River

driver

package module
v0.24.0 Latest
Published: May 19, 2026 License: Proprietary

Index

Constants

const (
	// MigrationLinePro is the migration line for River Pro features.
	MigrationLinePro = "pro"

	// MigrationLineSequence is the deprecated migration line for the sequence
	// feature. Users should migrate to use the unified pro line instead.
	//
	// Deprecated: Use MigrationLinePro instead.
	MigrationLineSequence = "sequence"

	// MigrationLineWorkflow is the deprecated migration line for the workflow
	// feature. Users should migrate to use the unified pro line instead.
	//
	// Deprecated: Use MigrationLinePro instead.
	MigrationLineWorkflow = "workflow"
)

Variables

var ErrDeadlock = &DeadlockError{}

ErrDeadlock is returned when the driver detects a database deadlock.

var ErrStatementTimeout = &StatementTimeoutError{}

ErrStatementTimeout is returned when a Pro driver detects a database statement timeout.

This keeps SQLSTATE and driver-specific error parsing in driver packages so higher-level workflow services can classify transient failures without depending on pgx/libpq details.

Functions

func MigrationLineProTruncateTables

func MigrationLineProTruncateTables(line string, version int) []string

MigrationLineProTruncateTables returns the tables that should be truncated for a given pro migration line at the given version. This is used only in tests so it panics in case a line or version isn't found, which is usually an indication that a new version neeeds to be added to the matrix below.

Types

type DeadlockError

type DeadlockError struct {
	Err error
}

DeadlockError wraps a driver-specific deadlock error.

func (*DeadlockError) Error

func (e *DeadlockError) Error() string

func (*DeadlockError) Is

func (e *DeadlockError) Is(target error) bool

func (*DeadlockError) Unwrap

func (e *DeadlockError) Unwrap() error

type JobDeadLetterDeleteByIDParams

type JobDeadLetterDeleteByIDParams struct {
	ID     int64
	Schema string
}

type JobDeadLetterGetAllParams

type JobDeadLetterGetAllParams struct {
	Schema string
}

type JobDeadLetterGetByIDParams

type JobDeadLetterGetByIDParams struct {
	ID     int64
	Schema string
}

type JobDeadLetterMoveByIDParams

type JobDeadLetterMoveByIDParams struct {
	ID     int64
	Schema string
}

type JobDeadLetterMoveDiscardedParams

type JobDeadLetterMoveDiscardedParams struct {
	DiscardedFinalizedAtHorizon time.Time
	ExcludeWorkflowJobs         bool
	Max                         int
	Schema                      string
}

type JobDeleteByIDManyParams

type JobDeleteByIDManyParams struct {
	ID     []int64
	Schema string
}

type JobDeleteNonWorkflowBeforeParams

type JobDeleteNonWorkflowBeforeParams struct {
	CancelledDoDelete           bool
	CancelledFinalizedAtHorizon time.Time
	CompletedDoDelete           bool
	CompletedFinalizedAtHorizon time.Time
	DiscardedDoDelete           bool
	DiscardedFinalizedAtHorizon time.Time
	Max                         int
	QueuesExcluded              []string
	QueuesIncluded              []string
	Schema                      string
}

type JobGetAvailableForBatchParams

type JobGetAvailableForBatchParams struct {
	AttemptedBy      string
	BatchID          string
	BatchKey         string
	BatchLeaderJobID int64
	Kind             string
	Max              int32
	Queue            string
	Schema           string
}

type JobGetAvailableLimitedParams

type JobGetAvailableLimitedParams struct {
	*riverdriver.JobGetAvailableParams

	AvailablePartitionKeys                []string
	CurrentProducerPartitionKeys          []string
	CurrentProducerPartitionRunningCounts []int32
	GlobalLimit                           int32
	LocalLimit                            int32
	PartitionByArgs                       []string
	PartitionByKind                       bool
}

type JobGetAvailablePartitionKeysParams

type JobGetAvailablePartitionKeysParams struct {
	Queue  string
	Schema string
}

type PeriodicJob

type PeriodicJob struct {
	ID        string
	CreatedAt time.Time
	NextRunAt time.Time
	UpdatedAt time.Time
}

type PeriodicJobGetAllParams

type PeriodicJobGetAllParams struct {
	Max                   int
	Schema                string
	StaleUpdatedAtHorizon time.Time
}

type PeriodicJobGetByIDParams

type PeriodicJobGetByIDParams struct {
	ID     string
	Schema string
}

type PeriodicJobInsertParams

type PeriodicJobInsertParams struct {
	ID        string
	NextRunAt time.Time
	Schema    string
	UpdatedAt *time.Time
}

type PeriodicJobKeepAliveAndReapParams

type PeriodicJobKeepAliveAndReapParams struct {
	ID                    []string
	Now                   *time.Time
	Schema                string
	StaleUpdatedAtHorizon time.Time
}

type PeriodicJobUpsertManyParams

type PeriodicJobUpsertManyParams struct {
	Jobs   []*PeriodicJobUpsertParams
	Schema string
}

type PeriodicJobUpsertParams

type PeriodicJobUpsertParams struct {
	ID        string
	NextRunAt time.Time
	UpdatedAt time.Time
}

type ProDriver

type ProDriver[TTx any] interface {
	riverdriver.Driver[TTx]

	GetProExecutor() ProExecutor
	ProConfigInit(pilot riverpilot.Pilot)
	UnwrapProExecutor(tx TTx) ProExecutorTx
}

