River

riverbatch

package
v0.21.0 Go to latest
Published: Jan 15, 2026 License: Proprietary

Package riverbatch provides functionality for batching multiple River jobs together for execution as a single group. Batched jobs must all have the same kind.

A fetched "leader" job briefly waits for additional jobs that share the same batch key, then invokes your worker's [ManyWorker.WorkMany] to process the group efficiently.

Basic usage

Enable batching for a job kind in three steps:

  1. Implement `BatchOpts()` on your job args (alongside `Kind()`), returning a riverpro.BatchOpts to enable batching and configure how batches are formed. Partitioning by arguments can be enabled with `ByArgs`.

  2. Implement ManyWorker by adding a `WorkMany` method that processes a slice of jobs of the same kind.

  3. In your `Work` method, delegate to the batching helper (Work) so a fetched job can prepare and execute a batch.

For example:

import (
	"context"
	"time"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverbatch"
	"github.com/jackc/pgx/v5"
)

type MyBatchArgs struct{}

func (MyBatchArgs) Kind() string { return "my_batch" }

// Enable batching for this job kind. Customize options as needed.
func (MyBatchArgs) BatchOpts() riverpro.BatchOpts { return riverpro.BatchOpts{} }

type MyWorker struct {
	river.WorkerDefaults[MyBatchArgs]
}

func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error {
	// Invoke the batch helper so this job can gather a batch and run WorkMany.
	return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, w, job, nil)
}

func (w *MyWorker) WorkMany(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error {
	// Process the entire batch at once.
	//
	// Return nil to succeed the entire batch, a regular error to fail the
	// entire batch with the same error, or a [MultiError] to return
	// individualized errors per job.
	return nil
}

Configuration

Customize batching behavior when calling Work with WorkerOpts:

func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error {
	return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, w, job, &riverbatch.WorkerOpts{
		MaxCount:     1000,             // 100 by default
		MaxDelay:     10 * time.Second, // 5s by default
		PollInterval: 10 * time.Second, // 1s by default
	})
}

Batching by arguments

Batches are always formed per job kind. For more granular batching, you can partition by job arguments via riverpro.BatchOpts.ByArgs. When `ByArgs` is enabled, the batch key includes either all encoded args or only the fields you mark with the struct tag `river:"batch"`:

type MyBatchArgs struct {
	CustomerID string `json:"customer_id" river:"batch"`
	TraceID    string `json:"trace_id"`
}

func (MyBatchArgs) Kind() string { return "my_batch" }

func (MyBatchArgs) BatchOpts() riverpro.BatchOpts {
	return riverpro.BatchOpts{ByArgs: true}
}

Only `customer_id` contributes to the batch key; jobs with different `trace_id` values but the same `customer_id` are batched together. Prefer selecting stable identifiers via tags to avoid excessive cardinality.

Avoiding redundant batches

Batching is fully decentralized; multiple leaders for the same key may run concurrently. If you want to ensure only one leader per key at a time, combine batching with a global concurrency limit of 1 partitioned by kind:

&riverpro.Config{
	ProQueues: map[string]riverpro.QueueConfig{
		"my_queue": {
			Concurrency: riverpro.ConcurrencyConfig{
				GlobalLimit: 1,
				Partition:   riverpro.PartitionConfig{ByKind: true},
			},
			MaxWorkers: 100,
		},
	},
}

If you also batch by arguments, apply matching concurrency partitions (for example, `Partition.ByArgs`).

Function-worker helpers

Instead of implementing a full worker type, you can use WorkFunc or WorkFuncSafely to define a batch worker with just a function:

