Skip to content

Subscriptions

River clients support subscriptions using Client.Subscribe, which returns a channel over which events are received containing job information.


Subscribing to job events

Especially in mature systems, it's useful to receive detailed information on exactly what's happening inside a job queue to enable uses like emitting custom telemetry like logging and metrics. River clients support this through Client.Subscribe, which returns a channel emitting events about jobs moving through the client's workers.

Along with a reference to worked jobs, events contain rich statistics like how long the job took to run, and how long it had to wait in the queue. See Event and JobStatistics.

func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
    ...
}

// Event wraps an event that occurred within a River client, like a job being
// completed.
type Event struct {
    // Kind is the kind of event. Receivers should read this field and respond
    // accordingly. Subscriptions will only receive event kinds that they
    // requested when creating a subscription with Subscribe.
    Kind EventKind

    // Job contains job-related information.
    Job *rivertype.JobRow

    // JobStats are statistics about the run of a job.
    JobStats *JobStatistics
}

// JobStatistics contains information about a single execution of a job.
type JobStatistics struct {
    CompleteDuration  time.Duration // Time it took to set the job completed, discarded, or errored.
    QueueWaitDuration time.Duration // Time the job spent waiting in available state before starting execution.
    RunDuration       time.Duration // Time job spent running (measured around job worker.)
}

Subscribe takes variadic args containing the kinds of events a subscriber would like to receive:

subscribeChan, subscribeCancel :=
    riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()

See the Subscription example for complete code.

Subscribe returns a subscription channel and a cancel function, the latter making it possible to close the subscription. Some notable properties:

  • Subscription channels are buffered channels of size 100 to prevent slow consumers from stalling clients as they distribute events. Receivers doing heavy lifting as they receive events (e.g. network sends) should have their own buffering system so they don't fall behind the subscription and accidentally drop events.

  • Subscription channels are closed when clients stop. Channels receive *Event, so subscribers can detect this condition by checking for a nil event received.

  • Cancel functions should generally be invoked using a defer, but it's not strictly necessary to do so unless subscribers want to close their subscription prematurely. Subscriptions are terminated automatically on shutdown.

  • Clients support an arbitrary number of subscriptions, so it's okay to Subscribe multiple times for different uses.

  • Events are only distributed for jobs worked by the specific client that was subscribed to. When running multiple clients across multiple nodes, it's necessary to subscribe to all of them to receive events for all jobs in the entirety of the cluster.

Listening for multiple event kinds

Subscribe takes variadic args so that subscribers can receive multiple kinds of events:

subscribeChan, subscribeCancel := riverClient.Subscribe(
    river.EventKindJobCompleted,
    river.EventKindJobFailed,
)
defer completedSubscribeCancel()

Event kinds

A list of all currently available event kinds:

  • EventKindJobCancelled: Emitted when a job is cancelled.
  • EventKindJobCompleted: Emitted when a job is successfully completed.
  • EventKindJobFailed: Emitted when a job either errors and will be retried, or when it errors for the last time and will be discarded. Callers can use job fields like Attempt and State to differentiate the two possibilities.
  • EventKindJobSnoozed: Emitted when a job is snoozed.

Forward compatibility

Subscribe purposefully doesn't provide a shortcut for subscribing to all available event kinds (each must be specified explicitly) to ensure forward compatibility.

If new event kinds are added in future versions of River, it may be that existing subscription routines can't safely support them. In case they are, subscribers will have to manually add new kinds to their Subscribe parameters, which will simultaneously give them the opportunity to check that their implementation can support them.