type ProExecutor

type ProExecutor interface {
	riverdriver.Executor

	// BeginPro begins a new subtransaction returning a ProExecutorTx.
	// ErrSubTxNotSupported may be returned if the executor is a transaction and
	// the driver doesn't support subtransactions (like
	// riverdriver/riverdatabasesql for database/sql).
	BeginPro(ctx context.Context) (ProExecutorTx, error)

	JobDeadLetterDeleteByID(ctx context.Context, params *JobDeadLetterDeleteByIDParams) (*rivertype.JobRow, error)
	JobDeadLetterGetAll(ctx context.Context, params *JobDeadLetterGetAllParams) ([]*rivertype.JobRow, error)
	JobDeadLetterGetByID(ctx context.Context, params *JobDeadLetterGetByIDParams) (*rivertype.JobRow, error)
	JobDeadLetterMoveByID(ctx context.Context, params *JobDeadLetterMoveByIDParams) (*rivertype.JobRow, error)
	JobDeadLetterMoveDiscarded(ctx context.Context, params *JobDeadLetterMoveDiscardedParams) ([]*rivertype.JobRow, error)
	JobDeleteByIDMany(ctx context.Context, params *JobDeleteByIDManyParams) ([]*rivertype.JobRow, error)
	JobDeleteNonWorkflowBefore(ctx context.Context, params *JobDeleteNonWorkflowBeforeParams) (int, error)
	JobGetAvailableForBatch(ctx context.Context, params *JobGetAvailableForBatchParams) ([]*rivertype.JobRow, error)
	JobGetAvailableLimited(ctx context.Context, params *JobGetAvailableLimitedParams) ([]*rivertype.JobRow, error)
	JobGetAvailablePartitionKeys(ctx context.Context, params *JobGetAvailablePartitionKeysParams) ([]string, error)
	PGTryAdvisoryXactLock(ctx context.Context, key int64) (bool, error)
	PeriodicJobGetAll(ctx context.Context, params *PeriodicJobGetAllParams) ([]*PeriodicJob, error)
	PeriodicJobGetByID(ctx context.Context, params *PeriodicJobGetByIDParams) (*PeriodicJob, error)
	PeriodicJobInsert(ctx context.Context, params *PeriodicJobInsertParams) (*PeriodicJob, error)
	PeriodicJobKeepAliveAndReap(ctx context.Context, params *PeriodicJobKeepAliveAndReapParams) ([]*PeriodicJob, error)
	PeriodicJobUpsertMany(ctx context.Context, params *PeriodicJobUpsertManyParams) ([]*PeriodicJob, error)
	ProducerDelete(ctx context.Context, params *ProducerDeleteParams) error
	ProducerGetByID(ctx context.Context, params *ProducerGetByIDParams) (*Producer, error)
	QueueGetMetadataForInsert(ctx context.Context, params *QueueGetMetadataForInsertParams) ([]*QueueGetMetadataForInsertResult, error)
	ProducerInsertOrUpdate(ctx context.Context, params *ProducerInsertOrUpdateParams) (*Producer, error)
	ProducerKeepAlive(ctx context.Context, params *ProducerKeepAliveParams) (*Producer, error)
	ProducerListByQueue(ctx context.Context, params *ProducerListByQueueParams) ([]*ProducerListByQueueResult, error)
	ProducerUpdate(ctx context.Context, params *ProducerUpdateParams) (*Producer, error)
	SequenceAppendMany(ctx context.Context, params *SequenceAppendManyParams) (int, error)
	SequenceList(ctx context.Context, params *SequenceListParams) ([]*Sequence, error)
	// SequencePromote promotes the next pending head for the given keys using
	// SKIP LOCKED to avoid blocking on still-locked head rows. Returns promoted
	// and skipped keys; callers must INSERT skipped keys into
	// river_job_sequence to create a durable retry signal.
	SequencePromote(ctx context.Context, params *SequencePromoteParams) (*SequencePromoteResult, error)
	// SequencePromoteFromTable promotes sequences from the river_job_sequence
	// table and returns aggregate information about the promotion work done.
	SequencePromoteFromTable(ctx context.Context, params *SequencePromoteFromTableParams) (*SequencePromoteFromTableResult, error)
	SequenceScanAndPromoteStalled(ctx context.Context, params *SequenceScanAndPromoteStalledParams) (*SequenceScanAndPromoteStalledResult, error)

	TimeNow(ctx context.Context, params *TimeNowParams) (time.Time, error)

	WorkflowAttemptInsert(ctx context.Context, params *WorkflowAttemptInsertParams) (*WorkflowAttempt, error)
	WorkflowAttemptListByWorkflowID(ctx context.Context, params *WorkflowAttemptListByWorkflowIDParams) ([]*WorkflowAttempt, error)
	WorkflowAttemptTaskInsert(ctx context.Context, params *WorkflowAttemptTaskInsertParams) (*WorkflowAttemptTask, error)
	WorkflowAttemptTaskListByWorkflowID(ctx context.Context, params *WorkflowAttemptTaskListByWorkflowIDParams) ([]*WorkflowAttemptTask, error)
	WorkflowCancel(ctx context.Context, params *WorkflowCancelParams) ([]*rivertype.JobRow, error)
	WorkflowCancelWithDeletedDepsMany(ctx context.Context, params *WorkflowCancelWithDeletedDepsManyParams) (int64, error)
	WorkflowCancelWithFailedDepsMany(ctx context.Context, params *WorkflowCancelWithFailedDepsManyParams) (int64, error)
	WorkflowCleanupDeleteAttemptsByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
	WorkflowCleanupDeleteAttemptTasksByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
	WorkflowCleanupDeleteDeadLetterJobsByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
	WorkflowCleanupDeleteJobsByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
	WorkflowCleanupDeleteSignalsByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
	WorkflowCleanupDeleteTimersByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
	WorkflowCleanupDeleteWorkflowsByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteWorkflowsByWorkflowIDsParams) error
	WorkflowCleanupDeleteWorklistByWorkflowIDs(ctx context.Context, params *WorkflowCleanupDeleteByWorkflowIDsParams) error
	WorkflowCleanupListFinalizedIDs(ctx context.Context, params *WorkflowCleanupListFinalizedIDsParams) ([]string, error)
	WorkflowCleanupListFinalizedIDsWithoutJobs(ctx context.Context, params *WorkflowCleanupListFinalizedIDsWithoutJobsParams) ([]string, error)
	WorkflowCountIncompleteJobs(ctx context.Context, params *WorkflowCountIncompleteJobsParams) (int64, error)
	WorkflowFinalizeIfCompleteMany(ctx context.Context, params *WorkflowFinalizeIfCompleteManyParams) ([]string, error)
	WorkflowGetByID(ctx context.Context, params *WorkflowGetByIDParams) (*Workflow, error)
	WorkflowGetFinalizationCandidates(ctx context.Context, params *WorkflowGetFinalizationCandidatesParams) ([]string, error)
	WorkflowGetLegacyBackfillIDs(ctx context.Context, params *WorkflowGetLegacyBackfillIDsParams) ([]string, error)
	WorkflowHasWaitTasksMany(ctx context.Context, params *WorkflowHasWaitTasksManyParams) ([]string, error)
	WorkflowInitFromJobs(ctx context.Context, params *WorkflowInitFromJobsParams) ([]string, error)
	WorkflowInsertMany(ctx context.Context, params *WorkflowInsertManyParams) error
	WorkflowJobGetByTaskName(ctx context.Context, params *WorkflowJobGetByTaskNameParams) (*rivertype.JobRow, error)
	WorkflowJobList(ctx context.Context, params *WorkflowJobListParams) ([]*WorkflowTaskWithJob, error)
	WorkflowListActive(ctx context.Context, params *WorkflowListParams) ([]*WorkflowListItem, error)
	WorkflowListAll(ctx context.Context, params *WorkflowListParams) ([]*WorkflowListItem, error)
	WorkflowListByIDs(ctx context.Context, params *WorkflowListByIDsParams) ([]string, error)
	WorkflowListByIDsForWaitEval(ctx context.Context, params *WorkflowListByIDsForWaitEvalParams) ([]*WorkflowWaitWorkflow, error)
	WorkflowListInactive(ctx context.Context, params *WorkflowListParams) ([]*WorkflowListItem, error)
	WorkflowLoadDepTasksAndIDs(ctx context.Context, params *WorkflowLoadDepTasksAndIDsParams) (map[string]*int64, error)
	WorkflowLoadJobsWithDeps(ctx context.Context, params *WorkflowLoadJobsWithDepsParams) ([]*WorkflowTaskWithJob, error)
	WorkflowLoadTaskWithDeps(ctx context.Context, params *WorkflowLoadTaskWithDepsParams) (*WorkflowTaskWithJob, error)
	WorkflowLoadTaskNamesByWorkflowID(ctx context.Context, params *WorkflowLoadTaskNamesByWorkflowIDParams) ([]string, error)
	WorkflowLoadTasksByNames(ctx context.Context, params *WorkflowLoadTasksByNamesParams) ([]*WorkflowTask, error)
	WorkflowLockByIDsSkipLocked(ctx context.Context, params *WorkflowLockByIDsSkipLockedParams) ([]string, error)
	WorkflowReadyTaskIDsByWorkflowIDs(ctx context.Context, params *WorkflowReadyTaskIDsByWorkflowIDsParams) ([]*WorkflowReadyTaskIDsByWorkflowIDsRow, error)
	WorkflowRetry(ctx context.Context, params *WorkflowRetryParams) ([]*rivertype.JobRow, error)
	WorkflowRetryLockAndCheckRunning(ctx context.Context, params *WorkflowRetryLockAndCheckRunningParams) (*WorkflowRetryLockAndCheckRunningResult, error)
	WorkflowSignalInsert(ctx context.Context, params *WorkflowSignalInsertParams) (*WorkflowSignalInsertResult, error)
	WorkflowSignalList(ctx context.Context, params *WorkflowSignalListParams) ([]*WorkflowSignal, error)
	WorkflowSignalListByEvidence(ctx context.Context, params *WorkflowSignalListByEvidenceParams) ([]*WorkflowSignal, error)
	WorkflowSignalListByKeys(ctx context.Context, params *WorkflowSignalListByKeysParams) ([]*WorkflowSignal, error)
	WorkflowSignalListByWorkflowIDs(ctx context.Context, params *WorkflowSignalListByWorkflowIDsParams) ([]*WorkflowSignal, error)
	WorkflowSignalStatsByWorkflowIDs(ctx context.Context, params *WorkflowSignalStatsByWorkflowIDsParams) ([]*WorkflowSignalStat, error)
	// WorkflowStageJobsByIDMany promotes a set of candidate workflow task jobs by job ID.
	//
	// Important contract:
	// - This method DOES NOT evaluate dependencies. Callers must first compute which
	//   task jobs are eligible to be staged (e.g. via WorkflowReadyTaskIDsByWorkflowIDs).
	// - This method enforces waits: if a job has
	//   `river:workflow_wait` metadata, it will only be promoted
	//   after
	//   `river:workflow_wait_state.resolved_at` is set. Callers should generally:
	//   1) call WorkflowWaitActivatableTaskIDsByWorkflowIDs to find waiting
	//      pending tasks whose deps are satisfied but whose wait state
	//      is not active,
	//   2) call WorkflowWaitActivateByJobIDMany on those IDs to set `started_at`
	//      and timer `fire_at`, then
	//   3) call WorkflowStageJobsByIDMany with ready IDs to promote only tasks
	//      with no wait or tasks whose wait is already resolved.
	WorkflowStageJobsByIDMany(ctx context.Context, params *WorkflowStageJobsByIDManyParams) ([]*rivertype.JobRow, error)
	WorkflowTimerConsumeDue(ctx context.Context, params *WorkflowTimerConsumeDueParams) ([]*WorkflowTimer, error)
	WorkflowTimerDeleteByWorkflowIDs(ctx context.Context, params *WorkflowTimerDeleteByWorkflowIDsParams) error
	WorkflowTimerGetByWorkflowID(ctx context.Context, params *WorkflowTimerGetByWorkflowIDParams) (*WorkflowTimer, error)
	WorkflowTimerNextFireAtByWorkflowIDs(ctx context.Context, params *WorkflowTimerNextFireAtByWorkflowIDsParams) ([]*WorkflowTimerNextFireAtByWorkflowIDsRow, error)
	WorkflowTimerUpsertMany(ctx context.Context, params *WorkflowTimerUpsertManyParams) error
	WorkflowUnfinalizeIfActiveJobsMany(ctx context.Context, params *WorkflowUnfinalizeIfActiveJobsManyParams) ([]string, error)
	WorkflowWaitActivatableTaskIDsByWorkflowIDs(ctx context.Context, params *WorkflowWaitActivatableTaskIDsByWorkflowIDsParams) ([]*WorkflowWaitActivatableTaskIDsByWorkflowIDsRow, error)
	WorkflowWaitActivateByJobIDMany(ctx context.Context, params *WorkflowWaitActivateByJobIDManyParams) ([]int64, error)
	WorkflowWaitActiveTaskListByWorkflowIDs(ctx context.Context, params *WorkflowWaitActiveTaskListByWorkflowIDsParams) ([]*WorkflowWaitActiveTask, error)
	WorkflowWaitDepOutputListByWorkflowTaskPairs(ctx context.Context, params *WorkflowWaitDepOutputListByWorkflowTaskPairsParams) ([]*WorkflowWaitDepOutput, error)
	WorkflowWaitEvalCursorUpdateByWorkflowIDMany(ctx context.Context, params *WorkflowWaitEvalCursorUpdateByWorkflowIDManyParams) error
	WorkflowWaitUpdateMetadataByJobIDMany(ctx context.Context, params *WorkflowWaitUpdateMetadataByJobIDManyParams) error
	WorkflowWorklistDeleteByWorkflowIDsReturningReasons(ctx context.Context, params *WorkflowWorklistDeleteByWorkflowIDsReturningReasonsParams) ([]*WorkflowWorklistDeleteByWorkflowIDsReturningReasonsRow, error)
	WorkflowWorklistInsertMany(ctx context.Context, params *WorkflowWorklistInsertManyParams) error
	WorkflowWorklistListIDs(ctx context.Context, params *WorkflowWorklistListParams) ([]*WorkflowWorklistIDItem, error)
	WorkflowWorklistList(ctx context.Context, params *WorkflowWorklistListParams) ([]*WorkflowWorklistItem, error)
}

