const ( // MigrationLinePro is the migration line for River Pro features. MigrationLinePro = "pro" // MigrationLineSequence is the deprecated migration line for the sequence // feature. Users should migrate to use the unified pro line instead. // // Deprecated: Use MigrationLinePro instead. MigrationLineSequence = "sequence" // MigrationLineWorkflow is the deprecated migration line for the workflow // feature. Users should migrate to use the unified pro line instead. // // Deprecated: Use MigrationLinePro instead. MigrationLineWorkflow = "workflow" )
var ErrDeadlock = &DeadlockError{}
ErrDeadlock is returned when the driver detects a database deadlock.
var ErrStatementTimeout = &StatementTimeoutError{}
ErrStatementTimeout is returned when a Pro driver detects a database statement timeout.
This keeps SQLSTATE and driver-specific error parsing in driver packages so higher-level workflow services can classify transient failures without depending on pgx/libpq details.
MigrationLineProTruncateTables returns the tables that should be truncated for a given pro migration line at the given version. This is used only in tests so it panics in case a line or version isn't found, which is usually an indication that a new version neeeds to be added to the matrix below.
type DeadlockError struct {
Err error
}
DeadlockError wraps a driver-specific deadlock error.
func (e *DeadlockError) Error() string
func (e *DeadlockError) Is(target error) bool
func (e *DeadlockError) Unwrap() error
type JobDeleteNonWorkflowBeforeParams struct {
CancelledDoDelete bool
CancelledFinalizedAtHorizon time.Time
CompletedDoDelete bool
CompletedFinalizedAtHorizon time.Time
DiscardedDoDelete bool
DiscardedFinalizedAtHorizon time.Time
Max int
QueuesExcluded []string
QueuesIncluded []string
Schema string
}
type JobGetAvailableLimitedParams struct {
*riverdriver.JobGetAvailableParams
AvailablePartitionKeys []string
CurrentProducerPartitionKeys []string
CurrentProducerPartitionRunningCounts []int32
GlobalLimit int32
LocalLimit int32
PartitionByArgs []string
PartitionByKind bool
}
type PeriodicJobUpsertManyParams struct {
Jobs []*PeriodicJobUpsertParams
Schema string
}
type ProDriver[TTx any] interface { riverdriver.Driver[TTx] GetProExecutor() ProExecutor ProConfigInit(pilot riverpilot.Pilot) UnwrapProExecutor(tx TTx) ProExecutorTx }
type ProExecutor interface {
riverdriver.Executor
// BeginPro begins a new subtransaction returning a ProExecutorTx.
// ErrSubTxNotSupported may be returned if the executor is a transaction and
// the driver doesn't support subtransactions (like
// riverdriver/riverdatabasesql for database/sql).
BeginPro(ctx context.Context) (ProExecutorTx, error)
JobDeadLetterDeleteByID(ctx context.Context, params *JobDeadLetterDeleteByIDParams) (*rivertype.JobRow, error)
JobDeadLetterGetAll(ctx context.Context, params *JobDeadLetterGetAllParams) ([]*rivertype.JobRow, error)
JobDeadLetterGetByID(ctx context.Context, params *JobDeadLetterGetByIDParams) (*rivertype.JobRow, error)
JobDeadLetterMoveByID(ctx context.Context, params *JobDeadLetterMoveByIDParams) (*rivertype.JobRow, error)
JobDeadLetterMoveDiscarded(ctx context.Context, params *JobDeadLetterMoveDiscardedParams) ([]*rivertype.JobRow, error)
JobDeleteByIDMany(ctx context.Context, params *JobDeleteByIDManyParams) ([]*rivertype.JobRow, error)
JobDeleteNonWorkflowBefore(ctx context.Context, params *JobDeleteNonWorkflowBeforeParams) (int, error)
JobGetAvailableForBatch(ctx context.Context, params *JobGetAvailableForBatchParams) ([]*rivertype.JobRow, error)
JobGetAvailableLimited(ctx context.Context, params *JobGetAvailableLimitedParams) ([]*rivertype.JobRow, error)
JobGetAvailablePartitionKeys(ctx context.Context, params *JobGetAvailablePartitionKeysParams) ([]string, error)
PGTryAdvisoryXactLock(ctx context.Context, key int64) (bool, error)
PeriodicJobGetAll(ctx context.Context, params *PeriodicJobGetAllParams) ([]*PeriodicJob, error)
PeriodicJobGetByID(ctx context.Context, params *PeriodicJobGetByIDParams) (*PeriodicJob, error)
PeriodicJobInsert(ctx context.Context, params *PeriodicJobInsertParams) (*PeriodicJob, error)
PeriodicJobKeepAliveAndReap(ctx context.Context, params *PeriodicJobKeepAliveAndReapParams) ([]*PeriodicJob, error)
PeriodicJobUpsertMany(ctx context.Context, params *PeriodicJobUpsertManyParams) ([]*PeriodicJob, error)
ProducerDelete(ctx context.Context, params *ProducerDeleteParams) error
ProducerGetByID(ctx context.Context, params *ProducerGetByIDParams) (*Producer, error)
QueueGetMetadataForInsert(ctx context.Context, params *QueueGetMetadataForInsertParams) ([]*QueueGetMetadataForInsertResult, error)
ProducerInsertOrUpdate(ctx context.Context, params *ProducerInsertOrUpdateParams) (*Producer, error)
ProducerKeepAlive(ctx context.Context, params *ProducerKeepAliveParams) (*Producer, error)
ProducerListByQueue(ctx context.Context, params *ProducerListByQueueParams) ([]*ProducerListByQueueResult, error)
ProducerUpdate(ctx context.Context, params *ProducerUpdateParams) (*Producer, error)
SequenceAppendMany(ctx context.Context, params *SequenceAppendManyParams) (int, error)
SequenceList(ctx context.Context, params *SequenceListParams) ([]*Sequence, error)
// SequencePromote promotes the next pending head for the given keys using
// SKIP LOCKED to avoid blocking on still-locked head rows. Returns promoted
// and skipped keys; callers must INSERT skipped keys into
// river_job_sequence to create a durable retry signal.
SequencePromote(ctx context.Context, params *SequencePromoteParams) (*SequencePromoteResult, error)
// SequencePromoteFromTable promotes sequences from the river_job_sequence
// table and returns aggregate information about the promotion work done.
SequencePromoteFromTable(ctx context.Context, params *SequencePromoteFromTableParams) (*SequencePromoteFromTableResult, error)
SequenceScanAndPromoteStalled(ctx context.Context, params *SequenceScanAndPromoteStalledParams) (*SequenceScanAndPromoteStalledResult, error)
TimeNow(ctx context.Context, params *TimeNowParams) (time.Time, error)
WorkflowAttemptInsert(ctx context.Context, params *WorkflowAttemptInsertParams) (*WorkflowAttempt, error)
WorkflowAttemptListByWorkflowID(ctx context.Context, params *WorkflowAttemptListByWorkflowIDParams) ([]*WorkflowAttempt, error)
WorkflowAttemptTaskInsert(ctx context.Context, params *WorkflowAttemptTaskInsertParams) (*WorkflowAttemptTask, error)
WorkflowAttemptTaskListByWorkflowID(ctx context.Context, params *WorkflowAttemptTaskListByWorkflowIDParams) ([]*WorkflowAttemptTask, error)
WorkflowCancel(ctx context.Context, params *WorkflowCancelParams) ([]*rivertype.JobRow, error)
WorkflowCancelWithDeletedDepsMany(ctx context.Context, params *WorkflowCancelWithDeletedDepsManyParams) (int64, error)
WorkflowCancelWithFailedDepsMany(ctx context.Context, params *WorkflowCancelWithFailedDepsManyParams) (int64, error)
WorkflowCleanupDeleteAttemptsByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
WorkflowCleanupDeleteAttemptTasksByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
WorkflowCleanupDeleteDeadLetterJobsByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
WorkflowCleanupDeleteJobsByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
WorkflowCleanupDeleteSignalsByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
WorkflowCleanupDeleteTimersByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
WorkflowCleanupDeleteWorkflowsByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteWorkflowsByWorkflowIDsParams) error
WorkflowCleanupDeleteWorklistByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
WorkflowCleanupListFinalizedIDs(ctx context.Context, params *WorkflowCleanupListFinalizedIDsParams) ([]string, error)
WorkflowCleanupListFinalizedIDsWithoutJobs(ctx context.Context, params *WorkflowCleanupListFinalizedIDsWithoutJobsParams) ([]string, error)
WorkflowCountIncompleteJobs(ctx context.Context, params *WorkflowCountIncompleteJobsParams) (int64, error)
WorkflowFinalizeIfCompleteMany(ctx context.Context, params *WorkflowFinalizeIfCompleteManyParams) ([]string, error)
WorkflowGetByID(ctx context.Context, params *WorkflowGetByIDParams) (*Workflow, error)
WorkflowGetFinalizationCandidates(ctx context.Context, params *WorkflowGetFinalizationCandidatesParams) ([]string, error)
WorkflowGetLegacyBackfillIDs(ctx context.Context, params *WorkflowGetLegacyBackfillIDsParams) ([]string, error)
WorkflowHasWaitTasksMany(ctx context.Context, params *WorkflowHasWaitTasksManyParams) ([]string, error)
WorkflowInitFromJobs(ctx context.Context, params *WorkflowInitFromJobsParams) ([]string, error)
WorkflowInsertMany(ctx context.Context, params *WorkflowInsertManyParams) error
WorkflowJobGetByTaskName(ctx context.Context, params *WorkflowJobGetByTaskNameParams) (*rivertype.JobRow, error)
WorkflowJobList(ctx context.Context, params *WorkflowJobListParams) ([]*WorkflowTaskWithJob, error)
WorkflowListActive(ctx context.Context, params *WorkflowListParams) ([]*WorkflowListItem, error)
WorkflowListAll(ctx context.Context, params *WorkflowListParams) ([]*WorkflowListItem, error)
WorkflowListByIDs(ctx context.Context, params *WorkflowListByIDsParams) ([]string, error)
WorkflowListByIDsForWaitEval(ctx context.Context, params *WorkflowListByIDsForWaitEvalParams) ([]*WorkflowWaitWorkflow, error)
WorkflowListInactive(ctx context.Context, params *WorkflowListParams) ([]*WorkflowListItem, error)
WorkflowLoadDepTasksAndIDs(ctx context.Context, params *WorkflowLoadDepTasksAndIDsParams) (map[string]*int64, error)
WorkflowLoadJobsWithDeps(ctx context.Context, params *WorkflowLoadJobsWithDepsParams) ([]*WorkflowTaskWithJob, error)
WorkflowLoadTaskWithDeps(ctx context.Context, params *WorkflowLoadTaskWithDepsParams) (*WorkflowTaskWithJob, error)
WorkflowLoadTaskNamesByWorkflowID(ctx context.Context, params *WorkflowLoadTaskNamesByWorkflowIDParams) ([]string, error)
WorkflowLoadTasksByNames(ctx context.Context, params *WorkflowLoadTasksByNamesParams) ([]*WorkflowTask, error)
WorkflowLockByIDsSkipLocked(ctx context.Context, params *WorkflowLockByIDsSkipLockedParams) ([]string, error)
WorkflowReadyTaskIDsByWorkflowIDs(ctx context.Context, params *WorkflowReadyTaskIDsByWorkflowIDsParams) ([]*WorkflowReadyTaskIDsByWorkflowIDsRow, error)
WorkflowRetry(ctx context.Context, params *WorkflowRetryParams) ([]*rivertype.JobRow, error)
WorkflowRetryLockAndCheckRunning(ctx context.Context, params *WorkflowRetryLockAndCheckRunningParams) (*WorkflowRetryLockAndCheckRunningResult, error)
WorkflowSignalInsert(ctx context.Context, params *WorkflowSignalInsertParams) (*WorkflowSignalInsertResult, error)
WorkflowSignalList(ctx context.Context, params *WorkflowSignalListParams) ([]*WorkflowSignal, error)
WorkflowSignalListByEvidence(ctx context.Context, params *WorkflowSignalListByEvidenceParams) ([]*WorkflowSignal, error)
WorkflowSignalListByKeys(ctx context.Context, params *WorkflowSignalListByKeysParams) ([]*WorkflowSignal, error)
WorkflowSignalListByWorkflowIDs(ctx context.Context, params *WorkflowSignalListByWorkflowIDsParams) ([]*WorkflowSignal, error)
WorkflowSignalStatsByWorkflowIDs(ctx context.Context, params *WorkflowSignalStatsByWorkflowIDsParams) ([]*WorkflowSignalStat, error)
// WorkflowStageJobsByIDMany promotes a set of candidate workflow task jobs by job ID.
//
// Important contract:
// - This method DOES NOT evaluate dependencies. Callers must first compute which
// task jobs are eligible to be staged (e.g. via WorkflowReadyTaskIDsByWorkflowIDs).
// - This method enforces waits: if a job has
// `river:workflow_wait` metadata, it will only be promoted
// after
// `river:workflow_wait_state.resolved_at` is set. Callers should generally:
// 1) call WorkflowWaitActivatableTaskIDsByWorkflowIDs to find waiting
// pending tasks whose deps are satisfied but whose wait state
// is not active,
// 2) call WorkflowWaitActivateByJobIDMany on those IDs to set `started_at`
// and timer `fire_at`, then
// 3) call WorkflowStageJobsByIDMany with ready IDs to promote only tasks
// with no wait or tasks whose wait is already resolved.
WorkflowStageJobsByIDMany(ctx context.Context, params *WorkflowStageJobsByIDManyParams) ([]*rivertype.JobRow, error)
WorkflowTimerConsumeDue(ctx context.Context, params *WorkflowTimerConsumeDueParams) ([]*WorkflowTimer, error)
WorkflowTimerDeleteByWorkflowIDs(ctx context.Context, params *WorkflowTimerDeleteByWorkflowIDsParams) error
WorkflowTimerGetByWorkflowID(ctx context.Context, params *WorkflowTimerGetByWorkflowIDParams) (*WorkflowTimer, error)
WorkflowTimerNextFireAtByWorkflowIDs(ctx context.Context, params *WorkflowTimerNextFireAtByWorkflowIDsParams) ([]*WorkflowTimerNextFireAtByWorkflowIDsRow, error)
WorkflowTimerUpsertMany(ctx context.Context, params *WorkflowTimerUpsertManyParams) error
WorkflowUnfinalizeIfActiveJobsMany(ctx context.Context, params *WorkflowUnfinalizeIfActiveJobsManyParams) ([]string, error)
WorkflowWaitActivatableTaskIDsByWorkflowIDs(ctx context.Context, params *WorkflowWaitActivatableTaskIDsByWorkflowIDsParams) ([]*WorkflowWaitActivatableTaskIDsByWorkflowIDsRow, error)
WorkflowWaitActivateByJobIDMany(ctx context.Context, params *WorkflowWaitActivateByJobIDManyParams) ([]int64, error)
WorkflowWaitActiveTaskListByWorkflowIDs(ctx context.Context, params *WorkflowWaitActiveTaskListByWorkflowIDsParams) ([]*WorkflowWaitActiveTask, error)
WorkflowWaitDepOutputListByWorkflowTaskPairs(ctx context.Context, params *WorkflowWaitDepOutputListByWorkflowTaskPairsParams) ([]*WorkflowWaitDepOutput, error)
WorkflowWaitEvalCursorUpdateByWorkflowIDMany(ctx context.Context, params *WorkflowWaitEvalCursorUpdateByWorkflowIDManyParams) error
WorkflowWaitUpdateMetadataByJobIDMany(ctx context.Context, params *WorkflowWaitUpdateMetadataByJobIDManyParams) error
WorkflowWorklistDeleteByWorkflowIDsReturningReasons(ctx context.Context, params *WorkflowWorklistDeleteByWorkflowIDsReturningReasonsParams) ([]*WorkflowWorklistDeleteByWorkflowIDsReturningReasonsRow, error)
WorkflowWorklistInsertMany(ctx context.Context, params *WorkflowWorklistInsertManyParams) error
WorkflowWorklistListIDs(ctx context.Context, params *WorkflowWorklistListParams) ([]*WorkflowWorklistIDItem, error)
WorkflowWorklistList(ctx context.Context, params *WorkflowWorklistListParams) ([]*WorkflowWorklistItem, error)
}
type ProExecutorTx interface {
ProExecutor
riverdriver.ExecutorTx
}
type StatementTimeoutError struct {
Err error
}
StatementTimeoutError wraps a driver-specific statement timeout error.
func (e *StatementTimeoutError) Error() string
func (e *StatementTimeoutError) Is(target error) bool
func (e *StatementTimeoutError) Unwrap() error
type UniqueViolationError struct {
ConstraintName string
Detail string
Err error
KeyValues map[string]string
SQLState string
}
UniqueViolationError wraps a driver-specific unique-violation error.
func (e *UniqueViolationError) Error() string
func (e *UniqueViolationError) Is(target error) bool
func (e *UniqueViolationError) Unwrap() error
type Workflow struct {
CreatedAt time.Time
CurrentAttempt int
FinalizedAt *time.Time
ID string
Metadata []byte
Name *string
State string
UpdatedAt time.Time
WaitEvalCursorJobID *int64
}
Workflow is a row from river_workflow.
WorkflowGetByIDParams selects a workflow row by ID.
WorkflowHasWaitTasksManyParams checks whether workflows have pending waiting tasks.
WorkflowInitFromJobsParams inserts workflow rows based on existing job metadata.
WorkflowInsertManyParams inserts workflow rows if they do not already exist.
WorkflowListByIDsForWaitEvalParams lists workflow rows needed for wait evaluation.
WorkflowListByIDsParams lists workflow rows by ID.
WorkflowLockByIDsSkipLockedParams locks workflow rows if available.
WorkflowIDs are attempted in input order. LimitCount optionally caps how many rows are locked; values <= 0 mean "no cap".
type WorkflowRetryLockAndCheckRunningResult struct {
WorkflowIsActive bool
}
type WorkflowRetryMode string
const ( WorkflowRetryModeAll WorkflowRetryMode = "all" WorkflowRetryModeFailedOnly WorkflowRetryMode = "failed_only" WorkflowRetryModeFailedAndDownstream WorkflowRetryMode = "failed_and_downstream" )
type WorkflowSignal struct {
Attempt int
CreatedAt time.Time
ID int64
IdempotencyKey string
Key string
Metadata []byte
Payload []byte
Source []byte
WorkflowID string
}
WorkflowSignal is a signal sent to a workflow.
type WorkflowSignalInsertParams struct {
IdempotencyKey string
Key string
Metadata []byte
Payload []byte
RequestedAttempt *int
Schema string
Source []byte
WorkflowID string
}
WorkflowSignalInsertParams inserts a signal for a workflow.
type WorkflowSignalInsertResult struct {
WorkflowSignal
// CurrentAttempt is the workflow's observed current attempt when the query
// found a workflow row.
CurrentAttempt int
// PayloadSemanticEqual indicates whether the returned row payload is
// semantically equal (Postgres jsonb equality) to the request payload.
PayloadSemanticEqual bool
// SignalPresent reports whether the insert query returned a real signal row.
SignalPresent bool
// SkippedAsDuplicate is true when an idempotent emit reused an existing
// signal row instead of inserting a new one.
SkippedAsDuplicate bool
}
WorkflowSignalInsertResult is the result of WorkflowSignalInsert.
type WorkflowSignalListByEvidenceParams struct {
Attempt int
CursorID *int64
Desc bool
Keys []string
LastIncludedSignalIDs []int64
LimitCount int
Schema string
WorkflowID string
}
WorkflowSignalListByEvidenceParams fetches signal rows bounded by persisted task evidence.
type WorkflowSignalListByKeysParams struct {
Attempt *int
CursorID *int64
Desc bool
Keys []string
LimitCount int
Schema string
WorkflowID string
}
WorkflowSignalListByKeysParams fetches workflow signals by declared keys.
type WorkflowSignalListByWorkflowIDsParams struct {
Attempt int
Keys []string
Schema string
WorkflowIDs []string
}
WorkflowSignalListByWorkflowIDsParams fetches only the signal rows relevant to wait evaluation for the given workflows and current attempt.
type WorkflowSignalListParams struct {
Attempt *int
CursorID *int64
Desc bool
Key *string
LimitCount int
Schema string
WorkflowID string
}
WorkflowSignalListParams fetches signals for a workflow with pagination.
type WorkflowSignalStat struct {
Key string
LastSignalID int64
SignalCount int64
WorkflowID string
}
WorkflowSignalStat summarizes signals for one workflow/key pair.
type WorkflowSignalStatsByWorkflowIDsParams struct {
Attempt int
Keys []string
Schema string
WorkflowIDs []string
}
WorkflowSignalStatsByWorkflowIDsParams fetches signal aggregates relevant to wait evaluation for the given workflows and current attempt.
WorkflowTimer is a row from river_workflow_timer.
WorkflowTimerConsumeDueParams atomically consumes due workflow timer rows.
WorkflowTimerDeleteByWorkflowIDsParams deletes workflow timer rows.
WorkflowTimerGetByWorkflowIDParams gets a workflow timer row by workflow ID.
type WorkflowTimerNextFireAtByWorkflowIDsParams struct {
Now time.Time
Schema string
WorkflowIDs []string
}
WorkflowTimerNextFireAtByWorkflowIDsParams finds next pending timer deadlines for workflows.
type WorkflowTimerUpsertManyParams struct {
NextFireAts []time.Time
Schema string
WorkflowIDs []string
}
WorkflowTimerUpsertManyParams creates or updates workflow timers.
type WorkflowUnfinalizeIfActiveJobsManyParams struct {
Now time.Time
Schema string
WorkflowIDs []string
}
WorkflowUnfinalizeIfActiveJobsManyParams reopens finalized workflows with active jobs.
WorkflowWaitActivateByJobIDManyParams activates waiting tasks by job IDs.
WorkflowWaitActiveTask is an active waiting-task row for evaluation.
type WorkflowWaitActiveTaskListByWorkflowIDsParams struct {
LimitCount int
Schema string
WorkflowIDs []string
}
WorkflowWaitActiveTaskListByWorkflowIDsParams lists active waiting tasks.
type WorkflowWaitDepOutput struct {
FinalizedAt *time.Time
Output []byte
State *rivertype.JobState
Task string
WorkflowID string
}
WorkflowWaitDepOutput is a dep output row for wait evaluation.
type WorkflowWaitDepOutputListByWorkflowTaskPairsParams struct {
Schema string
Tasks []string
WorkflowIDs []string
}
WorkflowWaitDepOutputListByWorkflowTaskPairsParams lists dep outputs.
type WorkflowWaitEvalCursorUpdateByWorkflowIDManyParams struct {
CursorJobIDs []int64
Schema string
WorkflowIDs []string
}
WorkflowWaitEvalCursorUpdateByWorkflowIDManyParams updates wait eval cursors.
type WorkflowWaitUpdateMetadataByJobIDManyParams struct {
JobIDs []int64
Schema string
WaitStates [][]byte
}
WorkflowWaitUpdateMetadataByJobIDManyParams updates wait state on jobs using `jsonb_set` to surgically update only the `river:workflow_wait_state` key, preserving all other metadata.
type WorkflowWaitWorkflow struct {
CreatedAt time.Time
CurrentAttempt int
ID string
Metadata []byte
}
WorkflowWaitWorkflow is workflow metadata used during wait evaluation.
type WorkflowWorklistDeleteByWorkflowIDsReturningReasonsParams struct {
Schema string
WorkflowIDs []string
}
WorkflowWorklistDeleteByWorkflowIDsReturningReasonsParams deletes worklist rows and returns reasons.
WorkflowWorklistIDItem is a lightweight worklist row used for prefix scans.
WorkflowWorklistInsertManyParams inserts worklist rows for workflows.
WorkflowWorklistItem is a row from river_workflow_worklist.
| Path | Synopsis |
|---|---|
| riverprodatabasesql module | |
| riverprodrivertest module | |
| riverpropgxv5 module |