River

riverpro

package module
v0.24.0 Latest
Published: May 19, 2026 License: Proprietary

Package riverpro is an extension for River, the robust high-performance job processing system for Go and Postgres.

See homepage, docs, and the River UI.

Overview

River Pro extends River through the use of a "pro" client and driver that wraps the standard River variants. This allows for additional functionality to be injected into the River client.

import (
	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
)

riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
	Config: riverpro.Config{
		// Standard River configuration is embedded in the Pro config:
		Config: river.Config{
			Queues: map[string]river.QueueConfig{
					river.QueueDefault: {MaxWorkers: 100},
			},
			Workers: workers,
		},
	},
})

Once the Pro driver is in place, River Pro functionality can be used.

Batching

River Pro batching allows simultaneous execution of many similar jobs together as a single group instead of executing each job individually. See riverqueue.com/riverpro/riverbatch for more details.

Sequences

River Pro sequences guarantee that a specific series of jobs will be executed in a one-at-a-time sequential order relative to other jobs in the same sequence. Sequences are partitioned based upon a "sequence key" that is computed from various job attributes such as its kind and args (or a subset of args).

Jobs across sequences may run in parallel. Unlike unique jobs, sequences allow an infinite number of jobs to be queued up in the sequence, even though only one job will be worked at a time.

See the sequence docs for more information on how to use sequences.

Workflows

River Pro workflows allow you to define a graph of interdependent jobs to express complex, multi-step workflows, including fan-out and fan-in execution. Workflows are a powerful tool for orchestrating tasks that depend on each other, and can be used to model a wide range of business processes.

Workflows allow you to compose jobs in a directed acyclic graph (DAG) where each job can depend on the completion of other jobs. Jobs will not be available for work until all of their dependencies have completed successfully. In addition, jobs that are scheduled to run in the future at time T will not execute until all their dependencies have completed _and_ the current time is greater than T.

import (
	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
)

ctx := context.Background()

client, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
	// ...
})

workflow := client.NewWorkflow(&river.WorkflowOpts{Name: "Send invoice to Alice"})
taskA := workflow.Add("task_a", issueInvoice{Email: "alice@example.com"}, nil, nil)
workflow.Add("task_b", sendInvoiceEmailArgs{Email: "alice@example.com"}, nil, &riverpro.WorkflowTaskOpts{
	Deps: []string{taskA.Name},
})
result, err := workflow.Prepare(ctx)
if err != nil {
	log.Fatal(err)
}
count, err := client.InsertMany(ctx, result.Jobs)
if err != nil {
	log.Fatal(err)
}
fmt.Printf("inserted %d jobs\n", count)

By default, the workflow engine will automatically cancel workflow jobs if any of their dependencies are cancelled, discarded, or deleted. This behavior can be overridden on a per-workflow basis with WorkflowOpts, or on a per-task basis with WorkflowTaskOpts.

See the workflow docs for more information on how to use workflows.

Example (BatchWorker)

Example_batchWorker demonstrates how to use a riverqueue.com/riverpro/riverbatch.ManyWorker to process a set of jobs together as a single unit.

package main

import (
	"context"
	"errors"
	"fmt"
	"log/slog"
	"os"
	"slices"
	"time"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/sliceutil"
	"github.com/riverqueue/river/rivershared/util/slogutil"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverbatch"
)

type BulkTerminateInstanceArgs struct {
	InstanceID int64 `json:"customer_id" river:"batch"`
}

func (BulkTerminateInstanceArgs) Kind() string { return "bulk_terminate_instance" }

func (a BulkTerminateInstanceArgs) BatchOpts() riverpro.BatchOpts {
	return riverpro.BatchOpts{}
}

type BulkTerminateInstanceWorker struct {
	river.WorkerDefaults[BulkTerminateInstanceArgs]
}

func (w *BulkTerminateInstanceWorker) Work(ctx context.Context, job *river.Job[BulkTerminateInstanceArgs]) error {
	return riverbatch.Work[BulkTerminateInstanceArgs, pgx.Tx](ctx, w, job, &riverbatch.WorkerOpts{
		MaxCount: 5,
		// Very short delays to make the example fast:
		MaxDelay:     10 * time.Millisecond,
		PollInterval: 10 * time.Millisecond,
	})
}

func (w *BulkTerminateInstanceWorker) NextRetry(job *river.Job[BulkTerminateInstanceArgs]) time.Time {
	// Always retry immediately for this example:
	return time.Now()
}

func (w *BulkTerminateInstanceWorker) WorkMany(ctx context.Context, jobs []*river.Job[BulkTerminateInstanceArgs]) error {
	sortedIDs := sliceutil.Map(jobs, func(job *river.Job[BulkTerminateInstanceArgs]) int64 { return job.Args.InstanceID })
	slices.Sort(sortedIDs)

	fmt.Printf("Terminating a batch of %d instances with ids=%v\n", len(jobs), sortedIDs)

	err := riverbatch.NewMultiError()
	for _, job := range jobs {
		// instance IDs divisible by 10 fail with an error, but only on the first attempt:
		if job.Args.InstanceID%10 == 0 && job.Attempt == 1 {
			err.AddByID(job.ID, errors.New("transient failure"))
		}
	}

	return err
}

// Example_batchWorker demonstrates how to use a [riverqueue.com/riverpro/riverbatch.ManyWorker] to
// process a set of jobs together as a single unit.
func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	worker := &BulkTerminateInstanceWorker{}
	river.AddWorker(workers, worker)

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Logger:  slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError, ReplaceAttr: slogutil.NoLevelTime})),
			Queues:  map[string]river.QueueConfig{},
			Workers: workers,
		},
		ProQueues: map[string]riverpro.QueueConfig{
			river.QueueDefault: {
				Concurrency: riverpro.ConcurrencyConfig{
					// Allow only batch leader job to run at once globally, per kind
					// (separate job kinds get batched separately):
					GlobalLimit: 1,
					Partition: riverpro.PartitionConfig{
						ByKind: true,
					},
				},
				MaxWorkers: 10,
			},
		},
	}))
	if err != nil {
		panic(err)
	}

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	subscribeChan, cancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer cancel()

	// Insert 10 jobs at once:
	insertParams := make([]river.InsertManyParams, 10)
	for i := range insertParams {
		insertParams[i] = river.InsertManyParams{Args: BulkTerminateInstanceArgs{
			InstanceID: int64(i) + 1,
		}}
	}

	if _, err := riverClient.InsertMany(ctx, insertParams); err != nil {
		panic(err)
	}

	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 10)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

Output:

Terminating a batch of 5 instances with ids=[1 2 3 4 5]
Terminating a batch of 5 instances with ids=[6 7 8 9 10]
Terminating a batch of 1 instances with ids=[10]
Example (ClientSetup)

Example_clientSetup demonstrates end-to-end River Pro setup with Postgres, workers, subscriptions, and a running client.

package main

import (
	"cmp"
	"context"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdbtest"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/slogutil"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
)

type clientSetupArgs struct{}

func (clientSetupArgs) Kind() string { return "notify_customer" }

// Example_clientSetup demonstrates end-to-end River Pro setup with Postgres,
// workers, subscriptions, and a running client.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, job *river.Job[clientSetupArgs]) error {
		fmt.Println("customer notified order_id=ord_123")
		return nil
	}))

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{Name: "workflow_wiring"})
	workflow.Add("notify_customer", clientSetupArgs{}, nil, nil)

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// Wait for completion before stopping so output remains deterministic.
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

// initTestConfig initializes properties on a given River Pro config with
// defaults suitable for example tests, including an isolated test schema and
// example-friendly logger. It's meant to keep the main example code more terse
// and its use can be removed when copy/pasting from examples.
//
// This helper is intentionally duplicated across example packages so setup
// stays discoverable in each package's docs without introducing an internal
// example-only helper package.
//
// Most workflow examples call this helper to keep setup terse while leaving
// `pgxpool.New` and `riverpro.NewClient` wiring visible in each example.
func initTestConfig(ctx context.Context, dbPool *pgxpool.Pool, config *riverpro.Config) *riverpro.Config {
	// Keep example schema and test-only behavior deterministic.
	config.Schema = riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpropgxv5.New(dbPool), nil)
	config.TestOnly = true

	if config.Workers != nil {
		config.FetchCooldown = cmp.Or(config.FetchCooldown, 50*time.Millisecond)
		config.FetchPollInterval = cmp.Or(config.FetchPollInterval, 50*time.Millisecond)
		if config.Logger == nil {
			// Keep internal client logs off stdout so doctest output stays stable.
			config.Logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
				Level:       slog.LevelWarn,
				ReplaceAttr: slogutil.NoLevelTime,
			}))
		}
		if config.Queues == nil {
			config.Queues = map[string]river.QueueConfig{
				river.QueueDefault: {MaxWorkers: 100},
			}
		}
	}

	return config
}

Output:

customer notified order_id=ord_123
Example (DurablePeriodicJob)

Example_durablePeriodicJob demonstrates the use of a durable periodic job. With durable periodic jobs, next run times are stored to the database and therefore run reliably at their target times even across client restarts.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
)

type PeriodicJobArgs struct{}

// Kind is the unique string name for this job.
func (PeriodicJobArgs) Kind() string { return "periodic" }

// PeriodicJobWorker is a job worker for sorting strings.
type PeriodicJobWorker struct {
	river.WorkerDefaults[PeriodicJobArgs]
}

func (w *PeriodicJobWorker) Work(ctx context.Context, job *river.Job[PeriodicJobArgs]) error {
	fmt.Printf("This job will run once immediately then once every 15 minutes\n")
	return nil
}

// Example_durablePeriodicJob demonstrates the use of a durable periodic job.
// With durable periodic jobs, next run times are stored to the database and
// therefore run reliably at their target times even across client restarts.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &PeriodicJobWorker{})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			PeriodicJobs: []*river.PeriodicJob{
				river.NewPeriodicJob(
					&onceImmediatelyThenEvery15Minutes{},
					func() (river.JobArgs, *river.InsertOpts) {
						return PeriodicJobArgs{}, nil
					},
					&river.PeriodicJobOpts{
						// inclusion of an ID makes periodic job durable (as
						// long as DurablePeriodicJob.Enable below is true);
						// must be unique across periodic jobs
						ID: "my_periodic_job",
					},
				),
			},
			Workers: workers,
		},
		DurablePeriodicJobs: riverpro.DurablePeriodicJobsConfig{
			Enabled: true,
		},
	}))
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	// There's no need to explicitly insert a periodic job. One will be inserted
	// (and worked soon after) as the client starts up.
	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// Wait for jobs to complete. Only needed for purposes of the example test.
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

// A bit contrived, but a custom schedule that runs once immediately, then once
// every 15 minutes. This lets us get away without having to include the
// RunOnStart option (whose use doesn't make much sense with durable periodic
// jobs), but also have the test execute its first job immediately so there
// isn't a wait that slows down the test suite.
type onceImmediatelyThenEvery15Minutes struct{ calledOnce bool }

func (s *onceImmediatelyThenEvery15Minutes) Next(t time.Time) time.Time {
	if s.calledOnce {
		return t.Add(15 * time.Minute)
	}

	s.calledOnce = true
	return t
}

Output:

This job will run once immediately then once every 15 minutes
Example (EphemeralJob)

Example_ephemeralJob demonstrates ephemeral jobs, which are jobs that are immediately deleted upon completion, trading off some operational visibility for potential stability as fewer operations are needed later to prune them.

package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"
	"github.com/riverqueue/river/rivertype"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
)

type EphemeralJobArgs struct{}

func (a EphemeralJobArgs) Kind() string { return "ephemeral" }

func (a EphemeralJobArgs) EphemeralOpts() riverpro.EphemeralOpts { return riverpro.EphemeralOpts{} }

type EphemeralJobWorker struct {
	river.WorkerDefaults[EphemeralJobArgs]
}

func (w *EphemeralJobWorker) Work(ctx context.Context, job *river.Job[EphemeralJobArgs]) error {
	fmt.Printf("This job will run and be immediately deleted from the database\n")
	return nil
}

// Example_ephemeralJob demonstrates ephemeral jobs, which are jobs that are
// immediately deleted upon completion, trading off some operational visibility
// for potential stability as fewer operations are needed later to prune them.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &EphemeralJobWorker{})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	insertRes, err := riverClient.Insert(ctx, EphemeralJobArgs{}, nil)
	if err != nil {
		panic(err)
	}

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// Wait for jobs to complete. Only needed for purposes of the example test.
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

	_, err = riverClient.JobGet(ctx, insertRes.Job.ID)
	if errors.Is(err, rivertype.ErrNotFound) {
		fmt.Printf("Job was worked and is now gone from the database")
	} else {
		panic("expected to get rivertype.ErrrNotFound")
	}

}

Output:

This job will run and be immediately deleted from the database
Job was worked and is now gone from the database
Example (EphemeralQueue)

Example_ephemeralQueue demonstrates addings a job to an ephemeral queue, which is a queue where jobs are immediately deleted upon completion, trading off some operational visibility for potential stability as fewer operations are needed later to prune them.

ctx := context.Background()

dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
	panic(err)
}
defer dbPool.Close()

workers := river.NewWorkers()
river.AddWorker(workers, &testworker.NoOpWorker{})

// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
	Config: river.Config{
		Workers: workers,
	},
	ProQueues: map[string]riverpro.QueueConfig{
		"my_ephemeral_queue": {
			Ephemeral: riverpro.QueueEphemeralConfig{
				Enabled: true,
			},
			MaxWorkers: 100,
		},
	},
}))
if err != nil {
	panic(err)
}

// Out of example scope, but used to wait until a job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()

insertRes, err := riverClient.Insert(ctx, testworker.NoOpArgs{}, &river.InsertOpts{Queue: "my_ephemeral_queue"})
if err != nil {
	panic(err)
}

if err := riverClient.Start(ctx); err != nil {
	panic(err)
}

// Wait for jobs to complete. Only needed for purposes of the example test.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

if err := riverClient.Stop(ctx); err != nil {
	panic(err)
}

_, err = riverClient.JobGet(ctx, insertRes.Job.ID)
if errors.Is(err, rivertype.ErrNotFound) {
	fmt.Printf("Job was worked and is now gone from the database")
} else {
	panic("expected to get rivertype.ErrrNotFound")
}

