River

riverbatch

package
v0.19.0 Go to latest
Published: Oct 8, 2025 License: Proprietary

Package riverbatch provides functionality for batching multiple River jobs together for execution as a single group. Batched jobs must all have the same kind, and are enabled with 3 steps:

  1. The job args type must implement JobArgsWithBatchOpts with a `BatchOpts` method alongside the usual `Kind` method. Batching can be partitioned by job args (or a subset) if riverpro.BatchOpts.ByArgs is true.

  2. The worker must implement ManyWorker with a `WorkMany` method for processing a batch of slice of jobs of the same kind.

  3. In the worker's `Work` method, call the riverbatch.Work helper to perform batching.

For example:

import (
	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverbatch"
)

type MyBatchArgs struct {}

func (MyBatchArgs) Kind() string { return "my_batch" }
func (MyBatchArgs) BatchOpts() riverpro.BatchOpts { return riverpro.BatchOpts{} }

type MyWorker struct {
	river.WorkerDefaults[MyBatchArgs]
}

func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error {
	return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, job, w, &w.opts)
}

func (w *MyWorker) WorkMany(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error {
	// Execute the batch. Return `nil` to mark the entire batch as successful, a
	// regular error to mark the entire batch as failed with that same error, or
	// a MultiError to return individualized errors per job.
	return nil
}

See riverpro for a complete executable example.

Implementation notes

For efficient querying of partitioned batch jobs, batches utilize a key that is precomputed at insert time based on the job args and batch options.

Batching is performed lazily without any centralized coordination. When a batch job starts and the Work helper is invoked, the job acts as a leader and fetches available jobs with the same batch key, polling for more jobs at the specified interval until either the max count or batch timeout is reached. Once all jobs are fetched, the worker executes the batch with [ManyWorker.WorkMany].

With this architecture it is possible for multiple batches with the same key to be active at the same time. To avoid this, batching can be combined with a global concurrency limit of 1 and `ByKind: true` to ensure there are no redundant batches.

The batch leader job will occupy a worker slot from the moment it is fetched until the batch is completed. This may result in jobs running for longer than expected and should be considered when choosing `MaxWorkers`.

Index

Constants

const (
	MaxCountDefault     = 100
	MaxDelayDefault     = 5 * time.Second
	PollIntervalDefault = 1 * time.Second
)

Variables

This section is empty.

Functions

func Work

func Work[T JobArgsWithBatchOpts, TTx any](ctx context.Context, worker ManyWorker[T], job *river.Job[T], opts *WorkerOpts) error

Work fetches a batch of jobs and executes them as a group with riverbatch.ManyWorker.WorkMany, instead of executing only a single job.

Client or Worker level timeout settings apply to the entire batch and are inclusive of the polling timeout.

Returns an error if any invalid WorkerOpts are provided.

func WorkFunc

func WorkFunc[T JobArgsWithBatchOpts, TTx any](f func(context.Context, []*river.Job[T]) error, opts *WorkerOpts) river.Worker[T]

WorkFunc wraps a function to implement the Worker interface. A job args struct implementing river.JobArgs will still be required to specify a Kind.

For example:

river.AddWorker(workers, riverbatch.WorkFunc(func(ctx context.Context, jobs []*river.Job[WorkFuncArgs]) error {
	fmt.Printf("Received a batch of %d jobs", len(jobs))
	return nil
}, &riverbatch.WorkerOpts{MaxCount: 100, PollInterval: 1*time.Second, PollTimeout: 5*time.Second}))

func WorkFuncSafely

func WorkFuncSafely[T JobArgsWithBatchOpts, TTx any](f func(context.Context, []*river.Job[T]) error, opts *WorkerOpts) (river.Worker[T], error)

WorkFuncSafely is the same as WorkFunc, but returns an error instead of panicking upon invalid options.

Types

type JobArgsWithBatchOpts

type JobArgsWithBatchOpts interface {
	river.JobArgs
	BatchOpts() riverpro.BatchOpts
}

type ManyWorker

type ManyWorker[T JobArgsWithBatchOpts] interface {
	// WorkMany performs the jobs and returns an error if failed. The context
	// will be configured with a timeout according to the worker settings and may
	// be cancelled for other reasons.
	//
	// If no error is returned, all the jobs are assumed to have succeeded and
	// will be marked completed.
	//
	// To mark only certain jobs as failed, cancelled, etc., utilze the MultiError
	// type to record errors on a per-job basis.
	//
	// As with any worker, it is important for any worker to respect context
	// cancellation to enable the client to respond to shutdown requests; there is
	// no way to cancel a running job that does not respect context cancellation,
	// other than terminating the process.
	WorkMany(ctx context.Context, jobs []*river.Job[T]) error
}

ManyWorker is a river.ManyWorker that defines a `WorkMany` for working a batch of jobs instead of a `Work` for working a single job.

type MultiError

type MultiError struct {/* contains filtered or unexported fields */}

func NewMultiError

func NewMultiError() *MultiError

func (*MultiError) Add

func (e *MultiError) Add(job *rivertype.JobRow, err error)

func (*MultiError) AddByID

func (e *MultiError) AddByID(jobID int64, err error)

func (*MultiError) Error

func (e *MultiError) Error() string

func (*MultiError) Get

func (e *MultiError) Get(job *rivertype.JobRow) error

func (*MultiError) GetByID

func (e *MultiError) GetByID(jobID int64) error

func (*MultiError) Is

func (e *MultiError) Is(target error) bool

type WorkerOpts

type WorkerOpts struct {
	// MaxCount is the maximum number of jobs to include in a batch.
	// Must be greater than 0 and less than math.MaxInt32.
	//
	// Default is 100 (MaxCountDefault).
	MaxCount int

	// MaxDelay is the maximum amount of time to wait for a full batch with
	// MaxCount jobs to be available. Once the maximum delay has elapsed, a final
	// attempt will be made to fetch more jobs (up to maxCount) before executing
	// the batch with however many jobs were fetched.
	//
	// Must be 0 or greater. Default is 5 seconds (MaxDelayDefault).
	MaxDelay time.Duration

	// PollInterval is the interval at which to poll for more jobs to add to the
	// batch until the batch is full or the MaxDelay is reached.
	//
	// Must be 0 or greater. Default is 1 second (PollIntervalDefault).
	PollInterval time.Duration
}

WorkerOpts are the batching options for a ManyWorker.