type ProExecutorTx

type ProExecutorTx interface {
	ProExecutor
	riverdriver.ExecutorTx
}

type Producer

type Producer struct {
	ID         int64
	ClientID   string
	QueueName  string
	MaxWorkers int32
	Metadata   []byte
	PausedAt   *time.Time
	CreatedAt  time.Time
	UpdatedAt  time.Time
}

type ProducerDeleteParams

type ProducerDeleteParams struct {
	ID     int64
	Schema string
}

type ProducerGetByIDParams

type ProducerGetByIDParams struct {
	ID     int64
	Schema string
}

type ProducerInsertOrUpdateParams

type ProducerInsertOrUpdateParams struct {
	ID         int64
	ClientID   string
	CreatedAt  *time.Time
	MaxWorkers int32
	Metadata   []byte
	PausedAt   *time.Time
	QueueName  string
	Schema     string
	UpdatedAt  *time.Time
}

type ProducerKeepAliveParams

type ProducerKeepAliveParams struct {
	ID                    int64
	QueueName             string
	Schema                string
	StaleUpdatedAtHorizon time.Time
}

type ProducerListByQueueParams

type ProducerListByQueueParams struct {
	QueueName string
	Schema    string
}

type ProducerListByQueueResult

type ProducerListByQueueResult struct {
	Producer *Producer
	Running  int32
}