// Output:
// Job was worked and is now gone from the database
Example (GlobalConcurrencyLimiting)

Example_globalConcurrencyLimiting demonstrates how to use global concurrency limiting to limit the number of jobs of a given kind that can run at once.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"sync/atomic"
	"time"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdbtest"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/slogutil"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
)

type ConcurrentLimitedArgs struct{}

func (ConcurrentLimitedArgs) Kind() string { return "concurrent_limited" }

type ConcurrentLimitedWorker struct {
	river.WorkerDefaults[ConcurrentLimitedArgs]

	jobsRunning *atomic.Int64
}

func (w *ConcurrentLimitedWorker) Work(ctx context.Context, job *river.Job[ConcurrentLimitedArgs]) error {
	w.jobsRunning.Add(1)
	defer w.jobsRunning.Add(-1)

	if count := w.jobsRunning.Load(); count > 2 {
		fmt.Printf("too many jobs running: %d\n", count)
		return fmt.Errorf("too many jobs running: %d", count)
	}

	return nil
}

// Example_globalConcurrencyLimiting demonstrates how to use global concurrency
// limiting to limit the number of jobs of a given kind that can run at once.
func main() {
	ctx, cancel := context.WithCancel(context.Background())

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	worker := &ConcurrentLimitedWorker{jobsRunning: &atomic.Int64{}}
	river.AddWorker(workers, worker)

	startNewClientAndSubscribe := func() (*riverpro.Client[pgx.Tx], <-chan *river.Event, func()) {
		riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
			Config: river.Config{
				Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
				// Poll quickly so the example is fast:
				FetchCooldown:     50 * time.Millisecond,
				FetchPollInterval: 50 * time.Millisecond,
				Schema:            riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpropgxv5.New(dbPool), nil), // only necessary for the example test
				TestOnly:          true,                                                                            // suitable only for use in tests; remove for live environments
				Workers:           workers,
			},
			ProQueues: map[string]riverpro.QueueConfig{
				river.QueueDefault: {
					Concurrency: riverpro.ConcurrencyConfig{
						// Allow up to 2 of each kind of job to run at once globally.
						GlobalLimit: 2,
						// Allow no more than 1 of each kind of job to run at once per client.
						LocalLimit: 1,
						Partition: riverpro.PartitionConfig{
							ByKind: true,
						},
					},
					MaxWorkers: 10,
				},
			},
		})
		if err != nil {
			panic(err)
		}

		if err := riverClient.Start(ctx); err != nil {
			panic(err)
		}

		subscribeChan, cancel := riverClient.Subscribe(river.EventKindJobCompleted)
		return riverClient, subscribeChan, cancel
	}

	// Use two clients to simulate two different workers. In real usage, these
	// would probably be on different machines.
	client1, subscribeChan1, cancel1 := startNewClientAndSubscribe()
	defer cancel1()
	client2, subscribeChan2, cancel2 := startNewClientAndSubscribe()
	defer cancel2()

	insertParams := make([]river.InsertManyParams, 10)
	for i := range insertParams {
		insertParams[i] = river.InsertManyParams{Args: ConcurrentLimitedArgs{}}
	}

	if _, err = client1.InsertMany(ctx, insertParams); err != nil {
		panic(err)
	}

	shutdown := func() {
		cancel() // intiate hard shutdown on both clients to stop fetching jobs
		<-client1.Stopped()
		<-client2.Stopped()
	}

	jobsCompleted := 0

	for range 10 {
		// Do an extra check to ensure no more than 2 jobs are running:
		numRunning := worker.jobsRunning.Load()
		if numRunning > 2 {
			fmt.Printf("Unexpected number of jobs running: %d\n", numRunning)
			shutdown()
			return
		}

		select {
		case <-time.After(2 * time.Second):
			fmt.Println("timed out waiting for jobs to complete")
			shutdown()
			return
		case <-subscribeChan1:
		case <-subscribeChan2:
		}
		jobsCompleted++
	}

	fmt.Printf("Jobs completed: %d\n", jobsCompleted)
	shutdown()

}

Output:

Jobs completed: 10
Example (PerQueueRetention)

Example_perQueueRetention demonstrates the use of per-queue job retention settings for cancelled, completed, and discarded jobs. This example test doesn't show much beyond the syntax for configuration custom retention on a pro queue.

ctx := context.Background()

dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
	panic(err)
}
defer dbPool.Close()

workers := river.NewWorkers()
river.AddWorker(workers, &testworker.NoOpWorker{})

// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
_, err = riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
	Config: river.Config{
		Queues:  map[string]river.QueueConfig{},
		Workers: workers,
	},
	ProQueues: map[string]riverpro.QueueConfig{
		river.QueueDefault: {
			CancelledJobRetentionPeriod: 24 * time.Hour,
			CompletedJobRetentionPeriod: 24 * time.Hour,
			DiscardedJobRetentionPeriod: 7 * 24 * time.Hour,
			MaxWorkers:                  100,
		},
	},
}))
if err != nil {
	panic(err)
}

// Output:
Example (WorkflowDependencyOutput)

Example_workflowDependencyOutput demonstrates loading dependency task output from inside a downstream worker.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

package main

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type shipOrderArgs struct {
	OrderID string
}

func (shipOrderArgs) Kind() string { return "ship_order" }

type shipOrderWorker struct {
	river.WorkerDefaults[shipOrderArgs]
}

func (w *shipOrderWorker) Work(ctx context.Context, job *river.Job[shipOrderArgs]) error {
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}

	deps, err := workflow.LoadDepsByJob(ctx, job.JobRow, nil)
	if err != nil {
		return err
	}

	// Dependency outputs are loaded by dependency task name.
	var scoreOutput fraudScoreOutput
	if err := deps.Output("score_fraud", &scoreOutput); err != nil {
		return err
	}

	fmt.Printf("loaded dependency fraud score=%d\n", scoreOutput.Score)
	return nil
}

type fraudScoreArgs struct {
	OrderID string
}

func (fraudScoreArgs) Kind() string { return "score_fraud" }

type fraudScoreOutput struct {
	Score int `json:"score"`
}

type fraudScoreWorker struct {
	river.WorkerDefaults[fraudScoreArgs]
}

func (w *fraudScoreWorker) Work(ctx context.Context, job *river.Job[fraudScoreArgs]) error {
	// Record structured output so downstream tasks can read it via `LoadDeps*`.
	output := fraudScoreOutput{Score: 92}
	if err := river.RecordOutput(ctx, output); err != nil {
		return err
	}

	fmt.Printf("fraud scored: %d\n", output.Score)
	return nil
}

// Example_workflowDependencyOutput demonstrates loading dependency task output
// from inside a downstream worker.
//
// For full setup wiring including `initTestConfig`, see the package-level
// ClientSetup example.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &shipOrderWorker{})
	river.AddWorker(workers, &fraudScoreWorker{})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{Name: "order_fulfillment"})
	riskTask := workflow.Add("score_fraud", fraudScoreArgs{OrderID: "ord_123"}, nil, nil)
	workflow.Add("ship_order", shipOrderArgs{OrderID: "ord_123"}, nil, &riverpro.WorkflowTaskOpts{
		Deps: []string{riskTask.Name},
		Wait: &riverworkflow.WaitSpec{
			Terms: []riverworkflow.WaitTermSpec{
				riverworkflow.WaitTerm("fraud_score_ready", `deps["score_fraud"].output.score >= 90`),
			},
		},
	})

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// Wait for score_fraud and then ship_order.
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 2)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

Output:

fraud scored: 92
loaded dependency fraud score=92
Example (WorkflowTaskSignalData)

Example_workflowTaskSignalData demonstrates emitting one signal, meeting a wait, and loading a task-scoped signal payload from inside the waiting worker.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"
	"github.com/riverqueue/river/rivertype"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type orderApprovalArgs struct {
	OrderID string
}

func (orderApprovalArgs) Kind() string { return "approve_order" }

type orderApprovalSignalPayload struct {
	Approved bool   `json:"approved"`
	Reviewer string `json:"reviewer"`
}

type orderApprovalWorker struct {
	river.WorkerDefaults[orderApprovalArgs]
}

func (w *orderApprovalWorker) Work(ctx context.Context, job *river.Job[orderApprovalArgs]) error {
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}

	taskName := riverworkflow.TaskFromJobRow(job.JobRow)
	reviewSignals, err := workflow.Signals().ListForTask(ctx, taskName, &riverpro.WorkflowSignalListForTaskParams{
		Desc:  true,
		Key:   "manual_review",
		Limit: 1,
	})
	if err != nil {
		return err
	}
	if len(reviewSignals.Signals) == 0 {
		return rivertype.ErrNotFound
	}
	reviewSignal := reviewSignals.Signals[0]

	var reviewPayload orderApprovalSignalPayload
	if err := json.Unmarshal(reviewSignal.Payload, &reviewPayload); err != nil {
		return err
	}

	fmt.Printf("manual review approved reviewer=%s\n", reviewPayload.Reviewer)
	return nil
}

// Example_workflowTaskSignalData demonstrates emitting one signal, meeting a
// wait, and loading a task-scoped signal payload from inside the waiting worker.
//
// For full setup wiring including `initTestConfig`, see the package-level
// ClientSetup example.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &orderApprovalWorker{})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{Name: "order_fulfillment"})
	workflow.Add("approve_order", orderApprovalArgs{OrderID: "ord_123"}, nil, &riverpro.WorkflowTaskOpts{
		Wait: &riverworkflow.WaitSpec{
			// Wait condition requires a positive manual review signal with a reviewer value.
			Terms: []riverworkflow.WaitTermSpec{
				riverworkflow.WaitTermSignal("manual_review_received", "manual_review", `payload.approved == true && payload.reviewer != ""`),
			},
		},
	})

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// This signal resolves the wait and provides the payload the worker reads.
	if _, err := workflow.Signals().Emit(ctx, "manual_review", orderApprovalSignalPayload{
		Approved: true,
		Reviewer: "manager:bob",
	}, nil); err != nil {
		panic(err)
	}

	// Wait for approve_order to complete after the signal resolves the wait.
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

Output:

manual review approved reviewer=manager:bob
Example (WorkflowWaitMixedTermsAndRawCEL)

Example_workflowWaitMixedTermsAndRawCEL demonstrates a structured term and raw CEL signal input in the same top-level wait expression.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

package main

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type mixedRiskHoldDecisionArgs struct {
	OrderID string
}

func (mixedRiskHoldDecisionArgs) Kind() string { return "resolve_mixed_risk_hold" }

type mixedRiskHoldDecisionWorker struct {
	river.WorkerDefaults[mixedRiskHoldDecisionArgs]
}

func (w *mixedRiskHoldDecisionWorker) Work(ctx context.Context, job *river.Job[mixedRiskHoldDecisionArgs]) error {
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}

	taskName := riverworkflow.TaskFromJobRow(job.JobRow)
	self, err := workflow.LoadTask(ctx, taskName)
	if err != nil {
		return err
	}
	if self.Wait == nil || self.Wait.ResolvedAt == nil {
		return fmt.Errorf("task %q expected wait result", taskName)
	}
	overrideInput := self.Wait.SignalInput("risk_override")
	if overrideInput == nil || overrideInput.Result == nil || overrideInput.Result.IncludedCount == 0 {
		return fmt.Errorf("task %q expected risk_override wait evidence", taskName)
	}
	riskHoldTerm := self.Wait.Term("risk_hold_received")
	if riskHoldTerm != nil && riskHoldTerm.Result != nil && riskHoldTerm.Result.Satisfied {
		return fmt.Errorf("task %q resolved through risk_hold_received unexpectedly", taskName)
	}

	fmt.Println("risk override cleared hold")
	return nil
}

// Example_workflowWaitMixedTermsAndRawCEL demonstrates a structured term and
// raw CEL signal input in the same top-level wait expression.
//
// For full setup wiring including `initTestConfig`, see the package-level
// ClientSetup example.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &mixedRiskHoldDecisionWorker{})

	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{Name: "order_fulfillment"})
	workflow.Add("resolve_mixed_risk_hold", mixedRiskHoldDecisionArgs{OrderID: "ord_456"}, nil, &riverpro.WorkflowTaskOpts{
		Wait: &riverworkflow.WaitSpec{
			Expr: `risk_hold_received || signals["risk_override"].exists(s, s.payload.active == false)`,
			Inputs: riverworkflow.WaitInputs{
				Signals: []string{"risk_override"},
			},
			Terms: []riverworkflow.WaitTermSpec{
				riverworkflow.WaitTermSignal("risk_hold_received", "risk_hold", `payload.active == true`),
			},
		},
	})

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	if _, err := workflow.Signals().Emit(ctx, "risk_override", map[string]any{"active": false}, nil); err != nil {
		panic(err)
	}

	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

Output:

risk override cleared hold
Example (WorkflowWaitRawCEL)

Example_workflowWaitRawCEL demonstrates a wait written directly as CEL over explicitly declared signal inputs.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type rawCELApprovalArgs struct {
	OrderID string
}

func (rawCELApprovalArgs) Kind() string { return "approve_order_raw_cel" }

type rawCELApprovalSignalPayload struct {
	Approved bool   `json:"approved"`
	Reviewer string `json:"reviewer"`
}

type rawCELApprovalWorker struct {
	river.WorkerDefaults[rawCELApprovalArgs]
}

func (w *rawCELApprovalWorker) Work(ctx context.Context, job *river.Job[rawCELApprovalArgs]) error {
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}

	taskName := riverworkflow.TaskFromJobRow(job.JobRow)
	reviewSignals, err := workflow.Signals().ListForTask(ctx, taskName, &riverpro.WorkflowSignalListForTaskParams{
		Desc:  true,
		Key:   "manual_review",
		Limit: 1,
	})
	if err != nil {
		return err
	}
	reviewSignal := reviewSignals.Signals[0]

	var reviewPayload rawCELApprovalSignalPayload
	if err := json.Unmarshal(reviewSignal.Payload, &reviewPayload); err != nil {
		return err
	}

	fmt.Printf("manual review approved reviewer=%s\n", reviewPayload.Reviewer)
	return nil
}

