Bounded goroutine pool with backpressure and futures for Go
go get github.com/philiprehberger/go-worker-poolBounded goroutine pool with backpressure and futures for Go
go get github.com/philiprehberger/go-worker-pool
import "github.com/philiprehberger/go-worker-pool"
p := workerpool.New(4) // max 4 concurrent goroutines
for i := 0; i < 100; i++ {
p.Submit(func() {
// do work
})
}
p.Wait() // block until all tasks complete
f := workerpool.Go(p, func() (int, error) {
return computeExpensiveValue(), nil
})
// do other work...
val, err := f.Get() // block until result is ready
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := p.SubmitCtx(ctx, func() {
// do work
})
if err != nil {
// context was cancelled while waiting for a worker slot
}
err := p.SubmitTimeout(func() {
// do work
}, 2*time.Second)
if errors.Is(err, workerpool.ErrSubmitTimeout) {
// no worker slot available within 2 seconds
}
// With futures
f, err := workerpool.GoTimeout(p, func() (int, error) {
return computeValue(), nil
}, 2*time.Second)
if err != nil {
// timed out waiting for a worker slot
}
val, err := f.Get()
s := p.Stats()
fmt.Printf("workers=%d active=%d completed=%d\n",
s.Workers, s.Active, s.Completed)
p := workerpool.New(4)
// Scale up under load
p.Resize(8)
// Scale down — excess workers drain naturally
p.Resize(2)
// Wait for all active tasks to finish, then resume accepting work
p.Drain()
| Function / Type | Description |
|---|---|
New(concurrency int) *Pool | Create a new pool with the given concurrency limit |
(*Pool) Submit(fn func()) | Submit work; blocks if all workers are busy |
(*Pool) SubmitCtx(ctx, fn) error | Submit with context; returns ctx.Err() if cancelled while waiting |
(*Pool) SubmitTimeout(fn, d) error | Submit with timeout; returns ErrSubmitTimeout if deadline expires |
(*Pool) Wait() | Block until all submitted work completes |
(*Pool) Stop() | Mark stopped and wait; further submits panic |
(*Pool) Running() int | Approximate number of active goroutines |
(*Pool) Stats() PoolStats | Snapshot of workers, active, queued, and completed counts |
(*Pool) Resize(n int) | Dynamically adjust max concurrency |
(*Pool) Drain() | Wait for active tasks to finish, then resume accepting work |
Go[T](p, fn) *Future[T] | Submit work that returns a value; returns a Future |
GoTimeout[T](p, fn, d) (*Future[T], error) | Submit for result with timeout on submission |
(*Future[T]) Get() (T, error) | Block until result is ready |
(*Future[T]) Done() bool | Non-blocking check if complete |
ErrSubmitTimeout | Sentinel error for timed-out submissions |
PoolStats | Struct: Workers, Active, Queued, Completed |
go test ./...
go vet ./...
If you find this project useful: