Package riverbatch provides functionality for batching multiple River jobs together for execution as a single group. Batched jobs must all have the same kind, and are enabled with 3 steps:
The job args type must implement JobArgsWithBatchOpts with a `BatchOpts` method alongside the usual `Kind` method. Batching can be partitioned by job args (or a subset) if riverpro.BatchOpts.ByArgs is true.
The worker must implement ManyWorker with a `WorkMany` method for processing a batch of slice of jobs of the same kind.
In the worker's `Work` method, call the riverbatch.Work helper to perform batching.
For example:
import (
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
"riverqueue.com/riverpro/riverbatch"
)
type MyBatchArgs struct {}
func (MyBatchArgs) Kind() string { return "my_batch" }
func (MyBatchArgs) BatchOpts() riverpro.BatchOpts { return riverpro.BatchOpts{} }
type MyWorker struct {
river.WorkerDefaults[MyBatchArgs]
}
func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error {
return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, job, w, &w.opts)
}
func (w *MyWorker) WorkMany(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error {
// Execute the batch. Return `nil` to mark the entire batch as successful, a
// regular error to mark the entire batch as failed with that same error, or
// a MultiError to return individualized errors per job.
return nil
}
See riverpro for a complete executable example.
For efficient querying of partitioned batch jobs, batches utilize a key that is precomputed at insert time based on the job args and batch options.
Batching is performed lazily without any centralized coordination. When a batch job starts and the Work helper is invoked, the job acts as a leader and fetches available jobs with the same batch key, polling for more jobs at the specified interval until either the max count or batch timeout is reached. Once all jobs are fetched, the worker executes the batch with [ManyWorker.WorkMany].
With this architecture it is possible for multiple batches with the same key to be active at the same time. To avoid this, batching can be combined with a global concurrency limit of 1 and `ByKind: true` to ensure there are no redundant batches.
The batch leader job will occupy a worker slot from the moment it is fetched until the batch is completed. This may result in jobs running for longer than expected and should be considered when choosing `MaxWorkers`.
This section is empty.
func Work[T JobArgsWithBatchOpts, TTx any](ctx context.Context, worker ManyWorker[T], job *river.Job[T], opts *WorkerOpts) error
Work fetches a batch of jobs and executes them as a group with riverbatch.ManyWorker.WorkMany, instead of executing only a single job.
Client or Worker level timeout settings apply to the entire batch and are inclusive of the polling timeout.
Returns an error if any invalid WorkerOpts are provided.
func WorkFunc[T JobArgsWithBatchOpts, TTx any](f func(context.Context, []*river.Job[T]) error, opts *WorkerOpts) river.Worker[T]
WorkFunc wraps a function to implement the Worker interface. A job args struct implementing river.JobArgs will still be required to specify a Kind.
For example:
river.AddWorker(workers, riverbatch.WorkFunc(func(ctx context.Context, jobs []*river.Job[WorkFuncArgs]) error {
fmt.Printf("Received a batch of %d jobs", len(jobs))
return nil
}, &riverbatch.WorkerOpts{MaxCount: 100, PollInterval: 1*time.Second, PollTimeout: 5*time.Second}))
type ManyWorker[T JobArgsWithBatchOpts] interface { // WorkMany performs the jobs and returns an error if failed. The context // will be configured with a timeout according to the worker settings and may // be cancelled for other reasons. // // If no error is returned, all the jobs are assumed to have succeeded and // will be marked completed. // // To mark only certain jobs as failed, cancelled, etc., utilze the MultiError // type to record errors on a per-job basis. // // As with any worker, it is important for any worker to respect context // cancellation to enable the client to respond to shutdown requests; there is // no way to cancel a running job that does not respect context cancellation, // other than terminating the process. WorkMany(ctx context.Context, jobs []*river.Job[T]) error }
ManyWorker is a river.ManyWorker that defines a `WorkMany` for working a batch of jobs instead of a `Work` for working a single job.
type MultiError struct {/* contains filtered or unexported fields */}
func NewMultiError() *MultiError
func (e *MultiError) Add(job *rivertype.JobRow, err error)
func (e *MultiError) AddByID(jobID int64, err error)
func (e *MultiError) Error() string
func (e *MultiError) Get(job *rivertype.JobRow) error
func (e *MultiError) GetByID(jobID int64) error
func (e *MultiError) Is(target error) bool
type WorkerOpts struct {
// MaxCount is the maximum number of jobs to include in a batch.
// Must be greater than 0 and less than math.MaxInt32.
//
// Default is 100 (MaxCountDefault).
MaxCount int
// MaxDelay is the maximum amount of time to wait for a full batch with
// MaxCount jobs to be available. Once the maximum delay has elapsed, a final
// attempt will be made to fetch more jobs (up to maxCount) before executing
// the batch with however many jobs were fetched.
//
// Must be 0 or greater. Default is 5 seconds (MaxDelayDefault).
MaxDelay time.Duration
// PollInterval is the interval at which to poll for more jobs to add to the
// batch until the batch is full or the MaxDelay is reached.
//
// Must be 0 or greater. Default is 1 second (PollIntervalDefault).
PollInterval time.Duration
}
WorkerOpts are the batching options for a ManyWorker.