Easy Parallelization and Smooth Multitasking - ' Limiting the Thread Pool ' (
Page 3 of 4 )
There are two very good, fundamental reasons to limit the size of the thread pool: time and memory. With a large collection, the system may not have enough memory to allocate one thread per item.
Creating a thread doesn't just create a Thread instance; it also allocates space for the thread's stack. Each thread gets a 1M stack, by default, so modern systems can support dozens of processes with dozens of threads per process. However, thousands of threads is a serious strain, and tens of thousands of 1M threads is not even possible with 32-bit addressing. As far as time goes, smooth multitasking depends on most threads being blocked most of the time. All multitasking involves some overhead, and two or more compute intensive tasks sharing a processor will run slower than the same tasks run sequentially.
When your threads are IO bound (or otherwise spend most of their time blocked), it's perfectly fine to rely on the thread pool's MaxThreads mechanism to manage execution. A modern processor can easily handle quite a few threads, each doing something like downloading large files; each thread does a relatively modest amount of computation in the intervals between waits, and making one or two handshakes wait a few microseconds has little impact on the total run time.
But if you are trying to use multiple processors to speed up a compute-intensive iteration, the default thread limit is much too generous. Each delegate takes less clock time if it essentially has a processor to itself, rather than having to multitask with several other delegates. You can minimize total runtime if you match the number of background threads to the number of processors (and to the amount of other background activity).
The (System.Threading) Semaphore class is a good way to limit your thread use. Where the Mutex class and the lock statement support binary locking — an object can be locked by only one thread at a time, and all other threads must wait; the Semaphore class supports n-way locking. A Semaphore can either be locked or unlocked, but a Semaphore can also be partially locked.
When you create a Semaphore, you specify how many threads can lock it at a time. If you specify a limit of two, then the first two threads that lock the Semaphore by calling WaitOne obtain the lock right away, and WaitOne returns more or less immediately. But the third thread that tries to lock the Semaphore by calling WaitOne will be blocked until another thread partially unlocks the Semaphore by calling Release.
Crucially, the thread that locks the Semaphore does not have to be the thread that unlocks the Semaphore. To process a collection in parallel, one thread per processor, the original thread locks a Semaphore before doing a BeginInvoke, and each asynch delegate unlocks the Semaphore before returning. The original thread thus blocks as soon as it has the maximum number of agents running, and wakes up to spawn a new agent each time an existing agent finishes. This is queuing behavior much like you get when the ThreadPool runs out of available threads and, as above, the actual run time depends on how the requests are ordered.
Actual code will help this abstract discussion make sense. I start with a simple, synchronous loop; do a naïve parallelization that relies on the MaxThreads queuing mechanism; and finally show a Semaphore-limited loop. The following snippets are all from the EasyParallelization VS.2005 project, which defines a GetString delegate — delegate string GetString(string Parameter) — that takes a string and returns a string. You can create a GetString delegate to a method like File.ReadAllText, or to the (EasyParallelization) Http.GetURL method, which uses a (System.Net) WebRequest to return a string containing the text of the web page.
Download the project file here.
The SerialGetString method is basically just a simple foreach loop, that synchronously invokes its GetFn parameter once for each URL in a params array.
static string[] SerialGetString(GetString GetFn,
params string[] Parameters)
{
List<string> Results =
new List<string>(Parameters.Length);
foreach (string Parameter in Parameters)
Results.Add(GetFn(Parameter));
// The Results array is in parameter order
// Copy List<string> to a string[]
return Results.ToArray();
}
There's nothing very unusual here. The SerialGetString method uses the familiar idiom of creating and populating a List<>, then using the List's ToArray method to return a fixed size array.
A premature optimizer may object to using a temporary List<>, instead of allocating a single string[] Results = new string[URLs.Length] and populating that, but the overhead of populating the variable length List<> and then copying the list to the result array is not that high (especially since we do know the result size) and the foreach loop is a lot smaller and simpler than the for loop that a fixed array would require.
The key point about this method is that, as above, the total runtime for the SerialGetString method is the sum of all the delegate run times, plus some overhead.
The simple asynchronous version of the SerialGetString method is not all that different.
static string[] ParallelGetString(GetString GetFn,
params string[] Parameters)
{
List<IAsyncResult> Cookies =
new List<IAsyncResult>(Parameters.Length);
// Use BeginInvoke to spawn a thread per Parameter
foreach (string Parameter in Parameters)
Cookies.Add(GetFn.BeginInvoke(Parameter, null, null));
List<string> Results =
new List<string>(Parameters.Length);
// Use EndInvoke to get each thread's result
foreach (IAsyncResult Cookie in Cookies)
Results.Add(GetFn.EndInvoke(Cookie));
return Results.ToArray();
}
As you can see, instead of calling GetFn directly, the ParallelGetString method first creates an appropriately-sized List<IAsyncResult>, and then repeatedly calls GetFn.BeginInvoke, saving BeginInvoke's IAsyncResult result to the temporary List<> in much the same way as the synchronous version saves string results. When this first foreach loop is done, the ParallelGetString method has an list of IAsyncResult 'cookies', in Parameters order.
If there are more Parameters than thread pool threads, some of these IAsyncResult cookies will certainly refer to delegates that have already returned, while others will refer to delegates that are still running. If the delegate has already returned, calling GetFn.EndInvoke returns right away, returning a string (that is, GetFn.EndInvoke does not return an object that will have to be cast to a string). If the delegate is still running, GetFn.EndInvoke blocks until the delegate returns, but it still returns a string.
The end result is a method that acts much like the SerialGetString method: It takes an array of strings and returns an array of strings, and it doesn't return until it's done a GetFn on every parameter. The big difference is that, as above, the total runtime for the ParallelGetString method may be as low as the single slowest download plus the extra overhead.
It may be a bit hard to see with lines wrapped so short, but the ParallelGetString method only takes five statements, where the SerialGetString method takes three. It creates one more temporary List<>, and it splits the Invoke loop into a BeginInvoke loop and an EndInvoke loop.
The code for the thread-limiting overload is more complicated than the above ParallelGetString method, but it really only does three new things: it creates a Semaphore; the main thread blocks in the first foreach, so that it doesn't BeginInvoke until there is a processor available; and it uses an anonymous method to create ThreadProc, a wrapper delegate that calls the GetFn delegate synchronously and then releases the Semaphore.
My first version of this method called GetFn.BeginInvoke, passing it an AsyncCallback that released the Semaphore. It turns out that EndInvoke only blocks until the asynch delegate returns; it's entirely possible for a thread that was blocked in EndInvoke to resume before the background thread executes the AsyncCallback. If the newly unblocked thread has already passed out of the using statement and closed the Semaphore, you get an exception when a subsequent AsyncCallback tries to Release the Semaphore.
static string[] ParallelGetString(GetString GetFn,
int MaxThreads, params string[] Parameters)
{
List<IAsyncResult> Cookies =
new List<IAsyncResult>(Parameters.Length);
// A Semaphore that will only allow
// MaxThreads to run at a time
using (Semaphore Limit =
new Semaphore(MaxThreads, MaxThreads))
{
// This thread proc "captures" Limit and GetFn
GetString ThreadProc = delegate(string Parameter)
{
string Result = GetFn(Parameter); // Synchronous!
Limit.Release();
return Result;
};
foreach (string Parameter in Parameters)
{
// block in the parallelizing thread
Limit.WaitOne();
// release in the worker thread
Cookies.Add(ThreadProc.BeginInvoke(Parameter, null, null));
}
List<string> Results =
new List<string>(Parameters.Length);
foreach (IAsyncResult Cookie in Cookies)
Results.Add(ThreadProc.EndInvoke(Cookie));
return Results.ToArray();
}
}
This thread-limiting ParallelGetString overload does a couple of new things before the first foreach loop. First, it creates a Semaphore, in a using block that will automatically Dispose of it. Second, it creates the GetString wrapper, ThreadProc, that it invokes asynchronously. This has the paradoxical effect that Limit.Release appears in the code before Limit.WaitOne does — but the WaitOne always executes first.
This is because the BeginInvoke loop locks the Semaphore (by calling Limit.WaitOne) before it invokes ThreadProc, the wrapper delegate. As above, so long as the Semaphore is not fully locked, the WaitOne calls return immediately, and the first thread spawns a new thread via BeginInvoke. However, once MaxThreads background threads have been spawned, calling Limit.WaitOne will block the first thread until one of the running delegates partially unlocks the Semaphore by calling Limit.Release before returning.