workers := river.NewWorkers()
river.AddWorker(workers, riverbatch.WorkFunc[MyBatchArgs, pgx.Tx](
	func(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error {
		return nil // success
	},
	&riverbatch.WorkerOpts{MaxCount: 4, MaxDelay: time.Millisecond, PollInterval: 10 * time.Millisecond},
))

Operational considerations

Compatibility

Batching works with other River and River Pro features including concurrency limits, workflows, and sequences.

Implementation notes

See riverpro and repository examples for end-to-end usage.

Index

Constants

const (
	MaxCountDefault     = 100
	MaxDelayDefault     = 5 * time.Second
	PollIntervalDefault = 1 * time.Second
)

Variables

This section is empty.

Functions

func Work

func Work[T JobArgsWithBatchOpts, TTx any](ctx context.Context, worker ManyWorker[T], job *river.Job[T], opts *WorkerOpts) error

Work fetches a batch of jobs and executes them as a group with riverbatch.ManyWorker.WorkMany, instead of executing only a single job.

Client or Worker level timeout settings apply to the entire batch and are inclusive of the polling timeout.

Returns an error if any invalid WorkerOpts are provided.

func WorkFunc

func WorkFunc[T JobArgsWithBatchOpts, TTx any](f func(context.Context, []*river.Job[T]) error, opts *WorkerOpts) river.Worker[T]

WorkFunc wraps a function to implement the Worker interface. A job args struct implementing river.JobArgs will still be required to specify a Kind.

For example:

river.AddWorker(workers, riverbatch.WorkFunc(func(ctx context.Context, jobs []*river.Job[WorkFuncArgs]) error {
	fmt.Printf("Received a batch of %d jobs", len(jobs))
	return nil
}, &riverbatch.WorkerOpts{MaxCount: 100, PollInterval: 1*time.Second, PollTimeout: 5*time.Second}))

func WorkFuncSafely

func WorkFuncSafely[T JobArgsWithBatchOpts, TTx any](f func(context.Context, []*river.Job[T]) error, opts *WorkerOpts) (river.Worker[T], error)

WorkFuncSafely is the same as WorkFunc, but returns an error instead of panicking upon invalid options.

Types

type JobArgsWithBatchOpts

type JobArgsWithBatchOpts interface {
	river.JobArgs
	BatchOpts() riverpro.BatchOpts
}

type ManyWorker

type ManyWorker[T JobArgsWithBatchOpts] interface {
	// WorkMany performs the jobs and returns an error if failed. The context
	// will be configured with a timeout according to the worker settings and may
	// be cancelled for other reasons.
	//
	// If no error is returned, all the jobs are assumed to have succeeded and
	// will be marked completed.
	//
	// To mark only certain jobs as failed, cancelled, etc., utilze the MultiError
	// type to record errors on a per-job basis.
	//
	// As with any worker, it is important for any worker to respect context
	// cancellation to enable the client to respond to shutdown requests; there is
	// no way to cancel a running job that does not respect context cancellation,
	// other than terminating the process.
	WorkMany(ctx context.Context, jobs []*river.Job[T]) error
}

ManyWorker is a river.ManyWorker that defines a `WorkMany` for working a batch of jobs instead of a `Work` for working a single job.

type MultiError

type MultiError struct {/* contains filtered or unexported fields */}

func NewMultiError

func NewMultiError() *MultiError

func (*MultiError) Add

func (e *MultiError) Add(job *rivertype.JobRow, err error)

func (*MultiError) AddByID

func (e *MultiError) AddByID(jobID int64, err error)

func (*MultiError) Error

func (e *MultiError) Error() string

func (*MultiError) Get

func (e *MultiError) Get(job *rivertype.JobRow) error

func (*MultiError) GetByID

func (e *MultiError) GetByID(jobID int64) error

func (*MultiError) Is

func (e *MultiError) Is(target error) bool

type WorkerOpts

type WorkerOpts struct {
	// MaxCount is the maximum number of jobs to include in a batch.
	// Must be greater than 0 and less than math.MaxInt32.
	//
	// Default is 100 (MaxCountDefault).
	MaxCount int

	// MaxDelay is the maximum amount of time to wait for a full batch with
	// MaxCount jobs to be available. Once the maximum delay has elapsed, a final
	// attempt will be made to fetch more jobs (up to maxCount) before executing
	// the batch with however many jobs were fetched.
	//
	// Must be 0 or greater. Default is 5 seconds (MaxDelayDefault).
	MaxDelay time.Duration

	// PollInterval is the interval at which to poll for more jobs to add to the
	// batch until the batch is full or the MaxDelay is reached.
	//
	// Must be 0 or greater. Default is 1 second (PollIntervalDefault).
	PollInterval time.Duration
}

WorkerOpts are the batching options for a ManyWorker.