Batch processor with per-item retry, backoff, and dead letter collection
gem install philiprehberger-retry_queueBatch processor with per-item retry, backoff, and dead letter collection
Add to your Gemfile:
gem "philiprehberger-retry_queue"
Or install directly:
gem install philiprehberger-retry_queue
require "philiprehberger/retry_queue"
result = Philiprehberger::RetryQueue.process(items, max_retries: 3) do |item|
process_item(item)
end
puts result.succeeded.size # => number of successful items
puts result.failed.size # => number of failed items
result = Philiprehberger::RetryQueue.process(items, max_retries: 5, backoff: ->(n) { n * 0.5 }) do |item|
external_api_call(item)
end
Reduce thundering-herd risk by randomizing the backoff delay. Pass a fraction in
0.0..1.0; the computed delay is multiplied by 1 + rand * jitter. A value of
0.0 (default) disables jitter.
result = Philiprehberger::RetryQueue.process(items, max_retries: 3, jitter: 0.3) do |item|
external_api_call(item)
end
Values outside 0.0..1.0 or non-Numeric values raise ArgumentError.
result = Philiprehberger::RetryQueue.process(items, max_retries: 3, retry_on: [Net::OpenTimeout, Timeout::Error]) do |item|
api_call(item)
end
# Only Net::OpenTimeout and Timeout::Error trigger retries
# All other errors send the item straight to failed
logger_hook = ->(item, error, attempt) { puts "Retrying #{item}: #{error.message} (attempt #{attempt})" }
metrics_hook = ->(item, _error, _attempt) { increment_counter("retry.#{item}") }
result = Philiprehberger::RetryQueue.process(items, max_retries: 3, on_retry: [logger_hook, metrics_hook]) do |item|
process_item(item)
end
on_failure = ->(item, error) { Rails.logger.error("Dead-lettered #{item}: #{error.message}") }
result = Philiprehberger::RetryQueue.process(items, max_retries: 3, on_failure: on_failure) do |item|
process_item(item)
end
The hook fires once per item that exhausts its retries, just as the item is recorded in
Result#failed. Exceptions raised inside the hook are swallowed so a faulty callback cannot
break the queue.
result = Philiprehberger::RetryQueue.process(jobs, max_retries: 2) do |job|
job.execute!
end
reprocessed = result.reprocess_failed do |item, error|
fallback_handler(item, error)
end
puts reprocessed.succeeded.size # => items recovered during reprocessing
puts reprocessed.failed.size # => items that failed reprocessing too
result = Philiprehberger::RetryQueue.process(records, max_retries: 3) do |record|
save(record)
end
stats = result.stats
# => { total: 100, succeeded: 97, failed: 3, success_rate: 0.97, elapsed: 1.23 }
result = Philiprehberger::RetryQueue.process(items, max_retries: 3) { |item| call(item) }
result.success_rate # => 0.92
result.failure_rate # => 0.08
Result#failure_rate is the counterpart to #success_rate and pairs naturally with
it — for any non-empty Result, success_rate + failure_rate sums to 1.0. Empty
batches return 0.0 for both.
| Method | Description |
|---|---|
.process(items, max_retries:, concurrency:, backoff:, retry_on:, on_retry:, on_failure:, jitter:) { |item| } | Process items with retry logic |
max_retries: | Integer >= 0. 0 means one attempt with no retries (not zero attempts) |
jitter: | Numeric in 0.0..1.0; multiplies backoff delay by 1 + rand * jitter. Defaults to 0.0 |
on_failure: | Callable (item, error) invoked once per item that exhausts retries; hook errors are swallowed |
Result#succeeded | Array of successfully processed items |
Result#failed | Array of hashes with :item, :error, :attempts |
Result#stats | Hash with :total, :succeeded, :failed, :success_rate, :elapsed |
Result#success_rate | Float in [0.0, 1.0]; ratio of succeeded to total items (0.0 for empty batches) |
Result#failure_rate | Ratio of failed items to total (0.0..1.0); pairs with success_rate |
Result#empty? | true when stats[:total] is 0 |
Result#size | Returns stats[:total] (succeeded + failed count) |
Result#reprocess_failed { |item, error| } | Reprocess failed items, returns a new Result |
bundle install
bundle exec rspec
bundle exec rubocop
If you find this project useful: