How would you implement parallel processing in .NET?
Quick Answer
Choose the parallel approach by workload: `Parallel.For`/`Parallel.ForEach` and PLINQ (`AsParallel`) for CPU-bound data parallelism across cores; `Task.Run` + `Task.WhenAll` for concurrent independent tasks; and `async-await` for I/O-bound concurrency (which doesn't need extra threads). Mind partitioning, thread-safety, and `MaxDegreeOfParallelism`. Don't parallelize I/O-bound work with CPU-parallel constructs — use async concurrency instead.
Detailed Answer
.NET provides several approaches for parallel processing depending on your scenario:
1. Parallel.ForEach (CPU-bound operations):
Best for CPU-intensive operations on collections:
public void ProcessLargeDataset(List items)
{
Parallel.ForEach(items, new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
},
item =>
{
// CPU-intensive work
var result = PerformComplexCalculation(item);
SaveResult(result);
});
}
2. Parallel.For (indexed iterations):
public void ProcessArray(int[] numbers)
{
Parallel.For(0, numbers.Length, i =>
{
// Process each element
numbers[i] = ComplexCalculation(numbers[i]);
});
}
3. PLINQ (Parallel LINQ):
For declarative parallel queries:
public List ProcessWithPLINQ(List items)
{
var results = items
.AsParallel()
.WithDegreeOfParallelism(8)
.Where(item => item.IsValid)
.Select(item => ProcessItem(item))
.ToList();
return results;
}
// With ordering preserved
public List ProcessOrdered(List items)
{
var results = items
.AsParallel()
.AsOrdered() // Maintain original order
.Select(item => ProcessItem(item))
.ToList();
return results;
}
4. Task.WhenAll (I/O-bound operations):
Best for concurrent I/O operations:
public async Task<List> ProcessConcurrentlyAsync(List urls)
{
// Create all tasks
var tasks = urls.Select(url => FetchDataAsync(url)).ToList();
// Wait for all to complete
var results = await Task.WhenAll(tasks);
return results.ToList();
}
5. Parallel processing with cancellation:
public void ProcessWithCancellation(
List items,
CancellationToken cancellationToken)
{
var options = new ParallelOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = Environment.ProcessorCount
};
try
{
Parallel.ForEach(items, options, item =>
{
cancellationToken.ThrowIfCancellationRequested();
ProcessItem(item);
});
}
catch (OperationCanceledException)
{
Console.WriteLine("Processing cancelled");
}
}
6. Partitioning for better load balancing:
public void ProcessWithPartitioning(List items)
{
var partitioner = Partitioner.Create(items, loadBalance: true);
Parallel.ForEach(partitioner, item =>
{
ProcessItem(item);
});
}
7. Thread-safe result collection:
public List ProcessAndCollectResults(List items)
{
var results = new ConcurrentBag();
Parallel.ForEach(items, item =>
{
var result = ProcessItem(item);
results.Add(result); // Thread-safe
});
return results.ToList();
}
8. Async parallel processing with throttling:
public async Task<List> ProcessWithThrottlingAsync(
List urls,
int maxConcurrency)
{
var semaphore = new SemaphoreSlim(maxConcurrency);
var tasks = new List<Task>();
foreach (var url in urls)
{
await semaphore.WaitAsync();
var task = Task.Run(async () =>
{
try
{
return await FetchDataAsync(url);
}
finally
{
semaphore.Release();
}
});
tasks.Add(task);
}
var results = await Task.WhenAll(tasks);
return results.ToList();
}
9. Producer-Consumer pattern with BlockingCollection:
public void ProcessWithProducerConsumer(IEnumerable items)
{
var queue = new BlockingCollection(boundedCapacity: 100);
// Producer task
var producer = Task.Run(() =>
{
foreach (var item in items)
{
queue.Add(item);
}
queue.CompleteAdding();
});
// Consumer tasks
var consumers = Enumerable.Range(0, Environment.ProcessorCount)
.Select(_ => Task.Run(() =>
{
foreach (var item in queue.GetConsumingEnumerable())
{
ProcessItem(item);
}
}))
.ToArray();
Task.WaitAll(consumers);
}
10. Dataflow (TPL Dataflow) for complex pipelines:
using System.Threading.Tasks.Dataflow;
public async Task ProcessPipelineAsync(List items)
{
var downloadBlock = new TransformBlock(
async item => await DownloadAsync(item),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
});
var processBlock = new TransformBlock(
data => ProcessData(data),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
});
var saveBlock = new ActionBlock(
async data => await SaveAsync(data),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5
});
// Link the pipeline
downloadBlock.LinkTo(processBlock, new DataflowLinkOptions { PropagateCompletion = true });
processBlock.LinkTo(saveBlock, new DataflowLinkOptions { PropagateCompletion = true });
// Post items
foreach (var item in items)
{
await downloadBlock.SendAsync(item);
}
downloadBlock.Complete();
await saveBlock.Completion;
}
Best practices:
-
Choose the right approach:
- CPU-bound: Parallel.ForEach, PLINQ
- I/O-bound: Task.WhenAll, async/await
- Complex pipelines: TPL Dataflow
-
Control degree of parallelism:
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }; -
Handle exceptions properly:
try { Parallel.ForEach(items, item => ProcessItem(item)); } catch (AggregateException ae) { foreach (var ex in ae.InnerExceptions) { _logger.LogError(ex, "Processing failed"); } } -
Use thread-safe collections:
- ConcurrentBag, ConcurrentQueue, ConcurrentDictionary
-
Avoid over-parallelization:
- Too many threads can degrade performance
- Measure and optimize based on actual workload