River

riverworkflow

package
v0.24.0-beta.2 Go to latest
Published: Apr 20, 2026 License: Proprietary

Package riverworkflow provides workflow helpers and gate types for River Pro's workflow engine.

Most of the core workflow functionality is exposed through the [riverpro.Workflow] type and in the [riverpro] package. See homepage and docs.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DepsFromJobRow

func DepsFromJobRow(job *rivertype.JobRow) []string

DepsFromJobRow extracts the dependency task names from a job row.

func DepsFromMetadata

func DepsFromMetadata(metadata []byte) []string

DepsFromMetadata extracts the dependency task names directly from a job's metadata.

func IDFromJobRow

func IDFromJobRow(job *rivertype.JobRow) string

IDFromJobRow extracts the workflow ID from a job row.

func IDFromMetadata

func IDFromMetadata(metadata []byte) string

IDFromMetadata extracts the workflow ID directly from a job's metadata.

func JobListParams

func JobListParams(job *rivertype.JobRow, params *river.JobListParams) (*river.JobListParams, error)

JobListParams extracts the workflow ID from a job row and returns a river.JobListParams with the workflow ID set in order to filter the job list by workflow.

func JobListParamsByID

func JobListParamsByID(workflowID string, params *river.JobListParams) (*river.JobListParams, error)

JobListParamsByID returns a river.JobListParams with the workflow ID set in order to filter the job list by workflow.

func NameFromJobRow

func NameFromJobRow(job *rivertype.JobRow) string

NameFromJobRow extracts the workflow's name from a job row.

func NameFromMetadata

func NameFromMetadata(metadata []byte) string

NameFromMetadata extracts the workflow's name directly from a job's metadata.

func TaskFromJobRow

func TaskFromJobRow(job *rivertype.JobRow) string

TaskFromJobRow extracts the workflow task's name from a job row.

func TaskFromMetadata

func TaskFromMetadata(metadata []byte) string

TaskFromMetadata extracts the workflow task's name directly from a job's metadata.

Types

type Gate

type Gate struct {/* contains filtered or unexported fields */}

Gate is a read-only view of a task's gate metadata.

Use its methods to inspect gate declarations, lifecycle phase, and satisfaction details.

func GateFromMetadata

func GateFromMetadata(metadata []byte) (Gate, error)

GateFromMetadata parses task metadata and returns the task's gate view.

It returns a zero gate and nil error when no gate metadata is present. Malformed metadata returns GateMetadataDecodeError with the failing field (`metadata`, `river:workflow_gate`, or `river:workflow_gate_state`).

func (Gate) Active

func (g Gate) Active() bool

Active reports whether the gate has become active.

func (Gate) ActiveAt

func (g Gate) ActiveAt() (time.Time, bool)

ActiveAt returns when the gate became active.

The second return value is false if the task has no gate or the gate is not active.

func (Gate) DeclaredSignals

func (g Gate) DeclaredSignals() []string

DeclaredSignals returns the gate's declared signal keys.

It returns an empty slice if the task has no gate.

func (Gate) Enabled

func (g Gate) Enabled() bool

Enabled reports whether the task has a gate.

func (Gate) Expr

func (g Gate) Expr() string

Expr returns the configured gate CEL expression.

It returns an empty string if the task has no gate.

func (Gate) HasTimer

func (g Gate) HasTimer(name string) bool

HasTimer reports whether the gate declares a timer name.

func (Gate) MarshalJSON

func (g Gate) MarshalJSON() ([]byte, error)

MarshalJSON marshals a gate as its structured view.

func (Gate) Phase

func (g Gate) Phase() GatePhase

Phase returns a high-level gate lifecycle phase.

func (Gate) Satisfaction

func (g Gate) Satisfaction() (GateSatisfaction, bool)

Satisfaction returns the gate-pass details captured when this gate was marked satisfied.

The returned data includes satisfaction time, attempt number, signal boundaries, and timer status so callers can inspect what satisfied the gate.

The boolean return value is false when the task has no gate, the gate is not satisfied, or no satisfaction snapshot is available in gate state.

func (Gate) Satisfied

func (g Gate) Satisfied() bool

Satisfied reports whether the gate has reached a satisfied state.

func (Gate) SatisfiedAt

func (g Gate) SatisfiedAt() (time.Time, bool)

SatisfiedAt returns when the gate became satisfied.

The second return value is false if the task has no gate or the gate is not satisfied.

func (Gate) Timer

func (g Gate) Timer(name string) (GateTimer, bool)

Timer returns a read-only timer view for the named timer.

func (Gate) TimerNames

func (g Gate) TimerNames() []string