// Example_workflowWaitRawCEL demonstrates a wait written directly as CEL over
// explicitly declared signal inputs.
//
// For full setup wiring including `initTestConfig`, see the package-level
// ClientSetup example.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &rawCELApprovalWorker{})

	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{Name: "order_fulfillment"})
	workflow.Add("approve_order_raw_cel", rawCELApprovalArgs{OrderID: "ord_456"}, nil, &riverpro.WorkflowTaskOpts{
		Wait: &riverworkflow.WaitSpec{
			Expr: `signals["manual_review"].exists(s, s.payload.approved == true && s.payload.reviewer != "")`,
			Inputs: riverworkflow.WaitInputs{
				Signals: []string{"manual_review"},
			},
		},
	})

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	if _, err := workflow.Signals().Emit(ctx, "manual_review", rawCELApprovalSignalPayload{
		Approved: true,
		Reviewer: "manager:alice",
	}, nil); err != nil {
		panic(err)
	}

	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

Output:

manual review approved reviewer=manager:alice
Example (WorkflowWaitResult_timeoutVsSignal)

Example_workflowWaitResult_timeoutVsSignal shows how to introspect a task's wait after it resolves. This example determines whether a manual-review signal arrived or an SLA timer fired.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"
	"github.com/riverqueue/river/rivertype"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type shipmentDecisionArgs struct {
	OrderID string
}

func (shipmentDecisionArgs) Kind() string { return "decide_shipment" }

type reviewDecisionPayload struct {
	Approved bool   `json:"approved"`
	Reviewer string `json:"reviewer"`
}

type shipmentDecisionWorker struct {
	river.WorkerDefaults[shipmentDecisionArgs]
}

func (*shipmentDecisionWorker) Work(ctx context.Context, job *river.Job[shipmentDecisionArgs]) error {
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}

	// Load this task's wait so we can inspect why it became resolved.
	taskName := riverworkflow.TaskFromJobRow(job.JobRow)
	self, err := workflow.LoadTask(ctx, taskName)
	if err != nil {
		return err
	}

	if self.Wait == nil || self.Wait.ResolvedAt == nil {
		return fmt.Errorf("task %q expected wait result", taskName)
	}

	// Apply explicit precedence: if both signal and timer are present in the
	// same evaluation window, the signal path wins and timeout is a fallback.
	latestSignals, err := workflow.Signals().ListForTask(ctx, taskName, &riverpro.WorkflowSignalListForTaskParams{
		Desc:  true,
		Key:   "manual_review",
		Limit: 1,
	})
	if err != nil {
		if !errors.Is(err, rivertype.ErrNotFound) {
			return err
		}
		// No included manual-review signal: treat the timer path as an
		// explicit fallback path for this task.
		timer := self.Wait.TimerInput("review_sla")
		if timer != nil && timer.Result != nil && timer.Result.Fired {
			fmt.Printf("order %s auto-released after review SLA\n", job.Args.OrderID)
			return nil
		}
		return fmt.Errorf("task %q expected manual_review signal or review_sla timer fallback", taskName)
	}
	if len(latestSignals.Signals) == 0 {
		timer := self.Wait.TimerInput("review_sla")
		if timer != nil && timer.Result != nil && timer.Result.Fired {
			fmt.Printf("order %s auto-released after review SLA\n", job.Args.OrderID)
			return nil
		}
		return fmt.Errorf("task %q expected manual_review signal or review_sla timer fallback", taskName)
	}
	latest := latestSignals.Signals[0]

	var review reviewDecisionPayload
	if err := json.Unmarshal(latest.Payload, &review); err != nil {
		return err
	}

	fmt.Printf("order %s approved by %s\n", job.Args.OrderID, review.Reviewer)
	return nil
}

// Example_workflowWaitResult_timeoutVsSignal shows how to introspect a
// task's wait after it resolves. This example determines whether a
// manual-review signal arrived or an SLA timer fired.
//
// For full setup wiring including `initTestConfig`, see the package-level
// ClientSetup example.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &shipmentDecisionWorker{})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
		WorkflowTimerPollerInterval: 10 * time.Millisecond,
	}))
	if err != nil {
		panic(err)
	}

	signalPath := riverClient.NewWorkflow(&riverpro.WorkflowOpts{ID: "wf_wait_result_signal"})
	signalPath.Add("decide_shipment", shipmentDecisionArgs{OrderID: "ord_123"}, nil, &riverpro.WorkflowTaskOpts{
		Wait: &riverworkflow.WaitSpec{
			Expr: "manual_review_received || review_sla",
			Terms: []riverworkflow.WaitTermSpec{
				riverworkflow.WaitTermSignal("manual_review_received", "manual_review", `payload.approved == true && payload.reviewer != ""`),
				riverworkflow.WaitTermTimer(riverworkflow.TimerAfterWaitStarted("review_sla", 250*time.Millisecond)),
			},
		},
	})

	prepareRes, err := signalPath.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}
	defer func() {
		if err := riverClient.Stop(ctx); err != nil {
			panic(err)
		}
	}()

	// First workflow is met by manual-review signal.
	if _, err := signalPath.Signals().Emit(ctx, "manual_review", reviewDecisionPayload{
		Approved: true,
		Reviewer: "manager:bob",
	}, nil); err != nil {
		panic(err)
	}
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

	// Second workflow has no review signal and is met by SLA timeout.
	timeoutPath := riverClient.NewWorkflow(&riverpro.WorkflowOpts{ID: "wf_wait_result_timeout"})
	timeoutPath.Add("decide_shipment", shipmentDecisionArgs{OrderID: "ord_456"}, nil, &riverpro.WorkflowTaskOpts{
		Wait: &riverworkflow.WaitSpec{
			Expr: "manual_review_received || review_sla",
			Terms: []riverworkflow.WaitTermSpec{
				riverworkflow.WaitTermSignal("manual_review_received", "manual_review", `payload.approved == true && payload.reviewer != ""`),
				// Use a shorter timer so the timeout branch is deterministic.
				riverworkflow.WaitTermTimer(riverworkflow.TimerAfterWaitStarted("review_sla", 75*time.Millisecond)),
			},
		},
	})
	timeoutPrepareRes, err := timeoutPath.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, timeoutPrepareRes.Jobs); err != nil {
		panic(err)
	}
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

}

Output:

order ord_123 approved by manager:bob
order ord_456 auto-released after review SLA
Example (WorkflowWaitSignalQuorum)

Example_workflowWaitSignalQuorum demonstrates a structured signal term that requires more than one matching signal before the task can run.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type quorumApprovalArgs struct {
	OrderID string
}

func (quorumApprovalArgs) Kind() string { return "approve_order_quorum" }

type quorumApprovalSignalPayload struct {
	Approved bool   `json:"approved"`
	Reviewer string `json:"reviewer"`
}

type quorumApprovalWorker struct {
	river.WorkerDefaults[quorumApprovalArgs]
}

func (w *quorumApprovalWorker) Work(ctx context.Context, job *river.Job[quorumApprovalArgs]) error {
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}

	taskName := riverworkflow.TaskFromJobRow(job.JobRow)
	reviewSignals, err := workflow.Signals().ListForTask(ctx, taskName, &riverpro.WorkflowSignalListForTaskParams{
		Desc:  true,
		Key:   "manual_review",
		Limit: 1,
	})
	if err != nil {
		return err
	}
	reviewSignal := reviewSignals.Signals[0]

	var reviewPayload quorumApprovalSignalPayload
	if err := json.Unmarshal(reviewSignal.Payload, &reviewPayload); err != nil {
		return err
	}

	fmt.Printf("manual review approved reviewer=%s\n", reviewPayload.Reviewer)
	return nil
}

// Example_workflowWaitSignalQuorum demonstrates a structured signal term that
// requires more than one matching signal before the task can run.
//
// For full setup wiring including `initTestConfig`, see the package-level
// ClientSetup example.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &quorumApprovalWorker{})

	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{Name: "order_fulfillment"})
	workflow.Add("approve_order_quorum", quorumApprovalArgs{OrderID: "ord_789"}, nil, &riverpro.WorkflowTaskOpts{
		Wait: &riverworkflow.WaitSpec{
			Terms: []riverworkflow.WaitTermSpec{
				riverworkflow.WaitTermSignal("two_reviews_received", "manual_review", `payload.approved == true && payload.reviewer != ""`).Count(2),
			},
		},
	})

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	if _, err := workflow.Signals().Emit(ctx, "manual_review", quorumApprovalSignalPayload{
		Approved: true,
		Reviewer: "manager:alice",
	}, nil); err != nil {
		panic(err)
	}
	if _, err := workflow.Signals().Emit(ctx, "manual_review", quorumApprovalSignalPayload{
		Approved: true,
		Reviewer: "manager:bob",
	}, nil); err != nil {
		panic(err)
	}

	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

Output:

manual review approved reviewer=manager:bob
Example (WorkflowWaitTimerFallback)

Example_workflowWaitTimerFallback demonstrates a wait that can resolve because a signal arrived or because a timer fired.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type riskHoldDecisionArgs struct {
	OrderID string
}

func (riskHoldDecisionArgs) Kind() string { return "resolve_risk_hold" }

type riskHoldDecisionWorker struct {
	river.WorkerDefaults[riskHoldDecisionArgs]
}

func (w *riskHoldDecisionWorker) Work(ctx context.Context, job *river.Job[riskHoldDecisionArgs]) error {
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}

	taskName := riverworkflow.TaskFromJobRow(job.JobRow)
	// Load the workflow snapshot so we can inspect this task's wait
	// result details, including fired timers.
	allTasks, err := workflow.LoadAll(ctx, nil)
	if err != nil {
		return err
	}
	self := allTasks.Get(taskName)
	if self == nil {
		return fmt.Errorf("task %q was not found in workflow snapshot", taskName)
	}

	if self.Wait == nil || self.Wait.ResolvedAt == nil {
		return fmt.Errorf("task %q expected wait result", taskName)
	}

	timer := self.Wait.TimerInput("hold_timeout")
	if timer != nil && timer.Result != nil && timer.Result.Fired {
		fmt.Println("risk hold timed out; proceeding without signal")
		return nil
	}

	fmt.Println("risk hold signal received")
	return nil
}

// Example_workflowWaitTimerFallback demonstrates a wait that
// can resolve because a signal arrived or because a timer fired.
//
// For full setup wiring including `initTestConfig`, see the package-level
// ClientSetup example.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &riskHoldDecisionWorker{})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
		WorkflowTimerPollerInterval: 10 * time.Millisecond,
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{Name: "order_fulfillment"})
	workflow.Add("resolve_risk_hold", riskHoldDecisionArgs{OrderID: "ord_123"}, nil, &riverpro.WorkflowTaskOpts{
		Wait: &riverworkflow.WaitSpec{
			// Wait condition resolves when the risk_hold signal exists, or once hold_timeout fires.
			Expr: "hold_timeout || risk_hold_received",
			Terms: []riverworkflow.WaitTermSpec{
				riverworkflow.WaitTermTimer(riverworkflow.TimerAfterWaitStarted("hold_timeout", 75*time.Millisecond)),
				riverworkflow.WaitTermSignal("risk_hold_received", "risk_hold", `payload.active == true`),
			},
		},
	})

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// No signal is emitted, so the timer path will resolve the wait.
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

Output:

risk hold timed out; proceeding without signal

Index

Examples

Constants

const (
	WorkflowTaskPendingReasonDependencies        = workflowinternal.TaskPendingReasonDependencies
	WorkflowTaskPendingReasonDependenciesAndWait = workflowinternal.TaskPendingReasonDependenciesAndWait
	WorkflowTaskPendingReasonNone                = workflowinternal.TaskPendingReasonNone
	WorkflowTaskPendingReasonWait                = workflowinternal.TaskPendingReasonWait
)

Variables

This section is empty.

Functions

func ContextWithClient

func ContextWithClient[TTx any](ctx context.Context, client *Client[TTx]) context.Context

ContextWithClient returns a new context with the provided Client set on it.

func ReindexerIndexNamesDefault

func ReindexerIndexNamesDefault() []string

ReindexerIndexNamesDefault returns River Pro's default set of indexes for periodic reindexing.

The returned list starts with River's defaults and appends Pro-managed indexes, then deduplicates while preserving first-seen order.

This default list is only used when river.Config.ReindexerIndexNames is nil. If callers provide a non-nil list, that override is used as-is.

Types

type BatchOpts

type BatchOpts struct {
	// ByArgs indicates that the job's encoded arguments should be used to compute
	// a batch key.
	//
	// Default is false, meaning that job arguments will not be considered for
	// batching.
	//
	// When set to true, the entire encoded args field will be included in the
	// batch hash, which requires care to ensure that no irrelevant args are
	// factored into the batch key. It is also possible to use a subset of
	// the args by indicating on the `JobArgs` struct which fields should be
	// included in the batch using struct tags:
	//
	//  type MyJobArgs struct {
	//   CustomerID string `json:"customer_id" river:"batch"`
	//   TraceID string `json:"trace_id"
	//  }
	//
	// In this example, only the encoded `customer_id` key will be included in the
	// batch key and the `trace_id` key will be ignored.
	//
	// All keys are sorted alphabetically before hashing to ensure consistent
	// results.
	ByArgs bool
}

BatchOpts are options for defining batch partitioning on JobArgs.

type Client

type Client[TTx any] struct {
	*river.Client[TTx]
	// contains filtered or unexported fields
}

Client is a client for River Pro. It is a superset of the main river.Client type, with additional methods and behavioral changes for working with Pro features.

func ClientFromContext

func ClientFromContext[TTx any](ctx context.Context) *Client[TTx]

ClientFromContext returns the Client from the context. This function can only be used within a Worker's Work() method because that is the only place River sets the Client on the context.

It panics if the context does not contain a Client, which will never happen from the context provided to a Worker's Work() method.

When testing JobArgs.Work implementations, it might be useful to use ContextWithClient to initialize a context that has an available client.

func ClientFromContextSafely

func ClientFromContextSafely[TTx any](ctx context.Context) (*Client[TTx], error)

ClientFromContextSafely returns the Client from the context. This function can only be used within a Worker's Work() method because that is the only place River sets the Client on the context.

It returns an error if the context does not contain a Client, which will never happen from the context provided to a Worker's Work() method.

When testing JobArgs.Work implementations, it might be useful to use ContextWithClient to initialize a context that has an available client.

func NewClient