type ProducerUpdateParams

type ProducerUpdateParams struct {
	ID                 int64
	MaxWorkers         int32
	MaxWorkersDoUpdate bool
	Metadata           []byte
	MetadataDoUpdate   bool
	PausedAt           *time.Time
	PausedAtDoUpdate   bool
	Schema             string
	UpdatedAt          *time.Time
}

type QueueGetMetadataForInsertParams

type QueueGetMetadataForInsertParams struct {
	Names  []string
	Schema string
}

type QueueGetMetadataForInsertResult

type QueueGetMetadataForInsertResult struct {
	Name        string
	Concurrency []byte
}

type Sequence

type Sequence struct {
	ID        int64
	Key       string
	CreatedAt time.Time
}

type SequenceAppendManyParams

type SequenceAppendManyParams struct {
	Schema  string
	SeqKeys []string
}

type SequenceListParams

type SequenceListParams struct {
	MaxCount int
	Schema   string
}

type SequencePromoteFromTableParams

type SequencePromoteFromTableParams struct {
	GracePeriod time.Duration
	Max         int
	Now         *time.Time
	Schema      string
}

type SequencePromoteFromTableResult

type SequencePromoteFromTableResult struct {
	Continue    bool
	NumDeleted  int
	NumPromoted int
}

type SequencePromoteParams

