Parallel for 3.5

Some time ago I blogged about a simple way to process a collection of lengthy operations on separate threads. The new Parallel Extensions library currently available as CTP, hits right on the spot for CPU-intensive operations. It is less suited for IO or network intensive operations with lots of waiting, and the thread pool is invaluable. The new language features provides new possibilities for implementing such a method.

class Program  
{ 
    static void Main(string[] args) 
    { 
        IEnumerable<int> ints = new int[] { 19, 18, 17, 14, 15 }; 
        ints.ThreadPoolForEach( 
            delegate(int i) 
            { 
                Console.WriteLine("start " 
                    + Thread.CurrentThread.ManagedThreadId); 
                Thread.Sleep(i * 1000); 
                Console.WriteLine("end " 
                    + Thread.CurrentThread.ManagedThreadId); 
            }); 
    } 
} 
public static void ThreadPoolForEach<T>(this IEnumerable<T> items, Action<T> command)  
{
    Stack<WaitHandle> waitHandles = new Stack<WaitHandle>();            
    Exception lastException = null;
    int i = 0;
    foreach (T item in items) {
        ManualResetEvent waitHandle = new ManualResetEvent(false);
        waitHandles.Push(waitHandle);
        ThreadPool.QueueUserWorkItem(
            delegate(object o) {
                try {
                    command((T)o);
                }
                catch (Exception e) {
                    lastException = e;
                }
                finally {
                    waitHandle.Set();
                }
            }, item);
        if (++i == 64) { // WaitAll can wait for no more than 64 handles
            i = 0;
            Mutex.WaitAll(waitHandles.ToArray());
            waitHandles.Clear();
        }
    }
    if (i>0)
        Mutex.WaitAll(waitHandles.ToArray());
    if (lastException != null)
        throw lastException;
}
Google
m@kli.dk @klinkby RSS feed  GitHub