TimerNames returns declared timer names in sorted order.

func (Gate) Timers

func (g Gate) Timers() []GateTimer

Timers returns read-only timer views for declared timers.

func (Gate) View

func (g Gate) View() GateView

View returns a JSON-friendly, eagerly projected view of the gate.

type GateMetadataDecodeError

type GateMetadataDecodeError struct {
	// Err is the underlying decode failure.
	Err error
	// Field is the metadata field that failed decoding.
	Field string
}

GateMetadataDecodeError reports malformed task metadata while decoding a Gate from metadata.

func (*GateMetadataDecodeError) Error

func (e *GateMetadataDecodeError) Error() string

func (*GateMetadataDecodeError) Unwrap

func (e *GateMetadataDecodeError) Unwrap() error

Unwrap returns the underlying decode error.

type GatePhase

type GatePhase int

GatePhase summarizes the runtime lifecycle of a task gate.

const (
	// GatePhaseNone indicates the task has no gate.
	GatePhaseNone GatePhase = iota

	// GatePhaseInactive indicates a gate exists but has not been activated yet.
	GatePhaseInactive

	// GatePhaseWaiting indicates a gate is active but not yet satisfied.
	GatePhaseWaiting

	// GatePhaseSatisfied indicates a gate is satisfied.
	GatePhaseSatisfied
)

func (GatePhase) String

func (p GatePhase) String() string

type GateSatisfaction

type GateSatisfaction struct {/* contains filtered or unexported fields */}

GateSatisfaction describes what made a task's gate pass.

It captures the evaluator timestamp, workflow attempt, per-signal-key boundaries, and timer status recorded when the gate became satisfied. Use it to inspect why a gate passed and to load the same set of visible signals later.

func (GateSatisfaction) AsOf

func (s GateSatisfaction) AsOf() time.Time

AsOf returns the evaluator timestamp used when the gate was satisfied.

func (GateSatisfaction) Attempt

func (s GateSatisfaction) Attempt() int

Attempt returns the workflow attempt that was active when this snapshot was captured.

func (GateSatisfaction) FiredTimerNames

func (s GateSatisfaction) FiredTimerNames() []string

FiredTimerNames returns timer names that were fired at satisfaction time.

func (GateSatisfaction) HasFiredTimer

func (s GateSatisfaction) HasFiredTimer(name string) bool

HasFiredTimer reports whether the named timer was fired at satisfaction time.

func (GateSatisfaction) SignalBoundary

SignalBoundary returns the stored signal boundary for a key.

func (GateSatisfaction) SignalKeys

func (s GateSatisfaction) SignalKeys() []string

SignalKeys returns captured signal keys in sorted order.

func (GateSatisfaction) Timer

Timer returns captured timer status for a named timer.

type GateSatisfactionSignalBoundary

type GateSatisfactionSignalBoundary struct {
	// Count is the number of visible signals for the key at satisfaction time.
	Count int64
	// LastSignalID is the maximum visible signal ID for the key.
	LastSignalID int64
}

GateSatisfactionSignalBoundary bounds the visible signal set for a key at satisfaction time.

type GateSatisfactionSignalView

type GateSatisfactionSignalView struct {
	// Count is the number of visible signals for the key at satisfaction time.
	Count int64
	// Key identifies the signal stream within the workflow.
	Key string
	// LastSignalID is the maximum visible signal ID for the key.
	LastSignalID int64
}

GateSatisfactionSignalView is a JSON-friendly signal boundary captured when a gate was satisfied.

type GateSatisfactionTimer

type GateSatisfactionTimer struct {
	// FireAt is the timer's effective fire time when known.
	FireAt *time.Time
	// Fired reports whether the timer was fired at satisfaction time.
	Fired bool
}

GateSatisfactionTimer stores timer status captured at satisfaction time.

type GateSatisfactionTimerView

type GateSatisfactionTimerView struct {
	// FireAt is the timer's effective fire time when known.
	FireAt *time.Time
	// Fired reports whether the timer was fired at satisfaction time.
	Fired bool
	// Name is the declared timer name.
	Name string
}

GateSatisfactionTimerView is a JSON-friendly timer status captured when a gate was satisfied.

type GateSatisfactionView

type GateSatisfactionView struct {
	// AsOf is the evaluator timestamp used when the gate was satisfied.
	AsOf time.Time
	// Attempt is the workflow attempt active when the snapshot was captured.
	Attempt int
	// Signals contains captured signal boundaries sorted by key.
	Signals []*GateSatisfactionSignalView
	// Timers contains captured timer statuses sorted by name.
	Timers []*GateSatisfactionTimerView
}