type SequencePromoteParams struct {
	GracePeriod time.Duration
	Keys        []string
	Now         *time.Time
	Schema      string
}

type SequencePromoteResult

type SequencePromoteResult struct {
	PromotedKeys []string
	SkippedKeys  []string
}

type SequenceScanAndPromoteStalledParams

type SequenceScanAndPromoteStalledParams struct {
	GracePeriod     time.Duration
	LastSequenceKey string
	Max             int
	Now             *time.Time
	Schema          string
}

type SequenceScanAndPromoteStalledResult

type SequenceScanAndPromoteStalledResult struct {
	Continue       bool
	LastSeqKey     string
	SkippedSeqKeys []string
}

type StatementTimeoutError

type StatementTimeoutError struct {
	Err error
}

StatementTimeoutError wraps a driver-specific statement timeout error.

func (*StatementTimeoutError) Error

func (e *StatementTimeoutError) Error() string

func (*StatementTimeoutError) Is

func (e *StatementTimeoutError) Is(target error) bool

func (*StatementTimeoutError) Unwrap

func (e *StatementTimeoutError) Unwrap() error

type TimeNowParams

type TimeNowParams struct {
	Now *time.Time
}

type UniqueViolationError

type UniqueViolationError struct {
	ConstraintName string
	Detail         string
	Err            error
	KeyValues      map[string]string
	SQLState       string
}

UniqueViolationError wraps a driver-specific unique-violation error.

func (*UniqueViolationError) Error

func (e *UniqueViolationError) Error() string

func (*UniqueViolationError) Is

func (e *UniqueViolationError) Is(target error) bool

func (*UniqueViolationError) Unwrap

func (e *UniqueViolationError) Unwrap() error

type Workflow

type Workflow struct {
	CreatedAt           time.Time
	CurrentAttempt      int
	FinalizedAt         *time.Time
	ID                  string
	Metadata            []byte
	Name                *string
	State               string
	UpdatedAt           time.Time
	WaitEvalCursorJobID *int64
}

Workflow is a row from river_workflow.

type WorkflowAttempt

type WorkflowAttempt struct {
	Attempt      int
	CreatedAt    time.Time
	ResetHistory bool
	RetryMode    string
	TriggeredBy  []byte
	WorkflowID   string
}

type WorkflowAttemptInsertParams

type WorkflowAttemptInsertParams struct {
	Attempt      int
	ResetHistory bool
	RetryMode    string
	Schema       string
	TriggeredBy  []byte
	WorkflowID   string
}

type WorkflowAttemptListByWorkflowIDParams

type WorkflowAttemptListByWorkflowIDParams struct {
	Schema     string
	WorkflowID string
}

type WorkflowAttemptTask

type WorkflowAttemptTask struct {
	Attempt      int
	AttemptCount int
	Errors       []rivertype.AttemptError
	FinalizedAt  *time.Time
	JobID        int64
	Metadata     []byte
	State        string
	Task         string
	WorkflowID   string
}

type WorkflowAttemptTaskInsertParams

type WorkflowAttemptTaskInsertParams struct {
	Attempt      int
	AttemptCount int
	Errors       [][]byte
	FinalizedAt  *time.Time
	JobID        int64
	Metadata     []byte
	Schema       string
	State        string
	Task         string
	WorkflowID   string
}

type WorkflowAttemptTaskListByWorkflowIDParams

type WorkflowAttemptTaskListByWorkflowIDParams struct {
	Attempt    int
	Schema     string
	WorkflowID string
}

type WorkflowCancelParams

type WorkflowCancelParams struct {
	CancelAttemptedAt time.Time
	ControlTopic      string
	Schema            string
	WorkflowID        string
}

type WorkflowCancelWithDeletedDepsManyParams

type WorkflowCancelWithDeletedDepsManyParams struct {
	Schema               string
	WorkflowDepsFailedAt time.Time
	WorkflowIDs          []string
}

type WorkflowCancelWithFailedDepsManyParams

type WorkflowCancelWithFailedDepsManyParams struct {
	Schema               string
	WorkflowDepsFailedAt time.Time
	WorkflowIDs          []string
}

type WorkflowCleanupDeleteByWorkflowIDsParams

type WorkflowCleanupDeleteByWorkflowIDsParams struct {
	Schema      string
	WorkflowIDs []string
}

type WorkflowCleanupDeleteWorkflowsByWorkflowIDsParams

type WorkflowCleanupDeleteWorkflowsByWorkflowIDsParams struct {
	Schema      string
	State       string
	WorkflowIDs []string
}

