Philiprehberger.BatchProcessor

Process large collections in configurable batches with progress reporting, error handling, async execution, streaming IAsyncEnumerable support, checkpoint/resume, and adaptive batch sizing.
Installation
dotnet add package Philiprehberger.BatchProcessor
Usage
using Philiprehberger.BatchProcessor;
var items = Enumerable.Range(1, 1000).ToList();
var result = await BatchProcessor.Process(items, batchSize: 100, async batch =>
{
await ProcessItemsAsync(batch);
});
Console.WriteLine($"Processed {result.SuccessCount} items in {result.TotalDuration.TotalSeconds}s");
Progress Reporting
using Philiprehberger.BatchProcessor;
var items = Enumerable.Range(1, 500).ToList();
var result = await BatchProcessor.Process(items, batchSize: 50, async batch =>
{
await SendToApiAsync(batch);
}, new BatchOptions
{
OnProgress = progress =>
{
Console.WriteLine($"Progress: {progress.Percent:F1}% ({progress.ProcessedCount}/{progress.TotalCount})");
}
});
Parallel Execution with Error Handling
using Philiprehberger.BatchProcessor;
var items = Enumerable.Range(1, 1000).ToList();
var result = await BatchProcessor.Process(items, batchSize: 50, async batch =>
{
await SendToApiAsync(batch);
}, new BatchOptions
{
MaxDegreeOfParallelism = 4,
OnBatchError = BatchErrorHandling.Skip,
RetryCount = 2
});
Console.WriteLine($"Success: {result.SuccessCount}, Failures: {result.FailureCount}");
Console.WriteLine($"Errors: {result.Errors.Count}");
Per-Batch Timeout
using Philiprehberger.BatchProcessor;
var items = Enumerable.Range(1, 1000).ToList();
var result = await BatchProcessor.Process(items, batchSize: 100, async batch =>
{
await SendToSlowServiceAsync(batch);
}, new BatchOptions
{
BatchTimeout = TimeSpan.FromSeconds(30),
OnBatchError = BatchErrorHandling.Skip
});
Per-Item Error Tracking
using Philiprehberger.BatchProcessor;
var items = Enumerable.Range(1, 100).ToList();
var result = await BatchProcessor.ProcessAsync(items, batchSize: 10, async batch =>
{
await ProcessBatchAsync(batch);
});
Console.WriteLine($"Succeeded: {result.SucceededCount}, Failed: {result.FailedCount}");
foreach (var failure in result.Failures)
{
Console.WriteLine($"Item {failure.Item} failed: {failure.Exception?.Message}");
}
Batch Completed Callback
using Philiprehberger.BatchProcessor;
var items = Enumerable.Range(1, 500).ToList();
var result = await BatchProcessor.Process(items, batchSize: 50, async batch =>
{
await ProcessBatchAsync(batch);
}, new BatchOptions
{
OnBatchCompleted = e =>
{
Console.WriteLine($"Batch {e.BatchIndex}: {e.SuccessCount} ok, {e.FailureCount} failed in {e.Elapsed.TotalMilliseconds}ms");
}
});
Cancellation Token
using Philiprehberger.BatchProcessor;
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
var items = Enumerable.Range(1, 10000).ToList();
var result = await BatchProcessor.Process(items, batchSize: 100, async batch =>
{
await ProcessBatchAsync(batch);
}, cancellationToken: cts.Token);
Streaming with IAsyncEnumerable
Process items from an async stream without materializing the full collection:
using Philiprehberger.BatchProcessor;
async IAsyncEnumerable<Order> GetOrdersAsync()
{
await foreach (var order in database.StreamOrdersAsync())
{
yield return order;
}
}
var result = await BatchProcessor.ProcessStreamAsync(GetOrdersAsync(), batchSize: 50, async batch =>
{
await SendToWarehouseAsync(batch);
}, new BatchOptions
{
MaxDegreeOfParallelism = 3,
OnBatchError = BatchErrorHandling.Skip
});
Console.WriteLine($"Streamed {result.SuccessCount} orders, {result.FailureCount} failed");
Checkpoint and Resume
Persist progress after each batch and resume from where processing left off:
using Philiprehberger.BatchProcessor;
var lastCheckpoint = LoadCheckpointFromDisk(); // e.g., returns 5
var result = await BatchProcessor.Process(items, batchSize: 100, async batch =>
{
await ProcessBatchAsync(batch);
}, new BatchOptions
{
ResumeFromBatch = lastCheckpoint + 1,
CheckpointCallback = batchIndex =>
{
SaveCheckpointToDisk(batchIndex);
}
});
Streaming with Per-Item Results
When you need per-item failure tracking on a streamed source, use ProcessStreamWithItemsAsync:
using Philiprehberger.BatchProcessor;
async IAsyncEnumerable<Order> GetOrdersAsync()
{
await foreach (var order in database.StreamOrdersAsync())
{
yield return order;
}
}
var result = await BatchProcessor.ProcessStreamWithItemsAsync(GetOrdersAsync(), batchSize: 50, async batch =>
{
await SendToWarehouseAsync(batch);
}, new BatchOptions
{
OnBatchError = BatchErrorHandling.Skip
});
foreach (var failure in result.Failures)
{
Console.WriteLine($"Order {failure.Item} failed: {failure.Exception?.Message}");
}
Adaptive Batch Sizing
Automatically adjust batch sizes based on measured throughput:
using Philiprehberger.BatchProcessor;
var items = Enumerable.Range(1, 10000).ToList();
var result = await BatchProcessor.Process(items, batchSize: 50, async batch =>
{
await SendToApiAsync(batch);
}, new BatchOptions
{
AdaptiveBatching = new AdaptiveBatchOptions
{
MinBatchSize = 10,
MaxBatchSize = 500,
TargetThroughput = 200 // items per second
},
OnBatchCompleted = e =>
{
Console.WriteLine($"Batch {e.BatchIndex}: {e.ItemCount} items in {e.Elapsed.TotalMilliseconds}ms");
}
});
API
BatchProcessor
| Method | Description |
|---|
Process<T>(items, batchSize, processor, options?, cancellationToken?) | Process items in batches asynchronously. Returns a BatchResult. |
ProcessAsync<T>(items, batchSize, processor, options?, cancellationToken?) | Process items in batches with per-item error tracking. Returns a BatchResult<T>. |
ProcessStreamAsync<T>(source, batchSize, processor, options?, cancellationToken?) | Process items from an IAsyncEnumerable<T> source in batches. Returns a BatchResult. |
ProcessStreamWithItemsAsync<T>(source, batchSize, processor, options?, cancellationToken?) | Process items from an IAsyncEnumerable<T> source with per-item error tracking. Returns a BatchResult<T>. |
BatchOptions
| Property | Type | Default | Description |
|---|
MaxDegreeOfParallelism | int | 1 | Maximum number of batches to process concurrently. |
OnProgress | Action<BatchProgress>? | null | Callback invoked after each batch completes. |
OnBatchError | BatchErrorHandling | Abort | Error handling strategy: Abort or Skip. |
RetryCount | int | 0 | Number of times to retry a failed batch. |
BatchTimeout | TimeSpan? | null | Timeout per batch. Throws TimeoutException if exceeded. |
OnBatchCompleted | Action<BatchCompletedEventArgs>? | null | Callback with batch index, item count, elapsed time, and success/failure counts. |
CheckpointCallback | Action<int>? | null | Callback invoked after each batch with the zero-based batch index for checkpoint/resume. |
ResumeFromBatch | int | 0 | Zero-based batch index to resume from. Earlier batches are skipped. |
AdaptiveBatching | AdaptiveBatchOptions? | null | Adaptive batch sizing configuration. |
AdaptiveBatchOptions
| Property | Type | Default | Description |
|---|
MinBatchSize | int | 1 | Minimum batch size for adaptive sizing. |
MaxBatchSize | int | 1000 | Maximum batch size for adaptive sizing. |
TargetThroughput | double | 100.0 | Target throughput in items per second. |
BatchProgress
| Property | Type | Description |
|---|
ProcessedCount | int | Number of items processed so far. |
TotalCount | int | Total number of items. |
CurrentBatch | int | Current batch number (1-based). |
TotalBatches | int | Total number of batches. |
Percent | double | Completion percentage (0-100). |
BatchResult
| Property | Type | Description |
|---|
SuccessCount | int | Number of successfully processed items. |
FailureCount | int | Number of items in failed batches. |
TotalDuration | TimeSpan | Total processing duration. |
Errors | IReadOnlyList<BatchError> | List of batch errors. |
BatchResult<T>
| Property | Type | Description |
|---|
Items | IReadOnlyList<BatchItemResult<T>> | Per-item results for every processed item. |
SucceededCount | int | Number of items that succeeded. |
FailedCount | int | Number of items that failed. |
Failures | IReadOnlyList<BatchItemResult<T>> | Per-item results for failed items only. |
TotalDuration | TimeSpan | Total processing duration. |
Errors | IReadOnlyList<BatchError> | List of batch-level errors. |
BatchItemResult<T>
| Property | Type | Description |
|---|
Item | T | The item that was processed. |
Success | bool | Whether the item was processed successfully. |
Exception | Exception? | The exception that occurred, or null if successful. |
BatchCompletedEventArgs
| Property | Type | Description |
|---|
BatchIndex | int | Zero-based index of the completed batch. |
ItemCount | int | Number of items in the batch. |
Elapsed | TimeSpan | Time spent processing the batch. |
SuccessCount | int | Number of items that succeeded in this batch. |
FailureCount | int | Number of items that failed in this batch. |
BatchError
| Property | Type | Description |
|---|
BatchIndex | int | Zero-based index of the failed batch. |
Exception | Exception | The exception that occurred. |
Development
dotnet build src/Philiprehberger.BatchProcessor.csproj --configuration Release
Support
If you find this project useful:
⭐ Star the repo
🐛 Report issues
💡 Suggest features
❤️ Sponsor development
🌐 All Open Source Projects
💻 GitHub Profile
🔗 LinkedIn Profile
License
MIT