Features
Unique jobs
Jobs can be made unique, such that River guarantees that only one exists for a given set of properties. Jobs can be made unique by args, period, queue, and state.
Unique properties
It's occasionally useful to ensure that background work is only performed once for a given set of conditions. For example, there might be a job that does a daily reconcilation of a user's account, but performs heavy lifting across many accounts, so ideally it only runs once a day per account to save on worker resources. River can guarantee job uniqueness along dimensions based on a combination of job properties like arguments and insert period.
Jobs configure unique properties by implementing JobArgsWithInsertOpts
and populating UniqueOpts
as part of the InsertOpts
returned, or by adding UniqueOpts
at insertion time with Client.Insert
or InsertTx
.
type ReconcileAccountArgs struct {
AccountID int `json:"account_id"`
}
func (ReconcileAccountArgs) Kind() string { return "reconcile_account" }
// InsertOpts returns custom insert options that every job of this type will
// inherit, including unique options.
func (ReconcileAccountArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{
UniqueOpts: river.UniqueOpts{
ByArgs: true,
ByPeriod: 24 * time.Hour,
},
}
}
...
// First job insertion for account 1.
_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil)
if err != nil {
panic(err)
}
// Job is inserted a second time, but it doesn't matter because its unique
// args cause the insertion to be skipped because it's meant to only run
// once per account per 24 hour period.
_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil)
if err != nil {
panic(err)
}
// Because the job is unique ByArgs, another job for account 2 _is_ allowed.
_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 2}, nil)
if err != nil {
panic(err)
}
See the UniqueJob
example for complete code.
UniqueOpts
provides options like ByArgs
and ByPeriod
that specify the dimensions along which jobs should be considered unique. Each one specified increases the specificity of the unique bound, thereby relaxing the uniqueness of the job (so more options means less uniqueness and more distinct jobs allowed). The job's kind is always taken in account to determine uniqueness, and an empty UniqueOpts
struct implies no uniqueness. So for example:
A job with kind
reconcile_account
andUniqueOpts{ByPeriod: 24 * time.Hour}
means that only onereconcile_account
can exist for this 24 hour period.A job with kind
reconcile_account
andUniqueOpts{ByPeriod: 24 * time.Hour, ByArgs: true}
means that onereconcile_account
can exist per set of encoded job args and this 24 hour period.So, a
reconcile_account
with args{"account_id":1}
can coexist alongside{"account_id":2}
.A job with kind
reconcile_account
andUniqueOpts{}
means that no uniqueness is enforced.
Unique by args
ByArgs
(taking a boolean) indicates that uniqueness should be enforced by kind and encoded JSON job args. Given args like:
type ReconcileAccountArgs struct {
AccountID int `json:"account_id"`
}
The struct ReconcileAccountArgs{AccountID: 1}
(encoding to {"account_id":1}
) is allowed to coexist with ReconcileAccountArgs{AccountID: 2}
(encoding to {"account_id":2}
), but if another ReconcileAccountArgs{AccountID: 1}
was inserted, it'd be skipped on grounds of uniqueness.
Unique by period
ByPeriod
(taking a duration) indicates that uniqueness should be enforced by kind and period. On insertion, the current time is rounded down to the nearest multiple of the given period, and a job is only inserted if there isn't already an existing job that'd run between this lower bound and the next multiple of the period.
For example, if a job is inserted with UniqueOpts{ByPeriod: 15 * time.Minute}
and the current time is 15:21:00, it'll be unique for the interval of 15:15:00 to 15:30:00. A new job inserted at 15:28:00 will be skipped on grounds of uniquess, but one inserted at 15:31:00 would be allowed.
ByPeriod
is the most commonly used unique property, and other properties are most likely to be specified along with it, rather than be configured by themselves.
Unique by queue
ByQueue
(taking a boolean) indicates that uniqueness should be enforced by kind and queue.
For example, if a job with kind reconcile_account
is inserted into queue default
, a new insertion of reconcile_account
would be skipped on grounds of uniqueness, but a reconcile_account
inserted to queue high_priority
would be allowed.
Unique by state
ByState
(taking a slice of JobState
) indicates that uniqueness should be enforced by kind and job state. This is the only unique property that inherits a default if not explicitly assigned, which is all job properties with the exception of JobStateCancelled
and JobStateDiscarded
:
[]rivertype.JobState{
rivertype.JobStateAvailable,
rivertype.JobStateCompleted,
rivertype.JobStateRunning,
rivertype.JobStateRetryable,
rivertype.JobStateScheduled,
}
This default is usually the right setting for most unique jobs, but a custom value might be useful in tweaking behavior. For example, removing JobStateCompleted
from the set above would mean that uniqueness would be enforced within active job states (i.e. being run or available to be run), so that each time a job with this kind completes, a new one is allowed to be enqueued.
Job retention horizons
When thinking about job state, remember that completed jobs aren't retained permanently. The default retention time for completed jobs is 24 hours, so with a default ByState
, even with no unique period set a new job would be allowed to be inserted every 24 hours as the previous completed job is pruned.
Checking for skipped inserts
Insert functions return JobInsertResult
, containing a UniqueSkippedAsDuplicate
property that's set to true if an insert was skipped due to uniqueness:
insertRes, err := riverClient.Insert(ctx, SortArgs{...}, nil)
insertRes.UniqueSkippedAsDuplicate // true if job was skipped
JobInsertResult.Job
contains a newly inserted job row, or the preexisting one with matching unique conditions if insertion was skipped.
At least once
While River can ensure that a unique job is only inserted once, it can't guarantee that it will be worked exactly once. A unique job could work successfully, but fail to have its completed status persisted to the database, which would require that it be worked again for River to be sure it went through. Like other jobs, River provides an at-least-once guarantee for unique jobs.
Unique jobs execute at least once
Although unique jobs ensure that a given job will only be inserted once for the chosen properties, those jobs can still execute more than once due to River's at-least-once execution design.
See Reliable workers for more information.
Advisory locks
Job uniqueness is checked with the use of Postgres advisory locks, which allow clients running in parallel to take a global lock before checking for the presence of a job. Because each unique job being checked needs to take a separate lock, trying to insert too many at once could lead to deadlocks between sessions, which is why job uniqueness is not supported on batch insertion.
Setting an advisory lock prefix
Advisory locks in Postgres share the same 64-bit number space across the entire database, and although unlikely, it's possible for two sessions both using locks but performing unrelated work to hash to the same lock value and conflict accidentally.
For users who'd like to guarantee that advisory locks taken by River never conflict with those taken by their own application, Config.AdvisoryLockPrefix
can be set to a 32-bit prefix that River will use for all its locks. Keep in mind that against intuition, setting this value might increase the likelihood of lock collisions because River (and other database users operating within their own prefix) only have 32 bits of space left over to hash inside.