Skip to content

Sequences

River Pro sequences guarantee that a specific series of jobs will be executed in a one-at-a-time sequential order relative to other jobs in the same sequence. Sequences are partitioned based upon a "sequence key" that is computed from various job attributes such as its kind and args (or a subset of args).

Jobs across sequences may run in parallel. Unlike unique jobs, sequences allow an infinite number of jobs to be queued up in the sequence, even though only one job will be worked at a time.

Sequences are a feature of River Pro ✨. If you haven't yet, install River Pro and run the sequence migration line.

Added in River Pro v0.5.0.


Basic usage

Sequences are enabled by implementing an optional SequenceOpts() interface on your JobArgs struct:

type MyJobArgs struct {
    CustomerID string `json:"customer_id"`
}

func (MyJobArgs) Kind() string { return "my_job" }

func (MyJobArgs) SequenceOpts() riverpro.SequenceOpts {
    // Use the default sequence partitioning based solely on the job kind.
    return riverpro.SequenceOpts{}
}

The riverpro.SequenceOpts struct configures the sequence partitioning for the job. By default, all jobs of the same kind will run in a sequence.

When a job is inserted, the sequence key is computed automatically based on the sequence options. Typically the returned SequenceOpts should be the same for all jobs of a given kind; River will automatically compute the sequence key based upon those options.

Sequence migrations

Sequences require additional database schema changes added as part of the sequence migration line.

Understanding sequences

Sequences allow jobs to be executed in a guaranteed one-at-a-time sequential order relative to other jobs in the same sequence. Consider the following example, where jobs are partitioned into sequences based on a customer_id field:

Sequences diagram

Jobs within a sequence are always ordered by when they were inserted (id ASC). However jobs across sequences may run in parallel. In the above example, this means that job 7 could begin executing before job 6 has even begun because they are in different sequences, whereas job 6 is waiting for job 3 to complete since they're in the same sequence.

Sequence options

Sequence options are configured with the riverpro.SequenceOpts type. Using an empty/default SequenceOpts struct will sequence the jobs based solely on the job kind, so that all jobs of that type will run in a single sequence.

This can be customized using the fields on the struct:

type SequenceOpts struct {
    ByArgs bool
    ByQueue bool
    ContinueOnCancelled bool
    ContinueOnDiscarded bool
    ExcludeKind bool
}

Sequencing by arguments

Many use cases will require using some of the job's arguments, such as a customer or tenant ID, to allow for more granular sequencing. When enabled, ByArgs utilizes all of the job's encoded arguments.

It's also possible to use a subset of the args by indicating on the JobArgs struct which fields should be included in the sequence using struct tags:

type MyJobArgs struct {
    CustomerID string `json:"customer_id" river:"sequence"`
    TraceID string `json:"trace_id"`
}

func (MyJobArgs) Kind() string { return "my_job" }

func (MyJobArgs) SequenceOpts() riverpro.SequenceOpts {
    return riverpro.SequenceOpts{ByArgs: true}
}

A sequence can also span across multiple job kinds by setting ExcludeKind to true. In the above example, if ExcludeKind were set to true, the sequence key would be computed based solely on the customer_id field. If other job types were also sequenced in the same way, they would run in a single global sequence for that particular customer ID.

Halted sequences

By default, a sequence will halt if any job in the sequence is cancelled or discarded. This is useful for preserving the guarantee of sequential execution.

However, this isn't always desirable. This behavior can be customized with the ContinueOnCancelled and ContinueOnDiscarded options. When set to true, the sequence will continue to be processed even if a job in the sequence is cancelled or discarded:

riverpro.SequenceOpts{
    ContinueOnCancelled: true,
    ContinueOnDiscarded: true,
}

Recovering halted sequences

When a sequence is halted due to a cancelled or discarded job, it can be recovered by retrying the job that caused the sequence to halt. This will resume the sequence from the halted job onward. Alternatively, you can manually retry a future job in the sequence to skip the halted job and continue processing.

Limitations

Sequences prevent concurrent execution under normal operation, but cannot do so with manual intervention including manually retrying previous completed or upcoming pending jobs in the sequence.

If there are no actively running jobs in a sequence, the first job in that sequence may encounter a higher latency before being moved to available by the sequence maintenace process. This latency does not apply to subsequent jobs in the sequence if they are already enqueued when the previous job completes; such subsequent jobs will be scheduled immediately.

Sequences are not compatible with workflows. Attempting to insert a sequence-based job in a workflow will result in an error.