Skip to content

Workflows

River Pro workflows allow you to define a graph of interdependent jobs to express complex, multi-step workflows, including fan-out and fan-in execution. Workflows are a powerful tool for orchestrating tasks that depend on each other, and can be used to model a wide range of business processes.

Workflows are modeled as a directed acyclic graph (DAG), where each task may specify dependencies on other tasks. Tasks will not begin excecution until all of their dependencies have completed successfully. Additionally, scheduled jobs will respect their ScheduledAt time and will not begin until that time has passed and their dependencies have been satisfied.

Tasks may run in parallel if their dependencies have been met, enabling intensive jobs to be distributed across many machines.

Workflows also include a web UI as part of River UI, which allows you to visualize the state of your workflows and tasks in real-time. Check out the live demo to see it in action.


Basic usage

Workflows are powered by the riverpro package within River Pro. If you haven't yet, install River Pro and run the workflow migration line.

Workflow migrations

Workflows use River's existing database structure. However, to perform optimally they require additional indexes added as part of the workflow migration line.

Workflows are created with a workflow builder struct using riverpro.NewWorkflow(), and tasks are added to the workflow until it is prepared for insertion. Jobs and args are defined like any other River job.

import (
    "riverqueue.com/riverpro"
)

// MyJobArgs is a sample River JobArgs struct
type MyJobArgs struct {
    // ...
}

func (MyJobArgs) Kind() string { return "my_job" }

func SampleWorkflow() *riverpro.Workflow {
    // Create a new workflow:
    workflow := riverpro.NewWorkflow(&riverpro.WorkflowOpts{Name: "My first workflow"})

    // Add a first task to the workflow, named "a":
    taskA := workflow.Add("a", MyJobArgs{}, nil, nil)

    // Fan-out to tasks b1 and b2, which  both depend on task a:
    taskB1 := workflow.Add("b1", MyJobArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{taskA.Name}})
    taskB2 := workflow.Add("b2", MyJobArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{taskA.Name}})

    // Fan-in to task c, which depends on both b1 and b2:
    taskC := workflow.Add("c", MyJobArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{taskB1.Name, taskB2.Name}})

    var _ = taskC // avoids "declared and not used" error

    return workflow
}

func main() {
    ctx := context.Background()
    riverClient, err := river.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
        Config: river.Config{
            Queues: map[string]river.QueueConfig{
                river.QueueDefault: {MaxWorkers: 100},
            },
            Workers: workers,
        },
    })

    // Prepare the workflow for insertion and validate it:
    result, err := riverClient.WorkflowPrepare(ctx, SampleWorkflow())
    if err != nil {
        panic(err)
    }

    // The result.Jobs field holds a slice of river.InsertManyOpts which can be
    // enqueued with a riverClient.InsertMany / InsertManyTx call:
    if _, err := riverClient.InsertMany(ctx, result.Jobs); err != nil {
        panic(err)
    }

    // continue execution, stop client, etc...
}

Error handling and retries

Individual tasks within a workflow may error, and are subject to the same retry rules as any other job. This enables one of the key benefits of using workflows: by breaking down complex multi-step routines into individually retryable components, each piece becomes easier to reason about and easier to build in an idempotent way. Workflow tasks also enable granular control over the retry behavior and timeouts of each piece of work.

Example of breaking down a complex workflow

To illustrate how a complex process can be broken apart into workflow tasks, consider a monthly billing job.

  1. At the start of the process, there may be a slow, computationally-intensive task that crunches data and makes many queries. This step can be safely retried as many times as necessary until it saves its results transactionally along with marking the job as completed.

  2. The next step in the workflow may be to create a charge on Stripe. This task can be retried independently of the previous step, without ever needing to repeat the computationally-intensive billing calculation. It can also leverage the job's unique ID as part of the Stripe idempotency key. This task can retry as many times as necessary until it receives a final response from Stripe and saves its result to the billing record.

  3. After that, the next task can generate a receipt PDF and put it on cloud storage, safely retrying if necessary without repeating the billing computations or the credit card charge call.

  4. A final task will email that receipt to the user, but can be given a MaxAttempts: 2 in order to avoid spamming customers if something is misbehaving with the email API. This property is necessary since most of the big email APIs still haven't figured out Stripe-style API idempotency, and you don't want to spam your customers if the job keeps retrying.

Each task in this workflow benefits from being able to retry independently of the others. Splitting the tasks apart makes them simpler to understand, easier to implement correctly in a retryable fashion, and avoids unnecessary repeat work in the event of a retry.

Tasks with failed dependencies

A tasks's dependencies are considered to have failed when they are:

  • Discarded due to exceeding their retry limit
  • Cancelled
  • Deleted (no longer existing in the database)

