Skip to content

Batching

Batching groups together many jobs of the same kind and executes them at the same time. A batch "leader" briefly waits for additional jobs that match the same batch key, then invokes your worker's WorkMany to process the group efficiently.

Batching is a feature of River Pro ✨. If you haven't yet, install River Pro and run the pro migration line.


Basic usage

Enable batching for a job kind in three steps:

  1. Implement BatchOpts() on your job args (alongside Kind()), returning a riverpro.BatchOpts to enable batching and configure how batches are formed.
  2. Implement ManyWorker by adding a WorkMany method that processes a slice of jobs of the same kind.
  3. In your Work method, delegate to the batching helper (riverbatch.Work) so a fetched job can prepare and execute a batch.
type MyBatchArgs struct{}
func (MyBatchArgs) Kind() string { return "my_batch" }
// Enable batching for this job kind. Customize options as needed.
func (MyBatchArgs) BatchOpts() riverpro.BatchOpts { return riverpro.BatchOpts{} }
type MyWorker struct {
river.WorkerDefaults[MyBatchArgs]
}
func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error {
// Invoke the batch helper so this job can gather a batch and run WorkMany.
return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, job, w, nil)
}
func (w *MyWorker) WorkMany(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error {
// Process the entire batch at once.
//
// Return nil to mark the entire batch as successful, a regular error to
// fail the entire batch with the same error, or a MultiError to return
// individualized errors per job.
return nil
}

How batching works

When a batchable job is fetched, it becomes a leader and computes a batch key based on the job kind and configured options. The batch key is precomputed at insert time for efficient lookups.

Once a leader is fetched, it polls at the configured interval to find more jobs matching the same batch key until the maximum count or timeout is reached. Once a full batch is fetched or the maximum delay has elapsed, the leader's worker's WorkMany is invoked once with the collected jobs (see ManyWorker).

Multiple leaders with the same key

River Pro's batching design is fully decentralized with no central coordinator. Multiple leaders with the same key may exist concurrently.

Operational considerations

The batch leader occupies a worker slot from the moment it is fetched until WorkMany completes. This can make jobs run longer than expected while a leader waits to collect a full batch. Account for this when choosing MaxWorkers and the max delay.

  • If partitioning by arguments, choose partitions that reflect isolation boundaries (e.g., per customer) to avoid over-aggregation and long waits.
  • Tune batch size, timeout, and poll interval to balance throughput and latency. Larger batches improve efficiency but increase tail latency for early jobs in the group.
  • WorkMany can return per-job errors via a MultiError to mark individual jobs failed while succeeding others in the same batch.

Configuration

Batches are always formed from jobs of a single kind, however their formation and partitioning can be further configured with riverpro.BatchOpts returned from BatchOpts() on your job args type.

Once a leader job is fetched, it will begin polling for more jobs that match the same batch key at a configured interval until either a maximum batch size is reached or a maximum wait time elapses. To customize the batching behavior, you can override the default options with riverbatch.WorkerOpts when calling the riverbatch.Work helper:

func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error {
// Override the default batch options so that we always wait 10 seconds
// for a full batch before executing:
return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, job, w, &riverbatch.WorkerOpts{
MaxCount: 1000, // 100 by default
MaxDelay: 10 * time.Second, // 5 seconds by default
PollInterval: 10 * time.Second, // 1 second by default
})
}

Batches are gathered by a leader job for a short window: the leader polls for more jobs at a configured interval until either a maximum batch size is reached or a timeout elapses.

Common considerations when choosing options (see WorkerOpts):

  • Maximum batch size (MaxCount): upper bound on jobs collected per batch.
  • Batch timeout (MaxDelay): how long a leader waits to gather more jobs before executing.
  • Poll interval (PollInterval): how frequently the leader checks for additional matching jobs while waiting.

Batching by arguments

Batches are always formed per job kind, meaning that when any batchable job is fetched, it will attempt to fill a batch with other available jobs of the same kind.

To achieve more granular batching, you can further partition by job arguments (via BatchOpts.ByArgs) to form independent batches per argument set (e.g., per customer or tenant).

When BatchOpts.ByArgs is enabled, the batch key includes all of the job's encoded args (sorted prior to hashing). You can opt to include only a subset of args by marking fields on your JobArgs struct with the river:"batch" tag:

type MyBatchArgs struct {
CustomerID string `json:"customer_id" river:"batch"`
TraceID string `json:"trace_id"`
}
func (MyBatchArgs) Kind() string { return "my_batch" }
func (MyBatchArgs) BatchOpts() riverpro.BatchOpts {
return riverpro.BatchOpts{ByArgs: true}
}

Only customer_id will contribute to the batch key; jobs with different trace_ids but the same customer_id will be batched together.

Prefer specific args for batching

Including all arguments can lead to very high cardinality, which may prevent effective batching. Prefer selecting stable identifiers (e.g., customer_id) via river:"batch" tags instead of hashing the entire args payload.

Avoiding redundant batches

Because leaders are decentralized, more than one leader may begin forming batches for the same key at the same time. If you want to avoid redundant overlapping batches, combine batching with a global concurrency limit of 1, partitioned by kind:

&riverpro.Config{
ProQueues: map[string]riverpro.QueueConfig{
"my_queue": {
Concurrency: riverpro.ConcurrencyConfig{
GlobalLimit: 1,
Partition: riverpro.PartitionConfig{ByKind: true},
},
MaxWorkers: 100,
},
},
}

If you're also batching by arguments, your concurrency limits should also be partitioned by the same arguments.

See Concurrency limits for details.

Function-worker helpers

Instead of implementing a full worker type, you can use the riverbatch.WorkFunc or riverbatch.WorkFuncSafely helpers to define a batch worker with just a function for working the batch:

workers := river.NewWorkers()
river.AddWorker(workers, riverbatch.WorkFunc[MyBatchArgs, pgx.Tx](
func(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error {
return nil // success
},
&riverbatch.WorkerOpts{MaxCount: 4, MaxDelay: time.Millisecond, PollInterval: 10 * time.Millisecond},
))

These are similar to work functions for regular workers.

Compatibility

Batching is compatible with all other River and River Pro features, including concurrency limits, workflows, and sequences.