func NewClient[TTx any](proDriver driver.ProDriver[TTx], config *Config) (*Client[TTx], error)

NewClient creates a new River Pro client using the provided driver and configuration.

func (*Client[TTx]) InsertMany

func (c *Client[TTx]) InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)

InsertMany inserts jobs and maps workflow task-name unique violations to DuplicateTaskError.

func (*Client[TTx]) InsertManyTx

func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)

InsertManyTx inserts jobs in an existing transaction and maps workflow task-name unique violations to DuplicateTaskError.

func (*Client[TTx]) JobDeadLetterGet

func (c *Client[TTx]) JobDeadLetterGet(ctx context.Context, jobID int64) (*rivertype.JobRow, error)

JobDeadLetterGet gets a job from the dead letter queue (i.e. present in the `river_job_dead_letter` table after having failed enough times to be discarded). rivertype.ErrNotFound is returned if it doesn't exist.

Jobs are only moved to the dead letter queue in the first place if the dead letter queue is enabled with Config.DeadLetter.Enabled.

func (*Client[TTx]) JobDeadLetterGetTx

func (c *Client[TTx]) JobDeadLetterGetTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobRow, error)

JobDeadLetterGetTx gets a job from the dead letter queue (i.e. present in the `river_job_dead_letter` table after having failed enough times to be discarded). rivertype.ErrNotFound is returned if it doesn't exist.

Jobs are only moved to the dead letter queue in the first place if the dead letter queue is enabled with Config.DeadLetter.Enabled.

This variant operates within an existing transaction. See JobDeadLetterRetry for the non-transaction variant.

func (*Client[TTx]) JobDeadLetterRetry

func (c *Client[TTx]) JobDeadLetterRetry(ctx context.Context, jobID int64) (*rivertype.JobInsertResult, error)

JobDeadLetterRetry removes a job in the dead letter queue (i.e. present in the `river_job_dead_letter` table after having failed enough times to be discarded) and places it back in the live queue with a state of `available`. Its ID, args, metadata, and other core properties are retained, but its attempts and errors are reset as if it's a brand new job.

Jobs are only moved to the dead letter queue in the first place if the dead letter queue is enabled with Config.DeadLetter.Enabled.

func (*Client[TTx]) JobDeadLetterRetryTx

func (c *Client[TTx]) JobDeadLetterRetryTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobInsertResult, error)

JobDeadLetterRetryTx removes a job in the dead letter queue (i.e. present in the `river_job_dead_letter` table after having failed enough times to be discarded) and places it back in the live queue with a state of `available`. Its ID, args, metadata, and other core properties are retained, but its attempts and errors are reset as if it's a brand new job.

Jobs are only moved to the dead letter queue in the first place if the dead letter queue is enabled with Config.DeadLetter.Enabled.

This variant operates within an existing transaction. See JobDeadLetterRetry for the non-transaction variant.

func (*Client[TTx]) NewWorkflow

func (c *Client[TTx]) NewWorkflow(opts *WorkflowOpts) *WorkflowT[TTx]

NewWorkflow creates a new workflow. If an ID is not specified in opts, it will be generated.

func (*Client[TTx]) Queues

func (c *Client[TTx]) Queues() *QueueBundle

Queues returns the currently configured set of queues for the client, and can be used to add new ones.

func (*Client[TTx]) WorkflowCancel

func (c *Client[TTx]) WorkflowCancel(ctx context.Context, workflowID string) (*WorkflowCancelResult, error)

WorkflowCancel cancels all non-finalized tasks in a workflow using the workflow's ID. Non-finalized tasks are those which are not already completed, discarded, or cancelled. As with normal job cancellation, in-flight tasks *may* complete prior to receiving notification of the cancellation.

See WorkflowCancelTx for a version that allows cancellation within a broader database transaction.

The return value is a list of job rows where cancellation was attempted, along with a potential error.

func (*Client[TTx]) WorkflowCancelTx

func (c *Client[TTx]) WorkflowCancelTx(ctx context.Context, tx TTx, workflowID string) (*WorkflowCancelResult, error)

WorkflowCancelTx cancels all non-finalized tasks in a workflow using the workflow's ID. Non-finalized tasks are those which are not already completed, discarded, or cancelled. As with normal job cancellation, in-flight tasks *may* complete prior to receiving notification of the cancellation.

This variant operates within an existing transaction. See WorkflowCancel for the non-transaction variant.

The return value is a list of job rows where cancellation was attempted, along with a potential error.

func (*Client[TTx]) WorkflowFromExisting

func (c *Client[TTx]) WorkflowFromExisting(job *rivertype.JobRow, opts *WorkflowOpts) (*WorkflowT[TTx], error)

WorkflowFromExisting initiates a workflow from an existing job in the workflow. This enables jobs to be added to an existing workflow. If the job is not part of a workflow, an error will be returned.

The workflow ID and Name will be derived from the existing job, and the corresponding values in opts will be ignored.

func (*Client[TTx]) WorkflowFromExistingID

func (c *Client[TTx]) WorkflowFromExistingID(ctx context.Context, workflowID string, opts *WorkflowOpts) (*WorkflowT[TTx], error)

WorkflowFromExistingID initiates a workflow from an existing workflow row. The workflow ID comes from the workflowID argument, and the workflow Name is loaded directly from that workflow row. The corresponding values in opts will be ignored.

func WorkflowPrepare deprecated

func (c *Client[TTx]) WorkflowPrepare(ctx context.Context, workflow *Workflow) (*WorkflowPrepareResult, error)

WorkflowPrepare validates the jobs in the workflow and prepares them for insertion. Validations ensure that:

  • All specified dependencies correspond to other task names in the workflow
  • Task names are unique
  • There are no cycles

If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.

For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, existing task names are loaded from the database prior to validation.

Deprecated: Use Workflow.Prepare instead.

func WorkflowPrepareTx deprecated

func (c *Client[TTx]) WorkflowPrepareTx(ctx context.Context, tx TTx, workflow *Workflow) (*WorkflowPrepareResult, error)

WorkflowPrepareTx validates the jobs in the workflow and prepares them for insertion. Validations ensure that:

  • All specified dependencies correspond to other task names in the workflow
  • Task names are unique
  • There are no cycles

If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.

For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, existing task names are loaded from the database prior to validation.

Deprecated: Use Workflow.PrepareTx instead.

type ConcurrencyConfig

type ConcurrencyConfig struct {
	// GlobalLimit is the maximum number of jobs that can run concurrently
	// across all workers.
	GlobalLimit int

	// LocalLimit is the maximum number of jobs that can run concurrently
	// on a single worker.
	LocalLimit int

	// Partition is the configuration for partitioning concurrency limits. By
	// default, concurrency limits are not partitioned and apply to the entire
	// queue.
	Partition PartitionConfig
}

ConcurrencyConfig is a configuration for concurrency limits. Either a global or local limit (or both) must be specified in order for concurrency limits to be enforced.

type Config

type Config struct {
	river.Config

	// DeadLetter configures a dead letter queue, which optionally transitions
	// discarded jobs to the `river_job_dead_letter` table instead of deleting
	// them after river.Config.DiscardedJobRetentionPeriod has elapsed.
	DeadLetter DeadLetterConfig

	// DurablePeriodicJobs configures periodic jobs that are persisted to the
	// database. Unlike standard in-process jobs—whose state is lost on
	// restarts or leader elections— these durable jobs retain their state,
	// enabling more precise and reliable scheduling.
	DurablePeriodicJobs DurablePeriodicJobsConfig

	// PartitionKeyCacheTTL is the duration to cache partition keys for available
	// jobs. With concurrency limiting enabled, fetches of available jobs with
	// cached partition keys can be much faster than those without caching.
	// Without concurrency limiting enabled, this value is unused.
	//
	// This value limits how quickly the first available job for a given partition
	// key will be seen and fetched and is a direct trade-off for higher
	// throughput at the expense of higher latency. If this value is lower than
	// the fetch cooldown, caching is effectively disabled. To explicitly disable
	// partition key caching altogether, set this to -1. Other negative values are
	// not allowed.
	//
	// Defaults to 1 second.
	PartitionKeyCacheTTL time.Duration

	// ProQueues holds the configuration for queues that use Pro-specific
	// features, including concurrency limits. A given queue can be configured
	// as either a Pro queue or a standard queue, but not both.
	ProQueues map[string]QueueConfig

	// SequenceSchedulerInterval is the amount of time to wait between runs of the sequence scheduler.
	SequenceSchedulerInterval time.Duration

	// WorkflowAwareRetention enables workflow-level retention. When enabled,
	// finalized workflows are cleaned as a unit according to
	// WorkflowClosedRetentionPeriod and WorkflowCancelledRetentionPeriod.
	// Workflow jobs are not deleted by per-job retention, and discarded workflow
	// jobs are not moved to the dead letter queue before the workflow is cleaned.
	//
	// When disabled, workflow jobs use the normal job retention settings,
	// including per-queue overrides and dead letter movement. Workflow metadata
	// is cleaned after all jobs for a finalized workflow have been deleted.
	//
	// Defaults to false.
	WorkflowAwareRetention bool

	// WorkflowCancelledRetentionPeriod is the amount of time to keep cancelled
	// workflow data before permanent deletion.
	//
	// This setting is only used when WorkflowAwareRetention is enabled. When
	// WorkflowAwareRetention is disabled, cancelled workflow jobs are deleted by
	// the normal cancelled job retention settings instead, and workflow metadata
	// is cleaned after all jobs for a finalized workflow have been deleted.
	//
	// Increase this for longer audit/history visibility of cancelled workflows.
	// Decrease it to reduce storage usage. This setting only affects cleanup
	// timing; it does not affect active workflow execution correctness.
	//
	// The special value -1 disables deletion of cancelled workflows.
	//
	// Defaults to 24 hours.
	WorkflowCancelledRetentionPeriod time.Duration

	// WorkflowClosedRetentionPeriod is the amount of time to keep closed
	// workflow data before permanent deletion.
	//
	// This setting is only used when WorkflowAwareRetention is enabled. When
	// WorkflowAwareRetention is disabled, completed and discarded workflow jobs
	// are deleted by the normal completed/discarded job retention settings
	// instead, and workflow metadata is cleaned after all jobs for a finalized
	// workflow have been deleted.
	//
	// Increase this for longer post-completion visibility and diagnostics.
	// Decrease it to reduce storage usage. This setting only affects cleanup
	// timing; it does not affect active workflow execution correctness.
	//
	// The special value -1 disables deletion of closed workflows.
	//
	// Defaults to 24 hours.
	WorkflowClosedRetentionPeriod time.Duration

	// WorkflowEvaluatorBatchSize controls how many workflows are evaluated per
	// transaction batch.
	//
	// Defaults to 100.
	//
	// Higher values can improve throughput for large backlogs, but increase
	// transaction size and may worsen per-workflow tail latency. Lower values
	// improve fairness and responsiveness, but may reduce total throughput.
	WorkflowEvaluatorBatchSize int

	// WorkflowRescuerInterval is the amount of time to wait between runs of the workflow rescuer.
	WorkflowRescuerInterval time.Duration

	// WorkflowTimerPollerInterval is the amount of time to wait between runs of
	// the workflow timer poller.
	//
	// Defaults to 1 second.
	//
	// Lower values reduce timer wake-up lag for timer-driven wait tasks, but
	// increase database polling load. Higher values reduce polling load, but can
	// delay timer-triggered staging by up to roughly one interval.
	WorkflowTimerPollerInterval time.Duration
}

Config holds configuration for the riverpro.Client. The config is primarily managed with an embedded river.Config, along with additional fields for Pro-specific configuration.

Reindexer defaults are applied through the embedded river.Config.ReindexerIndexNames field:

The non-nil override behavior allows opting out of Pro-managed indexes, but it also means callers are responsible for including any indexes they still want reindexed.

func (*Config) WithDefaults

func (c *Config) WithDefaults() *Config

WithDefaults returns a copy of the Config with all default values applied.

type DeadLetterConfig

type DeadLetterConfig struct {
	// Enaabled enables the dead letter queue.
	Enabled bool
}

DeadLetterConfig configures a dead letter queue, which optionally transitions discarded jobs to the `river_job_dead_letter` table instead of deleting them after DiscardedJobRetentionPeriod has elapsed.

See also river.Config.DiscardedJobRetentionPeriod, which will define the period before jobs are moved to the dead letter queue.

type DependencyCycleError

type DependencyCycleError struct {
	DepStack []string
}

DependencyCycleError is returned when a dependency cycle is detected in the workflow. A cycle occurs when a task depends on itself, either directly or indirectly through other tasks.

func (*DependencyCycleError) Error

func (e *DependencyCycleError) Error() string

func (*DependencyCycleError) Is

func (e *DependencyCycleError) Is(target error) bool

type DuplicateTaskError

type DuplicateTaskError struct {
	TaskName string
}

DuplicateTaskError is returned when a task name is added to the same workflow more than once. Task names must be unique within a workflow.

func (*DuplicateTaskError) Error

func (e *DuplicateTaskError) Error() string

func (*DuplicateTaskError) Is

func (e *DuplicateTaskError) Is(target error) bool

type DurablePeriodicJobsConfig

