Measuring River with OpenTelemetry

After an application has fully graduated to production, it's critical to have expansive and timely insight into its health. That's doubly true for a job queue, which while undoubtedly is one of the most critical components in any stack, is by extension one that acts as a lightning rod for degradation and failure.

To that end, River now has an OpenTelemetry package, designed for use with the open metrics and tracing standard, and which is compatible with a wide array of vendors including the best known names in the industry like DataDog or Sentry.

Pull the package into an existing Go module with go get:

Terminal window
go get -u github.com/riverqueue/rivercontrib/otelriver

Then, install it as middleware on River client:

import "github.com/riverqueue/rivercontrib/otelriver"
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Middleware: []rivertype.Middleware{
// Install the OpenTelemetry middleware to run for all jobs inserted
// or worked by this River client.
otelriver.NewMiddleware(nil),
},
})

otelriver's default invocation will pick up a global metrics/trace provider automatically, so no work is necessary beyond configuring OpenTelemetry for your preferred vendor. Here's how to do do that with DataDog:

import (
ddotel "github.com/DataDog/dd-trace-go/v2/ddtrace/opentelemetry"
"go.opentelemetry.io/otel"
)
provider := ddotel.NewTracerProvider()
defer func() { _ = provider.Shutdown() }()
otel.SetTracerProvider(provider)
_, err := river.NewClient(riverpgxv5.New(nil), &river.Config{
Middleware: []rivertype.Middleware{
otelriver.NewMiddleware(nil),
},
})

See the full example for use of otelriver with DataDog. Other providers should have similar configuration instructions for their use with OpenTelemetry.

Traces and metrics

The package produces two main traces:

  • river.insert_many: Traced across a batch insert of jobs. In River, all jobs are inserted as part of a batch (therefore the name "many"), although they'll be batches of one in cases where only one job is being inserted.
  • river.work: Traced across a single job being worked.

It also emits a number of metrics:

  • river.insert_count: Number of individual jobs inserted.
  • river.insert_many_count: Number of job batches inserted.
  • river.insert_many_duration: Gauge of the duration of a batch insert operation.
  • river.work_count: Number of jobs worked.
  • river.work_duration: Gauge of the duration of a single job worked (in seconds).

Operations are tagged with a status attribute of ok, error, or panic so metrics can be filtered to only successes or only failures. Work operations are tagged with kind and queue to help with additional customization.

Let us know what's next

We know that we probably didn't cover all functionality that everyone would like, but our goal with this release is to get a first pass in people's hands that's somewhat useful, and iterate from there.

Are there specific features that you'd like to see? More spans? More metrics? Distributed traving? Let us know what you'd like to see next.