Skip to content

Maintenance services

River includes a number of auxiliary services for features and queue maintenance. These perform functions like cleaning cancelled, completed, and discarded jobs from the database, periodically rebuilding indexes to optimize performance, and rescuing stuck jobs. One client at a time runs maintenance services, determined by leader election.


Cleaner

River leaves cancelled, completed, and discarded jobs in the database even though they're at the end of their lifecycle so that operators can introspect them, but if left to accumulate forever, they'd eventually grow the jobs table to a point where its size would consume excessive storage and impact performance.

To prevent the jobs table growing without bound, River includes a job cleaner process that periodically prunes old jobs. It wakes up periodically and deletes completed, cancelled, and permanently failed (discarded) jobs if their last attempt was beyond the retention period.

The default retention periods vary by job state, and each is configurable in Config:

  • Cancelled: CancelledJobRetentionPeriod, defaults to 24 hours.
  • Completed: CompletedJobRetentionPeriod, defaults to 24 hours.
  • Discarded: DiscardedJobRetentionPeriod, defaults to 7 days.

Workflow-aware retention

River Pro workflows have auxiliary data in addition to their task jobs, including workflow rows, attempts, signals, timers, and evaluator worklist rows. By default, workflow task jobs still use the normal per-job retention behavior described above, including per-queue retention and dead letter queue movement for discarded jobs. After all jobs and dead letter rows for a finalized workflow are gone, the Pro cleaner removes the remaining workflow metadata.

For installations that want workflow history to remain together, enable riverpro.Config.WorkflowAwareRetention. With workflow-aware retention enabled, workflow task jobs are excluded from normal per-job cleanup, and discarded workflow tasks are not moved to the dead letter queue before the workflow is cleaned. Finalized workflows are deleted as a unit:

  • closed workflows use WorkflowClosedRetentionPeriod, which defaults to 24 hours.
  • cancelled workflows use WorkflowCancelledRetentionPeriod, which defaults to 24 hours.
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
},
WorkflowAwareRetention: true,
WorkflowClosedRetentionPeriod: 30 * 24 * time.Hour,
WorkflowCancelledRetentionPeriod: 7 * 24 * time.Hour,
})
if err != nil {
return err
}

Set either workflow retention period to -1 to preserve workflows in that state indefinitely. Setting both periods to -1 keeps finalized workflow data indefinitely, including task jobs, dead letter jobs, attempts, timers, signals, and wait-condition evidence.

Workflow-aware retention changes cleanup semantics

When workflow-aware retention is enabled, workflow task jobs no longer follow their individual finalized timestamps, per-queue retention periods, or dead letter movement. Use it when workflow-level auditability is more important than cleaning each task according to normal job retention.


Periodic enqueuer

The periodic enqueuer schedules periodic jobs. On start, it calculates the next run time of every configured PeriodicJob, then runs in a loop:

  • Sleeps until the next run time of the first job that'll come due.
  • Runs the job and any others that'll come due within a small margin of now.
  • Recalculates the next run time of all jobs that ran.
  • Goes back to sleep and repeats the cycle.

The periodic enqueuer has no configuration aside from its assigned periodic jobs.


Queue cleaner

The river_queue table is used to track active queues and powers features such as pause and resume. To avoid this table remaining bloated with queues that are no longer in use, a queue cleaner maintenance process is responsible for periodically deleting the records of any queues which have not been used within the past 24 hours (based on their updated_at timestamp).

The queue cleaner is not currently configurable.


Reindexer

The reindexer works periodically to issue a REINDEX INDEX CONCURRENTLY to rebuild certain key job indexes. In most situations reindexing isn't expected to improve performance, but it can help in some degenerate cases like where a glut of jobs had at one point bloated the B-tree index and subsequently left it with many empty or nearly empty pages. In such situations Postgres' indexes will never "collapse" of their own accord, but a REINDEX to rebuild them from scratch produces a new index with the live rows and without the empty space.

The reindexer rebuilds one index at a time in order to not put an undue amount of stress on the database.

By default the reindexer runs every day at midnight UTC, but it can be customized through Config.ReindexerSchedule with a custom scheduling function. Like with periodic jobs, a cron package can be used to succinctly define a complex schedule. It can be disabled altogether using river.NeverSchedule() as its schedule.

The reindexer is currently hardcoded to only reindex the GIN indexes river_job_args_index and river_job_metadata_index, as these are more prone to bloat than B-tree indexes.


Rescuer

The rescuer looks for "stuck" jobs and either enqueues them to be reworked, or discards them if they've hit their maximum allowed attempts. A job may become stuck in situations like:

  • A hardware crash causes the process to terminate before a job finished work or before it could be completed.
  • A bug. Think of a job that waits on a channel to which nothing will ever send to, and which isn't using a select to respect context cancellation. The job waits for something that will never happen. The client will eventually try to cancel it according to its Config.JobTimeout configuration, but because the job can't be cancelled, nothing happens, and it'll only end once its parent process is terminated. It's important to design jobs to be cancellable to avoid this problem.
  • After the job ran, there was a problem persisting its new state to the database. This problem should be rare, and can be avoided completely with the use of transactional job completion.

The duration after which a job is considered stuck and eligible for rescue can be configured with Config.RescueStuckJobsAfter. Its value:

  • Defaults to one hour, or JobTimeout plus one hour in case JobTimeout has been configured to be larger than one hour.
  • Must be greater than JobTimeout if both it and JobTimeout are configured.

Jobs that have overridden the default timeout by implementing Timeout() on their worker will be given at least that timeout duration before they're considered stuck and eligible for rescue.

The rescuer bounds job duration

Config.RescueStuckJobsAfter is effectively an upper bound on how long jobs are allowed to run, because jobs that are still running after this duration will be rescheduled to run again, potentially alongside an existing execution attempt for the same job.


Scheduler

Jobs can be scheduled to run in the future for several reasons:

  • At insertion time, a ScheduledAt time was specified in the job's InsertOpts.
  • A worker may have snoozed the job to run again in the future.
  • The job may have errored on a previous execution and needs to be retried after a backoff duration.

The scheduler executes at a constant interval. Each time it runs, it queries for jobs that are ready to be attempted again and makes them available. The scheduler runs every 5 seconds and is not configurable.