How would you implement parallel processing in .NET?

4 minadvanced.NETparallel-programmingTPLconcurrency

Quick Answer

Choose the parallel approach by workload: `Parallel.For`/`Parallel.ForEach` and PLINQ (`AsParallel`) for CPU-bound data parallelism across cores; `Task.Run` + `Task.WhenAll` for concurrent independent tasks; and `async-await` for I/O-bound concurrency (which doesn't need extra threads). Mind partitioning, thread-safety, and `MaxDegreeOfParallelism`. Don't parallelize I/O-bound work with CPU-parallel constructs — use async concurrency instead.

Detailed Answer

.NET provides several approaches for parallel processing depending on your scenario:

1. Parallel.ForEach (CPU-bound operations):

Best for CPU-intensive operations on collections:

public void ProcessLargeDataset(List items)
{
    Parallel.ForEach(items, new ParallelOptions
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount
    },
    item =>
    {
        // CPU-intensive work
        var result = PerformComplexCalculation(item);
        SaveResult(result);
    });
}

2. Parallel.For (indexed iterations):

public void ProcessArray(int[] numbers)
{
    Parallel.For(0, numbers.Length, i =>
    {
        // Process each element
        numbers[i] = ComplexCalculation(numbers[i]);
    });
}

3. PLINQ (Parallel LINQ):

For declarative parallel queries:

public List ProcessWithPLINQ(List items)
{
    var results = items
        .AsParallel()
        .WithDegreeOfParallelism(8)
        .Where(item => item.IsValid)
        .Select(item => ProcessItem(item))
        .ToList();
    
    return results;
}

// With ordering preserved
public List ProcessOrdered(List items)
{
    var results = items
        .AsParallel()
        .AsOrdered() // Maintain original order
        .Select(item => ProcessItem(item))
        .ToList();
    
    return results;
}

4. Task.WhenAll (I/O-bound operations):

Best for concurrent I/O operations:

public async Task<List> ProcessConcurrentlyAsync(List urls)
{
    // Create all tasks
    var tasks = urls.Select(url => FetchDataAsync(url)).ToList();
    
    // Wait for all to complete
    var results = await Task.WhenAll(tasks);
    
    return results.ToList();
}

5. Parallel processing with cancellation:

public void ProcessWithCancellation(
    List items, 
    CancellationToken cancellationToken)
{
    var options = new ParallelOptions
    {
        CancellationToken = cancellationToken,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };
    
    try
    {
        Parallel.ForEach(items, options, item =>
        {
            cancellationToken.ThrowIfCancellationRequested();
            ProcessItem(item);
        });
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Processing cancelled");
    }
}

6. Partitioning for better load balancing:

public void ProcessWithPartitioning(List items)
{
    var partitioner = Partitioner.Create(items, loadBalance: true);
    
    Parallel.ForEach(partitioner, item =>
    {
        ProcessItem(item);
    });
}

7. Thread-safe result collection:

public List ProcessAndCollectResults(List items)
{
    var results = new ConcurrentBag();
    
    Parallel.ForEach(items, item =>
    {
        var result = ProcessItem(item);
        results.Add(result); // Thread-safe
    });
    
    return results.ToList();
}

8. Async parallel processing with throttling:

public async Task<List> ProcessWithThrottlingAsync(
    List urls, 
    int maxConcurrency)
{
    var semaphore = new SemaphoreSlim(maxConcurrency);
    var tasks = new List<Task>();
    
    foreach (var url in urls)
    {
        await semaphore.WaitAsync();
        
        var task = Task.Run(async () =>
        {
            try
            {
                return await FetchDataAsync(url);
            }
            finally
            {
                semaphore.Release();
            }
        });
        
        tasks.Add(task);
    }
    
    var results = await Task.WhenAll(tasks);
    return results.ToList();
}

9. Producer-Consumer pattern with BlockingCollection:

public void ProcessWithProducerConsumer(IEnumerable items)
{
    var queue = new BlockingCollection(boundedCapacity: 100);
    
    // Producer task
    var producer = Task.Run(() =>
    {
        foreach (var item in items)
        {
            queue.Add(item);
        }
        queue.CompleteAdding();
    });
    
    // Consumer tasks
    var consumers = Enumerable.Range(0, Environment.ProcessorCount)
        .Select(_ => Task.Run(() =>
        {
            foreach (var item in queue.GetConsumingEnumerable())
            {
                ProcessItem(item);
            }
        }))
        .ToArray();
    
    Task.WaitAll(consumers);
}

10. Dataflow (TPL Dataflow) for complex pipelines:

using System.Threading.Tasks.Dataflow;

public async Task ProcessPipelineAsync(List items)
{
    var downloadBlock = new TransformBlock(
        async item => await DownloadAsync(item),
        new ExecutionDataflowBlockOptions 
        { 
            MaxDegreeOfParallelism = 10 
        });
    
    var processBlock = new TransformBlock(
        data => ProcessData(data),
        new ExecutionDataflowBlockOptions 
        { 
            MaxDegreeOfParallelism = Environment.ProcessorCount 
        });
    
    var saveBlock = new ActionBlock(
        async data => await SaveAsync(data),
        new ExecutionDataflowBlockOptions 
        { 
            MaxDegreeOfParallelism = 5 
        });
    
    // Link the pipeline
    downloadBlock.LinkTo(processBlock, new DataflowLinkOptions { PropagateCompletion = true });
    processBlock.LinkTo(saveBlock, new DataflowLinkOptions { PropagateCompletion = true });
    
    // Post items
    foreach (var item in items)
    {
        await downloadBlock.SendAsync(item);
    }
    
    downloadBlock.Complete();
    await saveBlock.Completion;
}

Best practices:

  1. Choose the right approach:

    • CPU-bound: Parallel.ForEach, PLINQ
    • I/O-bound: Task.WhenAll, async/await
    • Complex pipelines: TPL Dataflow
  2. Control degree of parallelism:

    var options = new ParallelOptions 
    { 
        MaxDegreeOfParallelism = Environment.ProcessorCount 
    };
    
  3. Handle exceptions properly:

    try
    {
        Parallel.ForEach(items, item => ProcessItem(item));
    }
    catch (AggregateException ae)
    {
        foreach (var ex in ae.InnerExceptions)
        {
            _logger.LogError(ex, "Processing failed");
        }
    }
    
  4. Use thread-safe collections:

    • ConcurrentBag, ConcurrentQueue, ConcurrentDictionary
  5. Avoid over-parallelization:

    • Too many threads can degrade performance
    • Measure and optimize based on actual workload