River middleware allow job insertion and execution to be wrapped with custom logic. Middleware can be used to add logging, telemetry, or other shared functionality to your River jobs. Middleware is a similar concept to hooks, except their invocations stay on the stack for the entirety of an inner call instead of finishing immediately.
rivertype.Middleware
is a trivial interface implemented by embedding river.MiddlewareDefaults
:
type traceMiddleware struct { river.MiddlewareDefaults}
Middleware operations
Middleware has no effect until it implements one or more of middleware operation interfaces. The middleware above could be made to trace on job inserts by implementing rivertype.JobInsertMiddleware
:
func (*traceMiddleware) InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error) { ctx, span := otel.GetTracerProvider().Start(ctx, "my_app.insert_many") defer span.End()
for _, params := range manyParams { var metadataMap map[string]any if err := json.Unmarshal(params.Metadata, &metadataMap); err != nil { return nil, err }
metadataMap["span_id"] = span.SpanContext().SpanID() metadataMap["trace_id"] = span.SpanContext().TraceID()
var err error params.Metadata, err = json.Marshal(metadataMap) if err != nil { return nil, err } }
return doInner(ctx)}
The middleware produces a span for the duration of the operation (ending with defer span.End()
after the insert finishes) and adds a trace ID to each inserted job. If modifying insert parameters were to be the only thing it was going to do, it'd be advisable to use hooks instead of middleware because they don't need to occupy a position on the stack for the duration of the insert.
traceMiddleware
could be extended to also log on job work by implementing rivertype.WorkerMiddleware
:
func (*traceMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { type spanAndTraceMetadata struct { SpanID string `json:"span_id"` TraceID string `json:"trace_id"` }
var spanAndTrace spanAndTraceMetadata if err := json.Unmarshal(job.Metadata, &spanAndTrace); err != nil { return err }
ctx, span := otel.GetTracerProvider().Start(ctx, "my_app.work", trace.WithLinks(trace.Link{ SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ SpanID: spanAndTrace.SpanID, TraceID: spanAndTrace.TraceID, }), }), ) defer span.End()
return doInner(ctx)}
List of all hook operations
Full list of hook operations interfaces:
rivertype.JobInsertMiddleware
: Invoked around a batch of jobs being inserted.rivertype.WorkerMiddleware
: Invoked around a job being worked.
Configuring middleware
A global set of middleware that run for every job are configurable on a River client:
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ // Order is significant. Middleware: []rivertype.Middleware{ &BothInsertAndWorkBeginMiddleware{}, &InsertBeginMiddleware{}, &WorkBeginMiddleware{}, },})
The effect of each middleware in the list will depend on the operation interfaces it implements. For example, implementing rivertype.JobInsertMiddleware
will invoke the middleware around the insertion of a batch of jobs, and implementing rivertype.WorkerMiddleware
will invoke it around jobs being worked. Middleware may implement multiple operations. Middleware implementing no operations will be have no functional effect.
Order in the list is significant, with hooks that appear first running before hooks that appear later.
Per-worker middleware
Worker middleware can also be configured at the individual worker level, enabling finer grain control over which middleware is applied to which workers:
type workerWithMiddleware[T myJobArgs] struct { river.WorkerDefaults[T]}
func (*workerWithMiddleware[T]) Middleware(job *rivertype.JobRow) []rivertype.WorkerMiddleware { return []rivertype.WorkerMiddleware{ traceWorkerMiddleware{}, }}
Worker-specific middleware always run after globally configured middleware.
There's no middleware per job args equivalent because a batch of jobs may contain multiple kinds of jobs.
Testing interface compliance
Each configured middleware is checked against the middleware operation interfaces before being run, and because the trivial nature of rivertype.Middleware
provides little in the way of type safety, it's an easy mistake to make to not have implemented a desired operation quite right (e.g. a return value is accidentally left off). To protect against this possibility, it's recommended that interface compliance is checked in code using a trivial assignment:
var ( _ rivertype.JobInsertMiddleware = &traceMiddleware{} _ rivertype.WorkerMiddleware = &traceMiddleware{})
traceMiddleware
unexpectedly failing to implement rivertype.JobInsertMiddleware
would be caught early because it'd cause a compilation failure.
Differences from hooks
Middleware is a similar concept to hooks except that they're invoked and finish immediately instead of wrapping an inner call. This leads to some important considerations for their use:
- Middleware wrap operations, so unlike hooks, modifications to context last for the duration of the operation. This makes them more suitable where context additions need to be durable, like adding an OpenTelemetry span or timing the duration of an operation.
- Middleware are less granular than hooks. Hooks provide
InsertBegin
, which is invoked for every inserted job. Middleware providesInsertMany
, which is invoked for every inserted job batch. - Because middleware wrap operations, they add an extra frame to the stack for its duration. This has the effect of deeper stack traces, making them harder to read and reason about.
Because hooks operate more granulary and don't go on the stack, generally prefer the use of hooks over middleware, and fall back to middleware in cases where hooks are too restrictive.