GateSatisfactionView is a JSON-friendly gate satisfaction snapshot.

type GateSpec

type GateSpec struct {
	// Expr is a CEL expression that must evaluate to true for the gate to be
	// satisfied.
	//
	// Available variables (field names use snake_case JSON tags):
	//   - signals: map[string][]SignalEvent
	//     - SignalEvent fields: attempt (int), created_at (time), id (int64),
	//       key (string), payload (map[string]any)
	//   - timers: map[string]TimerStatus
	//     - TimerStatus fields: fired (bool), fire_at (*time)
	//   - deps: map[string]Dep
	//     - Dep fields: finalized_at (*time), output (map[string]any),
	//       state (string job state)
	//   - workflow: WorkflowVars
	//     - WorkflowVars fields: attempt (int), created_at (time), id (string),
	//       metadata (map[string]any)
	//
	// Keys referenced in signals/timers/deps must be declared in
	// GateSpec.Signals, GateSpec.Timers, and the task's workflow deps.
	// Time-based waiting should use declared timers such as
	// `timers["timeout"].fired`.
	//
	// Examples:
	//   signals["approval"].exists(s, s.payload.approved == true)
	//   timers["timeout"].fired
	//   deps["build"].state == "completed" && deps["build"].output.sha != ""
	Expr string
	// Signals declares which signal keys the gate expression may read.
	//
	// Gate evaluation only exposes declared keys in `signals["key"]`, and signal
	// emissions on those keys enqueue gate reevaluation.
	Signals []string
	// Timers declares named timers used by the gate expression.
	//
	// Declared timers are materialized when the gate becomes active and are then
	// exposed to CEL as `timers["name"]` with `fire_at` and `fired` fields. Timer
	// progression also triggers reevaluation.
	Timers []Timer
}

GateSpec defines a CEL-based gate for a workflow task. Gate metadata is stored on the job and evaluated after deps are satisfied.

For common cases, prefer NewGateSignalOnly or NewGateTimerOnly. Use GateSpec directly for advanced CEL expressions.

Example

ExampleGateSpec demonstrates defining a gate with signals, timers, and CEL.

package main

import (
	"fmt"
	"time"

	"riverqueue.com/riverpro/riverworkflow"
)

func main() {
	gate := riverworkflow.GateSpec{
		Expr: "signals['approval'].exists(s, s.payload.approved == true) || timers['timeout'].fired || signals['cancel'].size() > 0",
		Signals: []string{
			"approval",
			"cancel",
		},
		Timers: []riverworkflow.Timer{
			riverworkflow.TimerAfter("timeout", 30*time.Minute),
		},
	}

	if err := gate.Validate(nil); err != nil {
		panic(err)
	}

	fmt.Println(gate.Timers[0].Name())

}

Output:

timeout

func NewGateSignalOnly

func NewGateSignalOnly(key string) *GateSpec

NewGateSignalOnly returns a gate that becomes satisfied when at least one signal exists for the provided key.

func NewGateTimerOnly

func NewGateTimerOnly(timer Timer) *GateSpec

NewGateTimerOnly returns a gate that becomes satisfied when the provided timer fires.

func (GateSpec) MarshalJSON

func (g GateSpec) MarshalJSON() ([]byte, error)

MarshalJSON writes gate metadata.

func (*GateSpec) UnmarshalJSON

func (g *GateSpec) UnmarshalJSON(data []byte) error

UnmarshalJSON reads gate metadata from persisted representation.

func (*GateSpec) Validate

func (g *GateSpec) Validate(deps []string) error

Validate checks the structural validity of a gate definition.

Validation checks timer anchors against deps and verifies the CEL expression and references.

type GateTimer

type GateTimer struct {
	// Anchor describes how a relative timer is anchored.
	//
	// Absolute timers use the zero [TimerAnchor].
	Anchor TimerAnchor
	// After is the declared relative duration for timers authored with `after`.
	After time.Duration
	// FireAt is the effective absolute fire time when known. For authored
	// absolute timers this is the declared fire time. For relative timers it is
	// populated once the gate is active and the anchor has been resolved.
	FireAt time.Time
	// HasAfter reports whether After was explicitly declared.
	HasAfter bool
	// HasFireAt reports whether FireAt is known.
	HasFireAt bool
	// Name is the timer name referenced in CEL (`timers["name"]`).
	Name string
}

GateTimer is a read-only timer view from a loaded gate.

It combines declared timer configuration with the runtime fire time once the gate has resolved it.

type GateTimerAnchorView

