River

riverworkflow

package
v0.5.1 Go to latest
Published: Oct 18, 2024 License: Proprietary

Package riverworkflow provides a workflow engine for River.

See homepage and docs.

Overview

Workflows allow you to compose jobs in a directed acyclic graph (DAG) where each job can depend on the completion of other jobs. Jobs will not be available for work until all of their dependencies have completed successfully. In addition, jobs that are scheduled to run in the future at time X will not execute until all their dependencies have completed _and_ the current time is greater than X.

⚠️ Most direct usage of this package has been deprecated in favor of the equivalent functionality in the parent riverpro package.

import (
	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/riverworkflow"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
)

workflow := riverworkflow.New(&river.WorkflowOpts{Name: "Send invoice to Alice"})
taskA := workflow.Add("task_a", issueInvoice{Email: "alice@example.com"}, nil, nil)
workflow.Add("task_b", sendInvoiceEmailArgs{Email: "alice@example.com"}, nil, &riverworkflow.TaskOpts{
	Deps: []string{taskA.Name},
})
result, err := riverworkflow.Prepare(context.Background(), riverClient, workflow)
if err != nil {
	log.Fatal(err)
}
count, err := riverClient.InsertMany(ctx, result.Jobs)
if err != nil {
	log.Fatal(err)
}
fmt.Printf("inserted %d jobs\n", count)

By default, the workflow engine will automatically cancel workflow jobs if any of their dependencies are cancelled, discarded, or deleted. This behavior can be overridden on a per-workflow basis with Opts, or on a per-task basis with TaskOpts.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IDFromJobRow

func IDFromJobRow(job *rivertype.JobRow) string

func IDFromMetadata

func IDFromMetadata(metadata []byte) string

func JobListParams

func JobListParams(job *rivertype.JobRow, params *river.JobListParams) (*river.JobListParams, error)

func JobListParamsByID

func JobListParamsByID(workflowID string, params *river.JobListParams) (*river.JobListParams, error)

Types

type DependencyCycleError

type DependencyCycleError struct {
	DepStack []string
}

DependencyCycleError is returned when a dependency cycle is detected in the workflow. A cycle occurs when a task depends on itself, either directly or indirectly through other tasks.

func (*DependencyCycleError) Error

func (e *DependencyCycleError) Error() string

func (*DependencyCycleError) Is

func (e *DependencyCycleError) Is(target error) bool

type DuplicateTaskError

type DuplicateTaskError struct {
	TaskName string
}

DuplicateTaskError is returned when a task name is added to the same workflow more than once. Task names must be unique within a workflow.

func (*DuplicateTaskError) Error

func (e *DuplicateTaskError) Error() string

func (*DuplicateTaskError) Is

func (e *DuplicateTaskError) Is(target error) bool

type MissingDependencyError

type MissingDependencyError struct {
	ParentTaskName string
	TaskName       string
}

MissingDependencyError is returned when a named dependency is missing from the workflow. A task cannot depend on another task that does not exist in the workflow.

func (*MissingDependencyError) Error

func (e *MissingDependencyError) Error() string

func (*MissingDependencyError) Is

func (e *MissingDependencyError) Is(target error) bool

type Opts

type Opts struct {
	ID                  string
	IgnoreCancelledDeps bool
	IgnoreDeletedDeps   bool
	IgnoreDiscardedDeps bool
	Name                string
}

Opts are options for creating a new workflow.

type PrepareResult

type PrepareResult struct {
	// Jobs is a list of jobs to insert into the database. It's structured as a
	// list of InsertManyParams so the jobs can be inserted with InsertMany.
	Jobs []river.InsertManyParams
}

PrepareResult is the result of preparing a workflow. It contains a list of jobs to insert.

func Prepare deprecated

func Prepare[TTx any](ctx context.Context, client *river.Client[TTx], workflow *Workflow) (*PrepareResult, error)

Prepare validates the jobs in the workflow and prepares them for insertion. Validations ensure that:

* All specified dependencies correspond to other task names in the workflow * Task names are unique * There are no cycles

If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.

For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.

Deprecated: Use WorkflowPrepare on riverpro.Client instead.

func PrepareTx deprecated

func PrepareTx[TTx any](ctx context.Context, client *river.Client[TTx], tx TTx, workflow *Workflow) (*PrepareResult, error)

PrepareTx validates the jobs in the workflow and prepares them for insertion. Validations ensure that:

* All specified dependencies correspond to other task names in the workflow * Task names are unique * There are no cycles

If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.

For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.

Deprecated: Use WorkflowPrepareTx on riverpro.Client instead.

type Task

type Task struct {
	// Name is the name of the workflow task.
	Name string
}

Task is a reference to a task in a workflow.

type TaskOpts

type TaskOpts struct {
	// Deps are the names of other tasks in the workflow that this task depends
	// on. If any of the dependencies are cancelled, deleted, or discarded after
	// failing repeatedly, the task will be cancelled.
	Deps []string
	// IgnoreCancelledDeps specifies whether to ignore cancelled dependencies,
	// instead treating them as if they had completed successfully.
	//
	// If specified, this overrides the IgnoreCancelledDeps option specified in
	// the workflow.
	IgnoreCancelledDeps *bool
	// IgnoreDeletedDeps specifies whether to ignore deleted dependencies,
	// instead treating them as if they had completed successfully.
	//
	// If specified, this overrides the IgnoreDeletedDeps option specified in
	// the workflow.
	IgnoreDeletedDeps *bool
	// IgnoreDiscardedDeps specifies whether to ignore discarded dependencies,
	// instead treating them as if they had completed successfully.
	//
	// If specified, this overrides the IgnoreDiscardedDeps option specified in
	// the workflow.
	IgnoreDiscardedDeps *bool
}

TaskOpts are options for adding a task to a workflow.

type Workflow

type Workflow struct {/* contains filtered or unexported fields */}

Workflow is a collection of jobs which can have dependencies on other jobs in the workflow.

func FromExisting deprecated

func FromExisting(job *rivertype.JobRow, opts *Opts) (*Workflow, error)

Deprecated: Use riverpro.WorkflowFromExisting instead.

func New deprecated

func New(opts *Opts) *Workflow

New creates a new workflow. If an ID is not specified in opts, it will be generated.

Deprecated: Use riverpro.NewWorkflow instead.

func (*Workflow) Add

func (w *Workflow) Add(taskName string, args river.JobArgs, insertOpts *river.InsertOpts, opts *TaskOpts) Task

Add adds a task to the workflow. The task name must be unique within the workflow.

func (*Workflow) ID

func (w *Workflow) ID() string

ID returns the ID of the workflow.