Batch processing and chunking utilities for Go. Generic, concurrent, zero dependencies
go get github.com/philiprehberger/go-batchBatch processing and chunking utilities for Go. Generic, concurrent, zero dependencies
go get github.com/philiprehberger/go-batch
import "github.com/philiprehberger/go-batch"
records := []Record{...} // 1000 records
for _, chunk := range batch.Chunk(records, 100) {
db.BulkInsert(chunk) // insert 100 at a time
}
import "github.com/philiprehberger/go-batch"
err := batch.Process(ctx, userIDs, 50, func(ctx context.Context, ids []int) error {
return sendNotifications(ctx, ids)
}, batch.WithWorkers(4))
import "github.com/philiprehberger/go-batch"
errs := batch.ProcessWithErrors(records, 100, 4, func(batch []Record) error {
return db.BulkInsert(batch)
})
for _, err := range errs {
log.Printf("batch failed: %v", err)
}
import "github.com/philiprehberger/go-batch"
type Order struct {
Status string
ID int
}
orders := []Order{{Status: "pending", ID: 1}, {Status: "shipped", ID: 2}, {Status: "pending", ID: 3}}
grouped := batch.ChunkBy(orders, func(o Order) string { return o.Status })
// grouped["pending"] = [{pending 1} {pending 3}]
// grouped["shipped"] = [{shipped 2}]
import "github.com/philiprehberger/go-batch"
acc := batch.NewAccumulator(func(events []Event) {
publishEvents(events)
}, batch.FlushSize[Event](100), batch.FlushInterval[Event](5*time.Second))
for event := range incomingEvents {
acc.Add(event)
}
acc.Stop()
acc := batch.NewAccumulator(func(events []Event) {
publishEvents(events)
}, batch.FlushSize[Event](100))
acc.Add(event1)
acc.Add(event2)
stats := acc.Stats()
fmt.Printf("flushes: %d, total: %d, pending: %d\n", stats.FlushCount, stats.TotalItems, stats.Pending)
acc := batch.NewAccumulator[Event](func(events []Event) {
publishEvents(events)
}, batch.FlushSize[Event](100))
acc.Add(event1)
acc.Add(event2)
buffered := acc.Peek() // returns copy of buffered items without flushing
fmt.Println(len(buffered)) // 2
| Function | Description |
|---|---|
Chunk[T](items, size) | Split slice into batches |
ChunkBy[T, K](items, key) | Group items into map by key function |
Process[T](ctx, items, size, fn, opts...) | Concurrent batch processing |
ProcessWithErrors[T](items, size, workers, fn) | Concurrent processing, returns error slice |
WithWorkers(n) | Set number of concurrent workers |
NewAccumulator[T](fn, opts...) | Auto-flushing item accumulator |
FlushSize[T](n) | Flush when N items accumulated |
FlushInterval[T](d) | Flush on time interval |
OnFlush[T](fn) | Callback invoked after each flush |
Accumulator.Stats() | Returns flush count, total items, pending |
Accumulator.Peek() | Returns copy of buffered items without flushing |
go test ./...
go vet ./...
If you find this project useful: