Package riverpro is an extension for River, the robust high-performance job processing system for Go and Postgres.
See homepage, docs, and the River UI.
River Pro extends River through the use of a "pro" client and driver that wraps the standard River variants. This allows for additional functionality to be injected into the River client.
import (
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
)
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: riverpro.Config{
// Standard River configuration is embedded in the Pro config:
Config: river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
},
},
})
Once the Pro driver is in place, River Pro functionality can be used.
River Pro batching allows simultaneous execution of many similar jobs together as a single group instead of executing each job individually. See riverqueue.com/riverpro/riverbatch for more details.
River Pro sequences guarantee that a specific series of jobs will be executed in a one-at-a-time sequential order relative to other jobs in the same sequence. Sequences are partitioned based upon a "sequence key" that is computed from various job attributes such as its kind and args (or a subset of args).
Jobs across sequences may run in parallel. Unlike unique jobs, sequences allow an infinite number of jobs to be queued up in the sequence, even though only one job will be worked at a time.
See the sequence docs for more information on how to use sequences.
River Pro workflows allow you to define a graph of interdependent jobs to express complex, multi-step workflows, including fan-out and fan-in execution. Workflows are a powerful tool for orchestrating tasks that depend on each other, and can be used to model a wide range of business processes.
Workflows allow you to compose jobs in a directed acyclic graph (DAG) where each job can depend on the completion of other jobs. Jobs will not be available for work until all of their dependencies have completed successfully. In addition, jobs that are scheduled to run in the future at time T will not execute until all their dependencies have completed _and_ the current time is greater than T.
import (
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
)
ctx := context.Background()
client, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
// ...
})
workflow := client.NewWorkflow(&river.WorkflowOpts{Name: "Send invoice to Alice"})
taskA := workflow.Add("task_a", issueInvoice{Email: "alice@example.com"}, nil, nil)
workflow.Add("task_b", sendInvoiceEmailArgs{Email: "alice@example.com"}, nil, &riverpro.WorkflowTaskOpts{
Deps: []string{taskA.Name},
})
result, err := workflow.Prepare(ctx)
if err != nil {
log.Fatal(err)
}
count, err := client.InsertMany(ctx, result.Jobs)
if err != nil {
log.Fatal(err)
}
fmt.Printf("inserted %d jobs\n", count)
By default, the workflow engine will automatically cancel workflow jobs if any of their dependencies are cancelled, discarded, or deleted. This behavior can be overridden on a per-workflow basis with WorkflowOpts, or on a per-task basis with WorkflowTaskOpts.
See the workflow docs for more information on how to use workflows.
Example_batchWorker demonstrates how to use a riverqueue.com/riverpro/riverbatch.ManyWorker to process a set of jobs together as a single unit.
package main
import (
"context"
"errors"
"fmt"
"log/slog"
"sort"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
"riverqueue.com/riverpro/riverbatch"
)
type BulkTerminateInstanceArgs struct {
InstanceID int64 `json:"customer_id" river:"batch"`
}
func (BulkTerminateInstanceArgs) Kind() string { return "bulk_terminate_instance" }
func (a BulkTerminateInstanceArgs) BatchOpts() riverpro.BatchOpts {
return riverpro.BatchOpts{}
}
type BulkTerminateInstanceWorker struct {
river.WorkerDefaults[BulkTerminateInstanceArgs]
}
func (w *BulkTerminateInstanceWorker) Work(ctx context.Context, job *river.Job[BulkTerminateInstanceArgs]) error {
return riverbatch.Work[BulkTerminateInstanceArgs, pgx.Tx](ctx, w, job, &riverbatch.WorkerOpts{
MaxCount: 5,
// Very short delays to make the example fast:
MaxDelay: 10 * time.Millisecond,
PollInterval: 10 * time.Millisecond,
})
}
func (w *BulkTerminateInstanceWorker) NextRetry(job *river.Job[BulkTerminateInstanceArgs]) time.Time {
// Always retry immediately for this example:
return time.Now()
}
func (w *BulkTerminateInstanceWorker) WorkMany(ctx context.Context, jobs []*river.Job[BulkTerminateInstanceArgs]) error {
sortedIDs := sliceutil.Map(jobs, func(job *river.Job[BulkTerminateInstanceArgs]) int64 { return job.Args.InstanceID })
sort.Slice(sortedIDs, func(i, j int) bool { return sortedIDs[i] < sortedIDs[j] })
fmt.Printf("Terminating a batch of %d instances with ids=%v\n", len(jobs), sortedIDs)
err := riverbatch.NewMultiError()
for _, job := range jobs {
// instance IDs divisible by 10 fail with an error, but only on the first attempt:
if job.Args.InstanceID%10 == 0 && job.Attempt == 1 {
err.AddByID(job.ID, errors.New("transient failure"))
}
}
return err
}
// Example_batchWorker demonstrates how to use a [riverqueue.com/riverpro/riverbatch.ManyWorker] to
// process a set of jobs together as a single unit.
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
worker := &BulkTerminateInstanceWorker{}
river.AddWorker(workers, worker)
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelError}),
// Poll quickly so the example is fast:
FetchCooldown: 50 * time.Millisecond,
FetchPollInterval: 50 * time.Millisecond,
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpropgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
},
ProQueues: map[string]riverpro.QueueConfig{
river.QueueDefault: {
Concurrency: riverpro.ConcurrencyConfig{
// Allow only batch leader job to run at once globally, per kind
// (separate job kinds get batched separately):
GlobalLimit: 1,
Partition: riverpro.PartitionConfig{
ByKind: true,
},
},
MaxWorkers: 10,
},
},
})
if err != nil {
panic(err)
}
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
subscribeChan, cancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer cancel()
// Insert 10 jobs at once:
insertParams := make([]river.InsertManyParams, 10)
for i := range insertParams {
insertParams[i] = river.InsertManyParams{Args: BulkTerminateInstanceArgs{
InstanceID: int64(i) + 1,
}}
}
if _, err = riverClient.InsertMany(ctx, insertParams); err != nil {
panic(err)
}
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 10)
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
}
Output:
Terminating a batch of 5 instances with ids=[1 2 3 4 5] Terminating a batch of 5 instances with ids=[6 7 8 9 10] Terminating a batch of 1 instances with ids=[10]
Example_durablePeriodicJob demonstrates the use of a durable periodic job. With durable periodic jobs, next run times are stored to the database and therefore run reliably at their target times even across client restarts.
package main
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
)
type PeriodicJobArgs struct{}
// Kind is the unique string name for this job.
func (PeriodicJobArgs) Kind() string { return "periodic" }
// PeriodicJobWorker is a job worker for sorting strings.
type PeriodicJobWorker struct {
river.WorkerDefaults[PeriodicJobArgs]
}
func (w *PeriodicJobWorker) Work(ctx context.Context, job *river.Job[PeriodicJobArgs]) error {
fmt.Printf("This job will run once immediately then once every 15 minutes\n")
return nil
}
// Example_durablePeriodicJob demonstrates the use of a durable periodic job.
// With durable periodic jobs, next run times are stored to the database and
// therefore run reliably at their target times even across client restarts.
func main() {
ctx := context.Background()
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
river.AddWorker(workers, &PeriodicJobWorker{})
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
PeriodicJobs: []*river.PeriodicJob{
river.NewPeriodicJob(
&onceImmediatelyThenEvery15Minutes{},
func() (river.JobArgs, *river.InsertOpts) {
return PeriodicJobArgs{}, nil
},
&river.PeriodicJobOpts{
// inclusion of an ID makes periodic job durable (as
// long as DurablePeriodicJob.Enable below is true);
// must be unique across periodic jobs
ID: "my_periodic_job",
},
),
},
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Schema: riverdbtest.TestSchema(ctx, testutil.TestingTB(testutil.PanicTB()), riverpropgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
},
DurablePeriodicJobs: riverpro.DurablePeriodicJobsConfig{
Enabled: true,
},
})
if err != nil {
panic(err)
}
// Out of example scope, but used to wait until a job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()
// There's no need to explicitly insert a periodic job. One will be inserted
// (and worked soon after) as the client starts up.
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
// Wait for jobs to complete. Only needed for purposes of the example test.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
}
// A bit contrived, but a custom schedule that runs once immediately, then once
// every 15 minutes. This lets us get away without having to include the
// RunOnStart option (whose use doesn't make much sense with durable periodic
// jobs), but also have the test execute its first job immediately so there
// isn't a wait that slows down the test suite.
type onceImmediatelyThenEvery15Minutes struct{ calledOnce bool }
func (s *onceImmediatelyThenEvery15Minutes) Next(t time.Time) time.Time {
if s.calledOnce {
return t.Add(15 * time.Minute)
}
s.calledOnce = true
return t
}
Output:
This job will run once immediately then once every 15 minutes
Example_ephemeralJob demonstrates ephemeral jobs, which are jobs that are immediately deleted upon completion, trading off some operational visibility for potential stability as fewer operations are needed later to prune them.
package main
import (
"context"
"errors"
"fmt"
"log/slog"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"github.com/riverqueue/river/rivertype"
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
)
type EphemeralJobArgs struct{}
func (a EphemeralJobArgs) Kind() string { return "ephemeral" }
func (a EphemeralJobArgs) EphemeralOpts() riverpro.EphemeralOpts { return riverpro.EphemeralOpts{} }
type EphemeralJobWorker struct {
river.WorkerDefaults[EphemeralJobArgs]
}
func (w *EphemeralJobWorker) Work(ctx context.Context, job *river.Job[EphemeralJobArgs]) error {
fmt.Printf("This job will run and be immediately deleted from the database\n")
return nil
}
// Example_ephemeralJob demonstrates ephemeral jobs, which are jobs that are
// immediately deleted upon completion, trading off some operational visibility
// for potential stability as fewer operations are needed later to prune them.
func main() {
ctx := context.Background()
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
river.AddWorker(workers, &EphemeralJobWorker{})
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Schema: riverdbtest.TestSchema(ctx, testutil.TestingTB(testutil.PanicTB()), riverpropgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
},
})
if err != nil {
panic(err)
}
// Out of example scope, but used to wait until a job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()
insertRes, err := riverClient.Insert(ctx, EphemeralJobArgs{}, nil)
if err != nil {
panic(err)
}
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
// Wait for jobs to complete. Only needed for purposes of the example test.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
_, err = riverClient.JobGet(ctx, insertRes.Job.ID)
if errors.Is(err, rivertype.ErrNotFound) {
fmt.Printf("Job was worked and is now gone from the database")
} else {
panic("expected to get rivertype.ErrrNotFound")
}
}
Output:
This job will run and be immediately deleted from the database Job was worked and is now gone from the database
Example_ephemeralQueue demonstrates addings a job to an ephemeral queue, which is a queue where jobs are immediately deleted upon completion, trading off some operational visibility for potential stability as fewer operations are needed later to prune them.
package main
import (
"context"
"errors"
"fmt"
"log/slog"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"github.com/riverqueue/river/rivertype"
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
"riverqueue.com/riverpro/internal/testworker"
)
func main() {
ctx := context.Background()
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
river.AddWorker(workers, &testworker.NoOpWorker{})
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Schema: riverdbtest.TestSchema(ctx, testutil.TestingTB(testutil.PanicTB()), riverpropgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
},
ProQueues: map[string]riverpro.QueueConfig{
"my_ephemeral_queue": {
Ephemeral: riverpro.QueueEphemeralConfig{
Enabled: true,
},
MaxWorkers: 100,
},
},
})
if err != nil {
panic(err)
}
// Out of example scope, but used to wait until a job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()
insertRes, err := riverClient.Insert(ctx, testworker.NoOpArgs{}, &river.InsertOpts{Queue: "my_ephemeral_queue"})
if err != nil {
panic(err)
}
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
// Wait for jobs to complete. Only needed for purposes of the example test.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
_, err = riverClient.JobGet(ctx, insertRes.Job.ID)
if errors.Is(err, rivertype.ErrNotFound) {
fmt.Printf("Job was worked and is now gone from the database")
} else {
panic("expected to get rivertype.ErrrNotFound")
}
}
Output:
Job was worked and is now gone from the database
Example_globalConcurrencyLimiting demonstrates how to use global concurrency limiting to limit the number of jobs of a given kind that can run at once.
package main
import (
"context"
"fmt"
"log/slog"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
)
type ConcurrentLimitedArgs struct{}
func (ConcurrentLimitedArgs) Kind() string { return "concurrent_limited" }
type ConcurrentLimitedWorker struct {
river.WorkerDefaults[ConcurrentLimitedArgs]
jobsRunning *atomic.Int64
}
func (w *ConcurrentLimitedWorker) Work(ctx context.Context, job *river.Job[ConcurrentLimitedArgs]) error {
w.jobsRunning.Add(1)
defer w.jobsRunning.Add(-1)
if count := w.jobsRunning.Load(); count > 2 {
fmt.Printf("too many jobs running: %d\n", count)
return fmt.Errorf("too many jobs running: %d", count)
}
return nil
}
// Example_globalConcurrencyLimiting demonstrates how to use global concurrency
// limiting to limit the number of jobs of a given kind that can run at once.
func main() {
ctx, cancel := context.WithCancel(context.Background())
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
worker := &ConcurrentLimitedWorker{jobsRunning: &atomic.Int64{}}
river.AddWorker(workers, worker)
startNewClientAndSubscribe := func() (*riverpro.Client[pgx.Tx], <-chan *river.Event, func()) {
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
// Poll quickly so the example is fast:
FetchCooldown: 50 * time.Millisecond,
FetchPollInterval: 50 * time.Millisecond,
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpropgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
},
ProQueues: map[string]riverpro.QueueConfig{
river.QueueDefault: {
Concurrency: riverpro.ConcurrencyConfig{
// Allow up to 2 of each kind of job to run at once globally.
GlobalLimit: 2,
// Allow no more than 1 of each kind of job to run at once per client.
LocalLimit: 1,
Partition: riverpro.PartitionConfig{
ByKind: true,
},
},
MaxWorkers: 10,
},
},
})
if err != nil {
panic(err)
}
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
subscribeChan, cancel := riverClient.Subscribe(river.EventKindJobCompleted)
return riverClient, subscribeChan, cancel
}
// Use two clients to simulate two different workers. In real usage, these
// would probably be on different machines.
client1, subscribeChan1, cancel1 := startNewClientAndSubscribe()
defer cancel1()
client2, subscribeChan2, cancel2 := startNewClientAndSubscribe()
defer cancel2()
insertParams := make([]river.InsertManyParams, 10)
for i := range insertParams {
insertParams[i] = river.InsertManyParams{Args: ConcurrentLimitedArgs{}}
}
if _, err = client1.InsertMany(ctx, insertParams); err != nil {
panic(err)
}
shutdown := func() {
cancel() // intiate hard shutdown on both clients to stop fetching jobs
<-client1.Stopped()
<-client2.Stopped()
}
jobsCompleted := 0
for range 10 {
// Do an extra check to ensure no more than 2 jobs are running:
numRunning := worker.jobsRunning.Load()
if numRunning > 2 {
fmt.Printf("Unexpected number of jobs running: %d\n", numRunning)
shutdown()
return
}
select {
case <-time.After(2 * time.Second):
fmt.Println("timed out waiting for jobs to complete")
shutdown()
return
case <-subscribeChan1:
case <-subscribeChan2:
}
jobsCompleted++
}
fmt.Printf("Jobs completed: %d\n", jobsCompleted)
shutdown()
}
Output:
Jobs completed: 10
Example_perQueueRetention demonstrates the use of per-queue job retention settings for cancelled, completed, and discarded jobs. This example test doesn't show much beyond the syntax for configuration custom retention on a pro queue.
package main
import (
"context"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/testutil"
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
"riverqueue.com/riverpro/internal/testworker"
)
func main() {
ctx := context.Background()
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
river.AddWorker(workers, &testworker.NoOpWorker{})
_, err = riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: river.Config{
Schema: riverdbtest.TestSchema(ctx, testutil.TestingTB(testutil.PanicTB()), riverpropgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
},
ProQueues: map[string]riverpro.QueueConfig{
river.QueueDefault: {
CancelledJobRetentionPeriod: 24 * time.Hour,
CompletedJobRetentionPeriod: 24 * time.Hour,
DiscardedJobRetentionPeriod: 7 * 24 * time.Hour,
MaxWorkers: 100,
},
},
})
if err != nil {
panic(err)
}
}
This section is empty.
This section is empty.
type BatchOpts struct {
// ByArgs indicates that the job's encoded arguments should be used to compute
// a batch key.
//
// Default is false, meaning that job arguments will not be considered for
// batching.
//
// When set to true, the entire encoded args field will be included in the
// batch hash, which requires care to ensure that no irrelevant args are
// factored into the batch key. It is also possible to use a subset of
// the args by indicating on the `JobArgs` struct which fields should be
// included in the batch using struct tags:
//
// type MyJobArgs struct {
// CustomerID string `json:"customer_id" river:"batch"`
// TraceID string `json:"trace_id"
// }
//
// In this example, only the encoded `customer_id` key will be included in the
// batch key and the `trace_id` key will be ignored.
//
// All keys are sorted alphabetically before hashing to ensure consistent
// results.
ByArgs bool
}
BatchOpts are options for defining batch partitioning on JobArgs.
Client is a client for River Pro. It is a superset of the main river.Client type, with additional methods and behavioral changes for working with Pro features.
ClientFromContext returns the Client from the context. This function can only be used within a Worker's Work() method because that is the only place River sets the Client on the context.
It panics if the context does not contain a Client, which will never happen from the context provided to a Worker's Work() method.
When testing JobArgs.Work implementations, it might be useful to use ContextWithClient to initialize a context that has an available client.
ClientFromContext returns the Client from the context. This function can only be used within a Worker's Work() method because that is the only place River sets the Client on the context.
It returns an error if the context does not contain a Client, which will never happen from the context provided to a Worker's Work() method.
When testing JobArgs.Work implementations, it might be useful to use ContextWithClient to initialize a context that has an available client.
NewClient creates a new River Pro client using the provided driver and configuration.
JobDeadLetterGet gets a job from the dead letter queue (i.e. present in the `river_job_dead_letter` table after having failed enough times to be discarded). rivertype.ErrNotFound is returned if it doesn't exist.
Jobs are only moved to the dead letter queue in the first place if the dead letter queue is enabled with Config.DeadLetter.Enabled.
func (c *Client[TTx]) JobDeadLetterGetTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobRow, error)
JobDeadLetterGetTx gets a job from the dead letter queue (i.e. present in the `river_job_dead_letter` table after having failed enough times to be discarded). rivertype.ErrNotFound is returned if it doesn't exist.
Jobs are only moved to the dead letter queue in the first place if the dead letter queue is enabled with Config.DeadLetter.Enabled.
This variant operates within an existing transaction. See JobDeadLetterRetry for the non-transaction variant.
func (c *Client[TTx]) JobDeadLetterRetry(ctx context.Context, jobID int64) (*rivertype.JobInsertResult, error)
JobDeadLetterRetry removes a job in the dead letter queue (i.e. present in the `river_job_dead_letter` table after having failed enough times to be discarded) and places it back in the live queue with a state of `available`. Its ID, args, metadata, and other core properties are retained, but its attempts and errors are reset as if it's a brand new job.
Jobs are only moved to the dead letter queue in the first place if the dead letter queue is enabled with Config.DeadLetter.Enabled.
func (c *Client[TTx]) JobDeadLetterRetryTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobInsertResult, error)
JobDeadLetterRetryTx removes a job in the dead letter queue (i.e. present in the `river_job_dead_letter` table after having failed enough times to be discarded) and places it back in the live queue with a state of `available`. Its ID, args, metadata, and other core properties are retained, but its attempts and errors are reset as if it's a brand new job.
Jobs are only moved to the dead letter queue in the first place if the dead letter queue is enabled with Config.DeadLetter.Enabled.
This variant operates within an existing transaction. See JobDeadLetterRetry for the non-transaction variant.
func (c *Client[TTx]) NewWorkflow(opts *WorkflowOpts) *WorkflowT[TTx]
NewWorkflow creates a new workflow. If an ID is not specified in opts, it will be generated.
func (c *Client[TTx]) Queues() *QueueBundle
Queues returns the currently configured set of queues for the client, and can be used to add new ones.
func (c *Client[TTx]) WorkflowCancel(ctx context.Context, workflowID string) (*WorkflowCancelResult, error)
WorkflowCancel cancels all non-finalized tasks in a workflow using the workflow's ID. Non-finalized tasks are those which are not already completed, discarded, or cancelled. As with normal job cancellation, in-flight tasks *may* complete prior to receiving notification of the cancellation.
See WorkflowCancelTx for a version that allows cancellation within a broader database transaction.
The return value is a list of job rows where cancellation was attempted, along with a potential error.
func (c *Client[TTx]) WorkflowCancelTx(ctx context.Context, tx TTx, workflowID string) (*WorkflowCancelResult, error)
WorkflowCancelTx cancels all non-finalized tasks in a workflow using the workflow's ID. Non-finalized tasks are those which are not already completed, discarded, or cancelled. As with normal job cancellation, in-flight tasks *may* complete prior to receiving notification of the cancellation.
This variant operates within an existing transaction. See WorkflowCancel for the non-transaction variant.
The return value is a list of job rows where cancellation was attempted, along with a potential error.
func (c *Client[TTx]) WorkflowFromExisting(job *rivertype.JobRow, opts *WorkflowOpts) (*WorkflowT[TTx], error)
WorkflowFromExisting initiates a workflow from an existing job in the workflow. This enables jobs to be added to an existing workflow. If the job is not part of a workflow, an error will be returned.
The workflow ID and Name will be derived from the existing job, and the corresponding values in opts will be ignored.
func (c *Client[TTx]) WorkflowPrepare(ctx context.Context, workflow *Workflow) (*WorkflowPrepareResult, error)
WorkflowPrepare validates the jobs in the workflow and prepares them for insertion. Validations ensure that:
If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.
For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.
Deprecated: Use Workflow.Prepare instead.
func (c *Client[TTx]) WorkflowPrepareTx(ctx context.Context, tx TTx, workflow *Workflow) (*WorkflowPrepareResult, error)
WorkflowPrepareTx validates the jobs in the workflow and prepares them for insertion. Validations ensure that:
If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.
For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.
Deprecated: Use Workflow.PrepareTx instead.
type ConcurrencyConfig struct {
// GlobalLimit is the maximum number of jobs that can run concurrently
// across all workers.
GlobalLimit int
// LocalLimit is the maximum number of jobs that can run concurrently
// on a single worker.
LocalLimit int
// Partition is the configuration for partitioning concurrency limits. By
// default, concurrency limits are not partitioned and apply to the entire
// queue.
Partition PartitionConfig
}
ConcurrencyConfig is a configuration for concurrency limits. Either a global or local limit (or both) must be specified in order for concurrency limits to be enforced.
type Config struct {
river.Config
// DeadLetter configures a dead letter queue, which optionally transitions
// discarded jobs to the `river_job_dead_letter` table instead of deleting
// them after river.Config.DiscardedJobRetentionPeriod has elapsed.
DeadLetter DeadLetterConfig
// DurablePeriodicJobs configures periodic jobs that are persisted to the
// database. Unlike standard in-process jobs—whose state is lost on
// restarts or leader elections— these durable jobs retain their state,
// enabling more precise and reliable scheduling.
DurablePeriodicJobs DurablePeriodicJobsConfig
// PartitionKeyCacheTTL is the duration to cache partition keys for available
// jobs. With concurrency limiting enabled, fetches of available jobs with
// cached partition keys can be much faster than those without caching.
// Without concurrency limiting enabled, this value is unused.
//
// This value limits how quickly the first available job for a given partition
// key will be seen and fetched and is a direct trade-off for higher
// throughput at the expense of higher latency. If this value is lower than
// the fetch cooldown, caching is effectively disabled. To explicitly disable
// partition key caching altogether, set this to -1. Other negative values are
// not allowed.
//
// Defaults to 1 second.
PartitionKeyCacheTTL time.Duration
// ProQueues holds the configuration for queues that use Pro-specific
// features, including concurrency limits. A given queue can be configured
// as either a Pro queue or a standard queue, but not both.
ProQueues map[string]QueueConfig
// SequenceSchedulerInterval is the amount of time to wait between runs of the sequence scheduler.
SequenceSchedulerInterval time.Duration
// WorkflowRescuerInterval is the amount of time to wait between runs of the workflow rescuer.
WorkflowRescuerInterval time.Duration
}
Config holds configuration for the riverpro.Client. The config is primarily managed with an embedded river.Config, along with additional fields for Pro-specific configuration.
type DeadLetterConfig struct {
// Enaabled enables the dead letter queue.
Enabled bool
}
DeadLetter configures a dead letter queue, which optionally transitions discarded jobs to the `river_job_dead_letter` table instead of deleting them after DiscardedJobRetentionPeriod has elapsed.
See also river.Config.DiscardedJobRetentionPeriod, which will define the period before jobs are moved to the dead letter queue.
type DependencyCycleError struct {
DepStack []string
}
DependencyCycleError is returned when a dependency cycle is detected in the workflow. A cycle occurs when a task depends on itself, either directly or indirectly through other tasks.
func (e *DependencyCycleError) Error() string
func (e *DependencyCycleError) Is(target error) bool
type DuplicateTaskError struct {
TaskName string
}
DuplicateTaskError is returned when a task name is added to the same workflow more than once. Task names must be unique within a workflow.
func (e *DuplicateTaskError) Error() string
func (e *DuplicateTaskError) Is(target error) bool
type DurablePeriodicJobsConfig struct {
// Enabled activates durable periodic jobs. All periodic jobs with a
// configured ID will become durable and persist their state in the
// database.
Enabled bool
// NextRunAtRatchetFunc provides a function that "ratchets" the next run
// times of durable jobs pulled from the database by potentially modifying
// them before they're enqueued and have a new next run time set for them.
//
// The primary purpose of this option is to protect against cases where
// durable jobs were present in the database that had frequent run periods,
// but where all clients may have been offline for a long period of time,
// and when a client finally does come back online many jobs of the same job
// are scheduled in quick succession because the next run at timestamp from
// the database was so old. For example, a periodic job runs once every
// minute, and is being persisted to the database on that schedule. All
// clients go offline for one hour before coming back. When coming back
// online, the job is scheduled 60 times in quick succession as it's run at
// -60+1 minutes, -60+2 minutes, -60+3 minutes, -60+4 minutes, etc. because
// next run times are calculated based on the last run time rather than now.
//
// Clients in deployed environments should generally be running all the
// time, but a common place where something like this might happen is local
// development where software devs may have their app offline for extended
// periods of time.
//
// A reasonable option is to ratchet "next run at" times to the current
// time. Any "next run at" times that are in the future stay in the future,
// but those that have fallen behind will be run immediately and get a new
// scheduled time based off now instead of a time in the past:
//
// NextRunAtRatchetFunc: func(nextRunAt, now time.Time) time.Time {
// if nextRunAt.Before(now) {
// return now
// }
// return nextRunAt
// }
//
// Handling old timestamps can of course be done from each periodic job's
// ScheduleFunc as well, but it's easy to forget to do so, and may not be
// desirable to have to consider them for every possible periodic job.
//
// The default behavior is to never ratchet next run at times, which is
// equivalent to:
//
// NextRunAtRatchetFunc: func(nextRunAt, now time.Time) time.Time { return nextRunAt }
NextRunAtRatchetFunc func(nextRunAt, now time.Time) time.Time
// StaleThreshold is the amount of time after which database persisted
// periodic jobs are considered stale and eligible to be pruned based on
// their `updated_at` timestamp. While running, the periodic job enqueuer
// service periodically bumps the `updated_at` timestamp of all jobs that it
// knows about. Periodic jobs that are no longer registered no longer have
// their timestamps touched, and are eventually removed. This keeps the
// database reasonably clean while also keeping records around for a little
// time for purposes of visibility and debugging.
//
// Defaults to 24 hours.
StaleThreshold time.Duration
// StartStaggerSpread is the interval over which to reschedule any jobs that
// were scheduled before or at the current time if StartStaggerThreshold was
// reached on start. When start stagger activates, jobs will have a random
// duration between one millisecond and StartStaggerSpread added to their
// next run time.
//
// Start stagger is designed to protect against the case where a client's
// been offline for some time, comes online to find a great number of
// periodic jobs that need immediate scheduling, and would otherwise end up
// scheduling them all to run nearly simultaneously. Instead, start times
// are staggered randomly to the queue more breathing room.
//
// Defaults to 1 minute.
StartStaggerSpread time.Duration
// StartStaggerThreshold is the number of jobs found that scheduled before
// or at the current time on start after which the client will stagger their
// next run time randomly between one miillisecond and StartStaggerSpread.
//
// Start stagger is designed to protect against the case where a client's
// been offline for some time, comes online to find a great number of
// periodic jobs that need immediate scheduling, and would otherwise end up
// scheduling them all to run nearly simultaneously. Instead, start times
// are staggered randomly to the queue more breathing room.
//
// Defaults to 500 jobs. Disable start stagger completely by setting to -1.
StartStaggerThreshold int
}
DurablePeriodicJobsConfig configures periodic jobs that are persisted to the database. Unlike standard in-process jobs—whose state is lost on restarts or leader elections— these durable jobs retain their state, enabling more precise and reliable scheduling.
type EphemeralOpts struct{}
EphemeralOpts are options for ephemeral jobs. Currently, there are no available options for ephemeral jobs, but this is modeled as an options struct to futureproof it.
type HookJobDeadLetterMove interface {
rivertype.Hook
// JobDeadLetterMove is invoked when a discarded job is moved to the dead
// letter queue.
JobDeadLetterMove(ctx context.Context, job *rivertype.JobRow) error
}
HookJobDeadLetterMove is an interface to a hook that runs when a discarded job is moved to the dead letter queue.
HookJobDeadLetterMoveFunc is a convenience helper for implementing HookJobDeadLetterMove using a simple function instead of a struct.
func (f HookJobDeadLetterMoveFunc) IsHook() bool
MissingDependencyError is returned when a named dependency is missing from the workflow. A task cannot depend on another task that does not exist in the workflow.
func (e *MissingDependencyError) Error() string
func (e *MissingDependencyError) Is(target error) bool
type PartitionConfig struct {
// ByArgs is the list of fields in the job arguments to partition by.
// For example, you may wish to limit the number of jobs that can run
// concurrently for a given `customer_id`.
//
// If ByArgs is explicitly set to an empty slice, all fields in the job's
// arguments will be used. This is generally not desirable as it can lead
// to high cardinality and a large number of partitions.
//
// When set to nil (the default), arguments are not used for partitioning.
ByArgs []string
// ByKind indicates that the concurrency limit should be partitioned by the
// kind of the job.
ByKind bool
}
PartitionConfig is a configuration for partitioning concurrency limits.
type QueueBundle struct {
*river.QueueBundle
// contains filtered or unexported fields
}
QueueBundle is a bundle for adding additional queues. It's made accessible through Client.Queues.
Unlike the river.QueueBundle, this bundle also enables Pro-specific configuration of queues.
func (qb *QueueBundle) AddPro(name string, config QueueConfig) error
AddPro adds a Pro queue with the given name and configuration to the QueueBundle. It returns a nil error on success.
type QueueConfig struct {
// Concurrency is the optional concurrency configuration for the queue.
Concurrency ConcurrencyConfig
// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
//
// The special value -1 disables deletion of cancelled jobs.
//
// This setting is specific to retention for this particular queue. If left
// unset, it'll default to river.Config.CancelledJobRetentionPeriod (shared
// cancelled retention period for all queues) or 24 hours.
CancelledJobRetentionPeriod time.Duration
// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
// around before they're removed permanently.
//
// The special value -1 disables deletion of completed jobs.
//
// This setting is specific to retention for this particular queue. If left
// unset, it'll default to river.Config.CompletedJobRetentionPeriod (shared
// completed retention period for all queues) or 24 hours.
CompletedJobRetentionPeriod time.Duration
// DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
//
// The special value -1 disables deletion of discarded jobs.
//
// This setting is specific to retention for this particular queue. If left
// unset, it'll default to river.Config.DiscardedJobRetentionPeriod (shared
// discarded retention period for all queues) or 7 days.
//
// If the dead letter queue is enabled with Config.DeadLetter.Enabled, it
// supersedes this setting. Per-queue dead letter settings are currently
// *not* supported, although we expect this limitation to be patched soon.
DiscardedJobRetentionPeriod time.Duration
// Ephemeral are ephemeral settings for queue. Ephemeral jobs are jobs which
// are deleted immediately upon completion instead of being pruned at a
// later time by the job cleaner.
Ephemeral QueueEphemeralConfig
// MaxWorkers is the maximum number of workers to run for the queue, or put
// otherwise, the maximum parallelism to run.
//
// This is the maximum number of workers within this particular client
// instance, but note that it doesn't control the total number of workers
// across parallel processes. Installations will want to calculate their
// total number by multiplying this number by the number of parallel nodes
// running River clients configured to the same database and queue.
//
// Requires a minimum of 1, and a maximum of 10,000.
MaxWorkers int
}
QueueConfig is a configuration for a Pro queue. It extends the standard River queue configuration with additional Pro-specific settings.
type QueueEphemeralConfig struct {
// Enabled can be set to true to make this an ephemeral queue. All jobs
// going to ephemeral queues are deleted immediately upon completion instead
// of being pruned at a later time by the job cleaner. This happens
// regardless of jobs in the queue implement their own EphemeralOpts.
Enabled bool
}
QueueEphemeralConfig are ephemeral settings for a queue. Ephemeral jobs are jobs which are deleted immediately upon completion instead of being pruned at a later time by the job cleaner.
type SequenceOpts struct {
// ByArgs indicates that the job's encoded arguments should be used to compute
// a sequence key.
//
// Default is false, meaning that input arguments will not be considered for
// sequencing.
//
// When set to true, the entire encoded args field will be included in the
// sequence hash, which requires care to ensure that no irrelevant args are
// factored into the sequence key. It is also possible to use a subset of
// the args by indicating on the `JobArgs` struct which fields should be
// included in the sequence using struct tags:
//
// type MyJobArgs struct {
// CustomerID string `json:"customer_id" river:"sequence"`
// TraceID string `json:"trace_id"
// }
//
// In this example, only the encoded `customer_id` key will be included in the
// sequence key and the `trace_id` key will be ignored.
//
// All keys are sorted alphabetically before hashing to ensure consistent
// results.
ByArgs bool
// ByQueue indicates that sequences should be partitioned by queue.
//
// Default is false, meaning that as long as any other sequence property is
// enabled, sequences will be constructed for a kind across all queues.
ByQueue bool
// ContinueOnCancelled indicates that the sequence should continue to be
// processed even if the job is cancelled. Without this setting, the sequence
// will be halted if any job in the sequence is cancelled.
ContinueOnCancelled bool
// ContinueOnDiscarded indicates that the sequence should continue to be
// processed even if the job repeatedly errors and is discarded. Without this
// setting, the sequence will be halted if any job in the sequence is
// discarded.
ContinueOnDiscarded bool
// ExcludeKind indicates that the job kind should not be included in the
// sequence partition key. This is useful when you want to include multiple
// job kinds in the same sequence.
ExcludeKind bool
}
SequenceOpts are options for defining sequence partitioning on JobArgs.
TaskHasNoOutputError is returned when attempting to get the output of a task that has no output.
func (e *TaskHasNoOutputError) Error() string
func (e *TaskHasNoOutputError) Is(target error) bool
Workflow is a collection of jobs which can have dependencies on other jobs in the workflow.
Deprecated: Use Client.NewWorkflow instead in order to gain access to transactional task loader methods and other functionality that requires access to a transaction.
func NewWorkflow(opts *WorkflowOpts) *Workflow
NewWorkflow creates a new workflow. If an ID is not specified in opts, it will be generated.
Deprecated: Use Client.NewWorkflow instead in order to gain access to transactional task loader methods.
func WorkflowFromExisting(job *rivertype.JobRow, opts *WorkflowOpts) (*Workflow, error)
WorkflowFromExisting initiates a workflow from an existing job in the workflow. This enables jobs to be added to an existing workflow. If the job is not part of a workflow, an error will be returned.
The workflow ID and Name will be derived from the existing job, and the corresponding values in opts will be ignored.
Deprecated: Use Client.WorkflowFromExisting instead.
WorkflowLoadAllOpts are options for loading all tasks in a workflow.
type WorkflowLoadDepsOpts struct {
Recursive bool
}
WorkflowLoadDepsOpts are options for loading the dependencies of a task in a workflow.
type WorkflowOpts struct {
ID string
IgnoreCancelledDeps bool
IgnoreDeletedDeps bool
IgnoreDiscardedDeps bool
Name string
}
WorkflowOpts are options for creating a new workflow.
type WorkflowPrepareResult struct {
// Jobs is a list of jobs to insert into the database. It's structured as a
// list of InsertManyParams so the jobs can be inserted with InsertMany.
Jobs []river.InsertManyParams
}
WorkflowPrepareResult is the result of preparing a workflow. It contains a list of jobs to insert.
type WorkflowRetryMode string
const ( // WorkflowRetryModeAll indicates that all jobs will be retried. This is the // default. WorkflowRetryModeAll WorkflowRetryMode = "all" // WorkflowRetryModeFailedOnly indicates that only failed jobs (cancelled or // discarded) will be retried. Downstream jobs will also be retried if they // have failed dependencies, but not if they had chosen to ignore failed // dependencies and completed successfully. WorkflowRetryModeFailedOnly WorkflowRetryMode = "failed_only" // WorkflowRetryModeFailedAndDownstream indicates that failed jobs (cancelled // or discarded) will be retried, and all downstream jobs will also be retried // even if they previously completed successfully. WorkflowRetryModeFailedAndDownstream WorkflowRetryMode = "failed_and_downstream" )
type WorkflowRetryOpts struct {
// Mode indicates the mode of retry, specifically which jobs to retry.
// By default, all jobs will be retried, though this option can be used to
// specify that only failed jobs should be retried.
Mode WorkflowRetryMode
// ResetHistory indicates whether to reset the history of the jobs when
// retrying them, including resetting their attempt count back to 0 (starting
// over their retry policy in the process) and clearing out known metadata
// from past attempts. If not set, the max attempts will be incremented by 1
// to allow for a single additional attempt.
ResetHistory bool
}
type WorkflowRetryStillActiveError struct {
WorkflowID string
}
WorkflowRetryStillActiveError is returned when a workflow is still active (containing some jobs in available, pending, retryable, running, or scheduled states) and cannot yet be retried.
func (e *WorkflowRetryStillActiveError) Error() string
func (e *WorkflowRetryStillActiveError) Is(target error) bool
type WorkflowT[TTx any] struct {/* contains filtered or unexported fields */}
WorkflowT is a collection of jobs which can have dependencies on other jobs in the workflow. Workflows form a directed acyclic graph (DAG) of jobs, with each one unblocking once its dependencies have been satisfied.
func (w *WorkflowT[TTx]) Add(taskName string, args river.JobArgs, insertOpts *river.InsertOpts, opts *WorkflowTaskOpts) WorkflowTask
Add adds a task to the workflow. The task name must be unique within the workflow.
Panics if attempting to use ephemeral jobs as workflow tasks.
func (w *WorkflowT[TTx]) AddSafely(taskName string, args river.JobArgs, insertOpts *river.InsertOpts, opts *WorkflowTaskOpts) (WorkflowTask, error)
AddSafely adds a task to the workflow. The task name must be unique within the workflow.
Returns an error if attempting to use ephemeral jobs as workflow tasks.
func (w *WorkflowT[TTx]) LoadAll(ctx context.Context, opts *WorkflowLoadAllOpts) (*WorkflowTasks, error)
LoadAll loads all tasks in the workflow. If PaginationLimit is not set, all tasks are loaded.
Panics if PaginationLimit or PaginationOffset is less than 0.
func (w *WorkflowT[TTx]) LoadAllTx(ctx context.Context, tx TTx, opts *WorkflowLoadAllOpts) (*WorkflowTasks, error)
LoadAllTx loads all tasks in the workflow. If PaginationLimit is not set, all tasks are loaded.
Panics if PaginationLimit is less than 0.
func (w *WorkflowT[TTx]) LoadDeps(ctx context.Context, taskName string, opts *WorkflowLoadDepsOpts) (*WorkflowTasks, error)
LoadDeps loads the dependencies of a named task in the workflow. If Recursive is true, it will also load the dependencies of the dependencies.
ExampleWorkflowT_LoadDeps demonstrates loading workflow task dependencies.
package main
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
"riverqueue.com/riverpro/riverworkflow"
)
type ProducerArgs struct{}
func (ProducerArgs) Kind() string { return "producer" }
type ProducerWorker struct {
river.WorkerDefaults[ProducerArgs]
}
func (w *ProducerWorker) Work(ctx context.Context, job *river.Job[ProducerArgs]) error {
fmt.Println("Producer task completed")
return nil
}
type ConsumerArgs struct{}
func (ConsumerArgs) Kind() string { return "consumer" }
type ConsumerWorker struct {
river.WorkerDefaults[ConsumerArgs]
}
func (w *ConsumerWorker) Work(ctx context.Context, job *river.Job[ConsumerArgs]) error {
taskName := riverworkflow.TaskFromJobRow(job.JobRow)
client := riverpro.ClientFromContext[pgx.Tx](ctx)
workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
if err != nil {
return err
}
tasks, err := workflow.LoadDeps(ctx, taskName, &riverpro.WorkflowLoadDepsOpts{})
if err != nil {
return err
}
fmt.Printf("Loaded %d dependency tasks\n", tasks.Count())
return nil
}
// ExampleWorkflowT_LoadDeps demonstrates loading workflow task dependencies.
func main() { //nolint:dupl
ctx := context.Background()
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
river.AddWorker(workers, &ProducerWorker{})
river.AddWorker(workers, &ConsumerWorker{})
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
// Poll quickly so the example is fast:
FetchCooldown: 50 * time.Millisecond,
FetchPollInterval: 50 * time.Millisecond,
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Schema: riverdbtest.TestSchema(ctx, testutil.TestingTB(testutil.PanicTB()), riverpropgxv5.New(dbPool), nil),
TestOnly: true,
Workers: workers,
},
})
if err != nil {
panic(err)
}
workflow := riverClient.NewWorkflow(nil)
producerTask := workflow.Add("producer", ProducerArgs{}, nil, nil)
workflow.Add("consumer", ConsumerArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{producerTask.Name}})
prepareRes, err := workflow.Prepare(ctx)
if err != nil {
panic(err)
}
_, err = riverClient.InsertMany(ctx, prepareRes.Jobs)
if err != nil {
panic(err)
}
// Out of example scope, but used to wait until jobs are worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
// Wait for two jobs to complete.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 2)
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
}
Output:
Producer task completed Loaded 1 dependency tasks
func (w *WorkflowT[TTx]) LoadDepsByJob(ctx context.Context, job *rivertype.JobRow, opts *WorkflowLoadDepsOpts) (*WorkflowTasks, error)
LoadDepsByJob loads the dependencies of a job in the workflow. If Recursive is true, it will also load the dependencies of the dependencies.
func (w *WorkflowT[TTx]) LoadDepsByJobTx(ctx context.Context, tx TTx, job *rivertype.JobRow, opts *WorkflowLoadDepsOpts) (*WorkflowTasks, error)
LoadDepsByJobTx loads the dependencies of a job in the workflow. If Recursive is true, it will also load the dependencies of the dependencies.
func (w *WorkflowT[TTx]) LoadDepsTx(ctx context.Context, tx TTx, taskName string, opts *WorkflowLoadDepsOpts) (*WorkflowTasks, error)
LoadDepsTx loads the dependencies of a named task in the workflow. If Recursive is true, it will also load the dependencies of the dependencies.
LoadOutput loads the output of the named task in the workflow and unmarshals it into v.
If loading more than one task's output (including dependencies) it is more efficient to avoid multiple database calls by first loading the tasks with WorkflowT.LoadDeps or WorkflowT.LoadAll and then use the WorkflowTasks.Output method.
If no task with that name is found, rivertype.ErrNotFound is returned.
ExampleWorkflowT_LoadOutput demonstrates retrieving output from workflow task dependencies.
package main
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"riverqueue.com/riverpro"
"riverqueue.com/riverpro/driver/riverpropgxv5"
)
type OutputExampleStruct struct {
Value int `json:"value"`
}
type OutputProducerArgs struct{}
func (OutputProducerArgs) Kind() string { return "output_producer" }
type OutputProducerWorker struct {
river.WorkerDefaults[OutputProducerArgs]
}
func (w *OutputProducerWorker) Work(ctx context.Context, job *river.Job[OutputProducerArgs]) error {
output := OutputExampleStruct{Value: 42}
return river.RecordOutput(ctx, output)
}
type OutputConsumerArgs struct{}
func (OutputConsumerArgs) Kind() string { return "output_consumer" }
type OutputConsumerWorker struct {
river.WorkerDefaults[OutputConsumerArgs]
}
func (w *OutputConsumerWorker) Work(ctx context.Context, job *river.Job[OutputConsumerArgs]) error {
client := riverpro.ClientFromContext[pgx.Tx](ctx)
workflow, err := client.WorkflowFromExisting(job.JobRow, nil)
if err != nil {
return err
}
var out OutputExampleStruct
if err := workflow.LoadOutput(ctx, "output_producer", &out); err != nil {
return err
}
fmt.Printf("Producer output value: %d\n", out.Value)
return nil
}
// ExampleWorkflowT_LoadOutput demonstrates retrieving output from workflow task
// dependencies.
func main() { //nolint:dupl
ctx := context.Background()
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
river.AddWorker(workers, &OutputProducerWorker{})
river.AddWorker(workers, &OutputConsumerWorker{})
riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
Config: river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
// Poll quickly so the example is fast:
FetchCooldown: 50 * time.Millisecond,
FetchPollInterval: 50 * time.Millisecond,
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Schema: riverdbtest.TestSchema(ctx, testutil.TestingTB(testutil.PanicTB()), riverpropgxv5.New(dbPool), nil),
TestOnly: true,
Workers: workers,
},
})
if err != nil {
panic(err)
}
workflow := riverClient.NewWorkflow(nil)
producerTask := workflow.Add("output_producer", OutputProducerArgs{}, nil, nil)
workflow.Add("output_consumer", OutputConsumerArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{producerTask.Name}})
prepareRes, err := workflow.Prepare(ctx)
if err != nil {
panic(err)
}
_, err = riverClient.InsertMany(ctx, prepareRes.Jobs)
if err != nil {
panic(err)
}
// Out of example scope, but used to wait until jobs are worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
// Wait for two jobs to complete.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 2)
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
}
Output:
Producer output value: 42
LoadOutputByJob loads the output of the task corresponding to the given job in the workflow and unmarshals it into v.
func (w *WorkflowT[TTx]) LoadOutputByJobTx(ctx context.Context, tx TTx, job *rivertype.JobRow, v any) error
LoadOutputByJobTx loads the output of the task corresponding to the given job in the workflow and unmarshals it into v.
LoadOutputTx loads the output of the named task in the workflow and unmarshals it into v.
If loading more than one task's output (including dependencies) it is more efficient to avoid multiple database calls by first loading the tasks with WorkflowT.LoadDeps or WorkflowT.LoadAll and then use the WorkflowTasks.Output method.
If no task with that name is found, rivertype.ErrNotFound is returned.
func (w *WorkflowT[TTx]) Prepare(ctx context.Context) (*WorkflowPrepareResult, error)
Prepare validates the jobs in the workflow and prepares them for insertion. Validations ensure that:
If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.
For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.
func (w *WorkflowT[TTx]) PrepareTx(ctx context.Context, tx TTx) (*WorkflowPrepareResult, error)
PrepareTx validates the jobs in the workflow and prepares them for insertion. Validations ensure that:
If validation succeeds, the result will include a Jobs field with a list of InsertManyParams for insertion into the database.
For new workflows, ctx and client are not used and this operation is entirely in-memory. For pre-existing workflows, any tasks listed as dependencies must be loaded from the database prior to validation.
func (w *WorkflowT[TTx]) Retry(ctx context.Context, opts *WorkflowRetryOpts) (*WorkflowRetryResult, error)
Retry retries the jobs in the workflow. By default, all jobs will be retried. This can be overridden with the Mode option.
Returns a WorkflowRetryStillActiveError if the workflow is still active.
func (w *WorkflowT[TTx]) RetryTx(ctx context.Context, tx TTx, opts *WorkflowRetryOpts) (*WorkflowRetryResult, error)
RetryTx retries the jobs in the workflow. By default, all jobs will be retried. This can be overridden with the Mode option.
ResetHistory indicates whether to reset the history of the jobs when retrying them, including resetting their attempt count back to 0 (starting over their retry policy in the process) and clearing out known metadata from past attempts. If not set, the max attempts will be incremented by 1 to allow for a single additional attempt.
Returns a WorkflowRetryStillActiveError if the workflow is still active.
type WorkflowTask struct {
// Name is the name of the workflow task.
Name string
}
WorkflowTask is a reference to a task in a workflow.
type WorkflowTaskOpts struct {
// Deps are the names of other tasks in the workflow that this task depends
// on. If any of the dependencies are cancelled, deleted, or discarded after
// failing repeatedly, the task will be cancelled.
Deps []string
// IgnoreCancelledDeps specifies whether to ignore cancelled dependencies,
// instead treating them as if they had completed successfully.
//
// If specified, this overrides the IgnoreCancelledDeps option specified in
// the workflow.
IgnoreCancelledDeps *bool
// IgnoreDeletedDeps specifies whether to ignore deleted dependencies,
// instead treating them as if they had completed successfully.
//
// If specified, this overrides the IgnoreDeletedDeps option specified in
// the workflow.
IgnoreDeletedDeps *bool
// IgnoreDiscardedDeps specifies whether to ignore discarded dependencies,
// instead treating them as if they had completed successfully.
//
// If specified, this overrides the IgnoreDiscardedDeps option specified in
// the workflow.
IgnoreDiscardedDeps *bool
}
WorkflowTaskOpts are options for adding a task to a workflow.
type WorkflowTaskWithJob struct {
// Deps are the names of other tasks in the workflow that this task depends
// on.
Deps []string
// Job is the job row for the task.
Job *rivertype.JobRow
// Name is the name of the workflow task.
Name string
}
func (w *WorkflowTaskWithJob) Output(v any) error
Output unmarshals the JSON-encoded output of the task and stores the result in the value pointed to by v using json.Unmarshal.
As with json.Unmarshal, if v is nil or not a pointer, an InvalidUnmarshalError is returned.
If the task has no output, a TaskHasNoOutputError is returned.
type WorkflowTasks struct {/* contains filtered or unexported fields */}
WorkflowTasks is a loaded collection of tasks in a workflow.
func (w *WorkflowTasks) Count() int
Count returns the number of tasks in the workflow.
func (w *WorkflowTasks) Get(name string) *WorkflowTaskWithJob
Get returns the task with the given name.
func (w *WorkflowTasks) Names() []string
Names returns the names of all loaded tasks in this collection.
func (w *WorkflowTasks) Output(name string, v any) error
Output unmarshals the JSON-encoded output of the named task and stores it in the value pointed to by v.
| Path | Synopsis |
|---|---|
| cmd/riverpro module | |
| driver module | |
| riverbatch | Package riverbatch provides functionality for batching multiple River jobs together for execution as a single group. |
| riverencrypt | |
| riverproshared/riverprosharedtest | |
| riverworkflow | Package riverworkflow provides workflow helpers for River Pro's workflow engine. |