Skip to content

OpenTelemetry

In addition to its standard logging system, River supports OpenTelemetry for getting operational insight into live production stacks. OpenTelemetry is an open metrics and tracing standard compatible with a wide array of vendors including the best known industry names like DataDog or Sentry. River supports these services through OpenTelemetry rather than maintaining vendor-specific packages for each one.


Installing the OpenTelemetry middleware

River's OpenTelemetry plugin is distributed as a middleware in the rivercontrib repository.

Pull the package into an existing Go module with go get:

Terminal window
go get -u github.com/riverqueue/rivercontrib/otelriver

Then, install it as middleware on River client:

import "github.com/riverqueue/rivercontrib/otelriver"
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Middleware: []rivertype.Middleware{
// Install the OpenTelemetry middleware to run for all jobs inserted
// or worked by this River client.
otelriver.NewMiddleware(nil),
},
})

Global providers and DataDog example

otelriver's default invocation will pick up a global metrics/trace provider automatically, so no work is necessary beyond configuring OpenTelemetry for your preferred vendor. Here's how to do do that with DataDog:

import (
ddotel "github.com/DataDog/dd-trace-go/v2/ddtrace/opentelemetry"
"go.opentelemetry.io/otel"
)
provider := ddotel.NewTracerProvider()
defer func() { _ = provider.Shutdown() }()
otel.SetTracerProvider(provider)
riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{
Middleware: []rivertype.Middleware{
otelriver.NewMiddleware(nil),
},
})

See the full example for use of otelriver with DataDog. Other providers should have similar configuration instructions for their use with OpenTelemetry.

Injecting providers

Where it's not desirable to use global providers (like to facilitate testing), they can also be injected via middleware configuration:

provider := ddotel.NewTracerProvider()
defer func() { _ = provider.Shutdown() }()
riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{
Middleware: []rivertype.Middleware{
otelriver.NewMiddleware(&otelriver.MiddlewareConfig{
TracerProvider: provider,
}),
},
})

List of traces and metrics

The package produces two main traces:

  • river.insert_many: Traced across a batch insert of jobs. In River, all jobs are inserted as part of a batch (therefore the name "many"), although they'll be batches of one in cases where only one job is being inserted.
  • river.work: Traced across a single job being worked.

It also emits metrics:

  • river.insert_count: Number of individual jobs inserted.
  • river.insert_many_count: Number of job batches inserted.
  • river.insert_many_duration: Gauge of the duration of a batch insert operation.
  • river.work_count: Number of jobs worked.
  • river.work_duration: Gauge of the duration of a single job worked (in seconds).

Operations are tagged with a status attribute of ok, error, or panic so metrics can be filtered to only successes or only failures. Work operations are tagged with kind and queue to help with additional customization.