Package riverpro is an extension for River, the robust high-performance job processing system for Go and Postgres.
See homepage, docs, and the River UI.
River Pro extends River through the use of a "pro" client and driver that wraps the standard River variants. This allows for additional functionality to be injected into the River client.
import ( "riverqueue.com/riverpro" "riverqueue.com/riverpro/driver/riverpropgxv5" ) riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{ Config: riverpro.Config{ // Standard River configuration is embedded in the Pro config: Config: river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }, }, })
Once the Pro driver is in place, River Pro functionality can be used.
River Pro sequences guarantee that a specific series of jobs will be executed in a one-at-a-time sequential order relative to other jobs in the same sequence. Sequences are partitioned based upon a "sequence key" that is computed from various job attributes such as its kind and args (or a subset of args).
Jobs across sequences may run in parallel. Unlike unique jobs, sequences allow an infinite number of jobs to be queued up in the sequence, even though only one job will be worked at a time.
See the sequence docs for more information on how to use sequences.
River Pro workflows allow you to define a graph of interdependent jobs to express complex, multi-step workflows, including fan-out and fan-in execution. Workflows are a powerful tool for orchestrating tasks that depend on each other, and can be used to model a wide range of business processes.
Workflows allow you to compose jobs in a directed acyclic graph (DAG) where each job can depend on the completion of other jobs. Jobs will not be available for work until all of their dependencies have completed successfully. In addition, jobs that are scheduled to run in the future at time T will not execute until all their dependencies have completed _and_ the current time is greater than T.
import ( "riverqueue.com/riverpro" "riverqueue.com/riverpro/driver/riverpropgxv5" ) client, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{ // ... }) workflow := riverpro.NewWorkflow(&river.WorkflowOpts{Name: "Send invoice to Alice"}) taskA := workflow.Add("task_a", issueInvoice{Email: "alice@example.com"}, nil, nil) workflow.Add("task_b", sendInvoiceEmailArgs{Email: "alice@example.com"}, nil, &river.WorkflowTaskOpts{ Deps: []string{taskA.Name}, }) result, err := client.WorkflowPrepare(context.Background(), workflow) if err != nil { log.Fatal(err) } count, err := client.InsertMany(ctx, result.Jobs) if err != nil { log.Fatal(err) } fmt.Printf("inserted %d jobs\n", count)
By default, the workflow engine will automatically cancel workflow jobs if any of their dependencies are cancelled, discarded, or deleted. This behavior can be overridden on a per-workflow basis with WorkflowOpts, or on a per-task basis with riverpro.WorkflowTaskOpts.
See the workflow docs for more information on how to use workflows.
This section is empty.
This section is empty.
Client is a client for River Pro. It is a superset of the main river.Client type, with additional methods and behavioral changes for working with Pro features.
ClientFromContext returns the Client from the context. This function can only be used within a Worker's Work() method because that is the only place River sets the Client on the context.
It panics if the context does not contain a Client, which will never happen from the context provided to a Worker's Work() method.
When testing JobArgs.Work implementations, it might be useful to use ContextWithClient to initialize a context that has an available client.
ClientFromContext returns the Client from the context. This function can only be used within a Worker's Work() method because that is the only place River sets the Client on the context.
It returns an error if the context does not contain a Client, which will never happen from the context provided to a Worker's Work() method.
When testing JobArgs.Work implementations, it might be useful to use ContextWithClient to initialize a context that has an available client.
NewClient creates a new River Pro client using the provided driver and configuration.
func (c *Client[TTx]) WorkflowCancel(ctx context.Context, workflowID string) (*WorkflowCancelResult, error)
WorkflowCancel cancels all non-finalized tasks in a workflow using the workflow's ID. Non-finalized tasks are those which are not already completed, discarded, or cancelled. As with normal job cancellation, in-flight tasks *may* complete prior to receiving notification of the cancellation.
See WorkflowCancelTx for a version that allows cancellation within a broader database transaction.
The return value is a list of job rows where cancellation was attempted, along with a potential error.
func (c *Client[TTx]) WorkflowCancelTx(ctx context.Context, tx TTx, workflowID string) (*WorkflowCancelResult, error)
WorkflowCancelTx cancels all non-finalized tasks in a workflow using the workflow's ID. Non-finalized tasks are those which are not already completed, discarded, or cancelled. As with normal job cancellation, in-flight tasks *may* complete prior to receiving notification of the cancellation.
This variant operates within an existing transaction. See WorkflowCancel for the non-transaction variant.
The return value is a list of job rows where cancellation was attempted, along with a potential error.
func (c *Client[TTx]) WorkflowPrepare(ctx context.Context, workflow *Workflow) (*WorkflowPrepareResult, error)
WorkflowPrepare validates the jobs in the workflow and prepares them for insertion. Validations ensure that:
If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.
For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.
func (c *Client[TTx]) WorkflowPrepareTx(ctx context.Context, tx TTx, workflow *Workflow) (*WorkflowPrepareResult, error)
WorkflowPrepareTx validates the jobs in the workflow and prepares them for insertion. Validations ensure that:
If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.
For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.
type Config struct { river.Config // SequenceSchedulerInterval is the amount of time to wait between runs of the sequence scheduler. SequenceSchedulerInterval time.Duration // WorkflowRescuerInterval is the amount of time to wait between runs of the workflow rescuer. WorkflowRescuerInterval time.Duration }
Config holds configuration for the riverpro.Client. The config is primarily managed with an embedded river.Config, along with additional fields for Pro-specific configuration.
type DependencyCycleError struct { DepStack []string }
DependencyCycleError is returned when a dependency cycle is detected in the workflow. A cycle occurs when a task depends on itself, either directly or indirectly through other tasks.
func (e *DependencyCycleError) Error() string
func (e *DependencyCycleError) Is(target error) bool
type DuplicateTaskError struct { TaskName string }
DuplicateTaskError is returned when a task name is added to the same workflow more than once. Task names must be unique within a workflow.
func (e *DuplicateTaskError) Error() string
func (e *DuplicateTaskError) Is(target error) bool
MissingDependencyError is returned when a named dependency is missing from the workflow. A task cannot depend on another task that does not exist in the workflow.
func (e *MissingDependencyError) Error() string
func (e *MissingDependencyError) Is(target error) bool
type SequenceOpts struct { // ByArgs indicates that the job's encoded arguments should be used to compute // a sequence key. // // Default is false, meaning that input arguments will not be considered for // sequencing. // // When set to true, the entire encoded args field will be included in the // sequence hash, which requires care to ensure that no irrelevant args are // factored into the sequence key. It is also possible to use a subset of // the args by indicating on the `JobArgs` struct which fields should be // included in the sequence using struct tags: // // type MyJobArgs struct { // CustomerID string `json:"customer_id" river:"sequence"` // TraceID string `json:"trace_id" // } // // In this example, only the encoded `customer_id` key will be included in the // sequence key and the `trace_id` key will be ignored. // // All keys are sorted alphabetically before hashing to ensure consistent // results. ByArgs bool // ByQueue indicates that sequences should be partitioned by queue. // // Default is false, meaning that as long as any other sequence property is // enabled, sequences will be constructed for a kind across all queues. ByQueue bool // ContinueOnCancelled indicates that the sequence should continue to be // processed even if the job is cancelled. Without this setting, the sequence // will be halted if any job in the sequence is cancelled. ContinueOnCancelled bool // ContinueOnDiscarded indicates that the sequence should continue to be // processed even if the job repeatedly errors and is discarded. Without this // setting, the sequence will be halted if any job in the sequence is // discarded. ContinueOnDiscarded bool // ExcludeKind indicates that the job kind should not be included in the // sequence partition key. This is useful when you want to include multiple // job kinds in the same sequence. ExcludeKind bool }
SequenceOpts are options for defining sequence partitioning on JobArgs.
type Workflow struct {/* contains filtered or unexported fields */}
Workflow is a collection of jobs which can have dependencies on other jobs in the workflow.
func NewWorkflow(opts *WorkflowOpts) *Workflow
NewWorkflow creates a new workflow. If an ID is not specified in opts, it will be generated.
func WorkflowFromExisting(job *rivertype.JobRow, opts *WorkflowOpts) (*Workflow, error)
WorkflowFromExisting initiates a workflow from an existing job in the workflow. This enables jobs to be added to an existing workflow. If the job is not part of a workflow, an error will be returned.
The workflow ID and Name will be derived from the existing job, and the corresponding values in opts will be ignored.
func (w *Workflow) Add(taskName string, args river.JobArgs, insertOpts *river.InsertOpts, opts *WorkflowTaskOpts) WorkflowTask
Add adds a task to the workflow. The task name must be unique within the workflow.
type WorkflowOpts struct { ID string IgnoreCancelledDeps bool IgnoreDeletedDeps bool IgnoreDiscardedDeps bool Name string }
WorkflowOpts are options for creating a new workflow.
type WorkflowPrepareResult struct { // Jobs is a list of jobs to insert into the database. It's structured as a // list of InsertManyParams so the jobs can be inserted with InsertMany. Jobs []river.InsertManyParams }
WorkflowPrepareResult is the result of preparing a workflow. It contains a list of jobs to insert.
type WorkflowTask struct { // Name is the name of the workflow task. Name string }
WorkflowTask is a reference to a task in a workflow.
type WorkflowTaskOpts struct { // Deps are the names of other tasks in the workflow that this task depends // on. If any of the dependencies are cancelled, deleted, or discarded after // failing repeatedly, the task will be cancelled. Deps []string // IgnoreCancelledDeps specifies whether to ignore cancelled dependencies, // instead treating them as if they had completed successfully. // // If specified, this overrides the IgnoreCancelledDeps option specified in // the workflow. IgnoreCancelledDeps *bool // IgnoreDeletedDeps specifies whether to ignore deleted dependencies, // instead treating them as if they had completed successfully. // // If specified, this overrides the IgnoreDeletedDeps option specified in // the workflow. IgnoreDeletedDeps *bool // IgnoreDiscardedDeps specifies whether to ignore discarded dependencies, // instead treating them as if they had completed successfully. // // If specified, this overrides the IgnoreDiscardedDeps option specified in // the workflow. IgnoreDiscardedDeps *bool }
WorkflowTaskOpts are options for adding a task to a workflow.
Path | Synopsis |
---|---|
cmd/riverpro module | |
driver module | |
riverworkflow | Package riverworkflow provides a workflow engine for River. |