Skip to content

Test helpers

River includes test helpers in the river/rivertest package to verify that River jobs are being inserted as expected.


Testing job inserts

Job inserts are verified with the RequireInserted* family of helpers provided by river/rivertest. They're designed to be symmetrical with the Client.Insert functions, so there's variants to verify a single insert or many, and on a database pool or a transaction.

Check the insertion of a single job with RequireInsertedTx:

import (
    "github.com/riverqueue/river"
    "github.com/riverqueue/river/riverdriver/riverpgxv5"
    "github.com/riverqueue/river/rivertest"
)

type RequiredArgs struct {
    Message string `json:"message"`
}

func (RequiredArgs) Kind() string { return "required" }

func TestInsert(t *testing.T) {
    ...

    tx, err := dbPool.Begin(ctx)
    if err != nil {
        // handle error
    }
    defer tx.Rollback(ctx)

    _, err = riverClient.InsertTx(ctx, tx, &RequiredArgs{ Message: "Hello."}, nil)
    if err != nil {
        // handle error
    }

    job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
    fmt.Printf("Test passed with message: %s\n", job.Args.Message)
}

See the RequiredInserted example for complete code.

RequiredInsertedTx takes a testing.TB why is provided by the argument to any Go test or benchmark as its first parameter like t *testing.T. If the job was inserted, it returns the inserted job. If not, the test fails:

--- FAIL: TestRequireInsertedTx (0.00s)
    --- FAIL: TestRequireInsertedTx/FailsWithoutInsert (0.12s)
        rivertest.go:352:
                River assertion failure:
                No jobs found with kind: required

Arguments assertions

The RequireInserted* functions use arguments sent to them like &RequiredArgs{} only to extract an expected job kind that's used to query for matching jobs. Any properties set in these job args are ignored. To check specific properties of an inserted job, assertions should be made against helper return values using built-in Go comparisons, or an assertion library of choice like testify/require.

import 	"github.com/stretchr/testify/require"

...

job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
require.Equal(t, "Hello.", job.Args.Message)

Job args properties are not compared

Job arg structs sent to RequireInserted* functions are only used to extract a job kind. Any properties on them are not checked for equality with inserted jobs. Separate assertions on return values are required.

Requiring options

RequireInsertedOptions can be used to assert various insertion options like maximum number of attempts, priority, queue, and scheduled time. RequireInsert* functions takes them as an optional last parameter:

_ = rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, &rivertest.RequireInsertedOpts{
    Priority: 1,
    Queue:    river.QueueDefault,
})

Requiring many

Similar to requiring a single insert, many insertions can be checked simultaneously for a nominal performance benefit:

jobs := rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
    {Args: &FirstRequiredArgs{}},
    {Args: &SecondRequiredArgs{}},
    {Args: &FirstRequiredArgs{}},
})
for i, job := range jobs {
    fmt.Printf("Job %d args: %s\n", i, string(job.EncodedArgs))
}

See the RequiredManyInserted example for complete code.

RequireManyInserted* functions take a mixed set of job args of different kinds, so unlike the the single job check, they return JobRow instead of Job[T], so arguments will have to be unmarshaled from job.EncodedArgs to be inspected.

The slice of jobs returned is ordered identically to the []rivertest.ExpectedJob input slice.

When checking many jobs at once, RequireManyInserted* expects all jobs of any included kinds to be included in the expectation list. The snippet above passes if exactly two FirstRequiredArgs and one SecondRequiredArgs were inserted, but if a third FirstRequiredArgs was inserted in addition to the first two, it'd fail.

Requiring on a pool

The examples above show requiring insertions on a transaction, but non-Tx variants are provided to check against a database pool instead:

_ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil)

Because a driver is passed to the function directly, the [*riverpgx5.Driver] type parameter can be omitted as Go can infer it.

Test transactions

Sophisticated users may want to make use of test transactions in their tests which are rolled back after each test case, insulating tests running in parallel from each other, and avoiding tainting global state. In such cases, it may not be convenient to inject a full database pool to River's client. To support test transactions, River clients can be initialized without database pool by passing nil to their driver:

// a nil database pool is sent to the driver
riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{})

The initialized client is more limited, supporting only inserts on transactions with InsertTx and InsertManyTx. Calls to the non-transactional variants Insert and InsertMany, or trying to start the client with Start, will fail.

Logging

River defaults to producing informational logging, and some logging may be emitted while tests run. Although not hugely harmful, logging output won't be indented to match other test output, and in the presence of t.Parallel() will be interleaved so as to become unusable for debugging.

To avoid these problems, we recommend bridging slog and testing using a package like slogt, which will send River's log output to t.Log so it can be cleanly collated by Go's test framework:

import "github.com/neilotoole/slogt"

func TestRiverInsertions(t *testing.T) {
  riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
      Logger: slogt.New(t),
      ...
  })
  ...
}

Testing workers

There are two main methods for testing workers:

  1. Directly invoking its Work function.
  2. Using the rivertest.Worker helpers to simulate real worker execution.

Directly testing a worker

Workers in River are plain old Go structs and can often be tested by directly invoking the Work function. They're testable by initializing a worker, invoking its Work function, and checking against the results:

type SortArgs struct {
    // Strings is a slice of strings to sort.
    Strings []string `json:"strings"`
}

func (SortArgs) Kind() string { return "sort" }

type SortWorker struct {
    river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    sort.Strings(job.Args.Strings)
    fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
    return nil
}

func TestSortWorker(t *testing.T) {
    err := (&SortWorker{}).Run(ctx, &river.Job{Args: JobArgs{
        Strings: []string{
          "whale", "tiger", "bear",
        },
    }
    if err != nil {
        // handle error
    }
}

Using the rivertest.Worker helpers

Some River features depend on the worker being run in a real worker context, or which depend on the database, and can't be fully tested in isolation. Examples include river.JobCompleteTx and recorded output. To comprehensively test workers, River provides powerful test helpers to exercise workers using a real work context and River's internal execution logic.

These features are exposed through the rivertest.Worker type, which must be initialized for the specific worker type being tested:

worker := &MyWorker{}
testWorker := rivertest.NewWorker(t, driver, config, worker)

The testWorker can then be used to execute as many jobs as desired:

result, err := testWorker.Work(ctx, t, tx, MyJobArgs{CustomerID: 123}, nil)
require.NoError(t, err)
require.Equal(t, river.EventKindJobCompleted, result.EventKind)
require.Equal(t, rivertype.JobStateCompleted, result.Job.State)

The Work function inserts a real job within the provided transaction, executes it, and records its result in the same transaction. The error return value includes any "real" errors the worker returned (excluding intentional snooze and cancel errors), as well as any recovered panics. The WorkResult struct includes the overall execution result (completed, failed, etc) as well as the final job row recorded in the database following execution.

Transactions are not automatically rolled back

The Work function does not automatically roll back the transaction, nor does it commit it. The transaction is assumed to be owned by the caller, so it is the caller's responsibility to roll back after each individual job or batch of jobs tested.

To test execution of an existing job, use WorkJob:

job := client.InsertTx(ctx, tx, args, nil)
// ...
result, err := testWorker.WorkJob(ctx, t, tx, job.JobRow)