Package riverpro is an extension for River, the robust high-performance job processing system for Go and Postgres.
See homepage, docs, and the River UI.
River Pro extends River through the use of a "pro" client and driver that wraps the standard River variants. This allows for additional functionality to be injected into the River client.
import ( "riverqueue.com/riverpro" "riverqueue.com/riverpro/driver/riverpropgxv5" ) riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{ Config: riverpro.Config{ // Standard River configuration is embedded in the Pro config: Config: river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }, }, })
Once the Pro driver is in place, River Pro functionality can be used.
River Pro sequences guarantee that a specific series of jobs will be executed in a one-at-a-time sequential order relative to other jobs in the same sequence. Sequences are partitioned based upon a "sequence key" that is computed from various job attributes such as its kind and args (or a subset of args).
Jobs across sequences may run in parallel. Unlike unique jobs, sequences allow an infinite number of jobs to be queued up in the sequence, even though only one job will be worked at a time.
See the sequence docs for more information on how to use sequences.
River Pro workflows allow you to define a graph of interdependent jobs to express complex, multi-step workflows, including fan-out and fan-in execution. Workflows are a powerful tool for orchestrating tasks that depend on each other, and can be used to model a wide range of business processes.
Workflows allow you to compose jobs in a directed acyclic graph (DAG) where each job can depend on the completion of other jobs. Jobs will not be available for work until all of their dependencies have completed successfully. In addition, jobs that are scheduled to run in the future at time T will not execute until all their dependencies have completed _and_ the current time is greater than T.
import ( "riverqueue.com/riverpro" "riverqueue.com/riverpro/driver/riverpropgxv5" ) client, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{ // ... }) workflow := riverpro.NewWorkflow(&river.WorkflowOpts{Name: "Send invoice to Alice"}) taskA := workflow.Add("task_a", issueInvoice{Email: "alice@example.com"}, nil, nil) workflow.Add("task_b", sendInvoiceEmailArgs{Email: "alice@example.com"}, nil, &river.WorkflowTaskOpts{ Deps: []string{taskA.Name}, }) result, err := client.WorkflowPrepare(context.Background(), workflow) if err != nil { log.Fatal(err) } count, err := client.InsertMany(ctx, result.Jobs) if err != nil { log.Fatal(err) } fmt.Printf("inserted %d jobs\n", count)
By default, the workflow engine will automatically cancel workflow jobs if any of their dependencies are cancelled, discarded, or deleted. This behavior can be overridden on a per-workflow basis with WorkflowOpts, or on a per-task basis with riverpro.WorkflowTaskOpts.
See the workflow docs for more information on how to use workflows.
Example_globalConcurrencyLimiting demonstrates how to use global concurrency limiting to limit the number of jobs of a given kind that can run at once.
package main
import (
"context"
"fmt"
"log/slog"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivershared/util/slogutil"
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
"riverqueue.com/riverpro/internal/internaltest"
)
type ConcurrentLimitedArgs struct{}
func (ConcurrentLimitedArgs) Kind() string { return "concurrent_limited" }
type ConcurrentLimitedWorker struct {
river.WorkerDefaults[ConcurrentLimitedArgs]
jobsRunning *atomic.Int64
}
func (w *ConcurrentLimitedWorker) Work(ctx context.Context, job *river.Job[ConcurrentLimitedArgs]) error {
w.jobsRunning.Add(1)
defer w.jobsRunning.Add(-1)
if count := w.jobsRunning.Load(); count > 2 {
fmt.Printf("too many jobs running: %d\n", count)
return fmt.Errorf("too many jobs running: %d", count)
}
return nil
}
// Example_globalConcurrencyLimiting demonstrates how to use global concurrency
// limiting to limit the number of jobs of a given kind that can run at once.
func main() {
ctx, cancel := context.WithCancel(context.Background())
dbPool, err := pgxpool.NewWithConfig(ctx, internaltest.DatabaseConfig("river_test_example"))
if err != nil {
panic(err)
}
defer dbPool.Close()
// Required for the purpose of this test, but not necessary in real usage.
if err := internaltest.TruncateRiverTables(ctx, dbPool); err != nil {
panic(err)
}
workers := river.NewWorkers()
worker := &ConcurrentLimitedWorker{jobsRunning: &atomic.Int64{}}
river.AddWorker(workers, worker)
startNewClientAndSubscribe := func() (*riverpro.Client[pgx.Tx], <-chan *river.Event, func()) {
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
// Poll quickly so the example is fast:
FetchCooldown: 50 * time.Millisecond,
FetchPollInterval: 50 * time.Millisecond,
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
},
ProQueues: map[string]riverpro.QueueConfig{
river.QueueDefault: {
Concurrency: riverpro.ConcurrencyConfig{
// Allow up to 2 of each kind of job to run at once globally.
GlobalLimit: 2,
// Allow no more than 1 of each kind of job to run at once per client.
LocalLimit: 1,
Partition: riverpro.PartitionConfig{
ByKind: true,
},
},
MaxWorkers: 10,
},
},
})
if err != nil {
panic(err)
}
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
subscribeChan, cancel := riverClient.Subscribe(river.EventKindJobCompleted)
return riverClient, subscribeChan, cancel
}
// Use two clients to simulate two different workers. In real usage, these
// would probably be on different machines.
client1, subscribeChan1, cancel1 := startNewClientAndSubscribe()
defer cancel1()
client2, subscribeChan2, cancel2 := startNewClientAndSubscribe()
defer cancel2()
insertParams := make([]river.InsertManyParams, 10)
for i := range insertParams {
insertParams[i] = river.InsertManyParams{Args: ConcurrentLimitedArgs{}}
}
if _, err = client1.InsertMany(ctx, insertParams); err != nil {
panic(err)
}
shutdown := func() {
cancel() // intiate hard shutdown on both clients to stop fetching jobs
<-client1.Stopped()
<-client2.Stopped()
}
jobsCompleted := 0
for range 10 {
// Do an extra check to ensure no more than 2 jobs are running:
numRunning := worker.jobsRunning.Load()
if numRunning > 2 {
fmt.Printf("Unexpected number of jobs running: %d\n", numRunning)
shutdown()
return
}
select {
case <-time.After(2 * time.Second):
fmt.Println("timed out waiting for jobs to complete")
shutdown()
return
case <-subscribeChan1:
case <-subscribeChan2:
}
jobsCompleted++
}
fmt.Printf("Jobs completed: %d\n", jobsCompleted)
shutdown()
}
Output:
Jobs completed: 10
This section is empty.
This section is empty.
Client is a client for River Pro. It is a superset of the main river.Client type, with additional methods and behavioral changes for working with Pro features.
ClientFromContext returns the Client from the context. This function can only be used within a Worker's Work() method because that is the only place River sets the Client on the context.
It panics if the context does not contain a Client, which will never happen from the context provided to a Worker's Work() method.
When testing JobArgs.Work implementations, it might be useful to use ContextWithClient to initialize a context that has an available client.
ClientFromContext returns the Client from the context. This function can only be used within a Worker's Work() method because that is the only place River sets the Client on the context.
It returns an error if the context does not contain a Client, which will never happen from the context provided to a Worker's Work() method.
When testing JobArgs.Work implementations, it might be useful to use ContextWithClient to initialize a context that has an available client.
NewClient creates a new River Pro client using the provided driver and configuration.
func (c *Client[TTx]) Queues() *QueueBundle
Queues returns the currently configured set of queues for the client, and can be used to add new ones.
func (c *Client[TTx]) WorkflowCancel(ctx context.Context, workflowID string) (*WorkflowCancelResult, error)
WorkflowCancel cancels all non-finalized tasks in a workflow using the workflow's ID. Non-finalized tasks are those which are not already completed, discarded, or cancelled. As with normal job cancellation, in-flight tasks *may* complete prior to receiving notification of the cancellation.
See WorkflowCancelTx for a version that allows cancellation within a broader database transaction.
The return value is a list of job rows where cancellation was attempted, along with a potential error.
func (c *Client[TTx]) WorkflowCancelTx(ctx context.Context, tx TTx, workflowID string) (*WorkflowCancelResult, error)
WorkflowCancelTx cancels all non-finalized tasks in a workflow using the workflow's ID. Non-finalized tasks are those which are not already completed, discarded, or cancelled. As with normal job cancellation, in-flight tasks *may* complete prior to receiving notification of the cancellation.
This variant operates within an existing transaction. See WorkflowCancel for the non-transaction variant.
The return value is a list of job rows where cancellation was attempted, along with a potential error.
func (c *Client[TTx]) WorkflowPrepare(ctx context.Context, workflow *Workflow) (*WorkflowPrepareResult, error)
WorkflowPrepare validates the jobs in the workflow and prepares them for insertion. Validations ensure that:
If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.
For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.
func (c *Client[TTx]) WorkflowPrepareTx(ctx context.Context, tx TTx, workflow *Workflow) (*WorkflowPrepareResult, error)
WorkflowPrepareTx validates the jobs in the workflow and prepares them for insertion. Validations ensure that:
If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.
For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.
type ConcurrencyConfig struct { // GlobalLimit is the maximum number of jobs that can run concurrently // across all workers. GlobalLimit int // LocalLimit is the maximum number of jobs that can run concurrently // on a single worker. LocalLimit int // Partition is the configuration for partitioning concurrency limits. By // default, concurrency limits are not partitioned and apply to the entire // queue. Partition PartitionConfig }
ConcurrencyConfig is a configuration for concurrency limits. Either a global or local limit (or both) must be specified in order for concurrency limits to be enforced.
type Config struct { river.Config // ProQueues holds the configuration for queues that use Pro-specific // features, including concurrency limits. A given queue can be configured // as either a Pro queue or a standard queue, but not both. ProQueues map[string]QueueConfig // SequenceSchedulerInterval is the amount of time to wait between runs of the sequence scheduler. SequenceSchedulerInterval time.Duration // WorkflowRescuerInterval is the amount of time to wait between runs of the workflow rescuer. WorkflowRescuerInterval time.Duration }
Config holds configuration for the riverpro.Client. The config is primarily managed with an embedded river.Config, along with additional fields for Pro-specific configuration.
type DependencyCycleError struct { DepStack []string }
DependencyCycleError is returned when a dependency cycle is detected in the workflow. A cycle occurs when a task depends on itself, either directly or indirectly through other tasks.
func (e *DependencyCycleError) Error() string
func (e *DependencyCycleError) Is(target error) bool
type DuplicateTaskError struct { TaskName string }
DuplicateTaskError is returned when a task name is added to the same workflow more than once. Task names must be unique within a workflow.
func (e *DuplicateTaskError) Error() string
func (e *DuplicateTaskError) Is(target error) bool
MissingDependencyError is returned when a named dependency is missing from the workflow. A task cannot depend on another task that does not exist in the workflow.
func (e *MissingDependencyError) Error() string
func (e *MissingDependencyError) Is(target error) bool
type PartitionConfig struct { // ByArgs is the list of fields in the job arguments to partition by. // For example, you may wish to limit the number of jobs that can run // concurrently for a given `customer_id`. // // If ByArgs is explicitly set to an empty slice, all fields in the job's // arguments will be used. This is generally not desirable as it can lead // to high cardinality and a large number of partitions. // // When set to nil (the default), arguments are not used for partitioning. ByArgs []string // ByKind indicates that the concurrency limit should be partitioned by the // kind of the job. ByKind bool }
PartitionConfig is a configuration for partitioning concurrency limits.
type QueueBundle struct { *river.QueueBundle // contains filtered or unexported fields }
QueueBundle is a bundle for adding additional queues. It's made accessible through Client.Queues.
Unlike the river.QueueBundle, this bundle also enables Pro-specific configuration of queues.
func (qb *QueueBundle) AddPro(name string, config QueueConfig) error
AddPro adds a Pro queue with the given name and configuration to the QueueBundle. It returns a nil error on success.
type QueueConfig struct { // Concurrency is the optional concurrency configuration for the queue. Concurrency ConcurrencyConfig // MaxWorkers is the maximum number of workers to run for the queue, or put // otherwise, the maximum parallelism to run. // // This is the maximum number of workers within this particular client // instance, but note that it doesn't control the total number of workers // across parallel processes. Installations will want to calculate their // total number by multiplying this number by the number of parallel nodes // running River clients configured to the same database and queue. // // Requires a minimum of 1, and a maximum of 10,000. MaxWorkers int }
QueueConfig is a configuration for a Pro queue. It extends the standard River queue configuration with additional Pro-specific settings.
type SequenceOpts struct { // ByArgs indicates that the job's encoded arguments should be used to compute // a sequence key. // // Default is false, meaning that input arguments will not be considered for // sequencing. // // When set to true, the entire encoded args field will be included in the // sequence hash, which requires care to ensure that no irrelevant args are // factored into the sequence key. It is also possible to use a subset of // the args by indicating on the `JobArgs` struct which fields should be // included in the sequence using struct tags: // // type MyJobArgs struct { // CustomerID string `json:"customer_id" river:"sequence"` // TraceID string `json:"trace_id" // } // // In this example, only the encoded `customer_id` key will be included in the // sequence key and the `trace_id` key will be ignored. // // All keys are sorted alphabetically before hashing to ensure consistent // results. ByArgs bool // ByQueue indicates that sequences should be partitioned by queue. // // Default is false, meaning that as long as any other sequence property is // enabled, sequences will be constructed for a kind across all queues. ByQueue bool // ContinueOnCancelled indicates that the sequence should continue to be // processed even if the job is cancelled. Without this setting, the sequence // will be halted if any job in the sequence is cancelled. ContinueOnCancelled bool // ContinueOnDiscarded indicates that the sequence should continue to be // processed even if the job repeatedly errors and is discarded. Without this // setting, the sequence will be halted if any job in the sequence is // discarded. ContinueOnDiscarded bool // ExcludeKind indicates that the job kind should not be included in the // sequence partition key. This is useful when you want to include multiple // job kinds in the same sequence. ExcludeKind bool }
SequenceOpts are options for defining sequence partitioning on JobArgs.
type Workflow struct {/* contains filtered or unexported fields */}
Workflow is a collection of jobs which can have dependencies on other jobs in the workflow.
func NewWorkflow(opts *WorkflowOpts) *Workflow
NewWorkflow creates a new workflow. If an ID is not specified in opts, it will be generated.
func WorkflowFromExisting(job *rivertype.JobRow, opts *WorkflowOpts) (*Workflow, error)
WorkflowFromExisting initiates a workflow from an existing job in the workflow. This enables jobs to be added to an existing workflow. If the job is not part of a workflow, an error will be returned.
The workflow ID and Name will be derived from the existing job, and the corresponding values in opts will be ignored.
func (w *Workflow) Add(taskName string, args river.JobArgs, insertOpts *river.InsertOpts, opts *WorkflowTaskOpts) WorkflowTask
Add adds a task to the workflow. The task name must be unique within the workflow.
type WorkflowOpts struct { ID string IgnoreCancelledDeps bool IgnoreDeletedDeps bool IgnoreDiscardedDeps bool Name string }
WorkflowOpts are options for creating a new workflow.
type WorkflowPrepareResult struct { // Jobs is a list of jobs to insert into the database. It's structured as a // list of InsertManyParams so the jobs can be inserted with InsertMany. Jobs []river.InsertManyParams }
WorkflowPrepareResult is the result of preparing a workflow. It contains a list of jobs to insert.
type WorkflowTask struct { // Name is the name of the workflow task. Name string }
WorkflowTask is a reference to a task in a workflow.
type WorkflowTaskOpts struct { // Deps are the names of other tasks in the workflow that this task depends // on. If any of the dependencies are cancelled, deleted, or discarded after // failing repeatedly, the task will be cancelled. Deps []string // IgnoreCancelledDeps specifies whether to ignore cancelled dependencies, // instead treating them as if they had completed successfully. // // If specified, this overrides the IgnoreCancelledDeps option specified in // the workflow. IgnoreCancelledDeps *bool // IgnoreDeletedDeps specifies whether to ignore deleted dependencies, // instead treating them as if they had completed successfully. // // If specified, this overrides the IgnoreDeletedDeps option specified in // the workflow. IgnoreDeletedDeps *bool // IgnoreDiscardedDeps specifies whether to ignore discarded dependencies, // instead treating them as if they had completed successfully. // // If specified, this overrides the IgnoreDiscardedDeps option specified in // the workflow. IgnoreDiscardedDeps *bool }
WorkflowTaskOpts are options for adding a task to a workflow.
Path | Synopsis |
---|---|
cmd/riverpro module | |
driver module | |
riverencrypt | |
riverworkflow | Package riverworkflow provides a workflow engine for River. |