River

riverworkflow

package
v0.7.0 Go to latest
Published: Dec 16, 2024 License: Proprietary

Package riverworkflow provides a workflow engine for River.

See homepage and docs.

Overview

Workflows allow you to compose jobs in a directed acyclic graph (DAG) where each job can depend on the completion of other jobs. Jobs will not be available for work until all of their dependencies have completed successfully. In addition, jobs that are scheduled to run in the future at time X will not execute until all their dependencies have completed _and_ the current time is greater than X.

⚠️ Much of the direct usage of this package has been deprecated in favor of the equivalent functionality in the parent riverqueue.com/riverpro package.

import (
	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/riverworkflow"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
)

workflow := riverworkflow.New(&river.WorkflowOpts{Name: "Send invoice to Alice"})
taskA := workflow.Add("task_a", issueInvoice{Email: "alice@example.com"}, nil, nil)
workflow.Add("task_b", sendInvoiceEmailArgs{Email: "alice@example.com"}, nil, &riverworkflow.TaskOpts{
	Deps: []string{taskA.Name},
})
result, err := riverworkflow.Prepare(context.Background(), riverClient, workflow)
if err != nil {
	log.Fatal(err)
}
count, err := riverClient.InsertMany(ctx, result.Jobs)
if err != nil {
	log.Fatal(err)
}
fmt.Printf("inserted %d jobs\n", count)

By default, the workflow engine will automatically cancel workflow jobs if any of their dependencies are cancelled, discarded, or deleted. This behavior can be overridden on a per-workflow basis with [Opts], or on a per-task basis with [TaskOpts].

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IDFromJobRow

func IDFromJobRow(job *rivertype.JobRow) string

IDFromJobRow extracts the workflow ID from a job row's metadata.

func IDFromMetadata

func IDFromMetadata(metadata []byte) string

IDFromMetadata extracts the workflow ID from metadata.

func JobListParams

func JobListParams(job *rivertype.JobRow, params *river.JobListParams) (*river.JobListParams, error)

JobListParams extracts the workflow ID from a job row and returns a river.JobListParams with the workflow ID set in order to filter the job list by workflow.

func JobListParamsByID

func JobListParamsByID(workflowID string, params *river.JobListParams) (*river.JobListParams, error)

JobListParamsByID returns a river.JobListParams with the workflow ID set in order to filter the job list by workflow.

func NameFromJobRow

func NameFromJobRow(job *rivertype.JobRow) string

NameFromJobRow extracts the workflow's name from a job row's metadata.

func TaskFromJobRow

func TaskFromJobRow(job *rivertype.JobRow) string

TaskFromJobRow extracts the workflow task's name from a job row's metadata.

Types

This section is empty.