River

driver

package module
v0.19.0 Go to latest
Published: Oct 8, 2025 License: Proprietary

Index

Constants

const (
	// MigrationLinePro is the migration line for River Pro features.
	MigrationLinePro = "pro"

	// MigrationLineSequence is the deprecated migration line for the sequence
	// feature. Users should migrate to use the unified pro line instead.
	//
	// Deprecated: Use MigrationLinePro instead.
	MigrationLineSequence = "sequence"

	// MigrationLineWorkflow is the deprecated migration line for the workflow
	// feature. Users should migrate to use the unified pro line instead.
	//
	// Deprecated: Use MigrationLinePro instead.
	MigrationLineWorkflow = "workflow"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type GetPendingWorkflowIDsParams

type GetPendingWorkflowIDsParams struct {
	LastWorkflowID string
	LimitCount     int32
	Schema         string
}

type GetPendingWorkflowIDsRow

type GetPendingWorkflowIDsRow struct {
	JobID      int64
	WorkflowID string
}

type JobDeadLetterDeleteByIDParams

type JobDeadLetterDeleteByIDParams struct {
	ID     int64
	Schema string
}

type JobDeadLetterGetAllParams

type JobDeadLetterGetAllParams struct {
	Schema string
}

type JobDeadLetterGetByIDParams

type JobDeadLetterGetByIDParams struct {
	ID     int64
	Schema string
}

type JobDeadLetterMoveByIDParams

type JobDeadLetterMoveByIDParams struct {
	ID     int64
	Schema string
}

type JobDeadLetterMoveDiscardedParams

type JobDeadLetterMoveDiscardedParams struct {
	DiscardedFinalizedAtHorizon time.Time
	Max                         int
	Schema                      string
}

type JobDeleteByIDManyParams

type JobDeleteByIDManyParams struct {
	ID     []int64
	Schema string
}

type JobGetAvailableForBatchParams

type JobGetAvailableForBatchParams struct {
	AttemptedBy      string
	BatchID          string
	BatchKey         string
	BatchLeaderJobID int64
	Kind             string
	Max              int32
	Queue            string
	Schema           string
}

type JobGetAvailableLimitedParams

type JobGetAvailableLimitedParams struct {
	*riverdriver.JobGetAvailableParams

	AvailablePartitionKeys                []string
	CurrentProducerPartitionKeys          []string
	CurrentProducerPartitionRunningCounts []int32
	GlobalLimit                           int32
	LocalLimit                            int32
	PartitionByArgs                       []string
	PartitionByKind                       bool
}

type JobGetAvailablePartitionKeysParams

type JobGetAvailablePartitionKeysParams struct {
	Queue  string
	Schema string
}

type JobGetByWorkflowIDAndTaskNameParams

type JobGetByWorkflowIDAndTaskNameParams struct {
	Schema     string
	TaskName   string
	WorkflowID string
}

type PeriodicJob

type PeriodicJob struct {
	ID        string
	CreatedAt time.Time
	NextRunAt time.Time
	UpdatedAt time.Time
}

type PeriodicJobGetAllParams

type PeriodicJobGetAllParams struct {
	Max                   int
	Schema                string
	StaleUpdatedAtHorizon time.Time
}

type PeriodicJobGetByIDParams

type PeriodicJobGetByIDParams struct {
	ID     string
	Schema string
}

type PeriodicJobInsertParams

type PeriodicJobInsertParams struct {
	ID        string
	NextRunAt time.Time
	Schema    string
	UpdatedAt *time.Time
}

type PeriodicJobKeepAliveAndReapParams

type PeriodicJobKeepAliveAndReapParams struct {
	ID                    []string
	Now                   *time.Time
	Schema                string
	StaleUpdatedAtHorizon time.Time
}

type PeriodicJobUpsertManyParams

type PeriodicJobUpsertManyParams struct {
	Jobs   []*PeriodicJobUpsertParams
	Schema string
}

type PeriodicJobUpsertParams

type PeriodicJobUpsertParams struct {
	ID        string
	NextRunAt time.Time
	UpdatedAt time.Time
}

type ProDriver

type ProDriver[TTx any] interface {
	riverdriver.Driver[TTx]

	GetProExecutor() ProExecutor
	ProConfigInit(pilot riverpilot.Pilot)
	UnwrapProExecutor(tx TTx) ProExecutorTx
}

type ProExecutor

type ProExecutor interface {
	riverdriver.Executor

	// BeginPro begins a new subtransaction returning a ProExecutorTx.
	// ErrSubTxNotSupported may be returned if the executor is a transaction and
	// the driver doesn't support subtransactions (like
	// riverdriver/riverdatabasesql for database/sql).
	BeginPro(ctx context.Context) (ProExecutorTx, error)

	GetPendingWorkflowIDs(ctx context.Context, params *GetPendingWorkflowIDsParams) ([]*GetPendingWorkflowIDsRow, error)
	JobDeadLetterDeleteByID(ctx context.Context, params *JobDeadLetterDeleteByIDParams) (*rivertype.JobRow, error)
	JobDeadLetterGetAll(ctx context.Context, params *JobDeadLetterGetAllParams) ([]*rivertype.JobRow, error)
	JobDeadLetterGetByID(ctx context.Context, params *JobDeadLetterGetByIDParams) (*rivertype.JobRow, error)
	JobDeadLetterMoveByID(ctx context.Context, params *JobDeadLetterMoveByIDParams) (*rivertype.JobRow, error)
	JobDeadLetterMoveDiscarded(ctx context.Context, params *JobDeadLetterMoveDiscardedParams) ([]*rivertype.JobRow, error)
	JobDeleteByIDMany(ctx context.Context, params *JobDeleteByIDManyParams) ([]*rivertype.JobRow, error)
	JobGetAvailableForBatch(ctx context.Context, params *JobGetAvailableForBatchParams) ([]*rivertype.JobRow, error)
	JobGetAvailableLimited(ctx context.Context, params *JobGetAvailableLimitedParams) ([]*rivertype.JobRow, error)
	JobGetAvailablePartitionKeys(ctx context.Context, params *JobGetAvailablePartitionKeysParams) ([]string, error)
	JobGetByWorkflowIDAndTaskName(ctx context.Context, params *JobGetByWorkflowIDAndTaskNameParams) (*rivertype.JobRow, error)
	PGTryAdvisoryXactLock(ctx context.Context, key int64) (bool, error)
	PeriodicJobGetAll(ctx context.Context, params *PeriodicJobGetAllParams) ([]*PeriodicJob, error)
	PeriodicJobGetByID(ctx context.Context, params *PeriodicJobGetByIDParams) (*PeriodicJob, error)
	PeriodicJobInsert(ctx context.Context, params *PeriodicJobInsertParams) (*PeriodicJob, error)
	PeriodicJobKeepAliveAndReap(ctx context.Context, params *PeriodicJobKeepAliveAndReapParams) ([]*PeriodicJob, error)
	PeriodicJobUpsertMany(ctx context.Context, params *PeriodicJobUpsertManyParams) ([]*PeriodicJob, error)
	ProducerDelete(ctx context.Context, params *ProducerDeleteParams) error
	ProducerGetByID(ctx context.Context, params *ProducerGetByIDParams) (*Producer, error)
	QueueGetMetadataForInsert(ctx context.Context, params *QueueGetMetadataForInsertParams) ([]*QueueGetMetadataForInsertResult, error)
	ProducerInsertOrUpdate(ctx context.Context, params *ProducerInsertOrUpdateParams) (*Producer, error)
	ProducerKeepAlive(ctx context.Context, params *ProducerKeepAliveParams) (*Producer, error)
	ProducerListByQueue(ctx context.Context, params *ProducerListByQueueParams) ([]*ProducerListByQueueResult, error)
	ProducerUpdate(ctx context.Context, params *ProducerUpdateParams) (*Producer, error)
	SequenceAppendMany(ctx context.Context, params *SequenceAppendManyParams) (int, error)
	SequenceList(ctx context.Context, params *SequenceListParams) ([]*Sequence, error)
	// SequencePromote promotes the sequences with the given keys. It returns the
	// keys of the sequences which were actually promoted.
	SequencePromote(ctx context.Context, params *SequencePromoteParams) ([]string, error)
	// SequencePromoteFromTable promotes sequences from the river_job_sequence
	// table. It returns the keys of the sequences which were actually promoted.
	SequencePromoteFromTable(ctx context.Context, params *SequencePromoteFromTableParams) (*SequencePromoteFromTableResult, error)
	SequenceScanAndPromoteStalled(ctx context.Context, params *SequenceScanAndPromoteStalledParams) (*SequenceScanAndPromoteStalledResult, error)
	StageWorkflowJobsMany(ctx context.Context, params *StageWorkflowJobsManyParams) ([]*rivertype.JobRow, error)
	WorkflowCancel(ctx context.Context, params *WorkflowCancelParams) ([]*rivertype.JobRow, error)
	WorkflowCancelWithFailedDepsMany(ctx context.Context, params *WorkflowCancelWithFailedDepsManyParams) ([]*rivertype.JobRow, error)
	WorkflowJobList(ctx context.Context, params *WorkflowJobListParams) ([]*rivertype.JobRow, error)
	WorkflowListActive(ctx context.Context, params *WorkflowListParams) ([]*WorkflowListItem, error)
	WorkflowListAll(ctx context.Context, params *WorkflowListParams) ([]*WorkflowListItem, error)
	WorkflowListInactive(ctx context.Context, params *WorkflowListParams) ([]*WorkflowListItem, error)
	WorkflowLoadDepTasksAndIDs(ctx context.Context, params *WorkflowLoadDepTasksAndIDsParams) (map[string]*int64, error)
	WorkflowLoadJobsWithDeps(ctx context.Context, params *WorkflowLoadJobsWithDepsParams) ([]*WorkflowTaskWithJob, error)
	WorkflowLoadTaskWithDeps(ctx context.Context, params *WorkflowLoadTaskWithDepsParams) (*WorkflowTaskWithJob, error)
	WorkflowLoadTasksByNames(ctx context.Context, params *WorkflowLoadTasksByNamesParams) ([]*WorkflowTask, error)
	WorkflowRetry(ctx context.Context, params *WorkflowRetryParams) ([]*rivertype.JobRow, error)
	WorkflowRetryLockAndCheckRunning(ctx context.Context, params *WorkflowRetryLockAndCheckRunningParams) (*WorkflowRetryLockAndCheckRunningResult, error)
}

type ProExecutorTx

type ProExecutorTx interface {
	ProExecutor
	riverdriver.ExecutorTx
}

type Producer

type Producer struct {
	ID         int64
	ClientID   string
	QueueName  string
	MaxWorkers int32
	Metadata   []byte
	PausedAt   *time.Time
	CreatedAt  time.Time
	UpdatedAt  time.Time
}

type ProducerDeleteParams

type ProducerDeleteParams struct {
	ID     int64
	Schema string
}

type ProducerGetByIDParams

type ProducerGetByIDParams struct {
	ID     int64
	Schema string
}

type ProducerInsertOrUpdateParams

type ProducerInsertOrUpdateParams struct {
	ID         int64
	ClientID   string
	CreatedAt  *time.Time
	MaxWorkers int32
	Metadata   []byte
	PausedAt   *time.Time
	QueueName  string
	Schema     string
	UpdatedAt  *time.Time
}

type ProducerKeepAliveParams

type ProducerKeepAliveParams struct {
	ID                    int64
	QueueName             string
	Schema                string
	StaleUpdatedAtHorizon time.Time
}

type ProducerListByQueueParams

type ProducerListByQueueParams struct {
	QueueName string
	Schema    string
}

type ProducerListByQueueResult

type ProducerListByQueueResult struct {
	Producer *Producer
	Running  int32
}

type ProducerUpdateParams

type ProducerUpdateParams struct {
	ID                 int64
	MaxWorkers         int32
	MaxWorkersDoUpdate bool
	Metadata           []byte
	MetadataDoUpdate   bool
	PausedAt           *time.Time
	PausedAtDoUpdate   bool
	Schema             string
	UpdatedAt          *time.Time
}

type QueueGetMetadataForInsertParams

type QueueGetMetadataForInsertParams struct {
	Names  []string
	Schema string
}

type QueueGetMetadataForInsertResult

type QueueGetMetadataForInsertResult struct {
	Name        string
	Concurrency []byte
}

type Sequence

type Sequence struct {
	ID        int64
	Key       string
	CreatedAt time.Time
}

type SequenceAppendManyParams

type SequenceAppendManyParams struct {
	Schema  string
	SeqKeys []string
}

type SequenceListParams

type SequenceListParams struct {
	MaxCount int
	Schema   string
}

type SequencePromoteFromTableParams

type SequencePromoteFromTableParams struct {
	GracePeriod time.Duration
	Max         int
	Now         *time.Time
	Schema      string
}

type SequencePromoteFromTableResult

type SequencePromoteFromTableResult struct {
	Continue     bool
	NumDeleted   int
	PromotedKeys []string
}

type SequencePromoteParams

type SequencePromoteParams struct {
	GracePeriod time.Duration
	Keys        []string
	Now         *time.Time
	Schema      string
}

type SequenceScanAndPromoteStalledParams

type SequenceScanAndPromoteStalledParams struct {
	GracePeriod     time.Duration
	LastSequenceKey string
	Max             int
	Now             *time.Time
	Schema          string
}

type SequenceScanAndPromoteStalledResult

type SequenceScanAndPromoteStalledResult struct {
	Continue   bool
	LastSeqKey string
}

type StageWorkflowJobsManyParams

type StageWorkflowJobsManyParams struct {
	Schema           string
	WorkflowIDs      []string
	WorkflowStagedAt time.Time
}

type WorkflowCancelParams

type WorkflowCancelParams struct {
	CancelAttemptedAt time.Time
	ControlTopic      string
	Schema            string
	WorkflowID        string
}

type WorkflowCancelWithFailedDepsManyParams

type WorkflowCancelWithFailedDepsManyParams struct {
	Schema               string
	WorkflowDepsFailedAt time.Time
	WorkflowIDs          []string
}

type WorkflowJobListParams

type WorkflowJobListParams struct {
	PaginationLimit  int
	PaginationOffset int
	Schema           string
	WorkflowID       string
}

type WorkflowListItem

type WorkflowListItem struct {
	CountAvailable  int
	CountCancelled  int
	CountCompleted  int
	CountDiscarded  int
	CountFailedDeps int
	CountPending    int
	CountRetryable  int
	CountRunning    int
	CountScheduled  int
	CreatedAt       time.Time
	ID              string
	Name            *string
}

type WorkflowListParams

type WorkflowListParams struct {
	After           string
	PaginationLimit int
	Schema          string
}

type WorkflowLoadDepTasksAndIDsParams

type WorkflowLoadDepTasksAndIDsParams struct {
	Recursive  bool
	Schema     string
	Task       string
	WorkflowID string
}

type WorkflowLoadJobsWithDepsParams

type WorkflowLoadJobsWithDepsParams struct {
	JobIds []int64
	Schema string
}

type WorkflowLoadTaskWithDepsParams

type WorkflowLoadTaskWithDepsParams struct {
	Schema     string
	Task       string
	WorkflowID string
}

type WorkflowLoadTasksByNamesParams

type WorkflowLoadTasksByNamesParams struct {
	Schema     string
	TaskNames  []string
	WorkflowID string
}

type WorkflowRetryLockAndCheckRunningParams

type WorkflowRetryLockAndCheckRunningParams struct {
	Schema     string
	WorkflowID string
}

type WorkflowRetryLockAndCheckRunningResult

type WorkflowRetryLockAndCheckRunningResult struct {
	WorkflowIsActive bool
}

type WorkflowRetryMode

type WorkflowRetryMode string
const (
	WorkflowRetryModeAll                 WorkflowRetryMode = "all"
	WorkflowRetryModeFailedOnly          WorkflowRetryMode = "failed_only"
	WorkflowRetryModeFailedAndDownstream WorkflowRetryMode = "failed_and_downstream"
)

type WorkflowRetryParams

type WorkflowRetryParams struct {
	Mode         WorkflowRetryMode
	Now          time.Time
	ResetHistory bool
	Schema       string
	WorkflowID   string
}

type WorkflowTask

type WorkflowTask struct {
	Deps       []string
	ID         int64
	State      rivertype.JobState
	Task       string
	WorkflowID string
}

type WorkflowTaskWithJob

type WorkflowTaskWithJob struct {
	Deps       []string
	Job        *rivertype.JobRow
	Task       string
	WorkflowID string
}

Directories

riverprodatabasesql module
riverprodrivertest module
riverpropgxv5 module