Designing River's hooks and middleware for change resistance and future extensibility
A long requested feature in River has been for middleware, a concept similar to what's found in a Ruby Rack application or other web stacks, but for a job queue. Middleware in web stacks gives users a way to add common, custom code around core operations like serving an HTTP request, often used for operations like authentication, logging, or telemetry via OpenTelemetry spans. Middleware in a job queue has similar uses, but wraps job insertions and jobs being worked.
River ships a slightly unconventional hybrid system. It makes a middleware API available, but also another one for function "hooks". Hooks are less flexible than middleware, but more DX and operationally friendly, with some differences that we'll touch on below.
A basic middleware example
The best way to demonstrate a concept is by example. Here's a simplified OpenTelemetry middleware, trimmed down from the official package for demonstration purposes:
import "go.opentelemetry.io/otel"
type traceMiddleware struct { river.MiddlewareDefaults}
func (*traceMiddleware) InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error) { ctx, span := otel.GetTracerProvider().Start(ctx, "my_app.insert_many") defer span.End()
for _, params := range manyParams { var metadataMap map[string]any if err := json.Unmarshal(params.Metadata, &metadataMap); err != nil { return nil, err }
metadataMap["span_id"] = span.SpanContext().SpanID() metadataMap["trace_id"] = span.SpanContext().TraceID()
var err error params.Metadata, err = json.Marshal(metadataMap) if err != nil { return nil, err } }
return doInner(ctx)}
It's installed to a River client through its Middleware
configuration:
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ // Order is significant. Middleware: []rivertype.Middleware{ &traceMiddleware{}, },})
When inserting any batch of jobs, River will invoke the middleware's InsertMany
, passing a doInner
function pointer that should be called into to do the actual insertion. Multiple middlewares are nested on top of each other in order of reverse definition, producing a stack where doInner
is calling down into any middleware nested below the current one, and eventually into a final step to insert a batch of jobs.
middleware := c.middlewareLookup.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert)if len(middleware) > 0 { // Wrap middlewares in reverse order so the one defined first is wrapped // as the outermost function and is first to receive the operation. for i := len(middleware) - 1; i >= 0; i-- { var ( middlewareItem = middleware[i].(rivertype.JobInsertMiddleware) previousDoInner = doInner ) doInner = func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { return middlewareItem.InsertMany(ctx, insertParams, previousDoInner) } }}
Interface driven design
When constructing a middleware stack around a specific operation, River will check each middleware to see if it supports the operation's Go interface. For example, supporting JobInsertMiddleware
will cause middleware to be invoked around job insertion:
type JobInsertMiddleware interface { Middleware
// InsertMany is invoked around a batch insert operation. Implementations // must always include a call to doInner to call down the middleware stack // and perform the insertion, and may run custom code before and after. // // Returning an error from this function will fail the overarching insert // operation, even if the inner insertion originally succeeded. InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error)}
Middleware may support more than one operation by implementing multiple middleware interfaces. Here's WorkerMiddleware
, a second interface that's invoked while a job is worked:
type WorkerMiddleware interface { Middleware
// Work is invoked after a job's JSON args being unmarshaled and before the // job is worked. Implementations must always include a call to doInner to // call down the middleware stack and perform the batch insertion, and may // run custom code before and after. // // Returning an error from this function will fail the overarching work // operation, even if the inner work originally succeeded. Work(ctx context.Context, job *JobRow, doInner func(context.Context) error) error}
The traceMiddleware
OpenTelemetry example above implemented JobInsertedMiddleware
. Let's give it a definition for WorkerMiddleware.Work
so jobs support distributed tracing for easy insight from initial insertion all the way to when they're finally worked:
func (*traceMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { type spanAndTraceMetadata struct { SpanID string `json:"span_id"` TraceID string `json:"trace_id"` }
var spanAndTrace spanAndTraceMetadata if err := json.Unmarshal(job.Metadata, &spanAndTrace); err != nil { return err }
ctx, span := otel.GetTracerProvider().Start(ctx, "my_app.work", trace.WithLinks(trace.Link{ SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ SpanID: spanAndTrace.SpanID, TraceID: spanAndTrace.TraceID, }), }), ) defer span.End()
return doInner(ctx)}
No changes to client initialization code are needed after making traceMiddleware
support multiple interfaces. On start up, River tabulates which middleware are compliant with which middleware interfaces, and remembers which ones to invoke on every supported operation so there's minimal accounting overhead after a client's up and running.
This design sacrifices some typing safety in exchange for better flexibility. Implementing the lowest common denominator interface rivertype.Middleware
is trivial, and middleware can comply to it while accidentally not adhering to more specific interfaces (say a parameter or return type was subtly wrong) so that the middleware is never called. To protect against this possibility, we recommend adding interface compliance checks that assign middleware to each of the interfaces it's expected to implement:
var ( _ rivertype.JobInsertMiddleware = &traceMiddleware{} _ rivertype.WorkerMiddleware = &traceMiddleware{})
If a middleware is accidentally not an expected interface, compilation fails, revealing the problem instantly.
A major advantage to using interfaces as proxies for supported operations is that River ends up with many small middleware interfaces instead of one big one. When new middleware functionality is added, it's added as a new interface that middleware may or may not implement. No existing middleware is broken due to interface non-compliance.
Hooks: A lighter middleware alternative
Middleware is a powerful and flexible paradigm, but it's not without its disadvantages:
Middleware wraps inner operations, so every middleware adds a frame to the stack trace that makes error output that much harder to read. Big production apps might have middleware stacks so deep that users might even have to scroll back in their terminal to see the top of it.
It needs to be tightly coupled to how River operates internally.
JobInsertMiddleware
defines anInsertMany
instead of anInsert
because River always inserts jobs in batches (even where those are batches of one). In most cases it'd be more convenient to work with only one inserted job at a time, but middleware can't support that.Coarser operations lead to more limited flexibility. Inserted batches may contain multiple kinds of jobs, so it'd be difficult (or at least not performant) to have middleware that runs only on the insertion of a specific kind of job.
To work around these problems, River implements hooks. Think of hooks like middleware's close conceptual cousin. They're used almost identically, but behave a little differently, making them more suitable for some cases and less so for others.
A simple logging hook:
type logHook struct { river.HookDefaults}
func (*logHook) InsertBegin(ctx context.Context, params *JobInsertParams) error { fmt.Printf("inserting job with kind %q\n", params.Kind) return nil}
func (*logHook) WorkBegin(ctx context.Context, job *JobRow) error { fmt.Printf("working job with kind %q\n", job.Kind) return nil}
It's close to the middleware defined above, but with a couple important differences:
- There aren't
doInner
functions to call into. Hooks are invoked, do their work, and pop off the stack immediately. InsertBegin
operates on a single inserted job instead of a batch of them.
Hooks are configured on River client almost identically to how middlewares are, trading only Config.Middleware
for Config.Hooks
:
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Hooks: []rivertype.Hook{ &logHook{}, },})
Hooks present a simpler interface and have the benefit of not having to live on the stack for the entire duration of an operation, their use should be preferred over that of middleware. But because they're short-lived, they're not suitable for code that specifically wants to measure across entire operations.
Going back to the OpenTelemetry example, we don't want to the insertion span to end with defer span.End()
until the insertion is really finished, so we must use middleware. This also applies to anywhere that we need to modify context for use in workers because context changes in a hook would be discarded immediately as the function returns.
func (*traceMiddleware) InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error) { ctx, span := otel.GetTracerProvider().Start(ctx, "my_app.insert_many") defer span.End()
...
Extensibility without breaking changes
To recap middleware and hooks:
River supports middleware for customization across the full duration of operations, like in cases it needs to be timed or where context needs to be modified for use by inner functions.
River also supports hooks for customization that's more easily applied to single jobs or job kinds, and lighter weight in that they return immediately instead of occupying a slot on the stack.
Both middleware and hooks are interface driven, and can tie into one or more operations by implementing specific interfaces like
JobInsertMiddleware
orHookWorkBegin
. Interface checks are made on initialization, so there's minimal overhead once the client is up and running.
Our hope is that the interface approach covers all currently known requirements for hooks and middleware, while also providing a solid foundation for River to be more extensible down the road as needed, with no breaking changes to existing middleware/hooks.