River middleware allow job insertion and execution to be wrapped with custom logic. Middleware can be used to add logging, telemetry, or other shared functionality to your River jobs.
Job insertion middleware
Job insertion middleware runs around a job being inserted into the queue. They're configured globally on the river.Client
with the JobInsertMiddleware
field:
type traceInsertMiddleware struct{
// embed JobInsertMiddlewareDefaults for forward compatibility
// in case additional methods are added to the interface:
river.JobInsertMiddlewareDefaults
}
func (m *traceInsertMiddleware) InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error) {
// Extract a trace ID from the context, and add it to the job metadata.
// Metadata is a useful place to store attributes that are not specific to a
// single job or which are tied to higher level functionality.
traceID := trace.FromContext(ctx).TraceID
for _, params := range manyParams {
var err error
params.Metadata, err = sjson.SetBytes(params.Metadata, "trace_id", traceID)
if err != nil {
return nil, err
}
}
return doInner(ctx)
}
Worker middleware
Worker middleware runs around a job being executed by a worker. They're configured globally on the river.Client
with the WorkerMiddleware
field:
type traceWorkerMiddleware struct{
// embed WorkerMiddlewareDefaults for forward compatibility
// in case additional methods are added to the interface:
river.WorkerMiddlewareDefaults
}
func (m *traceWorkerMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
// Extract the trace ID from the job metadata and log it.
traceID, err := gjson.GetBytes(params.Metadata, "trace_id").String()
if err != nil {
// log a message about the missing trace ID, or return an error if something
// unexpected happened.
return err
}
// add the trace ID to the context and call the next middleware in the chain:
ctx = trace.WithContext(ctx, traceID)
return doInner(ctx)
}
Worker middleware can also be configured at the individual worker level, allowing for more fine-grained control over which middleware is applied to which workers:
type myWorker[T myJobArgs] struct {
river.WorkerDefaults[T]
}
func (w *workerWithMiddleware[T]) Middleware(job *Job[T]) []rivertype.WorkerMiddleware {
return []rivertype.WorkerMiddleware{
traceWorkerMiddleware{},
}
}
Worker-specific middleware always run after globally configured middleware.