Batching groups together many jobs of the same kind and executes them at the same time. A batch "leader" briefly waits for additional jobs that match the same batch key, then invokes your worker's WorkMany
to process the group efficiently.
Batching is a feature of River Pro ✨. If you haven't yet, install River Pro and run the pro
migration line.
Basic usage
Enable batching for a job kind in three steps:
- Implement
BatchOpts()
on your job args (alongsideKind()
), returning ariverpro.BatchOpts
to enable batching and configure how batches are formed. - Implement
ManyWorker
by adding aWorkMany
method that processes a slice of jobs of the same kind. - In your
Work
method, delegate to the batching helper (riverbatch.Work
) so a fetched job can prepare and execute a batch.
type MyBatchArgs struct{}
func (MyBatchArgs) Kind() string { return "my_batch" }
// Enable batching for this job kind. Customize options as needed.func (MyBatchArgs) BatchOpts() riverpro.BatchOpts { return riverpro.BatchOpts{} }
type MyWorker struct { river.WorkerDefaults[MyBatchArgs]}
func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error { // Invoke the batch helper so this job can gather a batch and run WorkMany. return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, job, w, nil)}
func (w *MyWorker) WorkMany(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error { // Process the entire batch at once. // // Return nil to mark the entire batch as successful, a regular error to // fail the entire batch with the same error, or a MultiError to return // individualized errors per job. return nil}
How batching works
When a batchable job is fetched, it becomes a leader and computes a batch key based on the job kind and configured options. The batch key is precomputed at insert time for efficient lookups.
Once a leader is fetched, it polls at the configured interval to find more jobs matching the same batch key until the maximum count or timeout is reached. Once a full batch is fetched or the maximum delay has elapsed, the leader's worker's WorkMany
is invoked once with the collected jobs (see ManyWorker
).
Multiple leaders with the same key
River Pro's batching design is fully decentralized with no central coordinator. Multiple leaders with the same key may exist concurrently.
Operational considerations
The batch leader occupies a worker slot from the moment it is fetched until WorkMany
completes. This can make jobs run longer than expected while a leader waits to collect a full batch. Account for this when choosing MaxWorkers
and the max delay.
- If partitioning by arguments, choose partitions that reflect isolation boundaries (e.g., per customer) to avoid over-aggregation and long waits.
- Tune batch size, timeout, and poll interval to balance throughput and latency. Larger batches improve efficiency but increase tail latency for early jobs in the group.
WorkMany
can return per-job errors via aMultiError
to mark individual jobs failed while succeeding others in the same batch.
Configuration
Batches are always formed from jobs of a single kind, however their formation and partitioning can be further configured with riverpro.BatchOpts
returned from BatchOpts()
on your job args type.
Once a leader job is fetched, it will begin polling for more jobs that match the same batch key at a configured interval until either a maximum batch size is reached or a maximum wait time elapses. To customize the batching behavior, you can override the default options with riverbatch.WorkerOpts
when calling the riverbatch.Work
helper:
func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error { // Override the default batch options so that we always wait 10 seconds // for a full batch before executing: return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, job, w, &riverbatch.WorkerOpts{ MaxCount: 1000, // 100 by default MaxDelay: 10 * time.Second, // 5 seconds by default PollInterval: 10 * time.Second, // 1 second by default })}
Batches are gathered by a leader job for a short window: the leader polls for more jobs at a configured interval until either a maximum batch size is reached or a timeout elapses.
Common considerations when choosing options (see WorkerOpts
):
- Maximum batch size (
MaxCount
): upper bound on jobs collected per batch. - Batch timeout (
MaxDelay
): how long a leader waits to gather more jobs before executing. - Poll interval (
PollInterval
): how frequently the leader checks for additional matching jobs while waiting.
Batching by arguments
Batches are always formed per job kind, meaning that when any batchable job is fetched, it will attempt to fill a batch with other available jobs of the same kind.
To achieve more granular batching, you can further partition by job arguments (via BatchOpts.ByArgs
) to form independent batches per argument set (e.g., per customer or tenant).
When BatchOpts.ByArgs
is enabled, the batch key includes all of the job's encoded args (sorted prior to hashing). You can opt to include only a subset of args by marking fields on your JobArgs
struct with the river:"batch"
tag:
type MyBatchArgs struct { CustomerID string `json:"customer_id" river:"batch"` TraceID string `json:"trace_id"`}
func (MyBatchArgs) Kind() string { return "my_batch" }
func (MyBatchArgs) BatchOpts() riverpro.BatchOpts { return riverpro.BatchOpts{ByArgs: true}}
Only customer_id
will contribute to the batch key; jobs with different trace_id
s but the same customer_id
will be batched together.
Prefer specific args for batching
Including all arguments can lead to very high cardinality, which may prevent effective batching. Prefer selecting stable identifiers (e.g., customer_id
) via river:"batch"
tags instead of hashing the entire args payload.
Avoiding redundant batches
Because leaders are decentralized, more than one leader may begin forming batches for the same key at the same time. If you want to avoid redundant overlapping batches, combine batching with a global concurrency limit of 1, partitioned by kind:
&riverpro.Config{ ProQueues: map[string]riverpro.QueueConfig{ "my_queue": { Concurrency: riverpro.ConcurrencyConfig{ GlobalLimit: 1, Partition: riverpro.PartitionConfig{ByKind: true}, }, MaxWorkers: 100, }, },}
If you're also batching by arguments, your concurrency limits should also be partitioned by the same arguments.
See Concurrency limits for details.
Function-worker helpers
Instead of implementing a full worker type, you can use the riverbatch.WorkFunc
or riverbatch.WorkFuncSafely
helpers to define a batch worker with just a function for working the batch:
workers := river.NewWorkers()river.AddWorker(workers, riverbatch.WorkFunc[MyBatchArgs, pgx.Tx]( func(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error { return nil // success }, &riverbatch.WorkerOpts{MaxCount: 4, MaxDelay: time.Millisecond, PollInterval: 10 * time.Millisecond},))
These are similar to work functions for regular workers.
Compatibility
Batching is compatible with all other River and River Pro features, including concurrency limits, workflows, and sequences.