type DurablePeriodicJobsConfig struct {
	// Enabled activates durable periodic jobs. All periodic jobs with a
	// configured ID will become durable and persist their state in the
	// database.
	Enabled bool

	// NextRunAtRatchetFunc provides a function that "ratchets" the next run
	// times of durable jobs pulled from the database by potentially modifying
	// them before they're enqueued and have a new next run time set for them.
	//
	// The primary purpose of this option is to protect against cases where
	// durable jobs were present in the database that had frequent run periods,
	// but where all clients may have been offline for a long period of time,
	// and when a client finally does come back online many jobs of the same job
	// are scheduled in quick succession because the next run at timestamp from
	// the database was so old. For example, a periodic job runs once every
	// minute, and is being persisted to the database on that schedule. All
	// clients go offline for one hour before coming back. When coming back
	// online, the job is scheduled 60 times in quick succession as it's run at
	// -60+1 minutes, -60+2 minutes, -60+3 minutes, -60+4 minutes, etc. because
	// next run times are calculated based on the last run time rather than now.
	//
	// Clients in deployed environments should generally be running all the
	// time, but a common place where something like this might happen is local
	// development where software devs may have their app offline for extended
	// periods of time.
	//
	// A reasonable option is to ratchet "next run at" times to the current
	// time. Any "next run at" times that are in the future stay in the future,
	// but those that have fallen behind will be run immediately and get a new
	// scheduled time based off now instead of a time in the past:
	//
	// NextRunAtRatchetFunc: func(nextRunAt, now time.Time) time.Time {
	//  if nextRunAt.Before(now) {
	//   return now
	//  }
	//  return nextRunAt
	// }
	//
	// Handling old timestamps can of course be done from each periodic job's
	// ScheduleFunc as well, but it's easy to forget to do so, and may not be
	// desirable to have to consider them for every possible periodic job.
	//
	// The default behavior is to never ratchet next run at times, which is
	// equivalent to:
	//
	//                   NextRunAtRatchetFunc: func(nextRunAt, now time.Time) time.Time { return nextRunAt }
	NextRunAtRatchetFunc func(nextRunAt, now time.Time) time.Time

	// StaleThreshold is the amount of time after which database persisted
	// periodic jobs are considered stale and eligible to be pruned based on
	// their `updated_at` timestamp. While running, the periodic job enqueuer
	// service periodically bumps the `updated_at` timestamp of all jobs that it
	// knows about. Periodic jobs that are no longer registered no longer have
	// their timestamps touched, and are eventually removed. This keeps the
	// database reasonably clean while also keeping records around for a little
	// time for purposes of visibility and debugging.
	//
	// Defaults to 24 hours.
	StaleThreshold time.Duration

	// StartStaggerSpread is the interval over which to reschedule any jobs that
	// were scheduled before or at the current time if StartStaggerThreshold was
	// reached on start. When start stagger activates, jobs will have a random
	// duration between one millisecond and StartStaggerSpread added to their
	// next run time.
	//
	// Start stagger is designed to protect against the case where a client's
	// been offline for some time, comes online to find a great number of
	// periodic jobs that need immediate scheduling, and would otherwise end up
	// scheduling them all to run nearly simultaneously. Instead, start times
	// are staggered randomly to the queue more breathing room.
	//
	// Defaults to 1 minute.
	StartStaggerSpread time.Duration

	// StartStaggerThreshold is the number of jobs found that scheduled before
	// or at the current time on start after which the client will stagger their
	// next run time randomly between one miillisecond and StartStaggerSpread.
	//
	// Start stagger is designed to protect against the case where a client's
	// been offline for some time, comes online to find a great number of
	// periodic jobs that need immediate scheduling, and would otherwise end up
	// scheduling them all to run nearly simultaneously. Instead, start times
	// are staggered randomly to the queue more breathing room.
	//
	// Defaults to 500 jobs. Disable start stagger completely by setting to -1.
	StartStaggerThreshold int
}

DurablePeriodicJobsConfig configures periodic jobs that are persisted to the database. Unlike standard in-process jobs—whose state is lost on restarts or leader elections— these durable jobs retain their state, enabling more precise and reliable scheduling.

type EphemeralOpts

type EphemeralOpts struct{}

EphemeralOpts are options for ephemeral jobs. Currently, there are no available options for ephemeral jobs, but this is modeled as an options struct to futureproof it.

type HookJobDeadLetterMove

type HookJobDeadLetterMove interface {
	rivertype.Hook

	// JobDeadLetterMove is invoked when a discarded job is moved to the dead
	// letter queue.
	JobDeadLetterMove(ctx context.Context, job *rivertype.JobRow) error
}

HookJobDeadLetterMove is an interface to a hook that runs when a discarded job is moved to the dead letter queue.

type HookJobDeadLetterMoveFunc

type HookJobDeadLetterMoveFunc func(ctx context.Context, job *rivertype.JobRow) error

HookJobDeadLetterMoveFunc is a convenience helper for implementing HookJobDeadLetterMove using a simple function instead of a struct.

func (HookJobDeadLetterMoveFunc) IsHook

func (f HookJobDeadLetterMoveFunc) IsHook() bool

func (HookJobDeadLetterMoveFunc) JobDeadLetterMove

func (f HookJobDeadLetterMoveFunc) JobDeadLetterMove(ctx context.Context, job *rivertype.JobRow) error

type MissingDependencyError

type MissingDependencyError struct {
	ParentTaskName string
	TaskName       string
}

MissingDependencyError is returned when a named dependency is missing from the workflow. A task cannot depend on another task that does not exist in the workflow.

func (*MissingDependencyError) Error

func (e *MissingDependencyError) Error() string

func (*MissingDependencyError) Is

func (e *MissingDependencyError) Is(target error) bool

type PartitionConfig

type PartitionConfig struct {
	// ByArgs is the list of fields in the job arguments to partition by.
	// For example, you may wish to limit the number of jobs that can run
	// concurrently for a given `customer_id`.
	//
	// If ByArgs is explicitly set to an empty slice, all fields in the job's
	// arguments will be used. This is generally not desirable as it can lead
	// to high cardinality and a large number of partitions.
	//
	// When set to nil (the default), arguments are not used for partitioning.
	ByArgs []string

	// ByKind indicates that the concurrency limit should be partitioned by the
	// kind of the job.
	ByKind bool
}

PartitionConfig is a configuration for partitioning concurrency limits.

type QueueBundle

type QueueBundle struct {
	*river.QueueBundle
	// contains filtered or unexported fields
}

QueueBundle is a bundle for adding additional queues. It's made accessible through Client.Queues.

Unlike the river.QueueBundle, this bundle also enables Pro-specific configuration of queues.

func (*QueueBundle) AddPro

func (qb *QueueBundle) AddPro(name string, config QueueConfig) error

AddPro adds a Pro queue with the given name and configuration to the QueueBundle. It returns a nil error on success.

type QueueConfig

type QueueConfig struct {
	// Concurrency is the optional concurrency configuration for the queue.
	Concurrency ConcurrencyConfig

	// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
	// around before they're removed permanently.
	//
	// The special value -1 disables deletion of cancelled jobs.
	//
	// This setting is specific to retention for this particular queue. If left
	// unset, it'll default to river.Config.CancelledJobRetentionPeriod (shared
	// cancelled retention period for all queues) or 24 hours.
	CancelledJobRetentionPeriod time.Duration

	// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
	// around before they're removed permanently.
	//
	// The special value -1 disables deletion of completed jobs.
	//
	// This setting is specific to retention for this particular queue. If left
	// unset, it'll default to river.Config.CompletedJobRetentionPeriod (shared
	// completed retention period for all queues) or 24 hours.
	CompletedJobRetentionPeriod time.Duration

	// DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs
	// around before they're removed permanently.
	//
	// The special value -1 disables deletion of discarded jobs.
	//
	// This setting is specific to retention for this particular queue. If left
	// unset, it'll default to river.Config.DiscardedJobRetentionPeriod (shared
	// discarded retention period for all queues) or 7 days.
	//
	// If the dead letter queue is enabled with Config.DeadLetter.Enabled, it
	// supersedes this setting. Per-queue dead letter settings are currently
	// *not* supported, although we expect this limitation to be patched soon.
	DiscardedJobRetentionPeriod time.Duration

	// JobCleanerTimeout is the timeout used for cleaner queries for this queue
	// when configured on a Pro client.
	//
	// If zero, this queue inherits Config.JobCleanerTimeout.
	//
	// If non-zero, this overrides Config.JobCleanerTimeout for this queue's
	// retention cleanup queries.
	//
	// Increase this for very large queues whose cleanup queries may run longer.
	// Decrease it to fail slow cleanup queries faster and limit cleaner query
	// impact.
	JobCleanerTimeout time.Duration

	// Ephemeral are ephemeral settings for queue. Ephemeral jobs are jobs which
	// are deleted immediately upon completion instead of being pruned at a
	// later time by the job cleaner.
	Ephemeral QueueEphemeralConfig

	// FetchCooldown is the minimum amount of time to wait between fetches of new
	// jobs. Jobs will only be fetched *at most* this often, but if no new jobs
	// are coming in via LISTEN/NOTIFY then fetches may be delayed as long as
	// FetchPollInterval.
	//
	// Throughput is limited by this value.
	//
	// If non-zero, this overrides the FetchCooldown setting in the Client's
	// Config.
	FetchCooldown time.Duration

	// FetchPollInterval is the amount of time between periodic fetches for new
	// jobs. Typically new jobs will be picked up ~immediately after insert via
	// LISTEN/NOTIFY, but this provides a fallback.
	//
	// If non-zero, this overrides the FetchCooldown setting in the Client's
	// Config.
	FetchPollInterval time.Duration

	// MaxWorkers is the maximum number of workers to run for the queue, or put
	// otherwise, the maximum parallelism to run.
	//
	// This is the maximum number of workers within this particular client
	// instance, but note that it doesn't control the total number of workers
	// across parallel processes. Installations will want to calculate their
	// total number by multiplying this number by the number of parallel nodes
	// running River clients configured to the same database and queue.
	//
	// Requires a minimum of 1, and a maximum of 10,000.
	MaxWorkers int
}

QueueConfig is a configuration for a Pro queue. It extends the standard River queue configuration with additional Pro-specific settings.

type QueueEphemeralConfig

type QueueEphemeralConfig struct {
	// Enabled can be set to true to make this an ephemeral queue. All jobs
	// going to ephemeral queues are deleted immediately upon completion instead
	// of being pruned at a later time by the job cleaner. This happens
	// regardless of jobs in the queue implement their own EphemeralOpts.
	Enabled bool
}

QueueEphemeralConfig are ephemeral settings for a queue. Ephemeral jobs are jobs which are deleted immediately upon completion instead of being pruned at a later time by the job cleaner.

type SequenceOpts

type SequenceOpts struct {
	// ByArgs indicates that the job's encoded arguments should be used to compute
	// a sequence key.
	//
	// Default is false, meaning that input arguments will not be considered for
	// sequencing.
	//
	// When set to true, the entire encoded args field will be included in the
	// sequence hash, which requires care to ensure that no irrelevant args are
	// factored into the sequence key. It is also possible to use a subset of
	// the args by indicating on the `JobArgs` struct which fields should be
	// included in the sequence using struct tags:
	//
	//  type MyJobArgs struct {
	//   CustomerID string `json:"customer_id" river:"sequence"`
	//   TraceID string `json:"trace_id"
	//  }
	//
	// In this example, only the encoded `customer_id` key will be included in the
	// sequence key and the `trace_id` key will be ignored.
	//
	// All keys are sorted alphabetically before hashing to ensure consistent
	// results.
	ByArgs bool

	// ByQueue indicates that sequences should be partitioned by queue.
	//
	// Default is false, meaning that as long as any other sequence property is
	// enabled, sequences will be constructed for a kind across all queues.
	ByQueue bool

	// ContinueOnCancelled indicates that the sequence should continue to be
	// processed even if the job is cancelled. Without this setting, the sequence
	// will be halted if any job in the sequence is cancelled.
	ContinueOnCancelled bool

	// ContinueOnDiscarded indicates that the sequence should continue to be
	// processed even if the job repeatedly errors and is discarded. Without this
	// setting, the sequence will be halted if any job in the sequence is
	// discarded.
	ContinueOnDiscarded bool

	// ExcludeKind indicates that the job kind should not be included in the
	// sequence partition key. This is useful when you want to include multiple
	// job kinds in the same sequence.
	ExcludeKind bool
}

SequenceOpts are options for defining sequence partitioning on JobArgs.

type TaskHasNoOutputError

type TaskHasNoOutputError struct {
	JobID      int64
	TaskName   string
	WorkflowID string
}

TaskHasNoOutputError is returned when attempting to get the output of a task that has no output.

func (*TaskHasNoOutputError) Error

func (e *TaskHasNoOutputError) Error() string

func (*TaskHasNoOutputError) Is

func (e *TaskHasNoOutputError) Is(target error) bool

type Workflow deprecated

type Workflow WorkflowT[any]

Workflow is a collection of jobs which can have dependencies on other jobs in the workflow.

Deprecated: Use Client.NewWorkflow instead in order to gain access to transactional task loader methods and other functionality that requires access to a transaction.

func NewWorkflow deprecated

func NewWorkflow(opts *WorkflowOpts) *Workflow

NewWorkflow creates a new workflow. If an ID is not specified in opts, it will be generated.

Deprecated: Use Client.NewWorkflow instead in order to gain access to transactional task loader methods.

func WorkflowFromExisting deprecated

func WorkflowFromExisting(job *rivertype.JobRow, opts *WorkflowOpts) (*Workflow, error)

WorkflowFromExisting initiates a workflow from an existing job in the workflow. This enables jobs to be added to an existing workflow. If the job is not part of a workflow, an error will be returned.

The workflow ID and Name will be derived from the existing job, and the corresponding values in opts will be ignored.

Deprecated: Use Client.WorkflowFromExisting instead.

type WorkflowCancelResult

type WorkflowCancelResult struct {
	CancelledJobs []*rivertype.JobRow
}

type WorkflowLoadAllOpts

type WorkflowLoadAllOpts struct {
	PaginationLimit  int
	PaginationOffset int
}

WorkflowLoadAllOpts are options for loading all tasks in a workflow.

type WorkflowLoadDepsOpts

type WorkflowLoadDepsOpts struct {
	Recursive bool
}

WorkflowLoadDepsOpts are options for loading the dependencies of a task in a workflow.

type WorkflowOpts

type WorkflowOpts struct {
	ID                  string
	IgnoreCancelledDeps bool
	IgnoreDeletedDeps   bool
	IgnoreDiscardedDeps bool
	Name                string
}

WorkflowOpts are options for creating a new workflow.

type WorkflowPrepareResult

type WorkflowPrepareResult struct {
	// Jobs is a list of jobs to insert into the database. It's structured as a
	// list of InsertManyParams so the jobs can be inserted with InsertMany.
	Jobs []river.InsertManyParams
}

WorkflowPrepareResult is the result of preparing a workflow. It contains a list of jobs to insert.

type WorkflowRetryMode

