Skip to content

Writing reliable workers

One of the biggest benefits of River is that jobs can be enqueued transactionally and atomically along with other database changes. Once a job is committed to the database, River will not lose track of it and will ensure that it is executed.

However, even the most robust and reliable applications will inevitably encounter job errors or other failures, particulary at scale. Any River job may be retried multiple times due to scenarios including:

  • An error returned by the worker's Work() method
  • An error preventing the completed job's state from being saved to the database
  • A process crash or goroutine panic

River is designed to retry jobs in the event of an error. This at-least-once execution pattern ensures that programming errors, crashes, or transient network failures do not result in a job being lost or forgotten. It is therefore important to write workers with the expectation that jobs can and will be retried.


Jobs are automatically completed or errored

When River fetches a new job from the database, it is passed to the worker's Work() method for execution. If nil is returned from this method, the job is presumed to have succeeded and will be (asynchronously) marked as such. However if an error is returned, River will save it to the database, increment the error count, and schedule the job for a retry in the future.


Timeouts and contexts

Go does not provide a way for a goroutine to cancel or terminate another goroutine. This means that the only practical way for River to timeout or cancel jobs is with context cancellation.

Respect context cancellation

Workers should respect context cancellation for timeouts, job cancellation, and safe rapid shutdown. If a worker blocks and does not respect ctx.Done(), these features will not function and River will be stuck waiting for the job to complete.

To ensure that workers will always respect a cancelled or timed out context, workers should always inherit from the context provided in Work().

Client job timeouts

By default, clients will time out all jobs after 1 minute (DefaultJobTimeout). This can be customized for all jobs at the client level with the JobTimeout config:

client, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    JobTimeout: 10*time.Minute,
})

River can also run jobs with no timeout, though it is not recommended to do this globally:

client, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    JobTimeout: -1,
})

Worker-level job timeouts

Timeouts can also be customized at the level of an individual worker:

func (w *MyWorker) Timeout(*Job[T]) time.Duration { return 30 * time.Second }

Some use cases require a job to run for an indefinite amount of time. To disable timeouts on a worker, return -1:

func (w *MyWorker) Timeout(*Job[T]) time.Duration { return -1 }

Jobs with no timeout are still subject to being manually cancelled (coming soon) or an abrupt graceful shutdown via StopAndCancel, but this will only function if the worker respects the context's cancellation.

Cancellation and shutdown

A job's context could be cancelled for other reasons, such as the job itself being cancelled remotely (coming soon) or the server entering a rapid graceful shutdown. Just like with timeouts, the worker should respect a cancelled context by switching any potentially slow operation on <-ctx.Done().


Job idempotency

A database's ACID properties will ensure that any particular insert operation occurs exactly once or errors. This guarantee can be augmented with unique jobs so that only one job will be inserted based on a set of uniquely defining properties.

Once a job is inserted, it will be worked with at least once semantics. A successful job in a normal system will run exactly once in the overwhelming majority number of cases, but there are exceptions. If the job errors, it'll be reworked until it succeeds, and there may be side effects between these runs. Rarer cases are possible too, like if a job finishes successfully, but fails to be marked as completed, in which case it'll be rescued and run again.

Jobs should be written so that they can still succeed even if run multiple times, which is generally accomplished by making every operation in the job idempotent.

Idempotency with transactions

River workers have a shortcut to easier idempotency in that they can open a transaction in their work body, and in case of an error, roll changes back so that the next attempt will start with a clean slate.

func (w *TxWorker) Work(ctx context.Context, job *river.Job[TxArgs]) error {
    tx, err := w.dbPool.Begin(ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(ctx)

    ...

    if err != nil {
        // Rollback occurs, reverting database changes made during the run.
        return err
    }

    return tx.Commit(ctx)
}

But it's important to understand that even a transaction is not a full idempotency guarantee. Consider:

  • A work function may have called out to external systems while running like HTTP APIs or other non-transactional data stores (e.g. ElasticSearch). These changes will not roll back with a transaction.

  • To maximize work throughput, jobs are normally completed out-of-band of a work function, and in rare cases that completion could fail even if the worker succeeded and committed its transaction. See below for a possible remedy.

Transactions may also have performance and operational side effects. A long-lived transaction will tie up a Postgres backend while it's open, and will contribute to database bloat as any rows still technically visible to the transaction have to be retained until it's closed, even if they've since been deleted or updated to a new state. Workers may want to avoid opening a transaction when one isn't needed, or when they're expected to be long running.

Transactionally completing jobs alongside other changes

A way to hedge against possible failures while completing a job and get a little closer to an exactly once guarantee is to wrap operations in a transaction and complete the job as part of the same transaction using JobCompleteTx. A successful commit guarantees that the job will never rerun. A failed commit discards all changes so the next run starts fresh.

func (w *TxWorker) Work(ctx context.Context, job *river.Job[TxArgs]) error {
    tx, err := w.dbPool.Begin(ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(ctx)

    ...

    _, err := river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
    if err != nil {
        return err
    }

    return tx.Commit(ctx)
}

River normally completes jobs in large, efficient batches, so there is a throughput trade off to completing jobs in this way. Opening a transaction also has a small marginal cost, so that'll add some overhead to the worker unless one was in use already.

See transactional job completion for more details.