type GateTimerAnchorView struct {
	// Kind identifies the source time used to resolve a relative timer.
	Kind TimerAnchorKind
	// Task is the dependency task name for dependency-finalized timers.
	Task string
}

GateTimerAnchorView is a JSON-friendly timer anchor.

type GateTimerView

type GateTimerView struct {
	// After is the declared relative duration for timers authored with `after`.
	After string
	// AfterUS is the declared relative duration in microseconds.
	AfterUS *int64
	// Anchor describes how a relative timer is anchored.
	Anchor *GateTimerAnchorView
	// FireAt is the effective absolute fire time when known.
	FireAt *time.Time
	// HasAfter reports whether After was explicitly declared.
	HasAfter bool
	// HasFireAt reports whether FireAt is known.
	HasFireAt bool
	// Name is the timer name referenced in CEL.
	Name string
}

GateTimerView is a JSON-friendly timer view from a loaded gate.

type GateValidationError

type GateValidationError struct {
	// Err is the underlying validation failure.
	Err error
	// GateExpr is the gate CEL expression being validated, when available.
	GateExpr string
	// TaskName is the workflow task whose gate failed validation, when
	// available.
	TaskName string
	// WorkflowID is the ID of the workflow containing the failing task, when
	// available.
	WorkflowID string
}

GateValidationError reports that a GateSpec definition failed validation.

It wraps the underlying validation error from GateSpec.Validate and carries optional task/workflow context so callers can map the failure back to a specific node during workflow preparation.

TaskName, GateExpr, and WorkflowID may be empty when validation occurs outside workflow preparation.

func (*GateValidationError) Error

func (e *GateValidationError) Error() string

func (*GateValidationError) Unwrap

func (e *GateValidationError) Unwrap() error

Unwrap returns the underlying validation error.

type GateView

type GateView struct {
	// ActiveAt is when the gate became active, when known.
	ActiveAt *time.Time
	// DeclaredSignals contains signal keys referenced by the gate.
	DeclaredSignals []string
	// Enabled reports whether the task has a gate.
	Enabled bool
	// ExprCEL is the CEL expression that controls the gate.
	ExprCEL string
	// Phase summarizes the gate lifecycle.
	Phase string
	// Satisfaction captures what made the gate pass.
	Satisfaction *GateSatisfactionView
	// SatisfiedAt is when the gate became satisfied, when known.
	SatisfiedAt *time.Time
	// Timers contains declared timers sorted by name.
	Timers []*GateTimerView
}

GateView is a JSON-friendly, eagerly projected view of a task gate.

type Signal

type Signal struct {
	// Attempt is the workflow attempt number from when this signal was emitted.
	Attempt int
	// CreatedAt is when the signal row was inserted.
	CreatedAt time.Time
	// ID is the unique identifier of the signal row.
	ID int64
	// Key identifies the signal stream within the workflow. Gate
	// definitions reference signal keys (for example, signals["approval"]) to
	// decide which signals can satisfy a task's gate expression.
	Key string
	// Payload is the JSON payload associated with the signal.
	Payload json.RawMessage
	// Source is immutable JSON metadata describing the emitter for audit and
	// correlation (for example emitter type or request IDs).
	Source json.RawMessage
	// WorkflowID is the ID of the workflow that this signal belongs to.
	WorkflowID string
}

Signal is a durable workflow-scoped fact emitted to a workflow.

Signals are append-only and keyed within a workflow so multiple tasks can evaluate the same signal stream in gate expressions.

type SignalAttemptMismatchError

type SignalAttemptMismatchError struct {
	// RequestedAttempt is the attempt requested by the emitter.
	RequestedAttempt int
	// SignalAttempt is the attempt stamped on the inserted signal row.
	SignalAttempt int
	// WorkflowID is the ID of the workflow associated with the signal.
	WorkflowID string
}

SignalAttemptMismatchError indicates that a signal emit targeted a specific workflow attempt but inserted against a different attempt.

func (*SignalAttemptMismatchError) Error

func (*SignalAttemptMismatchError) Is

func (e *SignalAttemptMismatchError) Is(target error) bool

type SignalKeyUndeclaredError

type SignalKeyUndeclaredError struct {
	Key string

	TaskName   string
	WorkflowID string
}

SignalKeyUndeclaredError is returned when task-scoped signal reads request a key that is not declared by the task's gate.

func (*SignalKeyUndeclaredError) Error

func (e *SignalKeyUndeclaredError) Error() string

func (*SignalKeyUndeclaredError) Is

func (e *SignalKeyUndeclaredError) Is(target error) bool

type SignalPayloadMismatchError