type WorkflowRetryMode string
const (
	// WorkflowRetryModeAll indicates that all jobs will be retried. This is the
	// default.
	WorkflowRetryModeAll WorkflowRetryMode = "all"

	// WorkflowRetryModeFailedOnly indicates that only failed jobs (cancelled or
	// discarded) will be retried. Downstream jobs will also be retried if they
	// have failed dependencies, but not if they had chosen to ignore failed
	// dependencies and completed successfully.
	WorkflowRetryModeFailedOnly WorkflowRetryMode = "failed_only"

	// WorkflowRetryModeFailedAndDownstream indicates that failed jobs (cancelled
	// or discarded) will be retried, and all downstream jobs will also be retried
	// even if they previously completed successfully.
	WorkflowRetryModeFailedAndDownstream WorkflowRetryMode = "failed_and_downstream"
)

type WorkflowRetryOpts

type WorkflowRetryOpts struct {
	// Mode indicates the mode of retry, specifically which jobs to retry.
	// By default, all jobs will be retried, though this option can be used to
	// specify that only failed jobs should be retried. Retries preserve workflow
	// wait semantics: any retried task with a wait will
	// re-enter `pending` and must become resolved again before it can be staged.
	Mode WorkflowRetryMode

	// ResetHistory indicates whether to reset the history of the jobs when
	// retrying them, including resetting their attempt count back to 0 (starting
	// over their retry policy in the process) and clearing out known metadata
	// from past attempts. If not set, the max attempts will be incremented by 1
	// to allow for a single additional attempt.
	ResetHistory bool
}

type WorkflowRetryResult

type WorkflowRetryResult struct {
	Jobs []*rivertype.JobRow
}

type WorkflowRetryStillActiveError

type WorkflowRetryStillActiveError struct {
	WorkflowID string
}

WorkflowRetryStillActiveError is returned when a workflow is still active (containing some jobs in available, pending, retryable, running, or scheduled states) and cannot yet be retried.

func (*WorkflowRetryStillActiveError) Error

func (*WorkflowRetryStillActiveError) Is

type WorkflowSignalEmitOpts

type WorkflowSignalEmitOpts struct {
	// Attempt targets a specific workflow attempt. If nil, the current attempt
	// at insert time is targeted.
	Attempt *int
	// IdempotencyKey enables deduplication for emits on the same workflow and
	// signal key.
	//
	// If empty, no key is persisted and each emit creates a new signal row.
	//
	// If non-empty, reusing the same key returns the existing result only when
	// payload is semantically equal (Postgres jsonb equality). A semantic
	// mismatch returns [riverworkflow.SignalPayloadMismatchError].
	IdempotencyKey string
	// Source is optional metadata persisted with the signal's source object.
	// Use it to attach emitter context (for example request IDs, actor IDs, or
	// integration names) for audit and correlation.
	//
	// It must be a valid JSON object. Empty defaults to `{}`.
	//
	// Pass a Go value (for example map/struct) to marshal as JSON, or raw JSON in
	// []byte/json.RawMessage.
	Source any
}

WorkflowSignalEmitOpts configures workflow signal emission.

type WorkflowSignalEmitResult

type WorkflowSignalEmitResult struct {
	// Attempt is the workflow attempt snapshot captured on the emitted signal.
	Attempt int
	// CreatedAt is when the signal row was inserted.
	CreatedAt time.Time
	// ID is the database identity of the signal row.
	ID int64
	// IdempotencyKey is the key used for deduplication. Empty means no key was
	// provided for this emit.
	IdempotencyKey string
	// Key is the workflow signal key for the emitted signal. Wait logic reads
	// signals by this key (for example, signals["approval"]).
	Key string
	// SkippedAsDuplicate is true when an idempotent emit reused an existing
	// signal row instead of inserting a new one.
	SkippedAsDuplicate bool
	// WorkflowID is the ID of the workflow that received the signal.
	WorkflowID string
}

WorkflowSignalEmitResult reports the persisted signal row created by emit.

For idempotent replays, this is the original result for the prior successful emit.

type WorkflowSignalLatestForTaskOpts

type WorkflowSignalLatestForTaskOpts struct {
	// Attempt limits lookup to one workflow attempt. Leave nil to use task
	// visibility defaults. When wait evidence exists, the evidence attempt is
	// used; otherwise, the current workflow attempt is used.
	Attempt *int
	// IncludeAfterResolution controls task-scoped cutoff behavior. If false and
	// the selected attempt has persisted wait evidence, rows after the
	// evidence's per-key LastIncludedID boundaries are excluded. If true, reads
	// include the full selected attempt, including signals recorded after the
	// wait resolved.
	IncludeAfterResolution bool
}

WorkflowSignalLatestForTaskOpts controls how task-scoped latest-signal lookups are resolved for WorkflowSignals.LatestForTask and WorkflowSignals.LatestForTaskTx.

type WorkflowSignalListForTaskParams

type WorkflowSignalListForTaskParams struct {
	// Attempt limits lookup to one workflow attempt. Leave nil to use task
	// visibility defaults. When wait evidence exists, the evidence attempt is
	// used; otherwise, the current workflow attempt is used.
	Attempt *int
	// CursorID controls pagination.
	//
	// Leave this as 0 to fetch the first page. For later pages, pass
	// [WorkflowSignalListResult.NextCursorID] from the prior response when
	// [WorkflowSignalListResult.HasMore] is true.
	//
	// The cursor is exclusive and follows sort direction:
	//   - Desc=false (default): returns rows with `id > CursorID`.
	//   - Desc=true: returns rows with `id < CursorID`.
	CursorID int64
	// Desc controls row ordering. false means ascending by ID; true means
	// descending by ID.
	Desc bool
	// IncludeAfterResolution controls task-scoped cutoff behavior. If false and
	// the selected attempt has persisted wait evidence, rows after the
	// evidence's per-key LastIncludedID boundaries are excluded. If true, reads
	// include the full selected attempt, including signals recorded after the
	// wait resolved.
	IncludeAfterResolution bool
	// Key filters rows by declared signal input key.
	Key string
	// Limit controls page size. If unset or <= 0, a default of 100 is used. The
	// maximum is capped at 1000.
	Limit int
}

WorkflowSignalListForTaskParams controls task-scoped signal listing for WorkflowSignals.ListForTask and WorkflowSignals.ListForTaskTx.

type WorkflowSignalListParams

type WorkflowSignalListParams struct {
	// Attempt filters rows by attempt number.
	Attempt *int
	// CursorID controls pagination.
	//
	// Leave this as 0 to fetch the first page. For later pages, pass
	// [WorkflowSignalListResult.NextCursorID] from the prior response when
	// [WorkflowSignalListResult.HasMore] is true.
	//
	// The cursor is exclusive and follows sort direction:
	//   - Desc=false (default): returns rows with `id > CursorID`.
	//   - Desc=true: returns rows with `id < CursorID`.
	CursorID int64
	// Desc controls row ordering. false means ascending by ID; true means
	// descending by ID.
	Desc bool
	// Key filters rows by signal key.
	Key string
	// Limit controls page size. If unset or <= 0, a default of 100 is used. The
	// maximum is capped at 1000.
	Limit int
}

WorkflowSignalListParams configures workflow-wide signal reads.

These filters do not check task waits or task-declared keys.

type WorkflowSignalListResult

type WorkflowSignalListResult struct {
	// HasMore reports whether another page exists for the same filters and
	// ordering.
	HasMore bool
	// NextCursorID is the pagination value for the next signal list call.
	//
	// Pass it back as CursorID with the same filters and Desc setting on the next
	// call to [WorkflowSignals.List] when [WorkflowSignalListResult.HasMore] is true.
	// Nil means there is no next page.
	NextCursorID *int64
	Signals      []riverworkflow.Signal
}

WorkflowSignalListResult is a page of workflow signal rows.

type WorkflowSignals

type WorkflowSignals[TTx any] struct {/* contains filtered or unexported fields */}

WorkflowSignals provides workflow-scoped signal operations.

func (*WorkflowSignals[TTx]) Emit

Emit emits a workflow signal and enqueues the workflow for evaluation. payload may be a Go value to marshal, or raw JSON in []byte/json.RawMessage.

If payload is nil, empty []byte, or empty json.RawMessage, it is normalized to `{}`.

This method returns structured errors from package riverworkflow: riverworkflow.SignalAttemptMismatchError and riverworkflow.SignalPayloadMismatchError.

Example (ManualReview)

ExampleWorkflowSignals_Emit_manualReview demonstrates emitting a signal to satisfy a wait and run the waiting task.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

package main

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type manualReviewTaskArgs struct {
	OrderID string
}

func (manualReviewTaskArgs) Kind() string {
	return "apply_manual_review"
}

type manualReviewSignalPayload struct {
	Approved bool `json:"approved"`
}

type manualReviewTaskWorker struct {
	river.WorkerDefaults[manualReviewTaskArgs]
}

func (*manualReviewTaskWorker) Work(_ context.Context, job *river.Job[manualReviewTaskArgs]) error {
	fmt.Printf("manual review applied order_id=%s\n", job.Args.OrderID)
	return nil
}

// ExampleWorkflowSignals_Emit_manualReview demonstrates emitting a signal to
// satisfy a wait and run the waiting task.
//
// For full setup wiring including `initTestConfig`, see the package-level
// ClientSetup example.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &manualReviewTaskWorker{})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{ID: "wf_signal_basic"})
	workflow.Add("apply_manual_review", manualReviewTaskArgs{OrderID: "ord_123"}, nil, &riverpro.WorkflowTaskOpts{
		Wait: &riverworkflow.WaitSpec{
			// The task remains blocked until a positive manual_review signal arrives.
			Terms: []riverworkflow.WaitTermSpec{
				riverworkflow.WaitTermSignal("manual_review_received", "manual_review", `payload.approved == true`),
			},
		},
	})

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}
	defer func() {
		if err := riverClient.Stop(ctx); err != nil {
			panic(err)
		}
	}()

	if _, err := workflow.Signals().Emit(ctx, "manual_review", manualReviewSignalPayload{Approved: true}, nil); err != nil {
		panic(err)
	}
	// Wait for the once-blocked task to run after signal delivery.
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
}

Output:

manual review applied order_id=ord_123

func (*WorkflowSignals[TTx]) EmitTx

func (s *WorkflowSignals[TTx]) EmitTx(ctx context.Context, tx TTx, key string, payload any, opts *WorkflowSignalEmitOpts) (*WorkflowSignalEmitResult, error)

EmitTx emits a workflow signal inside an existing transaction. payload may be a Go value to marshal, or raw JSON in []byte/json.RawMessage.

If payload is nil, empty []byte, or empty json.RawMessage, it is normalized to `{}`.

func (*WorkflowSignals[TTx]) LatestForTask

func (s *WorkflowSignals[TTx]) LatestForTask(ctx context.Context, taskName string, key string, opts *WorkflowSignalLatestForTaskOpts) (riverworkflow.Signal, error)

LatestForTask loads the newest signal for key that one task is allowed to read.

rivertype.ErrNotFound is returned when the task scope is valid but no matching signal exists yet.

One of riverworkflow.SignalUnknownTaskError, riverworkflow.WaitTaskDeclaresNoWaitError, riverworkflow.SignalTaskDeclaresNoSignalKeysError, or riverworkflow.SignalKeyUndeclaredError is returned when taskName or key does not describe a valid task-visible scope.

Use WorkflowSignals.ListForTask when you need pagination or more than one row.

Example (LatestEvidenceSignal)

ExampleWorkflowSignals_LatestForTask_latestEvidenceSignal shows how to fetch the most recent signal included in a specific task's wait evidence when the wait became resolved from within that task's worker.

For the single-signal baseline, see Example_workflowTaskSignalData.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type overrideDecisionArgs struct {
	OrderID string
}

const overrideDecisionTaskName = "resolve_risk_hold"

func (overrideDecisionArgs) Kind() string {
	return overrideDecisionTaskName
}

type overrideSignalPayload struct {
	ApprovedBy string `json:"approved_by"`
	Reason     string `json:"reason"`
}

type overrideDecisionWorker struct {
	river.WorkerDefaults[overrideDecisionArgs]
}

func (w *overrideDecisionWorker) Work(ctx context.Context, job *river.Job[overrideDecisionArgs]) error {
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}

	taskName := riverworkflow.TaskFromJobRow(job.JobRow)
	// We intentionally read through the task API so the key is checked against
	// the task's wait declaration.
	latestSignal, err := workflow.Signals().LatestForTask(ctx, taskName, "risk_override", nil)
	if err != nil {
		return err
	}

	var override overrideSignalPayload
	if err := json.Unmarshal(latestSignal.Payload, &override); err != nil {
		return err
	}

	fmt.Printf("risk override by %s: %s\n", override.ApprovedBy, override.Reason)
	return nil
}

// ExampleWorkflowSignals_LatestForTask_latestEvidenceSignal shows how to fetch the
// most recent signal included in a specific task's wait evidence when the wait
// became resolved from within that task's worker.
//
// For the single-signal baseline, see Example_workflowTaskSignalData.
//
// For full setup wiring including `initTestConfig`, see the package-level
// ClientSetup example.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &overrideDecisionWorker{})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{ID: "wf_latest_override"})
	workflow.Add(overrideDecisionTaskName, overrideDecisionArgs{OrderID: "ord_123"}, nil, &riverpro.WorkflowTaskOpts{
		Wait: &riverworkflow.WaitSpec{
			// This task starts once the risk override from bob has arrived.
			Terms: []riverworkflow.WaitTermSpec{
				riverworkflow.WaitTermSignal("bob_override_received", "risk_override", `payload.approved_by == "bob"`).Label("Bob override received"),
			},
		},
	})

	// Prepare once, then insert the workflow DAG.
	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	// Subscribe before start so we can deterministically wait for task completion.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}
	defer func() {
		if err := riverClient.Stop(ctx); err != nil {
			panic(err)
		}
	}()

	// Alice's signal is stored, but it does not match this task's wait term.
	if _, err := workflow.Signals().Emit(ctx, "risk_override", map[string]any{
		"approved_by": "alice",
		"reason":      "initial triage",
	}, nil); err != nil {
		panic(err)
	}

	// Bob's signal resolves the wait and becomes the latest included evidence.
	if _, err := workflow.Signals().Emit(ctx, "risk_override", map[string]any{
		"approved_by": "bob",
		"reason":      "fraud false-positive",
	}, nil); err != nil {
		panic(err)
	}
	// Worker prints the latest included risk_override signal.
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

}

Output:

risk override by bob: fraud false-positive

