Hopefully your background jobs always succeed, but in case they don’t, River has you covered with a robust retry system that tracks them through to success. But there are cases where a full retry is suboptimally expensive. A long-running analytical SQL query may have executed whose results are fully cacheable, and running it again would be unnecessary pressure on the database. Or in the age of AI, this might be an agent step that’s both long and expensive to run. Avoiding doing it twice saves tokens, and by extension, money.
Today we’re shipping resumable jobs — jobs that pick up where they left off on retry. They’re complementary to River’s workflows. Useful on their own if you’re not using the Pro product, or good for breaking a single large workflow task into resumable steps if you are.
Step functions
The core of resumable jobs is river.ResumableStep. It’s a simple helper that defines steps as functions:
func (w *DataPipelineWorker) Work(ctx context.Context, job *river.Job[DataPipelineArgs]) error { river.ResumableStep(ctx, "download", nil, func(ctx context.Context) error { return downloadData(ctx, job.Args.SourceURL) })
// If River was forced to stop between download and transform or midway into // transform, the next run of this job skips download and picks up here. river.ResumableStep(ctx, "transform", nil, func(ctx context.Context) error { return transformData(ctx) })
river.ResumableStep(ctx, "load", nil, func(ctx context.Context) error { return loadData(ctx) })
return nil}A design goal of resumable jobs was that they should just be normal River jobs with no special syntax. We think we achieved that — there’s no separate interface to implement or struct to embed, with the only necessity being the use of helpers inside normal Work functions.
In the code above:
-
A normal successful execution invokes
download,transform, thenloadin sequence. The job is technically completed with a checkpoint ofload(the last step), but that won’t be used again because the job won’t re-run. -
If
downloadsucceeds, but there’s a failure betweendownload/transformor insidetransform, the job errors with a checkpoint ofdownload. The next time it’s retried,downloadis skipped and River goes directly totransform. -
Checkpoints are set on a job regardless of success or failure. The only time they’re not is in the event of a process crash or forcible kill mid-step, which needs a different helper to handle elegantly (see
ResumableSetStepTxbelow).
Cursors for heavy loops
A step is expected to be a heavyweight unit of work, but large workloads can occur within steps as well. A common pattern is to iterate over a collection dynamically and process each item within, each iteration of which can be resource-intensive enough to merit its own checkpoint.
Along with the name of a step, River can also store a cursor for each step. The simplest cursor is the last ID that was processed in the loop:
func (w *ResumableCursorWorker) Work(ctx context.Context, job *river.Job[ResumableCursorArgs]) error { river.ResumableStepCursor(ctx, "process_ids", nil, func(ctx context.Context, lastProcessedID int) error { for _, id := range job.Args.IDs { if id <= lastProcessedID { continue }
fmt.Printf("Processed %d\n", id) if err := river.ResumableSetCursor(ctx, id); err != nil { return err } }
return nil })
return nil}The value of the cursor is user-defined, so more complex values are supported too. Here’s a multi-level loop iteration with two IDs in the cursor:
type ResumableCursorArgs struct { IDs []int `json:"ids"` SubIDs map[int][]int `json:"sub_ids"`}
type ResumableCursor struct { LastProcessedID int `json:"last_processed_id"` LastProcessedSubID int `json:"last_processed_sub_id"` // points to only subID of LastProcessedID}
type ResumableCursorWorker struct { river.WorkerDefaults[ResumableCursorArgs]}
func (w *ResumableCursorWorker) Work(ctx context.Context, job *river.Job[ResumableCursorArgs]) error { river.ResumableStepCursor(ctx, "process_ids", nil, func(ctx context.Context, cursor ResumableCursor) error { for _, id := range job.Args.IDs { if id <= cursor.LastProcessedID { continue }
for _, subID := range job.Args.SubIDs[id] { if id == cursor.LastProcessedID && subID <= cursor.LastProcessedSubID { continue }
fmt.Printf("Processed %d / %d\n", id, subID) if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id, LastProcessedSubID: subID}); err != nil { return err } }
fmt.Printf("Processed %d\n", id) if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil { return err } }
return nil })
return nil}Transactional checkpoints
One of River’s strongest differentiators is its transactional guarantees, so we’d be remiss to ship this feature without considering how it interacts with transactions.
By default, a job’s latest step is persisted when the job completes or errors out on return. River’s executor is pretty robust, so generally, this is sufficient to be assured that a new step gets set under almost all conditions, but there are cases where a step fails to be recorded, like in the event of a total process crash or forcible kill mid-step.
In cases where it’s critical that work isn’t repeated when a step is complete, River provides ResumableSetStepTx. Like its analog JobCompleteTx, it’s called in a transaction along with other work:
river.ResumableStep(ctx, "durable_step", nil, func(ctx context.Context) error { tx, err := w.dbPool.Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx)
// Do work within the transaction. if err := doWork(ctx, tx); err != nil { return err }
// Persist step completion as part of the same transaction. if _, err := river.ResumableSetStepTx[*riverpgxv5.Driver]( ctx, tx, job, ); err != nil { return err }
return tx.Commit(ctx)})If the work commits, the step commits with it. If a crash occurs, the transaction is either committed already, or nothing commits and the step is ready to be retried.
Continuations designed for Go
We modeled the design of resumable jobs heavily on Active::Job continuations in Rails, and at first assumed that there’s no way we’d be able to achieve an API comparable to what’s possible in DSL-friendly Ruby code, but it came together more nicely than expected. The result is concise, but explicit enough to be a good citizen of the Go ecosystem (or at least we think so).
Meanwhile, we hit pretty comprehensive functionality:
- Minimal syntax for easy cases with less demanding requirements.
- Arbitrary cursor support from simple scalars to complex cursors for multi-level loops.
- Transactional checkpointing of steps/cursors for maximum robustness in case of crash or killed process.
- Test helpers (not covered in this post, but see testing resumable workers).
Resumable jobs are available today in version 0.37.0 and we’d love for you to try them out. As usual, drop us a line if you run into problems or have other feedback. See also the resumable job docs.