Skip to content

Transactional job completion

River supports completing jobs in the same transaction as other worker operations with JobCompleteTx, guaranteeing that if the transaction successfully commits, the job will never rerun, even in the presence of intermittent failure.


Completing a job atomically

Normally, jobs are marked complete out-of-band from the work functions of those jobs. This is usually good because it lets River apply optimizations during completion so that queue throughput is as high as possible, but the downside is that there's a small chance of failure between when a job's work function is invoked and when a River client sets it as complete. Say its process is terminated at that exact instant for example.

Should a job fail to be set complete, it'll be rescued and rerun. This duplicates work that was already successful, but is the only way River can guarantee that the job was run.

All workers should always expect to be run at least once anyway, but those that'd like to minimize the probability of an accidental rerun can use JobCompleteTx to mark a job as complete in the same transaction as other operations. A successful commit guarantees that the job will never rerun. A failed commit discards all other changes so the next rerun starts with a fresh slate.

type TxWorker struct {
    river.WorkerDefaults[TxArgs]
    dbPool *pgxpool.Pool
}

func (w *TxWorker) Work(ctx context.Context, job *river.Job[TxArgs]) error {
    tx, err := w.dbPool.Begin(ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(ctx)

    ...

    _, err := river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
    if err != nil {
        return err
    }

    if err = tx.Commit(ctx); err != nil {
        return err
    }

    return nil
}

See the CompleteJobWithinTx example for complete code.

Why the generic parameter?

JobCompleteTx is a top-level river package function instead of one on the client. This is for user convenience so that a River client doesn't need to be threaded onto every worker, but that convenience requires a concession. Because JobCompleteTx doesn't have a client pointer to work with, it needs to take a generic parameter of the River driver in use ([*riverpgxv5.Driver] above) to give it enough information to complete a job.

The injected driver will always be the same type as the driver passed into NewClient to create a River client.