Skip to content

Inserting and working jobs

River's most basic use involves defining structs for job arguments and worker, starting a River client to work them, and inserting jobs to be worked.


Job args and workers

Jobs are defined in struct pairs, with an implementation of JobArgs and one of Worker[T JobArgs].

Job args contain json annotations and define how jobs are serialized to and from the database, along with a "kind", a stable string that uniquely identifies the job.

type SortArgs struct {
    // Strings is a slice of strings to sort.
    Strings []string `json:"strings"`
}

func (SortArgs) Kind() string { return "sort" }

Workers expose a Work function that dictates how jobs run.

type SortWorker struct {
    // An embedded WorkerDefaults sets up default methods to fulfill the rest of
    // the Worker interface:
    river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    sort.Strings(job.Args.Strings)
    fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
    return nil
}

Other job args options

Job args can also implement JobArgsWithInsertOpts to provide default InsertOpts for the job at insertion time:

func (AlwaysHighPriorityArgs) InsertOpts() river.InsertOpts {
	return river.InsertOpts{
		Queue: "high_priority",
	}
}

See the CustomInsertOpts example for complete code.

Other worker options

The use of WorkerDefaults provides default implementations for most functions in the Worker interface, but the defaults can be overridden as appropriate.

Timeout lets a worker provide a timeout:

func (w *LongRunningWorker) Timeout(job *Job[MyJobArgs]) time.Duration {
    return 1 * time.Hour
}

NextRetry lets a worker provide a custom retry schedule:

// NextRetry always schedules the next retry for 10 seconds from now.
func (w *ConstantRetryTimeWorker) NextRetry(job *Job[MyJobArgs]) time.Time {
    return time.Now().Add(10*time.Second)
}

Registering workers

Jobs are uniquely identified by their "kind" string. Workers are registered on start up so that River knows how to assign jobs to workers:

workers := river.NewWorkers()
// AddWorker panics if the worker is already registered or invalid:
river.AddWorker(workers, &SortWorker{})

AddWorker panics in case of invalid configuration. Given its succinct syntax and that bad configuration should prevent a worker process from booting, panicking is probably a reasonable compromise for most applications. However, for those who find it distastely, AddWorkerSafely is also provided:

workers := river.NewWorkers()
if err := river.AddWorkerSafely(workers, &SortWorker{}); err != nil {
    panic("handle this error")
}

Starting a client

A River Client provides an interface for job insertion and manages job processing and maintenance services. A client's created with a database pool, driver, and config struct containing a Workers bundle and other settings. Here's a client Client working one queue ("default") with up to 100 worker goroutines at a time:

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    Queues: map[string]river.QueueConfig{
        river.QueueDefault: {MaxWorkers: 100},
    },
    Workers: workers,
})
if err != nil {
    panic(err)
}

// Run the client inline. All executed jobs will inherit from ctx:
if err := riverClient.Start(ctx); err != nil {
    panic(err)
}

Stopping

The client should also be stopped on program shutdown:

// Stop fetching new work and wait for active jobs to finish.
if err := riverClient.Stop(ctx); err != nil {
    panic(err)
}

There are some complexities around ensuring clients stop cleanly, but also in a timely manner. Read Graceful shutdown for more details on River's stop modes.

Inserting jobs

Client.InsertTx is used in conjunction with an instance of job args to insert a job to work on a transaction:

_, err = riverClient.InsertTx(ctx, tx, SortArgs{
    Strings: []string{
        "whale", "tiger", "bear",
    },
}, nil)
if err != nil {
    panic(err)
}

See the InsertAndWork example for complete code.

Client.Insert that doesn't take a transaction is also available, although as described in Transactional enqueuing, inserting jobs in transactions is usually more appropriate to avoid bugs.

_, err = riverClient.Insert(ctx, SortArgs{
    Strings: []string{
        "whale", "tiger", "bear",
    },
}, nil)
if err != nil {
    panic(err)
}

See also Batch job insertion.