River includes test helpers in the river/rivertest
package to verify that River jobs are being inserted as expected.
Testing job inserts
Job inserts are verified with the RequireInserted*
family of helpers provided by river/rivertest
. They're designed to be symmetrical with the Client.Insert
functions, so there's variants to verify a single insert or many, and on a database pool or a transaction.
Check the insertion of a single job with RequireInsertedTx
:
import (
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertest"
)
type RequiredArgs struct {
Message string `json:"message"`
}
func (RequiredArgs) Kind() string { return "required" }
func TestInsert(t *testing.T) {
...
tx, err := dbPool.Begin(ctx)
if err != nil {
// handle error
}
defer tx.Rollback(ctx)
_, err = riverClient.InsertTx(ctx, tx, &RequiredArgs{ Message: "Hello."}, nil)
if err != nil {
// handle error
}
job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
fmt.Printf("Test passed with message: %s\n", job.Args.Message)
}
See the RequiredInserted
example for complete code.
RequiredInsertedTx
takes a testing.TB
why is provided by the argument to any Go test or benchmark as its first parameter like t *testing.T
. If the job was inserted, it returns the inserted job. If not, the test fails:
--- FAIL: TestRequireInsertedTx (0.00s)
--- FAIL: TestRequireInsertedTx/FailsWithoutInsert (0.12s)
rivertest.go:352:
River assertion failure:
No jobs found with kind: required
Arguments assertions
The RequireInserted*
functions use arguments sent to them like &RequiredArgs{}
only to extract an expected job kind that's used to query for matching jobs. Any properties set in these job args are ignored. To check specific properties of an inserted job, assertions should be made against helper return values using built-in Go comparisons, or an assertion library of choice like testify/require
.
import "github.com/stretchr/testify/require"
...
job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
require.Equal(t, "Hello.", job.Args.Message)
Job args properties are not compared
Job arg structs sent to RequireInserted*
functions are only used to extract a job kind. Any properties on them are not checked for equality with inserted jobs. Separate assertions on return values are required.
Requiring options
RequireInsertedOptions
can be used to assert various insertion options like maximum number of attempts, priority, queue, and scheduled time. RequireInsert*
functions takes them as an optional last parameter:
_ = rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, &rivertest.RequireInsertedOpts{
Priority: 1,
Queue: river.QueueDefault,
})
Requiring many
Similar to requiring a single insert, many insertions can be checked simultaneously for a nominal performance benefit:
jobs := rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
{Args: &FirstRequiredArgs{}},
{Args: &SecondRequiredArgs{}},
{Args: &FirstRequiredArgs{}},
})
for i, job := range jobs {
fmt.Printf("Job %d args: %s\n", i, string(job.EncodedArgs))
}
See the RequiredManyInserted
example for complete code.
RequireManyInserted*
functions take a mixed set of job args of different kinds, so unlike the the single job check, they return JobRow
instead of Job[T]
, so arguments will have to be unmarshaled from job.EncodedArgs
to be inspected.
The slice of jobs returned is ordered identically to the []rivertest.ExpectedJob
input slice.
When checking many jobs at once, RequireManyInserted*
expects all jobs of any included kinds to be included in the expectation list. The snippet above passes if exactly two FirstRequiredArgs
and one SecondRequiredArgs
were inserted, but if a third FirstRequiredArgs
was inserted in addition to the first two, it'd fail.
Requiring on a pool
The examples above show requiring insertions on a transaction, but non-Tx
variants are provided to check against a database pool instead:
_ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil)
Because a driver is passed to the function directly, the [*riverpgx5.Driver]
type parameter can be omitted as Go can infer it.
Test transactions
Sophisticated users may want to make use of test transactions in their tests which are rolled back after each test case, insulating tests running in parallel from each other, and avoiding tainting global state. In such cases, it may not be convenient to inject a full database pool to River's client. To support test transactions, River clients can be initialized without database pool by passing nil
to their driver:
// a nil database pool is sent to the driver
riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{})
The initialized client is more limited, supporting only inserts on transactions with InsertTx
and InsertManyTx
. Calls to the non-transactional variants Insert
and InsertMany
, or trying to start the client with Start
, will fail.
Logging
River defaults to producing informational logging, and some logging may be emitted while tests run. Although not hugely harmful, logging output won't be indented to match other test output, and in the presence of t.Parallel()
will be interleaved so as to become unusable for debugging.
To avoid these problems, we recommend bridging slog
and testing
using a package like slogt
, which will send River's log output to t.Log
so it can be cleanly collated by Go's test framework:
import "github.com/neilotoole/slogt"
func TestRiverInsertions(t *testing.T) {
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slogt.New(t),
...
})
...
}
Testing workers
There are two main methods for testing workers:
- Directly invoking its
Work
function. - Using the
rivertest.Worker
helpers to simulate real worker execution.
Directly testing a worker
Workers in River are plain old Go structs and can often be tested by directly invoking the Work
function. They're testable by initializing a worker, invoking its Work
function, and checking against the results:
type SortArgs struct {
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}
func (SortArgs) Kind() string { return "sort" }
type SortWorker struct {
river.WorkerDefaults[SortArgs]
}
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}
func TestSortWorker(t *testing.T) {
err := (&SortWorker{}).Run(ctx, &river.Job{Args: JobArgs{
Strings: []string{
"whale", "tiger", "bear",
},
}
if err != nil {
// handle error
}
}
Using the rivertest.Worker
helpers
Some River features depend on the worker being run in a real worker context, or which depend on the database, and can't be fully tested in isolation. Examples include river.JobCompleteTx
and recorded output. To comprehensively test workers, River provides powerful test helpers to exercise workers using a real work context and River's internal execution logic.
These features are exposed through the rivertest.Worker
type, which must be initialized for the specific worker type being tested:
worker := &MyWorker{}
testWorker := rivertest.NewWorker(t, driver, config, worker)
The testWorker
can then be used to execute as many jobs as desired:
result, err := testWorker.Work(ctx, t, tx, MyJobArgs{CustomerID: 123}, nil)
require.NoError(t, err)
require.Equal(t, river.EventKindJobCompleted, result.EventKind)
require.Equal(t, rivertype.JobStateCompleted, result.Job.State)
The Work
function inserts a real job within the provided transaction, executes it, and records its result in the same transaction. The error return value includes any "real" errors the worker returned (excluding intentional snooze and cancel errors), as well as any recovered panics. The WorkResult
struct includes the overall execution result (completed, failed, etc) as well as the final job row recorded in the database following execution.
Transactions are not automatically rolled back
The Work
function does not automatically roll back the transaction, nor does it commit it. The transaction is assumed to be owned by the caller, so it is the caller's responsibility to roll back after each individual job or batch of jobs tested.
To test execution of an existing job, use WorkJob
:
job := client.InsertTx(ctx, tx, args, nil)
// ...
result, err := testWorker.WorkJob(ctx, t, tx, job.JobRow)