Package riverbatch provides functionality for batching multiple River jobs together for execution as a single group. Batched jobs must all have the same kind.
A fetched "leader" job briefly waits for additional jobs that share the same batch key, then invokes your worker's [ManyWorker.WorkMany] to process the group efficiently.
Enable batching for a job kind in three steps:
Implement `BatchOpts()` on your job args (alongside `Kind()`), returning a riverpro.BatchOpts to enable batching and configure how batches are formed. Partitioning by arguments can be enabled with `ByArgs`.
Implement ManyWorker by adding a `WorkMany` method that processes a slice of jobs of the same kind.
In your `Work` method, delegate to the batching helper (Work) so a fetched job can prepare and execute a batch.
For example:
import (
"context"
"time"
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
"riverqueue.com/riverpro/riverbatch"
"github.com/jackc/pgx/v5"
)
type MyBatchArgs struct{}
func (MyBatchArgs) Kind() string { return "my_batch" }
// Enable batching for this job kind. Customize options as needed.
func (MyBatchArgs) BatchOpts() riverpro.BatchOpts { return riverpro.BatchOpts{} }
type MyWorker struct {
river.WorkerDefaults[MyBatchArgs]
}
func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error {
// Invoke the batch helper so this job can gather a batch and run WorkMany.
return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, w, job, nil)
}
func (w *MyWorker) WorkMany(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error {
// Process the entire batch at once.
//
// Return nil to succeed the entire batch, a regular error to fail the
// entire batch with the same error, or a [MultiError] to return
// individualized errors per job.
return nil
}
Customize batching behavior when calling Work with WorkerOpts:
func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error {
return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, w, job, &riverbatch.WorkerOpts{
MaxCount: 1000, // 100 by default
MaxDelay: 10 * time.Second, // 5s by default
PollInterval: 10 * time.Second, // 1s by default
})
}
Batches are always formed per job kind. For more granular batching, you can partition by job arguments via riverpro.BatchOpts.ByArgs. When `ByArgs` is enabled, the batch key includes either all encoded args or only the fields you mark with the struct tag `river:"batch"`:
type MyBatchArgs struct {
CustomerID string `json:"customer_id" river:"batch"`
TraceID string `json:"trace_id"`
}
func (MyBatchArgs) Kind() string { return "my_batch" }
func (MyBatchArgs) BatchOpts() riverpro.BatchOpts {
return riverpro.BatchOpts{ByArgs: true}
}
Only `customer_id` contributes to the batch key; jobs with different `trace_id` values but the same `customer_id` are batched together. Prefer selecting stable identifiers via tags to avoid excessive cardinality.
Batching is fully decentralized; multiple leaders for the same key may run concurrently. If you want to ensure only one leader per key at a time, combine batching with a global concurrency limit of 1 partitioned by kind:
&riverpro.Config{
ProQueues: map[string]riverpro.QueueConfig{
"my_queue": {
Concurrency: riverpro.ConcurrencyConfig{
GlobalLimit: 1,
Partition: riverpro.PartitionConfig{ByKind: true},
},
MaxWorkers: 100,
},
},
}
If you also batch by arguments, apply matching concurrency partitions (for example, `Partition.ByArgs`).
Instead of implementing a full worker type, you can use WorkFunc or WorkFuncSafely to define a batch worker with just a function:
workers := river.NewWorkers()
river.AddWorker(workers, riverbatch.WorkFunc[MyBatchArgs, pgx.Tx](
func(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error {
return nil // success
},
&riverbatch.WorkerOpts{MaxCount: 4, MaxDelay: time.Millisecond, PollInterval: 10 * time.Millisecond},
))
Batching works with other River and River Pro features including concurrency limits, workflows, and sequences.
See riverpro and repository examples for end-to-end usage.
This section is empty.
func Work[T JobArgsWithBatchOpts, TTx any](ctx context.Context, worker ManyWorker[T], job *river.Job[T], opts *WorkerOpts) error
Work fetches a batch of jobs and executes them as a group with riverbatch.ManyWorker.WorkMany, instead of executing only a single job.
Client or Worker level timeout settings apply to the entire batch and are inclusive of the polling timeout.
Returns an error if any invalid WorkerOpts are provided.
func WorkFunc[T JobArgsWithBatchOpts, TTx any](f func(context.Context, []*river.Job[T]) error, opts *WorkerOpts) river.Worker[T]
WorkFunc wraps a function to implement the Worker interface. A job args struct implementing river.JobArgs will still be required to specify a Kind.
For example:
river.AddWorker(workers, riverbatch.WorkFunc(func(ctx context.Context, jobs []*river.Job[WorkFuncArgs]) error {
fmt.Printf("Received a batch of %d jobs", len(jobs))
return nil
}, &riverbatch.WorkerOpts{MaxCount: 100, PollInterval: 1*time.Second, PollTimeout: 5*time.Second}))
type ManyWorker[T JobArgsWithBatchOpts] interface { // WorkMany performs the jobs and returns an error if failed. The context // will be configured with a timeout according to the worker settings and may // be cancelled for other reasons. // // If no error is returned, all the jobs are assumed to have succeeded and // will be marked completed. // // To mark only certain jobs as failed, cancelled, etc., utilze the MultiError // type to record errors on a per-job basis. // // As with any worker, it is important for any worker to respect context // cancellation to enable the client to respond to shutdown requests; there is // no way to cancel a running job that does not respect context cancellation, // other than terminating the process. WorkMany(ctx context.Context, jobs []*river.Job[T]) error }
ManyWorker is a river.ManyWorker that defines a `WorkMany` for working a batch of jobs instead of a `Work` for working a single job.
type MultiError struct {/* contains filtered or unexported fields */}
func NewMultiError() *MultiError
func (e *MultiError) Add(job *rivertype.JobRow, err error)
func (e *MultiError) AddByID(jobID int64, err error)
func (e *MultiError) Error() string
func (e *MultiError) Get(job *rivertype.JobRow) error
func (e *MultiError) GetByID(jobID int64) error
func (e *MultiError) Is(target error) bool
type WorkerOpts struct {
// MaxCount is the maximum number of jobs to include in a batch.
// Must be greater than 0 and less than math.MaxInt32.
//
// Default is 100 (MaxCountDefault).
MaxCount int
// MaxDelay is the maximum amount of time to wait for a full batch with
// MaxCount jobs to be available. Once the maximum delay has elapsed, a final
// attempt will be made to fetch more jobs (up to maxCount) before executing
// the batch with however many jobs were fetched.
//
// Must be 0 or greater. Default is 5 seconds (MaxDelayDefault).
MaxDelay time.Duration
// PollInterval is the interval at which to poll for more jobs to add to the
// batch until the batch is full or the MaxDelay is reached.
//
// Must be 0 or greater. Default is 1 second (PollIntervalDefault).
PollInterval time.Duration
}
WorkerOpts are the batching options for a ManyWorker.