Package riverworkflow provides workflow helpers and wait types for River Pro's workflow engine.
Most of the core workflow functionality is exposed through the [riverpro.Workflow] type and in the [riverpro] package. See homepage and docs.
This section is empty.
This section is empty.
DepsFromJobRow extracts the dependency task names from a job row.
DepsFromMetadata extracts the dependency task names directly from a job's metadata.
IDFromJobRow extracts the workflow ID from a job row.
IDFromMetadata extracts the workflow ID directly from a job's metadata.
func JobListParams(job *rivertype.JobRow, params *river.JobListParams) (*river.JobListParams, error)
JobListParams extracts the workflow ID from a job row and returns a river.JobListParams with the workflow ID set in order to filter the job list by workflow.
func JobListParamsByID(workflowID string, params *river.JobListParams) (*river.JobListParams, error)
JobListParamsByID returns a river.JobListParams with the workflow ID set in order to filter the job list by workflow.
NameFromJobRow extracts the workflow's name from a job row.
NameFromMetadata extracts the workflow's name directly from a job's metadata.
TaskFromJobRow extracts the workflow task's name from a job row.
type Signal struct {
// Attempt is the workflow attempt number from when this signal was emitted.
Attempt int
// CreatedAt is when the signal row was inserted.
CreatedAt time.Time
// ID is the unique identifier of the signal row.
ID int64
// Key identifies the signal stream within the workflow.
Key string
// Payload is the JSON payload associated with the signal.
Payload json.RawMessage
// Source is immutable JSON metadata describing the emitter for audit and
// correlation.
Source json.RawMessage
// WorkflowID is the ID of the workflow that this signal belongs to.
WorkflowID string
}
Signal is a durable workflow-scoped fact emitted to a workflow.
type SignalAttemptMismatchError struct {
// RequestedAttempt is the attempt requested by the emitter.
RequestedAttempt int
// SignalAttempt is the attempt stamped on the inserted signal row.
SignalAttempt int
// WorkflowID is the ID of the workflow associated with the signal.
WorkflowID string
}
SignalAttemptMismatchError indicates that a signal emit targeted a specific workflow attempt but inserted against a different attempt.
func (e *SignalAttemptMismatchError) Error() string
func (e *SignalAttemptMismatchError) Is(target error) bool
SignalKeyUndeclaredError is returned when task-scoped signal reads request a key that is not declared by the task's wait.
func (e *SignalKeyUndeclaredError) Error() string
func (e *SignalKeyUndeclaredError) Is(target error) bool
type SignalPayloadMismatchError struct {
// SignalID is the existing signal ID returned by the idempotent insert path.
SignalID *int64
// WorkflowID is the ID of the workflow associated with the signal.
WorkflowID string
}
SignalPayloadMismatchError indicates that a signal idempotency replay payload does not match the original payload for the same idempotency key.
func (e *SignalPayloadMismatchError) Error() string
func (e *SignalPayloadMismatchError) Is(target error) bool
SignalTaskDeclaresNoSignalKeysError is returned when task-scoped signal reads target a task whose wait declares no signal keys. This includes timer-only waits and dep-only waits.
func (e *SignalTaskDeclaresNoSignalKeysError) Error() string
func (e *SignalTaskDeclaresNoSignalKeysError) Is(target error) bool
SignalUnknownTaskError is returned when task-scoped signal reads target a task name that does not exist in the workflow.
func (e *SignalUnknownTaskError) Error() string
func (e *SignalUnknownTaskError) Is(target error) bool
type Timer struct {/* contains filtered or unexported fields */}
Timer defines a timer dependency for a wait.
TimerAfterTaskFinalized builds a relative timer anchored to a direct dependency task's finalization time.
ExampleTimerAfterTaskFinalized demonstrates building a task-finalized timer.
package main
import (
"fmt"
"time"
"riverqueue.com/riverpro/riverworkflow"
)
func main() {
wait := riverworkflow.WaitSpec{
Terms: []riverworkflow.WaitTermSpec{
riverworkflow.WaitTermTimer(riverworkflow.TimerAfterTaskFinalized("wait_for_build", "build_assets", 10*time.Minute)),
},
}
if err := wait.Validate([]string{"build_assets"}); err != nil {
panic(err)
}
fmt.Println(wait.Timers()[0].Name())
}
Output:
wait_for_build
TimerAfterWaitStarted builds a relative timer anchored to wait start.
TimerAfterWorkflowCreated builds a relative timer anchored to workflow creation time.
TimerAt builds an absolute timer that fires at the provided time.
After returns the relative delay for a timer created with TimerAfterWaitStarted, TimerAfterTaskFinalized, or TimerAfterWorkflowCreated.
func (t Timer) Anchor() TimerAnchor
Anchor returns the anchor for a relative timer.
Absolute timers return the zero TimerAnchor.
type TimerAnchor struct {
// Kind identifies the source time used to resolve a relative timer.
Kind TimerAnchorKind
// TaskName is the dependency task name for task-finalized timers.
TaskName string
}
TimerAnchor describes how a timer is anchored when it is inspected.
Absolute timers use the zero value.
type TimerAnchorKind string
TimerAnchorKind identifies the source time used to resolve a relative timer.
const ( // TimerAnchorKindTaskFinalizedAt anchors relative timers to a dependency // task's finalization time. TimerAnchorKindTaskFinalizedAt TimerAnchorKind = "task_finalized_at" // TimerAnchorKindWaitStartedAt anchors relative timers to wait start time. TimerAnchorKindWaitStartedAt TimerAnchorKind = "wait_started_at" // TimerAnchorKindWorkflowCreatedAt anchors relative timers to workflow // creation time. TimerAnchorKindWorkflowCreatedAt TimerAnchorKind = "workflow_created_at" )
type Wait struct {
// Evidence summarizes the bounded final evaluator evidence exposed for this
// wait.
Evidence *WaitEvidence
// Expr is the effective CEL expression over terms and raw inputs.
Expr string
// Inputs contains the loaded input view for the wait.
Inputs WaitInputState
// Phase summarizes the wait lifecycle.
Phase WaitPhase
// ResolvedAt is when the wait became resolved, when known.
ResolvedAt *time.Time
// StartedAt is when the wait became active, when known.
StartedAt *time.Time
// Summary is the bounded human-facing explanation captured when the wait
// resolved.
Summary string
// Terms contains authored terms in authored order with satisfied state when
// known.
Terms []WaitTermStatus
}
Wait is a loaded snapshot of a task's wait metadata.
WaitFromMetadata parses task metadata and returns the task's loaded wait snapshot.
It returns nil and no error when no wait metadata is present. Malformed metadata returns WaitMetadataDecodeError.
func (w *Wait) DepInput(taskName string) *WaitDepInput
DepInput returns the loaded dependency input for the provided task name.
func (w *Wait) SignalInput(key string) *WaitSignalInput
SignalInput returns the loaded signal input for the provided key.
func (w *Wait) Term(name string) *WaitTermStatus
Term returns the loaded term result for the provided name.
func (w *Wait) TimerInput(name string) *WaitTimerInput
TimerInput returns the loaded timer input for the provided name.
type WaitDepDiagnostic struct {
Available bool
FinalizedAt *time.Time
State string
TaskName string
}
WaitDepDiagnostic contains live dependency input diagnostics.
type WaitDepInput struct {
// Result contains final evidence for this dependency when the wait has
// resolved.
Result *WaitDepInputResult
// TaskName is the dependency task name.
TaskName string
}
WaitDepInput is a loaded dependency input for a task wait.
type WaitDepInputResult struct {
// Available reports whether the dependency output was available at result
// capture time.
Available bool
// FinalizedAt is when the dependency finalized, if known.
FinalizedAt *time.Time
// State is the dependency job state, when known.
State string
}
WaitDepInputResult is final dependency evidence for one wait dependency.
type WaitDiagnostics struct {
// EvalError describes a live CEL evaluation failure, when evaluation failed.
EvalError error
// ExprResult is the live top-level expression result when evaluation
// succeeded.
ExprResult *bool
// Inputs contains live input diagnostics.
Inputs WaitDiagnosticsInputs
// InspectedAt is when diagnostics were gathered, in UTC.
InspectedAt time.Time
// Phase is the loaded wait phase.
Phase WaitPhase
// SignalScanCount is the number of signal rows loaded across all declared
// signal keys before evaluation.
SignalScanCount int
// SignalScanLimit is the cap that was applied to the signal scan.
SignalScanLimit int
// Terms contains live term diagnostics in authored order.
Terms []WaitTermDiagnostic
// Truncated reports that more matching signal rows existed for the
// current attempt than SignalScanLimit allowed, so the evaluator only
// observed a capped prefix. A complete scan that loaded exactly
// SignalScanLimit rows leaves Truncated false. When true, the
// following fields are best-effort because they are derived from the
// capped prefix rather than the full signal stream: ExprResult,
// EvalError, per-term MatchedCount and Satisfied, and per-signal-input
// IncludedCount and LastID. Re-run with a higher
// [WorkflowWaitDiagnosticsOpts.SignalScanLimit] (up to its max)
// for a more complete scan.
Truncated bool
// WorkflowAttempt is the workflow attempt used for diagnostics.
WorkflowAttempt int
}
WaitDiagnostics contains live, read-only diagnostics for a task wait.
type WaitDiagnosticsInputs struct {
Deps []WaitDepDiagnostic
Signals []WaitSignalDiagnostic
Timers []WaitTimerDiagnostic
}
WaitDiagnosticsInputs contains live diagnostics for wait inputs.
type WaitEvidence struct {
// EvaluatedAt is when the evaluator captured this evidence.
EvaluatedAt time.Time
// WorkflowAttempt is the workflow attempt used for the evaluation.
WorkflowAttempt int
}
WaitEvidence identifies the bounded final evaluator evidence for a task wait.
type WaitInputState struct {
// Deps contains dependency task inputs referenced by the wait.
Deps []WaitDepInput
// Signals contains declared signal key inputs.
Signals []WaitSignalInput
// Timers contains declared timer inputs.
Timers []WaitTimerInput
}
WaitInputState contains the loaded inputs for a task wait.
type WaitInputs struct {
// Signals declares workflow signal keys available through `signals`.
Signals []string
// Timers declares timers available through `timers`.
Timers []Timer
// contains filtered or unexported fields
}
WaitInputs declares signal and timer inputs available to raw wait CEL through `signals` and `timers`.
Structured signal and timer terms declare their own signal keys and timers, so they do not need to be repeated here unless raw CEL also needs direct access through `signals` or `timers`.
type WaitMetadataDecodeError struct {
// Err is the underlying decode failure.
Err error
// Field is the metadata field that failed decoding.
Field string
}
WaitMetadataDecodeError reports malformed task metadata while decoding a Wait from metadata.
func (e *WaitMetadataDecodeError) Error() string
Error implements error.
func (e *WaitMetadataDecodeError) Unwrap() error
Unwrap returns the underlying decode error.
type WaitPhase int
WaitPhase summarizes the runtime lifecycle of a task wait.
const ( // WaitPhaseNotStarted indicates a wait exists but has not // started evaluating yet. WaitPhaseNotStarted WaitPhase = iota // WaitPhaseWaiting indicates a wait is active but not yet resolved. WaitPhaseWaiting // WaitPhaseResolved indicates a wait is resolved. WaitPhaseResolved )
WaitSignalDiagnostic contains live signal input diagnostics.
type WaitSignalInput struct {
// Key identifies the signal stream within the workflow.
Key string
// Result contains final evidence for this signal key when the wait has
// resolved.
Result *WaitSignalInputResult
}
WaitSignalInput is a loaded signal input for a task wait.
type WaitSignalInputResult struct {
// IncludedCount is the number of signal rows included by the final evidence
// boundary for the key.
IncludedCount int64
// LastIncludedID is the maximum included signal ID for the key.
LastIncludedID *int64
}
WaitSignalInputResult is final signal evidence for one declared signal key.
type WaitSpec struct {
// Expr is the top-level CEL expression. It may be omitted for single-term
// waits.
Expr string
// Inputs declares signal and timer inputs available to Expr and generic
// terms.
Inputs WaitInputs
// Terms declares optional named predicates available as booleans to Expr.
Terms []WaitTermSpec
}
WaitSpec defines the condition that must become true before a workflow task can run.
Expr is a CEL boolean expression. It can read direct dependency outputs through literal `deps["task_name"]` references, signal and timer inputs declared in Inputs through `signals` and `timers`, inspect the current workflow through `workflow`, and reference each named term in Terms as a boolean variable. Terms are useful when a condition should also produce structured result evidence, such as which signal key matched, how many signal rows matched, or whether a timer fired.
ExampleWaitSpec demonstrates defining a wait with terms, timers, and CEL.
package main
import (
"fmt"
"time"
"riverqueue.com/riverpro/riverworkflow"
)
func main() {
wait := riverworkflow.WaitSpec{
Expr: "approval_received || cancel_received || timeout",
Terms: []riverworkflow.WaitTermSpec{
riverworkflow.WaitTermSignal("approval_received", "approval", `payload.approved == true`),
riverworkflow.WaitTermSignal("cancel_received", "cancel", `true`),
riverworkflow.WaitTermTimer(riverworkflow.TimerAfterWaitStarted("timeout", 30*time.Minute)),
},
}
if err := wait.Validate(nil); err != nil {
panic(err)
}
fmt.Println(wait.Timers()[0].Name())
}
Output:
timeout
MarshalJSON implements json.Marshaler.
SignalKeys returns declared signal keys in first-declared order.
The result includes keys declared explicitly in Inputs and keys implied by structured signal terms built with WaitTermSignal.
Timers returns declared timers in authored order.
UnmarshalJSON implements json.Unmarshaler.
WaitTaskDeclaresNoWaitError is returned when wait diagnostics or task-scoped signal reads target a task that declares no wait.
func (e *WaitTaskDeclaresNoWaitError) Error() string
func (e *WaitTaskDeclaresNoWaitError) Is(target error) bool
type WaitTermDiagnostic struct {
LastMatchedID *int64
MatchedCount int64
Name string
RequiredCount int64
Satisfied bool
}
WaitTermDiagnostic contains live term diagnostics.
type WaitTermKind string
WaitTermKind identifies one authored wait term kind.
const ( // WaitTermKindGeneric identifies a generic CEL term over raw inputs. WaitTermKindGeneric WaitTermKind = "generic" // WaitTermKindSignal identifies a structured signal term. WaitTermKindSignal WaitTermKind = "signal" // WaitTermKindTimer identifies a structured timer term. WaitTermKindTimer WaitTermKind = "timer" )
type WaitTermSpec struct {/* contains filtered or unexported fields */}
WaitTermSpec defines one named wait term.
A term is evaluated before the top-level WaitSpec.Expr, and its name becomes a boolean variable that Expr can reference. Term names must be valid CEL identifiers, unique within the wait, and must not use CEL reserved words or wait input names such as `deps`, `signals`, `timers`, and `workflow`. Terms also let River record structured wait evidence for status and diagnostics.
func WaitTerm(name string, expr string) WaitTermSpec
WaitTerm builds a generic named CEL term over raw inputs.
The expression is evaluated in the same CEL environment as WaitSpec.Expr, with access to direct dependency outputs through `deps`, signal and timer inputs declared in WaitInputs, and the current workflow through `workflow`. Generic terms cannot reference other term names; term booleans are only available to WaitSpec.Expr.
Use a generic term for arbitrary CEL logic that does not need signal- or timer-specific result evidence, such as inspecting dependency output, combining multiple signal keys, or branching on workflow metadata. Raw signal inputs contain rows from the current workflow attempt only. Signal and timer keys may be declared in WaitInputs or by structured signal and timer terms.
func WaitTermSignal(name string, signalKey string, expr string) WaitTermSpec
WaitTermSignal builds a structured signal term that is satisfied by matching signal rows.
The term declares signalKey as a wait input and evaluates expr once for each signal row with that key in the current workflow attempt. The expr is CEL over a single signal, with variables `attempt`, `created_at`, `id`, `key`, `payload`, and `source`. The term is satisfied when at least one signal matches, or when at least WaitTermSpec.Count matching signals are present if Count is set.
Unlike WaitTerm, expr is not evaluated in the raw wait environment: `deps`, `signals`, `timers`, `workflow`, and other signal keys are not in scope. Use a generic term or WaitSpec.Expr for predicates that combine multiple inputs. Because `payload` and `source` are dynamic JSON maps, validation checks the expression shape but cannot prove every field access against future signal rows; write expr for the payload shape used by signalKey and guard optional fields when needed.
The top-level WaitSpec.Expr references the term by name as a boolean. River also records structured evidence for the term, including matched count, last matched signal ID, and the signal key. Prefer WaitTermSignal for common "wait for this signal" and signal quorum cases. Declaring a signal term also makes signalKey available to raw CEL as `signals["<signal key>"]`; it does not need to be repeated in WaitInputs.Signals.
package main
import (
"fmt"
"riverqueue.com/riverpro/riverworkflow"
)
func main() {
// Count counts matching signal rows. Use signal idempotency keys if each
// reviewer should only contribute once.
wait := riverworkflow.WaitSpec{
Terms: []riverworkflow.WaitTermSpec{
riverworkflow.WaitTermSignal(
"two_reviews_received",
"manual_review",
`payload.approved == true && payload.reviewer != ""`,
).Count(2),
},
}
if err := wait.Validate(nil); err != nil {
panic(err)
}
fmt.Println(wait.SignalKeys()[0])
fmt.Println(wait.Terms[0].CountRequirement())
}
Output:
manual_review 2
func WaitTermTimer(timer Timer) WaitTermSpec
WaitTermTimer builds a structured timer term.
The term name is the timer name, and the top-level WaitSpec.Expr references that name as a boolean. Because the timer name becomes a term name, it must satisfy the WaitTermSpec naming rules. A fired timer releases the task only when Expr references the timer term directly or through other logic.
Declaring a timer term also declares the timer as a wait input; it does not need to be repeated in WaitInputs.Timers. River records timer result evidence separately from raw CEL so status and diagnostics can show the configured timer and whether it fired.
func (t WaitTermSpec) Count(count int) WaitTermSpec
Count sets the minimum number of matched signal rows required for a WaitTermSignal term to become satisfied.
The value must be positive. Leave Count unset for the common case where a single matching signal is enough.
Count counts matching signal rows. It does not deduplicate by payload, source, or actor; use signal idempotency keys when the workflow needs one row per logical event. When unset, signal terms default to a required count of 1.
func (t WaitTermSpec) CountRequirement() int
CountRequirement returns the effective minimum matched-signal count for a signal term. It returns the authored value when set, otherwise 1.
func (t WaitTermSpec) DisplayLabel() string
DisplayLabel returns the resolved user-facing label for the term.
It prefers an explicit authored label and otherwise derives a readable fallback from the term declaration.
func (t WaitTermSpec) Expr() string
Expr returns the authored predicate expression for generic and signal terms.
func (t WaitTermSpec) Kind() WaitTermKind
Kind returns the authored term kind.
func (t WaitTermSpec) Label(label string) WaitTermSpec
Label sets the user-facing label for a term.
Labels are used in wait status, diagnostics, and result summaries. If no label is set, River derives a readable fallback from the signal key, timer name, or term name.
func (t WaitTermSpec) SignalKey() string
SignalKey returns the authored signal key for a signal term.
func (t WaitTermSpec) Timer() Timer
Timer returns the authored timer for a timer term.
func (t WaitTermSpec) UserLabel() string
UserLabel returns the authored label, when available.
type WaitTermStatus struct {
// Expr is the authored CEL expression for generic and signal terms. Timer
// terms do not have an authored CEL expression.
Expr string
// Kind identifies the authored term kind.
Kind WaitTermKind
// Label is the authored label, when available.
Label string
// Name is the authored term name.
Name string
// RequiredCount is the authored minimum matched-signal count for
// structured signal terms; zero for non-signal terms. It is populated
// from the spec regardless of resolution state, so callers can surface
// the required quorum on a still-pending wait.
RequiredCount int64
// Result contains final term evidence when the wait has resolved.
Result *WaitTermStatusResult
// SignalKey is the authored signal key for structured signal terms.
SignalKey string
// TimerName is the authored timer name for structured timer terms.
TimerName string
}
WaitTermStatus is one loaded authored term result.
type WaitTermStatusResult struct {
// LastMatchedID is the maximum matched signal ID for structured signal
// terms.
LastMatchedID *int64
// MatchedCount is the number of matched signal rows for structured signal
// terms.
MatchedCount int64
// Satisfied reports whether the term evaluated true.
Satisfied bool
}
WaitTermStatusResult is final evidence for one authored term.
WaitTimerDiagnostic contains live timer input diagnostics.
type WaitTimerInput struct {
// After is the declared relative duration for timers authored with `after`.
After *time.Duration
// Anchor describes how a relative timer is anchored.
Anchor *TimerAnchor
// FireAt is the effective absolute fire time when known. For authored
// absolute timers this is the declared fire time. For relative timers it is
// populated once the wait is active and the anchor has been resolved.
FireAt *time.Time
// Name is the timer name referenced by wait expressions.
Name string
// Result contains final evidence for this timer when the wait has resolved.
Result *WaitTimerInputResult
}
WaitTimerInput is a read-only timer input view from a loaded wait.
It combines authored timer configuration with runtime fire-time and final resolution-result evidence when available.
type WaitTimerInputResult struct {
// FireAt is the timer's effective fire time when known.
FireAt *time.Time
// Fired reports whether the timer had fired at result capture time.
Fired bool
}
WaitTimerInputResult is final timer evidence for one declared timer.
type WaitValidationError struct {
// Err is the underlying validation failure.
Err error
// TaskName is the workflow task whose wait failed validation,
// when available.
TaskName string
// WaitExpr is the top-level wait CEL expression being
// validated, when available.
WaitExpr string
// WorkflowID is the ID of the workflow containing the failing task, when
// available.
WorkflowID string
}
WaitValidationError reports that a WaitSpec definition failed validation.
It wraps the underlying validation error from WaitSpec.Validate and carries optional task/workflow context so callers can map the failure back to a specific node during workflow preparation.
TaskName, WaitExpr, and WorkflowID may be empty when validation occurs outside workflow preparation.
func (e *WaitValidationError) Error() string
Error implements error.
func (e *WaitValidationError) Unwrap() error
Unwrap returns the underlying validation error.