type WorkflowCleanupListFinalizedIDsParams

type WorkflowCleanupListFinalizedIDsParams struct {
	FinalizedBefore time.Time
	LimitCount      int
	Schema          string
	State           string
}

type WorkflowCleanupListFinalizedIDsWithoutJobsParams

type WorkflowCleanupListFinalizedIDsWithoutJobsParams struct {
	LimitCount int
	Schema     string
	State      string
}

type WorkflowCountIncompleteJobsParams

type WorkflowCountIncompleteJobsParams struct {
	Schema          string
	SupervisorJobID int64
	WorkflowID      string
}

type WorkflowFinalizeIfCompleteManyParams

type WorkflowFinalizeIfCompleteManyParams struct {
	Now         time.Time
	Schema      string
	WorkflowIDs []string
}

type WorkflowGetByIDParams

type WorkflowGetByIDParams struct {
	Schema     string
	WorkflowID string
}

WorkflowGetByIDParams selects a workflow row by ID.

type WorkflowGetFinalizationCandidatesParams

type WorkflowGetFinalizationCandidatesParams struct {
	AfterWorkflowID string
	LimitCount      int32
	Schema          string
}

type WorkflowGetLegacyBackfillIDsParams

type WorkflowGetLegacyBackfillIDsParams struct {
	AfterWorkflowID string
	LimitCount      int32
	Schema          string
}

type WorkflowHasWaitTasksManyParams

type WorkflowHasWaitTasksManyParams struct {
	Schema      string
	WorkflowIDs []string
}

WorkflowHasWaitTasksManyParams checks whether workflows have pending waiting tasks.

type WorkflowInitFromJobsParams

type WorkflowInitFromJobsParams struct {
	Schema      string
	WorkflowIDs []string
}

WorkflowInitFromJobsParams inserts workflow rows based on existing job metadata.

type WorkflowInsertManyParams

type WorkflowInsertManyParams struct {
	IDs    []string
	Names  []string
	Schema string
}

WorkflowInsertManyParams inserts workflow rows if they do not already exist.

type WorkflowJobGetByTaskNameParams

type WorkflowJobGetByTaskNameParams struct {
	Schema     string
	TaskName   string
	WorkflowID string
}

type WorkflowJobListParams

type WorkflowJobListParams struct {
	PaginationLimit  int
	PaginationOffset int
	Schema           string
	WorkflowID       string
}

type WorkflowListByIDsForWaitEvalParams

type WorkflowListByIDsForWaitEvalParams struct {
	Schema      string
	WorkflowIDs []string
}

WorkflowListByIDsForWaitEvalParams lists workflow rows needed for wait evaluation.

type WorkflowListByIDsParams

type WorkflowListByIDsParams struct {
	Schema      string
	WorkflowIDs []string
}

WorkflowListByIDsParams lists workflow rows by ID.

type WorkflowListItem

type WorkflowListItem struct {
	CountAvailable  int
	CountCancelled  int
	CountCompleted  int
	CountDiscarded  int
	CountFailedDeps int
	CountPending    int
	CountRetryable  int
	CountRunning    int
	CountScheduled  int
	CreatedAt       time.Time
	ID              string
	Name            *string
}

type WorkflowListParams

type WorkflowListParams struct {
	After           string
	Before          string
	PaginationLimit int
	Schema          string
}

type WorkflowLoadDepTasksAndIDsParams

type WorkflowLoadDepTasksAndIDsParams struct {
	Recursive  bool
	Schema     string
	Task       string
	WorkflowID string
}

type WorkflowLoadJobsWithDepsParams

type WorkflowLoadJobsWithDepsParams struct {
	JobIds []int64
	Schema string
}

type WorkflowLoadTaskNamesByWorkflowIDParams

type WorkflowLoadTaskNamesByWorkflowIDParams struct {
	Schema     string
	WorkflowID string
}

type WorkflowLoadTaskWithDepsParams

type WorkflowLoadTaskWithDepsParams struct {
	Schema     string
	Task       string
	WorkflowID string
}

type WorkflowLoadTasksByNamesParams

type WorkflowLoadTasksByNamesParams struct {
	Schema     string
	TaskNames  []string
	WorkflowID string
}

type WorkflowLockByIDsSkipLockedParams

type WorkflowLockByIDsSkipLockedParams struct {
	LimitCount  int
	Schema      string
	WorkflowIDs []string
}

WorkflowLockByIDsSkipLockedParams locks workflow rows if available.

WorkflowIDs are attempted in input order. LimitCount optionally caps how many rows are locked; values <= 0 mean "no cap".

type WorkflowReadyTaskIDsByWorkflowIDsParams

type WorkflowReadyTaskIDsByWorkflowIDsParams struct {
	LimitCount  int
	Schema      string
	WorkflowIDs []string
}

type WorkflowReadyTaskIDsByWorkflowIDsRow

type WorkflowReadyTaskIDsByWorkflowIDsRow struct {
	ID         int64
	TotalCount int64
	WorkflowID string
}

type WorkflowRetryLockAndCheckRunningParams

type WorkflowRetryLockAndCheckRunningParams struct {
	Schema     string
	WorkflowID string
}

type WorkflowRetryLockAndCheckRunningResult

type WorkflowRetryLockAndCheckRunningResult struct {
	WorkflowIsActive bool
}

type WorkflowRetryMode

