Features

Periodic and cron jobs

Periodic jobs make it easy to enqueue a job regularly on a fixed interval or even a complex cron schedule.


Basic usage

Periodic jobs are configured as part of the Client. Each job must be initialized with NewPeriodicJob and added to a slice of []*river.PeriodicJob. Each job is defined with a schedule, a constructor, and options.

The following code configures an empty MyPeriodicJobArgs{} job to be inserted every 15 minutes:

periodicJobs := []*river.PeriodicJob{
    river.NewPeriodicJob(
        river.PeriodicInterval(15*time.Minute),
        func() (river.JobArgs, *river.InsertOpts) {
            return MyPeriodicJobArgs{}, nil
        },
        &river.PeriodicJobOpts{RunOnStart: true},
    ),
}

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    PeriodicJobs: periodicJobs,
    // ...
})

See the PeriodicJob example for complete code.

Note the use of the RunOnStart: true option. This option causes the job to be inserted immediately anytime a new leader is elected.

Complex cron schedules

For schedules that are more complex or which require precise control over runtimes, we recommend the robfig/cron package:

go get github.com/robfig/cron/v3

The cron package's Schedule interface is the same as River's PeriodicSchedule, meaning that you can use any schedule generated by that package directly in River.

cron.ParseStandard("0 * * * *") // every hour on the hour
cron.ParseStandard("30 * * * *") // every hour on the half hour
cron.ParseStandard("*/15 * * * *") // every 15th minute of every hour, i.e. :00, :15, :30, :45
cron.ParseStandard("@midnight") // midnight every night, UTC
cron.ParseStandard("CRON_TZ=America/Chicago @midnight") // midnight every night, in Chicago

Here's an example that runs MyPeriodicJobArgs{} every 15th minute:

schedule, err := cron.ParseStandard("*/15 * * * *")
if err != nil {
    panic("invalid cron schedule")
}

periodicJobs := []*river.PeriodicJob{
    river.NewPeriodicJob(
        schedule,
        func() (river.JobArgs, *river.InsertOpts) {
            return MyPeriodicJobArgs{}, nil
        },
        nil,
    ),
}

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    PeriodicJobs: periodicJobs,
    // ...
})

See the CronJob example for complete code.

Adding periodic jobs after client start

River supports adding or removing periodic jobs after client start through the use of Client.PeriodicJobs. For example:

riverClient, err := river.NewClient(...)
if err != nil {
    panic(err)
}

if err := riverClient.Start(ctx); err != nil {
    panic(err)
}

// Add a periodic job after client has already started.
periodicJobHandle := riverClient.PeriodicJobs().Add(
    river.NewPeriodicJob(
        river.PeriodicInterval(15*time.Minute),
        func() (river.JobArgs, *river.InsertOpts) {
            return MyPeriodicJobArgs{}, nil
        },
        nil,
    ),
)

Adding or removing a job interrupts a running periodic enqueuer service's wait loop, causing it to immediately insert a new job if RunOnStart was enabled, and scheduling its first run as appropriate.

Adding or removing periodic jobs has no effect unless the client is a cluster's elected leader, so to guarantee an operation has its desired effect, periodic jobs should be added and removed to or from all River clients across all running processes.

Removing jobs with handle

Adding a periodic job returns a periodic job "handle", which can later be used to remove the job if necessary:

// Remove a periodic job using handle return by Client.PeriodicJobs.Add.
riverClient.PeriodicJobs().Remove(periodicJobHandle)

Details and caveats

Periodic jobs use the leader election system to ensure that only one worker is managing periodic jobs at any time (in a given database and schema). Periodic jobs are stateless, meaning there is no coordinated or persisted state between workers as far as when the next jobs will run. After a leader election (such as when the previous leader terminates), the new leader will evaluate all the schedules it knows about starting at the current time. It will also consider the RunOnStart option.

With this architecture, there is a possibility that periodic jobs will sometimes be skipped. River's leader election is fast, but for a job that only runs at midnight every night, it's possible that the current leader could be shut down at 11:59:59.99 and the new leader may not take over until 12:00:00.05.

Fortunately, many of these concerns can be addressed by combining periodic jobs with unique jobs and the RunOnStart option. For example, a job which is configured to be unique at the hourly level will only enqueue once in that hour no matter how many times it's attempted. By using this with the RunOnStart option, the above scenario would no longer result in a skipped job.

Previous
Pausing queues