Skip to content

Middleware

River middleware allow job insertion and execution to be wrapped with custom logic. Middleware can be used to add logging, telemetry, or other shared functionality to your River jobs. Middleware is a similar concept to hooks, except their invocations stay on the stack for the entirety of an inner call instead of finishing immediately.


rivertype.Middleware is a trivial interface implemented by embedding river.MiddlewareDefaults:

type traceMiddleware struct {
river.MiddlewareDefaults
}

Middleware operations

Middleware has no effect until it implements one or more of middleware operation interfaces. The middleware above could be made to trace on job inserts by implementing rivertype.JobInsertMiddleware:

func (*traceMiddleware) InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error) {
ctx, span := otel.GetTracerProvider().Start(ctx, "my_app.insert_many")
defer span.End()
for _, params := range manyParams {
var metadataMap map[string]any
if err := json.Unmarshal(params.Metadata, &metadataMap); err != nil {
return nil, err
}
metadataMap["span_id"] = span.SpanContext().SpanID()
metadataMap["trace_id"] = span.SpanContext().TraceID()
var err error
params.Metadata, err = json.Marshal(metadataMap)
if err != nil {
return nil, err
}
}
return doInner(ctx)
}

The middleware produces a span for the duration of the operation (ending with defer span.End() after the insert finishes) and adds a trace ID to each inserted job. If modifying insert parameters were to be the only thing it was going to do, it'd be advisable to use hooks instead of middleware because they don't need to occupy a position on the stack for the duration of the insert.

traceMiddleware could be extended to also log on job work by implementing rivertype.WorkerMiddleware:

func (*traceMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
type spanAndTraceMetadata struct {
SpanID string `json:"span_id"`
TraceID string `json:"trace_id"`
}
var spanAndTrace spanAndTraceMetadata
if err := json.Unmarshal(job.Metadata, &spanAndTrace); err != nil {
return err
}
ctx, span := otel.GetTracerProvider().Start(ctx, "my_app.work",
trace.WithLinks(trace.Link{
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
SpanID: spanAndTrace.SpanID,
TraceID: spanAndTrace.TraceID,
}),
}),
)
defer span.End()
return doInner(ctx)
}

List of all hook operations

Full list of hook operations interfaces:

Configuring middleware

A global set of middleware that run for every job are configurable on a River client:

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
// Order is significant.
Middleware: []rivertype.Middleware{
&BothInsertAndWorkBeginMiddleware{},
&InsertBeginMiddleware{},
&WorkBeginMiddleware{},
},
})

The effect of each middleware in the list will depend on the operation interfaces it implements. For example, implementing rivertype.JobInsertMiddleware will invoke the middleware around the insertion of a batch of jobs, and implementing rivertype.WorkerMiddleware will invoke it around jobs being worked. Middleware may implement multiple operations. Middleware implementing no operations will be have no functional effect.

Order in the list is significant, with hooks that appear first running before hooks that appear later.

Per-worker middleware

Worker middleware can also be configured at the individual worker level, enabling finer grain control over which middleware is applied to which workers:

type workerWithMiddleware[T myJobArgs] struct {
river.WorkerDefaults[T]
}
func (*workerWithMiddleware[T]) Middleware(job *rivertype.JobRow) []rivertype.WorkerMiddleware {
return []rivertype.WorkerMiddleware{
traceWorkerMiddleware{},
}
}

Worker-specific middleware always run after globally configured middleware.

There's no middleware per job args equivalent because a batch of jobs may contain multiple kinds of jobs.

Testing interface compliance

Each configured middleware is checked against the middleware operation interfaces before being run, and because the trivial nature of rivertype.Middleware provides little in the way of type safety, it's an easy mistake to make to not have implemented a desired operation quite right (e.g. a return value is accidentally left off). To protect against this possibility, it's recommended that interface compliance is checked in code using a trivial assignment:

var (
_ rivertype.JobInsertMiddleware = &traceMiddleware{}
_ rivertype.WorkerMiddleware = &traceMiddleware{}
)

traceMiddleware unexpectedly failing to implement rivertype.JobInsertMiddleware would be caught early because it'd cause a compilation failure.

Differences from hooks

Middleware is a similar concept to hooks except that they're invoked and finish immediately instead of wrapping an inner call. This leads to some important considerations for their use:

  • Middleware wrap operations, so unlike hooks, modifications to context last for the duration of the operation. This makes them more suitable where context additions need to be durable, like adding an OpenTelemetry span or timing the duration of an operation.
  • Middleware are less granular than hooks. Hooks provide InsertBegin, which is invoked for every inserted job. Middleware provides InsertMany, which is invoked for every inserted job batch.
  • Because middleware wrap operations, they add an extra frame to the stack for its duration. This has the effect of deeper stack traces, making them harder to read and reason about.

Because hooks operate more granulary and don't go on the stack, generally prefer the use of hooks over middleware, and fall back to middleware in cases where hooks are too restrictive.