Package riverworkflow provides a workflow engine for River.
Workflows allow you to compose jobs in a directed acyclic graph (DAG) where each job can depend on the completion of other jobs. Jobs will not be available for work until all of their dependencies have completed successfully. In addition, jobs that are scheduled to run in the future at time X will not execute until all their dependencies have completed _and_ the current time is greater than X.
⚠️ Much of the direct usage of this package has been deprecated in favor of the equivalent functionality in the parent riverqueue.com/riverpro package.
import ( "riverqueue.com/riverpro" "riverqueue.com/riverpro/riverworkflow" "riverqueue.com/riverpro/driver/riverpropgxv5" ) workflow := riverworkflow.New(&river.WorkflowOpts{Name: "Send invoice to Alice"}) taskA := workflow.Add("task_a", issueInvoice{Email: "alice@example.com"}, nil, nil) workflow.Add("task_b", sendInvoiceEmailArgs{Email: "alice@example.com"}, nil, &riverworkflow.TaskOpts{ Deps: []string{taskA.Name}, }) result, err := riverworkflow.Prepare(context.Background(), riverClient, workflow) if err != nil { log.Fatal(err) } count, err := riverClient.InsertMany(ctx, result.Jobs) if err != nil { log.Fatal(err) } fmt.Printf("inserted %d jobs\n", count)
By default, the workflow engine will automatically cancel workflow jobs if any of their dependencies are cancelled, discarded, or deleted. This behavior can be overridden on a per-workflow basis with Opts, or on a per-task basis with TaskOpts.
This section is empty.
This section is empty.
IDFromJobRow extracts the workflow ID from a job row's metadata.
IDFromMetadata extracts the workflow ID from metadata.
func JobListParams(job *rivertype.JobRow, params *river.JobListParams) (*river.JobListParams, error)
JobListParams extracts the workflow ID from a job row and returns a river.JobListParams with the workflow ID set in order to filter the job list by workflow.
func JobListParamsByID(workflowID string, params *river.JobListParams) (*river.JobListParams, error)
JobListParamsByID returns a river.JobListParams with the workflow ID set in order to filter the job list by workflow.
NameFromJobRow extracts the workflow's name from a job row's metadata.
type DependencyCycleError struct { DepStack []string }
DependencyCycleError is returned when a dependency cycle is detected in the workflow. A cycle occurs when a task depends on itself, either directly or indirectly through other tasks.
Deprecated: Use riverqueue.com/riverpro.DependencyCycleError instead.
func (e *DependencyCycleError) Error() string
func (e *DependencyCycleError) Is(target error) bool
type DuplicateTaskError struct { TaskName string }
DuplicateTaskError is returned when a task name is added to the same workflow more than once. Task names must be unique within a workflow.
Deprecated: Use riverqueue.com/riverpro.DuplicateTaskError instead.
func (e *DuplicateTaskError) Error() string
func (e *DuplicateTaskError) Is(target error) bool
MissingDependencyError is returned when a named dependency is missing from the workflow. A task cannot depend on another task that does not exist in the workflow.
Deprecated: Use riverqueue.com/riverpro.MissingDependencyError instead.
func (e *MissingDependencyError) Error() string
func (e *MissingDependencyError) Is(target error) bool
type Opts struct { ID string IgnoreCancelledDeps bool IgnoreDeletedDeps bool IgnoreDiscardedDeps bool Name string }
Opts are options for creating a new workflow.
Deprecated: Use riverqueue.com/riverpro.WorkflowOpts instead.
type PrepareResult struct { // Jobs is a list of jobs to insert into the database. It's structured as a // list of InsertManyParams so the jobs can be inserted with InsertMany. Jobs []river.InsertManyParams }
PrepareResult is the result of preparing a workflow. It contains a list of jobs to insert.
Deprecated: Use riverqueue.com/riverpro.WorkflowPrepareResult instead.
func Prepare[TTx any](ctx context.Context, client *river.Client[TTx], workflow *Workflow) (*PrepareResult, error)
Prepare validates the jobs in the workflow and prepares them for insertion. Validations ensure that:
* All specified dependencies correspond to other task names in the workflow * Task names are unique * There are no cycles
If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.
For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.
Deprecated: Use riverqueue.com/riverpro.Client.WorkflowPrepare instead.
func PrepareTx[TTx any](ctx context.Context, client *river.Client[TTx], tx TTx, workflow *Workflow) (*PrepareResult, error)
PrepareTx validates the jobs in the workflow and prepares them for insertion. Validations ensure that:
* All specified dependencies correspond to other task names in the workflow * Task names are unique * There are no cycles
If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.
For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.
Deprecated: Use riverqueue.com/riverpro.Client.WorkflowPrepareTx instead.
type Task struct { // Name is the name of the workflow task. Name string }
Task is a reference to a task in a workflow.
Deprecated: Use riverqueue.com/riverpro.WorkflowTask instead.
type TaskOpts struct { // Deps are the names of other tasks in the workflow that this task depends // on. If any of the dependencies are cancelled, deleted, or discarded after // failing repeatedly, the task will be cancelled. Deps []string // IgnoreCancelledDeps specifies whether to ignore cancelled dependencies, // instead treating them as if they had completed successfully. // // If specified, this overrides the IgnoreCancelledDeps option specified in // the workflow. IgnoreCancelledDeps *bool // IgnoreDeletedDeps specifies whether to ignore deleted dependencies, // instead treating them as if they had completed successfully. // // If specified, this overrides the IgnoreDeletedDeps option specified in // the workflow. IgnoreDeletedDeps *bool // IgnoreDiscardedDeps specifies whether to ignore discarded dependencies, // instead treating them as if they had completed successfully. // // If specified, this overrides the IgnoreDiscardedDeps option specified in // the workflow. IgnoreDiscardedDeps *bool }
TaskOpts are options for adding a task to a workflow.
Deprecated: Use riverqueue.com/riverpro.WorkflowTaskOpts instead.
type Workflow struct {/* contains filtered or unexported fields */}
Workflow is a collection of jobs which can have dependencies on other jobs in the workflow.
Deprecated: Use riverqueue.com/riverpro.Workflow instead.
New creates a new workflow. If an ID is not specified in opts, it will be generated.
Deprecated: Use riverqueue.com/riverpro.NewWorkflow instead.