type WorkflowRetryMode string
const (
	WorkflowRetryModeAll                 WorkflowRetryMode = "all"
	WorkflowRetryModeFailedOnly          WorkflowRetryMode = "failed_only"
	WorkflowRetryModeFailedAndDownstream WorkflowRetryMode = "failed_and_downstream"
)

type WorkflowRetryParams

type WorkflowRetryParams struct {
	Mode         WorkflowRetryMode
	Now          time.Time
	ResetHistory bool
	Schema       string
	WorkflowID   string
}

type WorkflowSignal

type WorkflowSignal struct {
	Attempt        int
	CreatedAt      time.Time
	ID             int64
	IdempotencyKey string
	Key            string
	Metadata       []byte
	Payload        []byte
	Source         []byte
	WorkflowID     string
}

WorkflowSignal is a signal sent to a workflow.

type WorkflowSignalInsertParams

type WorkflowSignalInsertParams struct {
	IdempotencyKey   string
	Key              string
	Metadata         []byte
	Payload          []byte
	RequestedAttempt *int
	Schema           string
	Source           []byte
	WorkflowID       string
}

WorkflowSignalInsertParams inserts a signal for a workflow.

type WorkflowSignalInsertResult

type WorkflowSignalInsertResult struct {
	WorkflowSignal

	// CurrentAttempt is the workflow's observed current attempt when the query
	// found a workflow row.
	CurrentAttempt int

	// PayloadSemanticEqual indicates whether the returned row payload is
	// semantically equal (Postgres jsonb equality) to the request payload.
	PayloadSemanticEqual bool

	// SignalPresent reports whether the insert query returned a real signal row.
	SignalPresent bool

	// SkippedAsDuplicate is true when an idempotent emit reused an existing
	// signal row instead of inserting a new one.
	SkippedAsDuplicate bool
}

WorkflowSignalInsertResult is the result of WorkflowSignalInsert.

type WorkflowSignalListByEvidenceParams

type WorkflowSignalListByEvidenceParams struct {
	Attempt               int
	CursorID              *int64
	Desc                  bool
	Keys                  []string
	LastIncludedSignalIDs []int64
	LimitCount            int
	Schema                string
	WorkflowID            string
}

WorkflowSignalListByEvidenceParams fetches signal rows bounded by persisted task evidence.

type WorkflowSignalListByKeysParams

type WorkflowSignalListByKeysParams struct {
	Attempt    *int
	CursorID   *int64
	Desc       bool
	Keys       []string
	LimitCount int
	Schema     string
	WorkflowID string
}

WorkflowSignalListByKeysParams fetches workflow signals by declared keys.

type WorkflowSignalListByWorkflowIDsParams

type WorkflowSignalListByWorkflowIDsParams struct {
	Attempt     int
	Keys        []string
	Schema      string
	WorkflowIDs []string
}

WorkflowSignalListByWorkflowIDsParams fetches only the signal rows relevant to wait evaluation for the given workflows and current attempt.

type WorkflowSignalListParams

type WorkflowSignalListParams struct {
	Attempt    *int
	CursorID   *int64
	Desc       bool
	Key        *string
	LimitCount int
	Schema     string
	WorkflowID string
}

WorkflowSignalListParams fetches signals for a workflow with pagination.

type WorkflowSignalStat

type WorkflowSignalStat struct {
	Key          string
	LastSignalID int64
	SignalCount  int64
	WorkflowID   string
}

WorkflowSignalStat summarizes signals for one workflow/key pair.

type WorkflowSignalStatsByWorkflowIDsParams

type WorkflowSignalStatsByWorkflowIDsParams struct {
	Attempt     int
	Keys        []string
	Schema      string
	WorkflowIDs []string
}

WorkflowSignalStatsByWorkflowIDsParams fetches signal aggregates relevant to wait evaluation for the given workflows and current attempt.

type WorkflowStageJobsByIDManyParams

type WorkflowStageJobsByIDManyParams struct {
	JobIDs           []int64
	Schema           string
	WorkflowStagedAt time.Time
}

type WorkflowTask

type WorkflowTask struct {
	Deps       []string
	ID         int64
	State      rivertype.JobState
	Task       string
	WorkflowID string
}

type WorkflowTaskWithJob

type WorkflowTaskWithJob struct {
	Deps                []string
	IgnoreCancelledDeps bool
	IgnoreDeletedDeps   bool
	IgnoreDiscardedDeps bool
	Job                 *rivertype.JobRow
	Task                string
	WorkflowID          string
}

type WorkflowTimer

type WorkflowTimer struct {
	NextFireAt time.Time
	WorkflowID string
}

WorkflowTimer is a row from river_workflow_timer.

type WorkflowTimerConsumeDueParams

type WorkflowTimerConsumeDueParams struct {
	AsOf       time.Time
	LimitCount int
	Schema     string
}

WorkflowTimerConsumeDueParams atomically consumes due workflow timer rows.

type WorkflowTimerDeleteByWorkflowIDsParams

type WorkflowTimerDeleteByWorkflowIDsParams struct {
	Schema      string
	WorkflowIDs []string
}

WorkflowTimerDeleteByWorkflowIDsParams deletes workflow timer rows.

type WorkflowTimerGetByWorkflowIDParams

type WorkflowTimerGetByWorkflowIDParams struct {
	Schema     string
	WorkflowID string
}

WorkflowTimerGetByWorkflowIDParams gets a workflow timer row by workflow ID.

type WorkflowTimerNextFireAtByWorkflowIDsParams