type SignalPayloadMismatchError struct {
	// SignalID is the existing signal ID returned by the idempotent insert path.
	SignalID *int64
	// WorkflowID is the ID of the workflow associated with the signal.
	WorkflowID string
}

SignalPayloadMismatchError indicates that a signal idempotency replay payload does not match the original payload for the same idempotency key.

func (*SignalPayloadMismatchError) Error

func (*SignalPayloadMismatchError) Is

func (e *SignalPayloadMismatchError) Is(target error) bool

type SignalTaskDeclaresNoSignalKeysError

type SignalTaskDeclaresNoSignalKeysError struct {
	TaskName   string
	WorkflowID string
}

SignalTaskDeclaresNoSignalKeysError is returned when task-scoped signal reads target a task that declares no signal keys. This includes tasks with no gate, timer-only gates, and dep-only gates.

func (*SignalTaskDeclaresNoSignalKeysError) Error

func (*SignalTaskDeclaresNoSignalKeysError) Is

type SignalUnknownTaskError

type SignalUnknownTaskError struct {
	TaskName   string
	WorkflowID string
}

SignalUnknownTaskError is returned when task-scoped signal reads target a task name that does not exist in the workflow.

func (*SignalUnknownTaskError) Error

func (e *SignalUnknownTaskError) Error() string

func (*SignalUnknownTaskError) Is

func (e *SignalUnknownTaskError) Is(target error) bool

type Timer

type Timer struct {/* contains filtered or unexported fields */}

Timer defines a timer dependency for a gate.

func TimerAfter

func TimerAfter(name string, after time.Duration) Timer

TimerAfter builds a relative timer anchored to gate activation by default. Name is the identifier used in CEL (timers["name"]). After is the relative delay (must be positive); it is stored with microsecond precision.

func TimerAfterDepFinalized

func TimerAfterDepFinalized(name string, after time.Duration, depTask string) Timer

TimerAfterDepFinalized builds a relative timer anchored to a dependency's finalization time.

Example

ExampleTimerAfterDepFinalized demonstrates building a dep-finalized timer.

package main

import (
	"fmt"
	"time"

	"riverqueue.com/riverpro/riverworkflow"
)

func main() {
	gate := riverworkflow.GateSpec{
		Expr: "timers['wait_for_build'].fired",
		Timers: []riverworkflow.Timer{
			riverworkflow.TimerAfterDepFinalized("wait_for_build", 10*time.Minute, "build_assets"),
		},
	}

	if err := gate.Validate([]string{"build_assets"}); err != nil {
		panic(err)
	}

	fmt.Println(gate.Expr)
	fmt.Println(gate.Timers[0].Name())

}

Output:

timers['wait_for_build'].fired
wait_for_build

func TimerAfterWorkflowCreated

func TimerAfterWorkflowCreated(name string, after time.Duration) Timer

TimerAfterWorkflowCreated builds a relative timer anchored to the workflow's creation time.

func TimerAt

func TimerAt(name string, fireAt time.Time) Timer

TimerAt builds an absolute timer that fires at the provided time. Name is the identifier used in CEL (timers["name"]).

func (Timer) After

func (t Timer) After() (time.Duration, bool)

After returns the relative delay for a timer created with TimerAfter.

func (Timer) Anchor

func (t Timer) Anchor() TimerAnchor

Anchor returns the anchor for a relative timer.

Absolute timers return the zero TimerAnchor.

func (Timer) FireAt

func (t Timer) FireAt() (time.Time, bool)

FireAt returns the absolute fire time for a timer created with TimerAt.

func (Timer) Name

func (t Timer) Name() string

Name returns the identifier used to reference the timer in CEL.

type TimerAnchor

type TimerAnchor struct {
	// DepTask is the dependency task name for dependency-finalized timers.
	DepTask string
	// Kind identifies the source time used to resolve a relative timer.
	Kind TimerAnchorKind
}

TimerAnchor describes how a timer is anchored when it is inspected.

Absolute timers use the zero value.

type TimerAnchorKind

type TimerAnchorKind string

TimerAnchorKind identifies the source time used to resolve a relative timer.

const (
	// TimerAnchorKindDepFinalizedAt anchors relative timers to a dependency's
	// finalization time.
	TimerAnchorKindDepFinalizedAt TimerAnchorKind = "dep_finalized_at"

	// TimerAnchorKindGateActiveAt anchors relative timers to gate activation
	// time.
	TimerAnchorKindGateActiveAt TimerAnchorKind = "gate_active_at"

	// TimerAnchorKindWorkflowCreatedAt anchors relative timers to the workflow's
	// creation time.
	TimerAnchorKindWorkflowCreatedAt TimerAnchorKind = "workflow_created_at"
)