Resumable jobs break a worker into discrete steps so that if a job fails partway through, the next attempt can skip already-completed steps and pick up where it left off. This is useful for long-running or resource-intensive jobs with multiple stages, where re-running everything from scratch on each retry would be wasteful or cause unwanted side effects.
Basic steps with ResumableStep
ResumableStep wraps a named unit of work. On the first attempt, all steps run in order. If a step returns an error, River records the last successfully completed step in the job's metadata. On the next attempt, all steps up to and including the recorded step are skipped, and execution resumes at the next one.
type DataPipelineArgs struct { SourceURL string `json:"source_url"`}
func (DataPipelineArgs) Kind() string { return "data_pipeline" }
type DataPipelineWorker struct esu river.WorkerDefaults[DataPipelineArgs]}
func (w *DataPipelineWorker) Work(ctx context.Context, job *river.Job[DataPipelineArgs]) error { river.ResumableStep(ctx, "download", nil, func(ctx context.Context) error { return downloadData(ctx, job.Args.SourceURL) })
// If River was forced to stop between download and transform or midway into // transform, the next run of this job skips download and picks up here. river.ResumableStep(ctx, "transform", nil, func(ctx context.Context) error { return transformData(ctx) })
river.ResumableStep(ctx, "load", nil, func(ctx context.Context) error { return loadData(ctx) })
return nil}If transform returns an error, download is recorded as completed. On the next attempt, download is skipped and execution begins at transform.
Step names must be unique within a worker. Code outside of steps still runs on every attempt, so generally speaking, all executable logic should be placed inside steps.
See the ResumableStep example for complete code.
Cursor steps with ResumableStepCursor
ResumableStepCursor is a variant of ResumableStep that receives a cursor value from a previous attempt. This lets a step that processes items in a loop (and where each iteration of the loop is expensive enough to merit being skipped) resume from the last successfully processed item rather than starting over.
The cursor type is user-defined and can be any JSON-serializable value — a simple integer in its most basic form, or a struct with named fields for clarity or multiple cursors to handle nested loops.
type BatchProcessArgs struct { IDs []int `json:"ids"`}
func (BatchProcessArgs) Kind() string { return "batch_process" }
type BatchCursor struct { LastProcessedID int `json:"last_processed_id"`}
type BatchProcessWorker struct { river.WorkerDefaults[BatchProcessArgs]}
func (w *BatchProcessWorker) Work(ctx context.Context, job *river.Job[BatchProcessArgs]) error { river.ResumableStepCursor(ctx, "process_ids", nil, func(ctx context.Context, cursor BatchCursor) error { for _, id := range job.Args.IDs { // Skip IDs that were already processed on a previous attempt. if id <= cursor.LastProcessedID { continue }
if err := processID(ctx, id); err != nil { return err }
// Record progress after each item so the next attempt can // resume after it. if err := river.ResumableSetCursor(ctx, BatchCursor{ LastProcessedID: id, }); err != nil { return err } }
return nil })
return nil}On the first attempt, the cursor is the zero value of the cursor type. ResumableSetCursor records a checkpoint as progress is made. If the job fails, the cursor is saved in job metadata and passed to the step on the next attempt.
Steps and cursor steps can be freely mixed in the same worker. Steps are skipped or re-executed according to the same rules: completed steps are skipped, and a cursor step with saved cursor data is re-executed with that cursor.
See the ResumableStepCursor example for complete code.
Nested loops
The cursor type is user-defined, so it can track progress across multiple levels of iteration. For a nested loop, define a struct with a field for each level:
type NestedCursor struct { LastProcessedID int `json:"last_processed_id"` LastProcessedSubID int `json:"last_processed_sub_id"`}
func (w *NestedWorker) Work(ctx context.Context, job *river.Job[NestedArgs]) error { river.ResumableStepCursor(ctx, "process_ids", nil, func(ctx context.Context, cursor NestedCursor) error { for _, id := range job.Args.IDs { if id <= cursor.LastProcessedID { continue }
for _, subID := range job.Args.SubIDs[id] { if id == cursor.LastProcessedID && subID <= cursor.LastProcessedSubID { continue }
if err := processSubID(ctx, id, subID); err != nil { return err }
if err := river.ResumableSetCursor(ctx, NestedCursor{ LastProcessedID: id, LastProcessedSubID: subID, }); err != nil { return err } } }
return nil })
return nil}On resume, the outer loop skips fully completed IDs and the inner loop skips sub-IDs already processed within the current ID. Each call to ResumableSetCursor records both levels so no work is repeated.
Durable checkpoints with ResumableSetStepTx
By default, step progress and cursors are saved to job metadata after the worker returns. This is efficient, but means that if the process crashes or is forcefully killed mid-step, the progress recorded by ResumableSetCursor may be lost.
ResumableSetStepTx and ResumableSetStepCursorTx provide a stronger guarantee by persisting step progress immediately as part of a database transaction. If the transaction commits, the checkpoint is durable. If it rolls back, the checkpoint is discarded along with the other work.
func (w *DurableWorker) Work(ctx context.Context, job *river.Job[DurableArgs]) error { river.ResumableStep(ctx, "durable_step", nil, func(ctx context.Context) error { tx, err := w.dbPool.Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx)
// Do work within the transaction... if err := doWork(ctx, tx); err != nil { return err }
// Persist step completion as part of the same transaction. if _, err := river.ResumableSetStepTx[*riverpgxv5.Driver]( ctx, tx, job, ); err != nil { return err }
return tx.Commit(ctx) })
return nil}Use the transactional variants when a step performs database writes that must be atomic with the step's completion record. For steps that are idempotent or where the default post-execution persistence is sufficient, ResumableStep and ResumableSetCursor are simpler and don't require an extra database operation.
Testing resumable workers
The rivertest package includes helpers for testing resumable workers at specific steps without needing to orchestrate real failures.
This is particularly useful for making sure that jobs can run successfully even if they picked up midway. It's relatively easy, for example, to accidentally hit a nil pointer exception because a step was skipped that would've initialized the pointer had the job run all steps from the beginning. Jobs with skippable steps need to have a fallback route to avoid that, and tests make sure that the fallback route works.
Testing from a specific step
ResumableStepAfter configures a job to skip all steps up to and including the named step, simulating a retry after that step completed successfully:
func TestWorker_ResumableStepAfter(t *testing.T) { result, err := rivertest.NewWorker(t, driver, config, &MyWorker{}). Work(ctx, t, tx, MyArgs{}, rivertest.ResumableStepAfter(&river.InsertOpts{}, "download")) require.NoError(t, err) require.Equal(t, river.EventKindJobCompleted, result.EventKind)}This runs only the steps that come after download, letting you test later steps in isolation.
Testing with a cursor
ResumableStepAtCursor configures a job to resume at a cursor step with specific cursor data, simulating a retry after partial progress:
func TestWorker_ResumeWithCursor(t *testing.T) { result, err := rivertest.NewWorker(t, driver, config, &BatchProcessWorker{}). Work(ctx, t, tx, BatchProcessArgs{IDs: []int{1, 2, 3}}, rivertest.ResumableStepAtCursor(&river.InsertOpts{}, "process_ids", BatchCursor{LastProcessedID: 1})) require.NoError(t, err) require.Equal(t, river.EventKindJobCompleted, result.EventKind)}This runs the process_ids step with a cursor indicating that ID 1 was already processed, so only IDs 2 and 3 are processed. All steps before process_ids are skipped.
Both helpers modify InsertOpts in place and return the same pointer, so they can be used inline as shown above, or applied to existing options:
opts := &river.InsertOpts{Queue: "custom"}rivertest.ResumableStepAfter(opts, "step1")result, err := tw.Work(ctx, t, tx, args, opts)