type WorkflowTimerNextFireAtByWorkflowIDsParams struct {
	Now         time.Time
	Schema      string
	WorkflowIDs []string
}

WorkflowTimerNextFireAtByWorkflowIDsParams finds next pending timer deadlines for workflows.

type WorkflowTimerNextFireAtByWorkflowIDsRow

type WorkflowTimerNextFireAtByWorkflowIDsRow struct {
	NextFireAt time.Time
	WorkflowID string
}

type WorkflowTimerUpsertManyParams

type WorkflowTimerUpsertManyParams struct {
	NextFireAts []time.Time
	Schema      string
	WorkflowIDs []string
}

WorkflowTimerUpsertManyParams creates or updates workflow timers.

type WorkflowUnfinalizeIfActiveJobsManyParams

type WorkflowUnfinalizeIfActiveJobsManyParams struct {
	Now         time.Time
	Schema      string
	WorkflowIDs []string
}

WorkflowUnfinalizeIfActiveJobsManyParams reopens finalized workflows with active jobs.

type WorkflowWaitActivatableTaskIDsByWorkflowIDsParams

type WorkflowWaitActivatableTaskIDsByWorkflowIDsParams struct {
	LimitCount  int
	Schema      string
	WorkflowIDs []string
}

type WorkflowWaitActivatableTaskIDsByWorkflowIDsRow

type WorkflowWaitActivatableTaskIDsByWorkflowIDsRow struct {
	ID         int64
	TotalCount int64
	WorkflowID string
}

type WorkflowWaitActivateByJobIDManyParams

type WorkflowWaitActivateByJobIDManyParams struct {
	JobIDs []int64
	Now    time.Time
	Schema string
}

WorkflowWaitActivateByJobIDManyParams activates waiting tasks by job IDs.

type WorkflowWaitActiveTask

type WorkflowWaitActiveTask struct {
	ID         int64
	Metadata   []byte
	TotalCount int64
	WorkflowID string
}

WorkflowWaitActiveTask is an active waiting-task row for evaluation.

type WorkflowWaitActiveTaskListByWorkflowIDsParams

type WorkflowWaitActiveTaskListByWorkflowIDsParams struct {
	LimitCount  int
	Schema      string
	WorkflowIDs []string
}

WorkflowWaitActiveTaskListByWorkflowIDsParams lists active waiting tasks.

type WorkflowWaitDepOutput

type WorkflowWaitDepOutput struct {
	FinalizedAt *time.Time
	Output      []byte
	State       *rivertype.JobState
	Task        string
	WorkflowID  string
}

WorkflowWaitDepOutput is a dep output row for wait evaluation.

type WorkflowWaitDepOutputListByWorkflowTaskPairsParams

type WorkflowWaitDepOutputListByWorkflowTaskPairsParams struct {
	Schema      string
	Tasks       []string
	WorkflowIDs []string
}

WorkflowWaitDepOutputListByWorkflowTaskPairsParams lists dep outputs.

type WorkflowWaitEvalCursorUpdateByWorkflowIDManyParams

type WorkflowWaitEvalCursorUpdateByWorkflowIDManyParams struct {
	CursorJobIDs []int64
	Schema       string
	WorkflowIDs  []string
}

WorkflowWaitEvalCursorUpdateByWorkflowIDManyParams updates wait eval cursors.

type WorkflowWaitUpdateMetadataByJobIDManyParams

type WorkflowWaitUpdateMetadataByJobIDManyParams struct {
	JobIDs     []int64
	Schema     string
	WaitStates [][]byte
}

WorkflowWaitUpdateMetadataByJobIDManyParams updates wait state on jobs using `jsonb_set` to surgically update only the `river:workflow_wait_state` key, preserving all other metadata.

type WorkflowWaitWorkflow

type WorkflowWaitWorkflow struct {
	CreatedAt      time.Time
	CurrentAttempt int
	ID             string
	Metadata       []byte
}

WorkflowWaitWorkflow is workflow metadata used during wait evaluation.

type WorkflowWorklistDeleteByWorkflowIDsReturningReasonsParams

type WorkflowWorklistDeleteByWorkflowIDsReturningReasonsParams struct {
	Schema      string
	WorkflowIDs []string
}

WorkflowWorklistDeleteByWorkflowIDsReturningReasonsParams deletes worklist rows and returns reasons.

type WorkflowWorklistDeleteByWorkflowIDsReturningReasonsRow

type WorkflowWorklistDeleteByWorkflowIDsReturningReasonsRow struct {
	Reason     int16
	WorkflowID string
}

type WorkflowWorklistIDItem

type WorkflowWorklistIDItem struct {
	ID         int64
	WorkflowID string
}

WorkflowWorklistIDItem is a lightweight worklist row used for prefix scans.

type WorkflowWorklistInsertManyParams

type WorkflowWorklistInsertManyParams struct {
	Reason      int16
	Schema      string
	WorkflowIDs []string
}

WorkflowWorklistInsertManyParams inserts worklist rows for workflows.

type WorkflowWorklistItem

type WorkflowWorklistItem struct {
	CreatedAt  time.Time
	ID         int64
	Reason     int16
	WorkflowID string
}

WorkflowWorklistItem is a row from river_workflow_worklist.

type WorkflowWorklistListParams

type WorkflowWorklistListParams struct {
	AfterID    int64
	LimitCount int
	Schema     string
}

WorkflowWorklistListParams fetches a batch of worklist rows.

Directories

riverprodatabasesql module
riverprodrivertest module
riverpropgxv5 module