By default, all tasks with failed dependencies are cancelled. This behavior can be customized at the level of an individual workflow using the IgnoreCancelledDeps, IgnoreDiscardedDeps, and IgnoreDeletedDeps options on either the workflow Opts or the TaskOpts. These options allow you to control whether a task's dependency should be considered successful despite it having entered into one of these failed states.

Creating a new workflow

New workflows are created with riverpro.NewWorkflow(), which takes an optional riverpro.WorkflowOpts struct. It's best to give your workflow a human-readable Name to make its jobs easier to identify (particularly in River UI), though this is not required.

workflow := riverpro.NewWorkflow(&riverpro.WorkflowOpts{Name: "Onboard_Customer_12345"})

The workflow's ID is automatically generated and does not need to be specified. It may be customized as part of the workflow opts. If you do customize the workflow ID, it is essential that the ID be globally unique, because the workflow ID is used for scheduling of tasks within the workflow. Additionally, it's best to choose an ID scheme that lends itself to lexicographical sorting like the default ULID scheme.

Once created, tasks can be added with the .Add() method. Each task must have a unique name within the workflow, and the task's name is used to specify dependencies between tasks:

taskA := workflow.Add("a", MyJobArgs{}, nil, nil)
taskB := workflow.Add("b", MyJobArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{taskA.Name}})

Finally, the workflow must be prepared for insertion with Client.WorkflowPrepare() or Client.WorkflowPrepareTx(). These functions validate the workflow's dependency graph, ensuring there are no cycles or missing dependencies that would cause it to fail. They return a riverpro.WorkflowPrepareResult which contains river.InsertManyParams that can be inserted into the database:

// client is a riverpro.Client:
result, err := client.WorkflowPrepare(ctx, workflow)
if err != nil {
    return err
}

if _, err := client.InsertMany(ctx, result.Jobs); err != nil {
    return err
}

Workflows are independent

Each individual instance of a workflow is independent of others and has its own tasks. This means you can use a different set of tasks for each instance of a workflow, even if they are named or constructed similarly.

If tasks are added to an existing workflow, they're only added to that specific instance of the workflow.

Adding tasks to an existing workflow

Tasks may be dynamically added to an existing workflow using the same API as when creating a new workflow. This is useful for workflows whose steps are based on data that can't be known in advance when the workflow is initially created.

First, the workflow must be initiated from an existing job in the workflow:

workflow, err := riverpro.WorkflowFromExisting(job.JobRow, nil)
if err != nil {
    return err
}

Tasks are added to the workflow as usual:

task := workflow.Add("new_task", MyJobArgs{}, nil, nil)

Finally, the workflow must be prepared for insertion and validated:

result, err := riverClient.WorkflowPrepare(ctx, workflow)
if err != nil {
    panic(err)
}

// Insert the new task(s):
if _, err := riverClient.InsertMany(ctx, result.Jobs); err != nil {
    panic(err)
}

Here is a complete example of doing this within a worker for another task in the workflow:

type MyWorker struct {
    river.WorkerDefaults[MyWorkerArgs]
    dbPool *pgxpool.Pool
}

func (*MyWorker) Work(ctx context.Context, job *river.Job[MyWorkerArgs]) error {
    riverClient := riverpro.ClientFromContext(ctx)

    // Get the workflow from the existing job:
    workflow, err := riverpro.WorkflowFromExisting(job.JobRow, nil)
    if err != nil {
        return err
    }

    // Add a new task to the workflow:
    task := workflow.Add("new_task", MyJobArgs{}, nil, nil)

    // Open a transaction so we can insert new tasks and complete this one atomically:
    tx, err := w.dbPool.Begin(ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(ctx)

    // Prepare the workflow for insertion and validate it:
    result, err := riverClient.WorkflowPrepareTx(ctx, tx, workflow)
    if err != nil {
        return err
    }

    // Insert the new task:
    if _, err := riverClient.InsertManyTx(ctx, tx, result.Jobs); err != nil {
        return err
    }

    // Complete the current task:
    if err := river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job); err != nil {
        return err
    }

    // Commit the transaction and return:
    return tx.Commit(ctx)
}

Workflows in River UI

Workflows also include a web UI as part of River UI, which lets you visualize the state of your workflows and tasks in real-time. This functionality works automatically if you are using the workflow feature.

Check out the live demo to see it in action.

Cancelling a workflow

Workflows can be controlled using the riverpro.Client type. The WorkflowCancel and WorkflowCancelTx methods allow you to cancel all non-finalized tasks in a workflow. These methods are useful for cleaning up workflows that are no longer needed or have failed.

result, err := riverClient.WorkflowCancel(ctx, workflowID)
if err != nil {
    // handle error
}
fmt.Printf("cancelled %d jobs", len(result.CancelledJobs))