River includes test helpers in the river/rivertest
package to verify that River jobs are being inserted as expected.
Testing job inserts
Job inserts are verified with the RequireInserted*
family of helpers provided by river/rivertest
. They're designed to be symmetrical with the Client.Insert
functions, so there's variants to verify a single insert or many, and on a database pool or a transaction.
Check the insertion of a single job with RequireInsertedTx
:
import (
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertest"
)
type RequiredArgs struct {
Message string `json:"message"`
}
func (RequiredArgs) Kind() string { return "required" }
func TestInsert(t *testing.T) {
...
tx, err := dbPool.Begin(ctx)
if err != nil {
// handle error
}
defer tx.Rollback(ctx)
_, err = riverClient.InsertTx(ctx, tx, &RequiredArgs{ Message: "Hello."}, nil)
if err != nil {
// handle error
}
job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
fmt.Printf("Test passed with message: %s\n", job.Args.Message)
}
See the RequiredInserted
example for complete code.
RequiredInsertedTx
takes a testing.TB
why is provided by the argument to any Go test or benchmark as its first parameter like t *testing.T
. If the job was inserted, it returns the inserted job. If not, the test fails:
--- FAIL: TestRequireInsertedTx (0.00s)
--- FAIL: TestRequireInsertedTx/FailsWithoutInsert (0.12s)
rivertest.go:352:
River assertion failure:
No jobs found with kind: required
Arguments assertions
The RequireInserted*
functions use arguments sent to them like &RequiredArgs{}
only to extract an expected job kind that's used to query for matching jobs. Any properties set in these job args are ignored. To check specific properties of an inserted job, assertions should be made against helper return values using built-in Go comparisons, or an assertion library of choice like testify/require
.
import "github.com/stretchr/testify/require"
...
job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
require.Equal(t, "Hello.", job.Args.Message)
Job args properties are not compared
Job arg structs sent to RequireInserted*
functions are only used to extract a job kind. Any properties on them are not checked for equality with inserted jobs. Separate assertions on return values are required.
Requiring options
RequireInsertedOptions
can be used to assert various insertion options like maximum number of attempts, priority, queue, and scheduled time. RequireInsert*
functions takes them as an optional last parameter:
_ = rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, &rivertest.RequireInsertedOpts{
Priority: 1,
Queue: river.QueueDefault,
})
Requiring many
Similar to requiring a single insert, many insertions can be checked simultaneously for a nominal performance benefit:
jobs := rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
{Args: &FirstRequiredArgs{}},
{Args: &SecondRequiredArgs{}},
{Args: &FirstRequiredArgs{}},
})
for i, job := range jobs {
fmt.Printf("Job %d args: %s\n", i, string(job.EncodedArgs))
}
See the RequiredManyInserted
example for complete code.
RequireManyInserted*
functions take a mixed set of job args of different kinds, so unlike the the single job check, they return JobRow
instead of Job[T]
, so arguments will have to be unmarshaled from job.EncodedArgs
to be inspected.
The slice of jobs returned is ordered identically to the []rivertest.ExpectedJob
input slice.
When checking many jobs at once, RequireManyInserted*
expects all jobs of any included kinds to be included in the expectation list. The snippet above passes if exactly two FirstRequiredArgs
and one SecondRequiredArgs
were inserted, but if a third FirstRequiredArgs
was inserted in addition to the first two, it'd fail.
Requiring on a pool
The examples above show requiring insertions on a transaction, but non-Tx
variants are provided to check against a database pool instead:
_ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil)
Because a driver is passed to the function directly, the [*riverpgx5.Driver]
type parameter can be omitted as Go can infer it.
Test transactions
Sophisticated users may want to make use of test transactions in their tests which are rolled back after each test case, insulating tests running in parallel from each other, and avoiding tainting global state. In such cases, it may not be convenient to inject a full database pool to River's client. To support test transactions, River clients can be initialized without database pool by passing nil
to their driver:
// a nil database pool is sent to the driver
riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{})
The initialized client is more limited, supporting only inserts on transactions with InsertTx
and InsertManyTx
. Calls to the non-transactional variants Insert
and InsertMany
, or trying to start the client with Start
, will fail.
Logging
River defaults to producing informational logging, and some logging may be emitted while tests run. Although not hugely harmful, logging output won't be indented to match other test output, and in the presence of t.Parallel()
will be interleaved so as to become unusable for debugging.
To avoid these problems, we recommend bridging slog
and testing
using a package like slogt
, which will send River's log output to t.Log
so it can be cleanly collated by Go's test framework:
import "github.com/neilotoole/slogt"
func TestRiverInsertions(t *testing.T) {
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slogt.New(t),
...
})
...
}
Testing workers
Workers in River are plain old Go structs, and no special testing utilities are provided for them. They're testable by initializing a worker, invoking its Work
function, and checking against the results:
type SortArgs struct {
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}
func (SortArgs) Kind() string { return "sort" }
type SortWorker struct {
river.WorkerDefaults[SortArgs]
}
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}
func TestSortWorker(t *testing.T) {
err := (&SortWorker{}).Run(ctx, &river.Job{Args: JobArgs{
Strings: []string{
"whale", "tiger", "bear",
},
}
if err != nil {
// handle error
}
}