Parallel iteration with configurable thread pool and ordered results
gem install philiprehberger-parallel_eachParallel iteration with configurable thread pool and ordered results
Add to your Gemfile:
gem "philiprehberger-parallel_each"
Or install directly:
gem install philiprehberger-parallel_each
require "philiprehberger/parallel_each"
# Parallel map (results preserve input order)
results = Philiprehberger::ParallelEach.map(urls, concurrency: 8) do |url|
fetch(url)
end
Philiprehberger::ParallelEach.each(items, concurrency: 4) do |item|
process(item)
end
even = Philiprehberger::ParallelEach.select(numbers, concurrency: 4, &:even?)
odd = Philiprehberger::ParallelEach.reject(numbers, concurrency: 4, &:even?)
Evaluate a predicate on every element in parallel and split into [truthy, falsy] in a single pass, order preserved within each array:
even, odd = Philiprehberger::ParallelEach.partition(numbers, concurrency: 4, &:even?)
admin = Philiprehberger::ParallelEach.find(users, concurrency: 4, &:admin?)
pairs = Philiprehberger::ParallelEach.flat_map(records, concurrency: 4) do |r|
[r.id, r.name]
end
# map_with_index passes (item, index) to the block
labeled = Philiprehberger::ParallelEach.map_with_index(items, concurrency: 4) do |item, idx|
"#{idx}: #{item}"
end
# each_with_index for side effects with index access
Philiprehberger::ParallelEach.each_with_index(items, concurrency: 4) do |item, idx|
puts "Processing item #{idx}: #{item}"
end
has_admin = Philiprehberger::ParallelEach.any?(users, concurrency: 4, &:admin?)
all_valid = Philiprehberger::ParallelEach.all?(users, concurrency: 4, &:valid?)
no_errors = Philiprehberger::ParallelEach.none?(records, concurrency: 4, &:invalid?)
even_count = Philiprehberger::ParallelEach.count(numbers, concurrency: 4, &:even?)
total = Philiprehberger::ParallelEach.reduce([1, 2, 3, 4], 0) { |acc, item| acc + item }
All methods accept a concurrency: keyword argument that controls the thread pool size. It defaults to Etc.nprocessors (the number of available CPU cores).
# Use 2 threads
Philiprehberger::ParallelEach.map(items, concurrency: 2) { |i| i * 2 }
# Use all available cores (default)
Philiprehberger::ParallelEach.map(items) { |i| i * 2 }
After a parallel run, inspect the most recent run's stats with last_stats:
Philiprehberger::ParallelEach.map(items, concurrency: 4) { |i| process(i) }
Philiprehberger::ParallelEach.last_stats
# => { workers: 4, completed: 10, failed: 0, elapsed_seconds: 0.123 }
last_stats returns nil until the first run completes. elapsed_seconds is nil
when no run has finished. Stats are reset and updated on each run.
If any block raises an exception, the first error is re-raised after all threads finish:
begin
Philiprehberger::ParallelEach.map(items, concurrency: 4) do |item|
raise ArgumentError, 'invalid' if item.nil?
transform(item)
end
rescue ArgumentError => e
puts e.message # => "invalid"
end
| Method | Description |
|---|---|
ParallelEach.map(collection, concurrency:) { |item| } | Parallel map preserving input order |
ParallelEach.each(collection, concurrency:) { |item| } | Parallel each, returns original collection |
ParallelEach.select(collection, concurrency:) { |item| } | Parallel filter preserving input order |
ParallelEach.reject(collection, concurrency:) { |item| } | Parallel inverse filter preserving input order |
ParallelEach.partition(collection, concurrency:) { |item| } | Parallel partition returning [truthy, falsy] with order preserved |
ParallelEach.flat_map(collection, concurrency:) { |item| } | Parallel flat_map, flattens one level |
ParallelEach.find(collection, concurrency:) { |item| } | Short-circuit find, returns first match or nil |
ParallelEach.any?(collection, concurrency:) { |item| } | Short-circuit any? |
ParallelEach.all?(collection, concurrency:) { |item| } | Short-circuit all? |
ParallelEach.none?(collection, concurrency:) { |item| } | Complement of any? |
ParallelEach.map_with_index(collection, concurrency:) { |item, idx| } | Parallel map with index |
ParallelEach.each_with_index(collection, concurrency:) { |item, idx| } | Parallel each with index |
ParallelEach.count(collection, concurrency:) { |item| } | Count matching elements |
ParallelEach.reduce(collection, initial, concurrency:) { |acc, item| } | Sequential reduction |
ParallelEach.last_stats | Hash of stats from the most recent run (workers, completed, failed, elapsed_seconds), or nil |
bundle install
bundle exec rspec
bundle exec rubocop
If you find this project useful: