Features

Unique jobs

Jobs can be made unique, such that River guarantees that only one exists for a given set of properties. Jobs can be made unique by args, period, queue, and state.


Unique properties

It's occasionally useful to ensure that background work is only performed once for a given set of conditions. For example, there might be a job that does a daily reconcilation of a user's account, but performs heavy lifting across many accounts, so ideally it only runs once a day per account to save on worker resources. River can guarantee job uniqueness along dimensions based on a combination of job properties like arguments and insert period.

Jobs configure unique properties by implementing JobArgsWithInsertOpts and populating UniqueOpts as part of the InsertOpts returned, or by adding UniqueOpts at insertion time with Client.Insert or InsertTx.

type ReconcileAccountArgs struct {
    AccountID int `json:"account_id"`
}

func (ReconcileAccountArgs) Kind() string { return "reconcile_account" }

// InsertOpts returns custom insert options that every job of this type will
// inherit, including unique options.
func (ReconcileAccountArgs) InsertOpts() river.InsertOpts {
    return river.InsertOpts{
        UniqueOpts: river.UniqueOpts{
            ByArgs:   true,
            ByPeriod: 24 * time.Hour,
        },
    }
}

...

// First job insertion for account 1.
_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil)
if err != nil {
    panic(err)
}

// Job is inserted a second time, but it doesn't matter because its unique
// args cause the insertion to be skipped because it's meant to only run
// once per account per 24 hour period.
_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil)
if err != nil {
    panic(err)
}

// Because the job is unique ByArgs, another job for account 2 _is_ allowed.
_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 2}, nil)
if err != nil {
    panic(err)
}

See the UniqueJob example for complete code.

UniqueOpts provides options like ByArgs and ByPeriod that specify the dimensions along which jobs should be considered unique. Each one specified increases the specificity of the unique bound, thereby relaxing the uniqueness of the job (so more options means less uniqueness and more distinct jobs allowed). The job's kind is always taken in account to determine uniqueness, and an empty UniqueOpts struct implies no uniqueness. So for example:

  • A job with kind reconcile_account and UniqueOpts{ByPeriod: 24 * time.Hour} means that only one reconcile_account can exist for this 24 hour period.

  • A job with kind reconcile_account and UniqueOpts{ByPeriod: 24 * time.Hour, ByArgs: true} means that one reconcile_account can exist per set of encoded job args and this 24 hour period.

    So, a reconcile_account with args {"account_id":1} can coexist alongside {"account_id":2}.

  • A job with kind reconcile_account and UniqueOpts{} means that no uniqueness is enforced.

Unique by args

ByArgs (taking a boolean) indicates that uniqueness should be enforced by kind and encoded JSON job args. Given args like:

type ReconcileAccountArgs struct {
    AccountID int `json:"account_id"`
}

The struct ReconcileAccountArgs{AccountID: 1} (encoding to {"account_id":1}) is allowed to coexist with ReconcileAccountArgs{AccountID: 2} (encoding to {"account_id":2}), but if another ReconcileAccountArgs{AccountID: 1} was inserted, it'd be skipped on grounds of uniqueness.

Unique by period

ByPeriod (taking a duration) indicates that uniqueness should be enforced by kind and period. On insertion, the current time is rounded down to the nearest multiple of the given period, and a job is only inserted if there isn't already an existing job that'd run between this lower bound and the next multiple of the period.

For example, if a job is inserted with UniqueOpts{ByPeriod: 15 * time.Minute} and the current time is 15:21:00, it'll be unique for the interval of 15:15:00 to 15:30:00. A new job inserted at 15:28:00 will be skipped on grounds of uniquess, but one inserted at 15:31:00 would be allowed.

ByPeriod is the most commonly used unique property, and other properties are most likely to be specified along with it, rather than be configured by themselves.

Unique by queue

ByQueue (taking a boolean) indicates that uniqueness should be enforced by kind and queue.

For example, if a job with kind reconcile_account is inserted into queue default, a new insertion of reconcile_account would be skipped on grounds of uniqueness, but a reconcile_account inserted to queue high_priority would be allowed.

Unique by state

ByState (taking a slice of JobState) indicates that uniqueness should be enforced by kind and job state. This is the only unique property that inherits a default if not explicitly assigned, which is all job properties with the exception of JobStateCancelled and JobStateDiscarded:

[]rivertype.JobState{
    rivertype.JobStateAvailable,
    rivertype.JobStateCompleted,
    rivertype.JobStateRunning,
    rivertype.JobStateRetryable,
    rivertype.JobStateScheduled,
}

This default is usually the right setting for most unique jobs, but a custom value might be useful in tweaking behavior. For example, removing JobStateCompleted from the set above would mean that uniqueness would be enforced within active job states (i.e. being run or available to be run), so that each time a job with this kind completes, a new one is allowed to be enqueued.

Job retention horizons

When thinking about job state, remember that completed jobs aren't retained permanently. The default retention time for completed jobs is 24 hours, so with a default ByState, even with no unique period set a new job would be allowed to be inserted every 24 hours as the previous completed job is pruned.

Checking for skipped inserts

Insert functions return JobInsertResult, containing a UniqueSkippedAsDuplicate property that's set to true if an insert was skipped due to uniqueness:

insertRes, err := riverClient.Insert(ctx, SortArgs{...}, nil)
insertRes.UniqueSkippedAsDuplicate // true if job was skipped

JobInsertResult.Job contains a newly inserted job row, or the preexisting one with matching unique conditions if insertion was skipped.

At least once

While River can ensure that a unique job is only inserted once, it can't guarantee that it will be worked exactly once. A unique job could work successfully, but fail to have its completed status persisted to the database, which would require that it be worked again for River to be sure it went through. Like other jobs, River provides an at-least-once guarantee for unique jobs.

Unique jobs execute at least once

Although unique jobs ensure that a given job will only be inserted once for the chosen properties, those jobs can still execute more than once due to River's at-least-once execution design.

See Reliable workers for more information.

Advisory locks

Job uniqueness is checked with the use of Postgres advisory locks, which allow clients running in parallel to take a global lock before checking for the presence of a job. Because each unique job being checked needs to take a separate lock, trying to insert too many at once could lead to deadlocks between sessions, which is why job uniqueness is not supported on batch insertion.

Setting an advisory lock prefix

Advisory locks in Postgres share the same 64-bit number space across the entire database, and although unlikely, it's possible for two sessions both using locks but performing unrelated work to hash to the same lock value and conflict accidentally.

For users who'd like to guarantee that advisory locks taken by River never conflict with those taken by their own application, Config.AdvisoryLockPrefix can be set to a 32-bit prefix that River will use for all its locks. Keep in mind that against intuition, setting this value might increase the likelihood of lock collisions because River (and other database users operating within their own prefix) only have 32 bits of space left over to hash inside.

Previous
Transactional job completion