River Pro sequences guarantee that a specific series of jobs will be executed in a one-at-a-time sequential order relative to other jobs in the same sequence. Sequences are partitioned based upon a "sequence key" that is computed from various job attributes such as its kind and args (or a subset of args).
Jobs across sequences may run in parallel. Unlike unique jobs, sequences allow an infinite number of jobs to be queued up in the sequence, even though only one job will be worked at a time.
Sequences are a feature of River Pro ✨. If you haven't yet, install River Pro and run the sequence
migration line.
Added in River Pro v0.5.0.
Basic usage
Sequences are enabled by implementing an optional SequenceOpts()
interface on your JobArgs
struct:
type MyJobArgs struct {
CustomerID string `json:"customer_id"`
}
func (MyJobArgs) Kind() string { return "my_job" }
func (MyJobArgs) SequenceOpts() riverpro.SequenceOpts {
// Use the default sequence partitioning based solely on the job kind.
return riverpro.SequenceOpts{}
}
The riverpro.SequenceOpts
struct configures the sequence partitioning for the job. By default, all jobs of the same kind will run in a sequence.
When a job is inserted, the sequence key is computed automatically based on the sequence options. Typically the returned SequenceOpts
should be the same for all jobs of a given kind; River will automatically compute the sequence key based upon those options.
Sequence migrations
Sequences require additional database schema changes added as part of the sequence
migration line.
Understanding sequences
Sequences allow jobs to be executed in a guaranteed one-at-a-time sequential order relative to other jobs in the same sequence. Consider the following example, where jobs are partitioned into sequences based on a customer_id
field:
Jobs within a sequence are always ordered by when they were inserted (id ASC
). However jobs across sequences may run in parallel. In the above example, this means that job 7
could begin executing before job 6
has even begun because they are in different sequences, whereas job 6
is waiting for job 3
to complete since they're in the same sequence.
Sequence options
Sequence options are configured with the riverpro.SequenceOpts
type. Using an empty/default SequenceOpts
struct will sequence the jobs based solely on the job kind, so that all jobs of that type will run in a single sequence.
This can be customized using the fields on the struct:
type SequenceOpts struct {
ByArgs bool
ByQueue bool
ContinueOnCancelled bool
ContinueOnDiscarded bool
ExcludeKind bool
}
Sequencing by arguments
Many use cases will require using some of the job's arguments, such as a customer or tenant ID, to allow for more granular sequencing. When enabled, ByArgs
utilizes all of the job's encoded arguments.
It's also possible to use a subset of the args by indicating on the JobArgs
struct which fields should be included in the sequence using struct tags:
type MyJobArgs struct {
CustomerID string `json:"customer_id" river:"sequence"`
TraceID string `json:"trace_id"`
}
func (MyJobArgs) Kind() string { return "my_job" }
func (MyJobArgs) SequenceOpts() riverpro.SequenceOpts {
return riverpro.SequenceOpts{ByArgs: true}
}
A sequence can also span across multiple job kinds by setting ExcludeKind
to true
. In the above example, if ExcludeKind
were set to true
, the sequence key would be computed based solely on the customer_id
field. If other job types were also sequenced in the same way, they would run in a single global sequence for that particular customer ID.
Halted sequences
By default, a sequence will halt if any job in the sequence is cancelled or discarded. This is useful for preserving the guarantee of sequential execution.
However, this isn't always desirable. This behavior can be customized with the ContinueOnCancelled
and ContinueOnDiscarded
options. When set to true
, the sequence will continue to be processed even if a job in the sequence is cancelled or discarded:
riverpro.SequenceOpts{
ContinueOnCancelled: true,
ContinueOnDiscarded: true,
}
Recovering halted sequences
When a sequence is halted due to a cancelled or discarded job, it can be recovered by retrying the job that caused the sequence to halt. This will resume the sequence from the halted job onward. Alternatively, you can manually retry a future job in the sequence to skip the halted job and continue processing.
Limitations
Sequences prevent concurrent execution under normal operation, but cannot do so with manual intervention including manually retrying previous completed
or upcoming pending
jobs in the sequence.
If there are no actively running jobs in a sequence, the first job in that sequence may encounter a higher latency before being moved to available
by the sequence maintenace process. This latency does not apply to subsequent jobs in the sequence if they are already enqueued when the previous job completes; such subsequent jobs will be scheduled immediately.
Sequences are not compatible with workflows. Attempting to insert a sequence-based job in a workflow will result in an error.