func (*WorkflowSignals[TTx]) LatestForTaskTx

func (s *WorkflowSignals[TTx]) LatestForTaskTx(ctx context.Context, tx TTx, taskName string, key string, opts *WorkflowSignalLatestForTaskOpts) (riverworkflow.Signal, error)

LatestForTaskTx loads the newest signal for key that one task is allowed to read inside an existing transaction.

rivertype.ErrNotFound is returned when the task scope is valid but no matching signal exists yet.

One of riverworkflow.SignalUnknownTaskError, riverworkflow.WaitTaskDeclaresNoWaitError, riverworkflow.SignalTaskDeclaresNoSignalKeysError, or riverworkflow.SignalKeyUndeclaredError is returned when taskName or key does not describe a valid task-visible scope.

Use WorkflowSignals.ListForTaskTx when you need pagination or more than one row.

func (*WorkflowSignals[TTx]) List

List loads workflow signals using workflow-wide filters.

This method does not apply task wait visibility rules. It is useful for workflow-wide reads like audit or diagnostics where task-specific visibility is not needed.

To continue pagination, pass WorkflowSignalListResult.NextCursorID as WorkflowSignalListParams.CursorID when WorkflowSignalListResult.HasMore is true.

Example (WorkflowAuditPagination)

ExampleWorkflowSignals_List_workflowAuditPagination demonstrates a common audit export pattern: walk a key-scoped workflow signal timeline with cursor-based pagination.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

ctx := context.Background()

dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
	panic(err)
}
defer dbPool.Close()

// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{}))
if err != nil {
	panic(err)
}

workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{ID: "wf_audit_timeline"})
if _, err := workflow.Signals().Emit(ctx, "state_transition", map[string]any{
	"actor": "system",
	"from":  "queued",
	"to":    "risk_scored",
}, nil); err != nil {
	panic(err)
}
if _, err := workflow.Signals().Emit(ctx, "state_transition", map[string]any{
	"actor": "rules-engine",
	"from":  "risk_scored",
	"to":    "manual_review",
}, nil); err != nil {
	panic(err)
}
if _, err := workflow.Signals().Emit(ctx, "state_transition", map[string]any{
	"actor": "reviewer:alice",
	"from":  "manual_review",
	"to":    "approved",
}, nil); err != nil {
	panic(err)
}

var cursorID int64
for {
	// Page through one signal key at a time using a cursor.
	page, err := workflow.Signals().List(ctx, &riverpro.WorkflowSignalListParams{
		CursorID: cursorID,
		Key:      "state_transition",
		Limit:    500,
	})
	if err != nil {
		panic(err)
	}

	for _, signal := range page.Signals {
		var transition struct {
			Actor string `json:"actor"`
			From  string `json:"from"`
			To    string `json:"to"`
		}
		if err := json.Unmarshal(signal.Payload, &transition); err != nil {
			panic(err)
		}

		fmt.Printf(
			"transition %s -> %s by %s\n",
			transition.From,
			transition.To,
			transition.Actor,
		)
	}

	// Stop when the server says there are no more rows.
	if !page.HasMore {
		break
	}
	cursorID = *page.NextCursorID
}

// Output:
// transition queued -> risk_scored by system
// transition risk_scored -> manual_review by rules-engine
// transition manual_review -> approved by reviewer:alice

func (*WorkflowSignals[TTx]) ListForTask

ListForTask loads signals visible to one task based on that task's wait declarations and persisted evidence boundary.

One of riverworkflow.SignalUnknownTaskError, riverworkflow.WaitTaskDeclaresNoWaitError, riverworkflow.SignalTaskDeclaresNoSignalKeysError, or riverworkflow.SignalKeyUndeclaredError is returned when taskName or key does not describe a valid task-visible scope.

To continue pagination, pass WorkflowSignalListResult.NextCursorID as WorkflowSignalListForTaskParams.CursorID when WorkflowSignalListResult.HasMore is true.

Example

ExampleWorkflowSignals_ListForTask shows a task loading the signals captured in its wait evidence, excluding later signals.

For full setup wiring including `initTestConfig`, see the package-level ClientSetup example.

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type overrideAuditArgs struct {
	OrderID string
}

const overrideAuditTaskName = "resolve_risk_hold"

func (overrideAuditArgs) Kind() string {
	return overrideAuditTaskName
}

type overrideAuditWorker struct {
	river.WorkerDefaults[overrideAuditArgs]

	continueCh chan struct{}
	readyCh    chan struct{}
}

func (w *overrideAuditWorker) Work(ctx context.Context, job *river.Job[overrideAuditArgs]) error {
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}

	taskName := riverworkflow.TaskFromJobRow(job.JobRow)
	// Tell the example goroutine that the worker has started and is ready to
	// test visibility boundaries.
	w.readyCh <- struct{}{}
	// Block until the example emits a post-resolution signal.
	<-w.continueCh

	// This call returns only signals included in this task's wait evidence.
	signals, err := workflow.Signals().ListForTask(ctx, taskName, &riverpro.WorkflowSignalListForTaskParams{
		Key: "risk_override",
	})
	if err != nil {
		return err
	}

	for _, signal := range signals.Signals {
		var override struct {
			ApprovedBy string `json:"approved_by"`
		}
		if err := json.Unmarshal(signal.Payload, &override); err != nil {
			return err
		}
		fmt.Printf("included override by %s\n", override.ApprovedBy)
	}

	return nil
}

// ExampleWorkflowSignals_ListForTask shows a task loading the signals
// captured in its wait evidence, excluding later signals.
//
// For full setup wiring including `initTestConfig`, see the package-level
// ClientSetup example.
func main() {
	ctx := context.Background()
	continueCh := make(chan struct{})
	readyCh := make(chan struct{})

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &overrideAuditWorker{
		continueCh: continueCh,
		readyCh:    readyCh,
	})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(&riverpro.WorkflowOpts{ID: "wf_risk_override_history"})
	workflow.Add(overrideAuditTaskName, overrideAuditArgs{OrderID: "ord_456"}, nil, &riverpro.WorkflowTaskOpts{
		Wait: &riverworkflow.WaitSpec{
			// This task runs once Carol's override is recorded, while the wait
			// evidence still includes earlier matching override signals.
			Terms: []riverworkflow.WaitTermSpec{
				riverworkflow.WaitTermSignal("carol_override_received", "risk_override", `payload.approved_by == "carol"`),
			},
		},
	})

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}
	if _, err := riverClient.InsertMany(ctx, prepareRes.Jobs); err != nil {
		panic(err)
	}

	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}
	defer func() {
		if err := riverClient.Stop(ctx); err != nil {
			panic(err)
		}
	}()

	// Alice and Bob are included as evidence; Carol's signal resolves the wait.
	for _, reviewer := range []string{"alice", "bob", "carol"} {
		if _, err := workflow.Signals().Emit(ctx, "risk_override", map[string]any{
			"approved_by": reviewer,
			"reason":      "manual review override",
		}, nil); err != nil {
			panic(err)
		}
	}
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), readyCh, 1)

	// This signal arrives after the wait is met and is excluded by default.
	if _, err := workflow.Signals().Emit(ctx, "risk_override", map[string]any{
		"approved_by": "dora",
		"reason":      "late audit note",
	}, nil); err != nil {
		panic(err)
	}
	// Let the worker continue after the late signal is written.
	close(continueCh)
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

}

Output:

included override by alice
included override by bob
included override by carol

func (*WorkflowSignals[TTx]) ListForTaskTx

func (s *WorkflowSignals[TTx]) ListForTaskTx(ctx context.Context, tx TTx, taskName string, opts *WorkflowSignalListForTaskParams) (*WorkflowSignalListResult, error)

ListForTaskTx loads signals visible to one task based on that task's wait declarations and persisted evidence boundary inside an existing transaction.

One of riverworkflow.SignalUnknownTaskError, riverworkflow.WaitTaskDeclaresNoWaitError, riverworkflow.SignalTaskDeclaresNoSignalKeysError, or riverworkflow.SignalKeyUndeclaredError is returned when taskName or key does not describe a valid task-visible scope.

To continue pagination, pass WorkflowSignalListResult.NextCursorID as WorkflowSignalListForTaskParams.CursorID when WorkflowSignalListResult.HasMore is true.

func (*WorkflowSignals[TTx]) ListTx

ListTx loads workflow signals using workflow-wide filters inside an existing transaction.

This method does not apply task wait visibility rules.

To continue pagination, pass WorkflowSignalListResult.NextCursorID as WorkflowSignalListParams.CursorID when WorkflowSignalListResult.HasMore is true.

type WorkflowT

type WorkflowT[TTx any] struct {/* contains filtered or unexported fields */}

WorkflowT is a collection of jobs which can have dependencies on other jobs in the workflow. Workflows form a directed acyclic graph (DAG) of jobs, with each one unblocking once its dependencies have been satisfied.

func (*WorkflowT[TTx]) Add

func (w *WorkflowT[TTx]) Add(taskName string, args river.JobArgs, insertOpts *river.InsertOpts, opts *WorkflowTaskOpts) WorkflowTask

Add adds a task to the workflow. The task name must be unique within the workflow.

Panics if attempting to use ephemeral jobs as workflow tasks.

func (*WorkflowT[TTx]) AddSafely

func (w *WorkflowT[TTx]) AddSafely(taskName string, args river.JobArgs, insertOpts *river.InsertOpts, opts *WorkflowTaskOpts) (WorkflowTask, error)

AddSafely adds a task to the workflow. The task name must be unique within the workflow.

Returns an error if attempting to use ephemeral jobs as workflow tasks.

func (*WorkflowT[TTx]) ID

func (w *WorkflowT[TTx]) ID() string

ID returns the ID of the workflow.

func (*WorkflowT[TTx]) LoadAll

func (w *WorkflowT[TTx]) LoadAll(ctx context.Context, opts *WorkflowLoadAllOpts) (*WorkflowTasks, error)

LoadAll loads all tasks in the workflow. If PaginationLimit is not set, all tasks are loaded.

Panics if PaginationLimit or PaginationOffset is less than 0.

func (*WorkflowT[TTx]) LoadAllTx

func (w *WorkflowT[TTx]) LoadAllTx(ctx context.Context, tx TTx, opts *WorkflowLoadAllOpts) (*WorkflowTasks, error)

LoadAllTx loads all tasks in the workflow. If PaginationLimit is not set, all tasks are loaded.

Panics if PaginationLimit is less than 0.

func (*WorkflowT[TTx]) LoadDeps

func (w *WorkflowT[TTx]) LoadDeps(ctx context.Context, taskName string, opts *WorkflowLoadDepsOpts) (*WorkflowTasks, error)

LoadDeps loads the dependencies of a named task in the workflow. If Recursive is true, it will also load the dependencies of the dependencies.

Example

ExampleWorkflowT_LoadDeps demonstrates loading workflow task dependencies.

For full setup wiring including, see `Example_clientSetup`.

package main

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/riverworkflow"
)

type collectDocumentsArgs struct{}

func (collectDocumentsArgs) Kind() string { return "collect_documents" }

type collectDocumentsWorker struct {
	river.WorkerDefaults[collectDocumentsArgs]
}

func (w *collectDocumentsWorker) Work(ctx context.Context, job *river.Job[collectDocumentsArgs]) error {
	fmt.Println("documents collected")
	return nil
}

type approveLoanArgs struct{}

func (approveLoanArgs) Kind() string { return "approve_loan" }

type approveLoanWorker struct {
	river.WorkerDefaults[approveLoanArgs]
}

func (w *approveLoanWorker) Work(ctx context.Context, job *river.Job[approveLoanArgs]) error {
	taskName := riverworkflow.TaskFromJobRow(job.JobRow)
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}
	// Resolve dependencies from the current task's configured DAG edges.
	tasks, err := workflow.LoadDeps(ctx, taskName, &riverpro.WorkflowLoadDepsOpts{})
	if err != nil {
		return err
	}
	fmt.Printf("loaded %d dependency tasks for loan approval\n", tasks.Count())
	return nil
}

// ExampleWorkflowT_LoadDeps demonstrates loading workflow task dependencies.
//
// For full setup wiring including, see `Example_clientSetup`.
func main() { //nolint:dupl
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &collectDocumentsWorker{})
	river.AddWorker(workers, &approveLoanWorker{})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(nil)
	docTask := workflow.Add("collect_documents", collectDocumentsArgs{}, nil, nil)
	workflow.Add("approve_loan", approveLoanArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{docTask.Name}})

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}

	_, err = riverClient.InsertMany(ctx, prepareRes.Jobs)
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until jobs are worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// Wait for two jobs to complete.
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 2)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

Output:

documents collected
loaded 1 dependency tasks for loan approval

func (*WorkflowT[TTx]) LoadDepsByJob

func (w *WorkflowT[TTx]) LoadDepsByJob(ctx context.Context, job *rivertype.JobRow, opts *WorkflowLoadDepsOpts) (*WorkflowTasks, error)

LoadDepsByJob loads the dependencies of a job in the workflow. If Recursive is true, it will also load the dependencies of the dependencies.

func (*WorkflowT[TTx]) LoadDepsByJobTx

func (w *WorkflowT[TTx]) LoadDepsByJobTx(ctx context.Context, tx TTx, job *rivertype.JobRow, opts *WorkflowLoadDepsOpts) (*WorkflowTasks, error)

LoadDepsByJobTx loads the dependencies of a job in the workflow. If Recursive is true, it will also load the dependencies of the dependencies.

func (*WorkflowT[TTx]) LoadDepsTx

func (w *WorkflowT[TTx]) LoadDepsTx(ctx context.Context, tx TTx, taskName string, opts *WorkflowLoadDepsOpts) (*WorkflowTasks, error)

LoadDepsTx loads the dependencies of a named task in the workflow. If Recursive is true, it will also load the dependencies of the dependencies.

func (*WorkflowT[TTx]) LoadOutput

func (w *WorkflowT[TTx]) LoadOutput(ctx context.Context, taskName string, v any) error

LoadOutput loads the output of the named task in the workflow and unmarshals it into v.

If loading more than one task's output (including dependencies) it is more efficient to avoid multiple database calls by first loading the tasks with WorkflowT.LoadDeps or WorkflowT.LoadAll and then use the WorkflowTasks.Output method.

