Features

Error and panic handling

Failure in a job queue is inevitable. River provides an interface to handle errors and panics for custom application telemetry and to provide execution feedback.


Implementing an error handler

At sufficient scale, it's inevitable that jobs will occasionally return errors, and even panic given unforeseen conditions. River has a comprehensive retry system to ensure that even in the presence of errors, no jobs are lost, and have a chance to be reworked in case of an intermittent failure or while a bug fix is deployed.

River rescues panics during work automatically, and will log information on any errors or panics that occur, but sophisticated applications will often want to hook up their own telemetry to handle these events. River provides the ErrorHandler interface to make this possible.

type ErrorHandler interface {
    // HandleError is invoked in case of an error occurring in a job. It's
    // used to add custom telemetry or provide feedback on an errored job.
    //
    // Context is descended from the one used to start the River client that
    // worked the job.
    HandleError(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult

    // HandleError is invoked in case of a panic occurring in a job. It's
    // used to add custom telemetry or provide feedback on a panicked job.
    //
    // Context is descended from the one used to start the River client that
    // worked the job.
    HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any) *ErrorHandlerResult
}

type ErrorHandlerResult struct {
    // SetCancelled can be set to true to fail the job immediately and
    // permanently. By default it'll continue to follow the configured retry
    // schedule.
    SetCancelled bool
}

Applications can provide an implementation for ErrorHandler and configure it when creating a client:

type CustomErrorHandler struct{}

func (*CustomErrorHandler) HandleError(ctx context.Context, job *river.rivertype.JobRow, err error) *river.ErrorHandlerResult {
    fmt.Printf("Job errored with: %s\n", err)
    return nil
}

func (*CustomErrorHandler) HandlePanic(ctx context.Context, job *river.rivertype.JobRow, panicVal any) *river.ErrorHandlerResult {
    fmt.Printf("Job panicked with: %v\n", panicVal)
    return nil
}
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    ErrorHandler: &CustomErrorHandler{},
    ...
})

See the ErrorHandler example for complete code.

Reacting to errors and panics

ErrorHandler also lets an implementation provide feedback to job execution. ErrorHandlerResult.SetCancelled can be set to permanently cancel the job (preventing any future retries):

func (*CustomErrorHandler) HandlePanic(ctx context.Context, job *river.rivertype.JobRow, panicVal any) *river.ErrorHandlerResult {
    fmt.Printf("Job panicked with: %v\n", panicVal)

    // Cancel the job to prevent it from being retried:
    return &river.ErrorHandlerResult{
        SetCancelled: true,
    }
}
Previous
Client from worker context