Test helpers

River includes test helpers in the river/rivertest package to verify that River jobs are being inserted as expected.

Testing job inserts

Job inserts are verified with the RequireInserted* family of helpers provided by river/rivertest. They're designed to be symmetrical with the Client.Insert functions, so there's variants to verify a single insert or many, and on a database pool or a transaction.

Check the insertion of a single job with RequireInsertedTx:

import (

type RequiredArgs struct {
    Message string `json:"message"`

func (RequiredArgs) Kind() string { return "required" }

func TestInsert(t *testing.T) {

    tx, err := dbPool.Begin(ctx)
    if err != nil {
        // handle error
    defer tx.Rollback(ctx)

    _, err = riverClient.InsertTx(ctx, tx, &RequiredArgs{ Message: "Hello."}, nil)
    if err != nil {
        // handle error

    job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
    fmt.Printf("Test passed with message: %s\n", job.Args.Message)

See the RequiredInserted example for complete code.

RequiredInsertedTx takes a testing.TB why is provided by the argument to any Go test or benchmark as its first parameter like t *testing.T. If the job was inserted, it returns the inserted job. If not, the test fails:

--- FAIL: TestRequireInsertedTx (0.00s)
    --- FAIL: TestRequireInsertedTx/FailsWithoutInsert (0.12s)
                River assertion failure:
                No jobs found with kind: required

Arguments assertions

The RequireInserted* functions use arguments sent to them like &RequiredArgs{} only to extract an expected job kind that's used to query for matching jobs. Any properties set in these job args are ignored. To check specific properties of an inserted job, assertions should be made against helper return values using built-in Go comparisons, or an assertion library of choice like testify/require.

import 	"github.com/stretchr/testify/require"


job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
require.Equal(t, "Hello.", job.Args.Message)

Job args properties are not compared

Job arg structs sent to RequireInserted* functions are only used to extract a job kind. Any properties on them are not checked for equality with inserted jobs. Separate assertions on return values are required.

Requiring options

RequireInsertedOptions can be used to assert various insertion options like maximum number of attempts, priority, queue, and scheduled time. RequireInsert* functions takes them as an optional last parameter:

_ = rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, &rivertest.RequireInsertedOpts{
    Priority: 1,
    Queue:    river.QueueDefault,

Requiring many

Similar to requiring a single insert, many insertions can be checked simultaneously for a nominal performance benefit:

jobs := rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
    {Args: &FirstRequiredArgs{}},
    {Args: &SecondRequiredArgs{}},
    {Args: &FirstRequiredArgs{}},
for i, job := range jobs {
    fmt.Printf("Job %d args: %s\n", i, string(job.EncodedArgs))

See the RequiredManyInserted example for complete code.

RequireManyInserted* functions take a mixed set of job args of different kinds, so unlike the the single job check, they return JobRow instead of Job[T], so arguments will have to be unmarshaled from job.EncodedArgs to be inspected.

The slice of jobs returned is ordered identically to the []rivertest.ExpectedJob input slice.

When checking many jobs at once, RequireManyInserted* expects all jobs of any included kinds to be included in the expectation list. The snippet above passes if exactly two FirstRequiredArgs and one SecondRequiredArgs were inserted, but if a third FirstRequiredArgs was inserted in addition to the first two, it'd fail.

Requiring on a pool

The examples above show requiring insertions on a transaction, but non-Tx variants are provided to check against a database pool instead:

_ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil)

Because a driver is passed to the function directly, the [*riverpgx5.Driver] type parameter can be omitted as Go can infer it.

Test transactions

Sophisticated users may want to make use of test transactions in their tests which are rolled back after each test case, insulating tests running in parallel from each other, and avoiding tainting global state. In such cases, it may not be convenient to inject a full database pool to River's client. To support test transactions, River clients can be initialized without database pool by passing nil to their driver:

// a nil database pool is sent to the driver
riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{})

The initialized client is more limited, supporting only inserts on transactions with InsertTx and InsertManyTx. Calls to the non-transactional variants Insert and InsertMany, or trying to start the client with Start, will fail.


River defaults to producing informational logging, and some logging may be emitted while tests run. Although not hugely harmful, logging output won't be indented to match other test output, and in the presence of t.Parallel() will be interleaved so as to become unusable for debugging.

To avoid these problems, we recommend bridging slog and testing using a package like slogt, which will send River's log output to t.Log so it can be cleanly collated by Go's test framework:

import "github.com/neilotoole/slogt"

func TestRiverInsertions(t *testing.T) {
  riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
      Logger: slogt.New(t),

Testing workers

Workers in River are plain old Go structs, and no special testing utilities are provided for them. They're testable by initializing a worker, invoking its Work function, and checking against the results:

type SortArgs struct {
    // Strings is a slice of strings to sort.
    Strings []string `json:"strings"`

func (SortArgs) Kind() string { return "sort" }

type SortWorker struct {

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
    return nil

func TestSortWorker(t *testing.T) {
    err := (&SortWorker{}).Run(ctx, &river.Job{Args: JobArgs{
        Strings: []string{
          "whale", "tiger", "bear",
    if err != nil {
        // handle error