If no task with that name is found, rivertype.ErrNotFound is returned.

Example

ExampleWorkflowT_LoadOutput demonstrates retrieving output from workflow task dependencies.

For full setup wiring including, see `Example_clientSetup`.

package main

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/testutil"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
)

type shippingQuoteOutput struct {
	Cents int `json:"cents"`
}

type buyLabelArgs struct{}

func (buyLabelArgs) Kind() string { return "buy_label" }

type buyLabelWorker struct {
	river.WorkerDefaults[buyLabelArgs]
}

func (w *buyLabelWorker) Work(ctx context.Context, job *river.Job[buyLabelArgs]) error {
	client := riverpro.ClientFromContext[pgx.Tx](ctx)
	workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
	if err != nil {
		return err
	}
	// Load recorded output from the upstream task by task name.
	var quote shippingQuoteOutput
	if err := workflow.LoadOutput(ctx, "quote_shipping", &quote); err != nil {
		return err
	}
	fmt.Printf("shipping quote cents: %d\n", quote.Cents)
	return nil
}

type quoteShippingArgs struct{}

func (quoteShippingArgs) Kind() string { return "quote_shipping" }

type quoteShippingWorker struct {
	river.WorkerDefaults[quoteShippingArgs]
}

func (w *quoteShippingWorker) Work(ctx context.Context, job *river.Job[quoteShippingArgs]) error {
	// Record output that downstream tasks can consume through `LoadOutput`.
	output := shippingQuoteOutput{Cents: 4200}
	return river.RecordOutput(ctx, output)
}

// ExampleWorkflowT_LoadOutput demonstrates retrieving output from workflow task
// dependencies.
//
// For full setup wiring including, see `Example_clientSetup`.
func main() { //nolint:dupl
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &quoteShippingWorker{})
	river.AddWorker(workers, &buyLabelWorker{})

	// Shared config setup keeps this example focused; see `initTestConfig` for defaults.
	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), initTestConfig(ctx, dbPool, &riverpro.Config{
		Config: river.Config{
			Workers: workers,
		},
	}))
	if err != nil {
		panic(err)
	}

	workflow := riverClient.NewWorkflow(nil)
	quoteTask := workflow.Add("quote_shipping", quoteShippingArgs{}, nil, nil)
	workflow.Add("buy_label", buyLabelArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{quoteTask.Name}})

	prepareRes, err := workflow.Prepare(ctx)
	if err != nil {
		panic(err)
	}

	_, err = riverClient.InsertMany(ctx, prepareRes.Jobs)
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until jobs are worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	// Wait for two jobs to complete.
	riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 2)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

Output:

shipping quote cents: 4200

func (*WorkflowT[TTx]) LoadOutputByJob

func (w *WorkflowT[TTx]) LoadOutputByJob(ctx context.Context, job *rivertype.JobRow, v any) error

LoadOutputByJob loads the output of the task corresponding to the given job in the workflow and unmarshals it into v.

func (*WorkflowT[TTx]) LoadOutputByJobTx

func (w *WorkflowT[TTx]) LoadOutputByJobTx(ctx context.Context, tx TTx, job *rivertype.JobRow, v any) error

LoadOutputByJobTx loads the output of the task corresponding to the given job in the workflow and unmarshals it into v.

func (*WorkflowT[TTx]) LoadOutputTx

func (w *WorkflowT[TTx]) LoadOutputTx(ctx context.Context, tx TTx, taskName string, v any) error

LoadOutputTx loads the output of the named task in the workflow and unmarshals it into v.

If loading more than one task's output (including dependencies) it is more efficient to avoid multiple database calls by first loading the tasks with WorkflowT.LoadDeps or WorkflowT.LoadAll and then use the WorkflowTasks.Output method.

If no task with that name is found, rivertype.ErrNotFound is returned.

func (*WorkflowT[TTx]) LoadTask

func (w *WorkflowT[TTx]) LoadTask(ctx context.Context, taskName string) (*WorkflowTaskWithJob, error)

LoadTask loads one named task in the workflow.

If no task with that name is found, rivertype.ErrNotFound is returned.

func (*WorkflowT[TTx]) LoadTaskTx

func (w *WorkflowT[TTx]) LoadTaskTx(ctx context.Context, tx TTx, taskName string) (*WorkflowTaskWithJob, error)

LoadTaskTx loads one named task in the workflow inside an existing transaction.

If no task with that name is found, rivertype.ErrNotFound is returned.

func (*WorkflowT[TTx]) Name

func (w *WorkflowT[TTx]) Name() string

Name returns the name of the workflow.

func (*WorkflowT[TTx]) Prepare

func (w *WorkflowT[TTx]) Prepare(ctx context.Context) (*WorkflowPrepareResult, error)

Prepare validates the jobs in the workflow and prepares them for insertion. Validations ensure that:

  • All specified dependencies correspond to other task names in the workflow
  • Task names are unique
  • There are no cycles

If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.

For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, existing task names are loaded from the database prior to validation.

func (*WorkflowT[TTx]) PrepareTx

func (w *WorkflowT[TTx]) PrepareTx(ctx context.Context, tx TTx) (*WorkflowPrepareResult, error)

PrepareTx validates the jobs in the workflow and prepares them for insertion. Validations ensure that:

  • All specified dependencies correspond to other task names in the workflow
  • Task names are unique
  • There are no cycles

If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.

For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, existing task names are loaded from the database prior to validation.

func (*WorkflowT[TTx]) Retry

func (w *WorkflowT[TTx]) Retry(ctx context.Context, opts *WorkflowRetryOpts) (*WorkflowRetryResult, error)

Retry retries the jobs in the workflow. By default, all jobs will be retried. This can be overridden with the Mode option.

Retry clears live per-attempt workflow metadata from retried jobs so they present as a fresh active attempt. Prior signals remain queryable by attempt, while timer rows are repaired by evaluator after the retry worklist enqueue.

Returns a WorkflowRetryStillActiveError if the workflow is still active.

func (*WorkflowT[TTx]) RetryTx

func (w *WorkflowT[TTx]) RetryTx(ctx context.Context, tx TTx, opts *WorkflowRetryOpts) (*WorkflowRetryResult, error)

RetryTx retries the jobs in the workflow. By default, all jobs will be retried. This can be overridden with the Mode option.

ResetHistory indicates whether to reset the history of the jobs when retrying them, including resetting their attempt count back to 0 (starting over their retry policy in the process) and clearing out known metadata from past attempts. If not set, the max attempts will be incremented by 1 to allow for a single additional attempt.

Retry does not directly modify workflow timer rows. The retry worklist enqueue hands timer repair to evaluator so `river_workflow_timer` remains evaluator-owned derived state.

Returns a WorkflowRetryStillActiveError if the workflow is still active.

func (*WorkflowT[TTx]) Signals

func (w *WorkflowT[TTx]) Signals() *WorkflowSignals[TTx]

Signals returns workflow-scoped signal operations.

func (*WorkflowT[TTx]) WaitDiagnostics

func (w *WorkflowT[TTx]) WaitDiagnostics(ctx context.Context, taskName string, opts *WorkflowWaitDiagnosticsOpts) (*riverworkflow.WaitDiagnostics, error)

WaitDiagnostics inspects the current state of a task wait without writing audit evidence.

If no task with that name is found, rivertype.ErrNotFound is returned. If the task exists but declares no wait, riverworkflow.WaitTaskDeclaresNoWaitError is returned. If the wait metadata cannot be decoded, WorkflowTaskWaitDecodeError is returned.

opts may be nil; see WorkflowWaitDiagnosticsOpts for the available knobs (most notably the signal scan limit). When the scan hits its limit, riverworkflow.WaitDiagnostics.Truncated is set and the diagnostic's expression result, eval error, and per-term match counts are best-effort.

func (*WorkflowT[TTx]) WaitDiagnosticsTx

func (w *WorkflowT[TTx]) WaitDiagnosticsTx(ctx context.Context, tx TTx, taskName string, opts *WorkflowWaitDiagnosticsOpts) (*riverworkflow.WaitDiagnostics, error)

WaitDiagnosticsTx inspects the current state of a task wait inside an existing transaction without writing audit evidence.

If no task with that name is found, rivertype.ErrNotFound is returned. If the task exists but declares no wait, riverworkflow.WaitTaskDeclaresNoWaitError is returned. If the wait metadata cannot be decoded, WorkflowTaskWaitDecodeError is returned.

opts may be nil; see WorkflowWaitDiagnosticsOpts for the available knobs (most notably the signal scan limit). When the scan hits its limit, riverworkflow.WaitDiagnostics.Truncated is set and the diagnostic's expression result, eval error, and per-term match counts are best-effort.

type WorkflowTask

type WorkflowTask struct {
	// Name is the name of the workflow task.
	Name string
}

WorkflowTask is a reference to a task in a workflow.

type WorkflowTaskOpts

type WorkflowTaskOpts struct {
	// Deps are the names of other tasks in the workflow that this task depends
	// on. If any of the dependencies are cancelled, deleted, or discarded after
	// failing repeatedly, the task will be cancelled.
	Deps []string
	// IgnoreCancelledDeps specifies whether to ignore cancelled dependencies,
	// instead treating them as if they had completed successfully.
	//
	// If specified, this overrides the IgnoreCancelledDeps option specified in
	// the workflow.
	IgnoreCancelledDeps *bool
	// IgnoreDeletedDeps specifies whether to ignore deleted dependencies,
	// instead treating them as if they had completed successfully.
	//
	// If specified, this overrides the IgnoreDeletedDeps option specified in
	// the workflow.
	IgnoreDeletedDeps *bool
	// IgnoreDiscardedDeps specifies whether to ignore discarded dependencies,
	// instead treating them as if they had completed successfully.
	//
	// If specified, this overrides the IgnoreDiscardedDeps option specified in
	// the workflow.
	IgnoreDiscardedDeps *bool
	// Wait defines a wait that must become resolved before the task can be staged.
	Wait *riverworkflow.WaitSpec
}

WorkflowTaskOpts are options for adding a task to a workflow.

type WorkflowTaskPendingReason

type WorkflowTaskPendingReason workflowinternal.TaskPendingReason

WorkflowTaskPendingReason describes why a pending workflow task is not currently runnable.

type WorkflowTaskWaitDecodeError

type WorkflowTaskWaitDecodeError struct {
	Err error

	JobID      int64
	TaskName   string
	WorkflowID string
}

WorkflowTaskWaitDecodeError is returned when task wait metadata cannot be decoded while loading workflow tasks.

It is produced by workflow task load paths (for example WorkflowT.LoadAll and WorkflowT.LoadDeps) and wraps an underlying riverworkflow.WaitMetadataDecodeError.

func (*WorkflowTaskWaitDecodeError) Error

func (*WorkflowTaskWaitDecodeError) Is

func (e *WorkflowTaskWaitDecodeError) Is(target error) bool

func (*WorkflowTaskWaitDecodeError) Unwrap

func (e *WorkflowTaskWaitDecodeError) Unwrap() error

type WorkflowTaskWithJob

type WorkflowTaskWithJob struct {
	// Deps are the names of other tasks in the workflow that this task depends
	// on.
	Deps []string
	// IgnoreCancelledDeps indicates that cancelled dependencies should be
	// treated as satisfied.
	IgnoreCancelledDeps bool
	// IgnoreDeletedDeps indicates that deleted dependencies should be treated as
	// satisfied.
	IgnoreDeletedDeps bool
	// IgnoreDiscardedDeps indicates that discarded dependencies should be
	// treated as satisfied.
	IgnoreDiscardedDeps bool
	// Job is the job row for the task.
	Job *rivertype.JobRow
	// Name is the name of the workflow task.
	Name string
	// PendingReason describes why a pending task is not currently runnable.
	PendingReason WorkflowTaskPendingReason
	// Wait contains wait details for this task.
	//
	// Nil means this task has no wait.
	Wait *riverworkflow.Wait
	// WorkflowID is the workflow that owns this task.
	WorkflowID string
}

WorkflowTaskWithJob is a loaded workflow task with its job row and task-specific workflow metadata.

In addition to task identity/dependencies, it includes wait details through Wait when the task has a wait.

func (*WorkflowTaskWithJob) Output

func (w *WorkflowTaskWithJob) Output(v any) error

Output unmarshals the JSON-encoded output of the task and stores the result in the value pointed to by v using json.Unmarshal.

As with json.Unmarshal, if v is nil or not a pointer, an InvalidUnmarshalError is returned.

If the task has no output, a TaskHasNoOutputError is returned.

type WorkflowTasks

type WorkflowTasks struct {/* contains filtered or unexported fields */}

WorkflowTasks is a loaded collection of tasks in a workflow.

func (*WorkflowTasks) Count

func (w *WorkflowTasks) Count() int

Count returns the number of tasks in the workflow.

func (*WorkflowTasks) Get

Get returns the task with the given name.

func (*WorkflowTasks) Names

func (w *WorkflowTasks) Names() []string

Names returns the names of all loaded tasks in this collection.

func (*WorkflowTasks) Output

func (w *WorkflowTasks) Output(name string, v any) error

Output unmarshals the JSON-encoded output of the named task and stores it in the value pointed to by v.

type WorkflowWaitDiagnosticsOpts

type WorkflowWaitDiagnosticsOpts struct {
	// SignalScanLimit caps the number of signal rows loaded across all
	// declared signal keys for the task wait's current attempt. When unset
	// or <= 0, a default of 10,000 is used. The maximum allowed value is
	// 100,000; values above the maximum are rejected with an error.
	// Diagnostics that need to inspect more rows than that should reach
	// for paginated reads via [WorkflowSignals.ListForTask] instead. When
	// the configured limit is hit,
	// [riverworkflow.WaitDiagnostics.Truncated] is set and the diagnostic's
	// expression result, eval error, per-term match counts, and per-signal
	// input counts are best-effort.
	SignalScanLimit int
}

WorkflowWaitDiagnosticsOpts controls how WorkflowT.WaitDiagnostics performs its live evaluation pass.

Directories

cmd/riverpro module
driver module
riverbatch Package riverbatch provides functionality for batching multiple River jobs together for execution as a single group.
riverencrypt
riverproshared/riverprosharedtest
riverworkflow Package riverworkflow provides workflow helpers and wait types for River Pro's workflow engine.