Core concepts

Writing reliable workers

One of the biggest benefits of River is that jobs can be enqueued transactionally and atomically along with other database changes. Once a job is committed to the database, River will not lose track of it and will ensure that it is executed.

However, even the most robust and reliable applications will inevitably encounter job errors or other failures, particulary at scale. Any River job may be retried multiple times due to scenarios including:

  • An error returned by the worker's Work() method
  • An error preventing the completed job's state from being saved to the database
  • A process crash or goroutine panic

River is designed to retry jobs in the event of an error. This at-least-once execution pattern ensures that programming errors, crashes, or transient network failures do not result in a job being lost or forgotten. It is therefore important to write workers with the expectation that jobs can and will be retried.


Jobs are automatically completed or errored

When River fetches a new job from the database, it is passed to the worker's Work() method for execution. If nil is returned from this method, the job is presumed to have succeeded and will be (asynchronously) marked as such. However if an error is returned, River will save it to the database, increment the error count, and schedule the job for a retry in the future.


Timeouts and contexts

Go does not provide a way for a goroutine to cancel or terminate another goroutine. This means that the only practical way for River to timeout or cancel jobs is with context cancellation.

Respect context cancellation

Workers should respect context cancellation for timeouts, job cancellation, and safe rapid shutdown. If a worker blocks and does not respect ctx.Done(), these features will not function and River will be stuck waiting for the job to complete.

To ensure that workers will always respect a cancelled or timed out context, workers should always inherit from the context provided in Work().

Client job timeouts

By default, clients will time out all jobs after 1 minute (DefaultJobTimeout). This can be customized for all jobs at the client level with the JobTimeout config:

client, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    JobTimeout: 10*time.Minute,
})

River can also run jobs with no timeout, though it is not recommended to do this globally:

client, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    JobTimeout: -1,
})

Worker-level job timeouts

Timeouts can also be customized at the level of an individual worker:

func (w *MyWorker) Timeout(*Job[T]) time.Duration { return 30 * time.Second }

Some use cases require a job to run for an indefinite amount of time. To disable timeouts on a worker, return -1:

func (w *MyWorker) Timeout(*Job[T]) time.Duration { return -1 }

Jobs with no timeout are still subject to being manually cancelled (coming soon) or an abrupt graceful shutdown via StopAndCancel, but this will only function if the worker respects the context's cancellation.

Cancellation and shutdown

A job's context could be cancelled for other reasons, such as the job itself being cancelled remotely (coming soon) or the server entering a rapid graceful shutdown. Just like with timeouts, the worker should respect a cancelled context by switching any potentially slow operation on <-ctx.Done().


Writing retryable workers

Transactionally completing jobs alongside other changes

If the worker makes other changes to the local database, one way to improve reliability is to wrap the changes in a transaction and complete the job as part of the same transaction using JobCompleteTx. A successful commit guarantees that the job will never rerun. A failed commit discards all other changes so the next rerun starts with a fresh slate.

See transactional job completion.

Previous
Job retries