Advanced topics

PgBouncer

River supports PgBouncer, but needs a minimum setting of transaction pooling for clients inserting jobs. When acting as a work coordinator, clients use LISTEN/NOTIFY and need PgBouncer in session pooling mode, or no PgBouncer, to work.

A good compromise is to use PgBouncer with transaction pooling for all job insertions and workers, but give work coordinators a raw Postgres pool. See a realistic scenario.

PgBouncer modes

As background, PgBouncer supports three modes:

  • Session pooling: Pooling occurs at the session level. When a client requests a session, it's assigned a "real" Postgres connection, and is allowed to keep it until explicitly released.

  • Transaction pooling: Pooling occurs at the transaction level. When a client starts a transaction, it's affixed to a single Postgres connection until the transaction commits or is rolled back.

  • Statement pooling: Pooling occurs at the statement level. Clients only stay affixed to a Postgres connection for as long as it takes to execute a single statement. Multi-statement transactions aren't allowed.

Ways to use River client

A River Client can broadly be used in two ways:

  • Insert-only: The client is initialized, but Start is never called. A client in such a state supports inserting jobs, but will never work them or participate in the leader election process.

  • Work coordinator: The client is initialized and Start is invoked. This client supports inserting jobs, working jobs, and will participate in leadership election. LISTEN/NOTIFY is required for both listening for new jobs and listening for leadership demotions. It needs a minimum of two connections that aren't using PgBouncer or are using PgBouncer configured for session pooling, and will vacillate between roughly two to five connections (usually closer to two) to fetch jobs and do maintenance work.

The work coordinator runs user-defined Worker implementations and those may need connections of their own, but that's entirely up to their authors. We recommend that most work occurs transactionally, but they support whichever PgBouncer modes they're written to support.

Session poolingTransaction poolingStatement pooling
Insert-only client
Work coordinator client
Workers (implementation dependent)

As shown in the matrix above, River clients need PgBouncer to be configured with at least transaction pooling, and if acting as a coordinator, PgBouncer configured for session pooling, or no PgBouncer at all.

Even an insert-only client doesn't support statement pooling because certain features like unique jobs require the use of a transaction.

A realistic scenario

A plausible scenario where River and PgBouncer are used together is to configure PgBouncer with transaction pooling, and use it for inserting all jobs and performing all work, but giving a raw Postgres connection pool to work coordinators.

This should work well in most circumstances because there's expected to be many clients inserting jobs and many workers working them, but comparably few work coordinators because each one can boot hundreds of goroutines to handle work. There's a fixed minimum required connections per work coordinator (minimum one for LISTEN/NOTIFY and one that'll be fetching quite frequently, but will use a few more during a leadership election or while performing maintenance work), but high levels of concurrency can be achieved using a modest number of processes running work coordinators, and with each configured with a high number of MaxWorkers for their queues.

Insert clients

To make this concrete, here's an insert-only client like might be used in a web or API process, configured to use PgBouncer as its pool:

dbPool, err := pgxpool.New(ctx, os.Getenv("PGBOUNCER_DATABASE_URL"))
if err != nil {
    // handle error
}

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
if err != nil {
    // handle error
}

// client NOT started
_, err = riverClient.Insert(ctx, DatabaseArgs{SQL: "SELECT 1"}, nil)
if err != nil {
    // handle error
}

Workers

Individual workers also use PgBouncer. They're worked by a work coordinator, but don't need to use the same database pool configuration as their progenitor:

type DatabaseArgs struct {
    SQL string `json:"sql"`
}

func (DatabaseArgs) Kind() string { return "database" }

type DatabaseWorker struct {
    river.WorkerDefaults[DatabaseArgs]
    dbPool *pgxpool.Pool
}

func (w *DatabaseWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    _, err := w.dbPool.Exec(job.Args.SQL)
    if err != nil {
        return err
    }

    return nil
}
dbPool, err := pgxpool.New(ctx, os.Getenv("PGBOUNCER_DATABASE_URL"))
if err != nil {
    // handle error
}

workers := river.NewWorkers()
river.AddWorker(workers, &DatabaseWorker{dbPool: dbPool})

Work coordinator

The work coordinator needs a real Postgres database pool so it can use LISTEN/NOTIFY, but is configured with high concurrency so relatively few total coordinators are required:

dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL")) // NOT PgBouncer
if err != nil {
    // handle error
}

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    Queues: map[string]river.QueueConfig{
        river.QueueDefault: {MaxWorkers: 1_000},
    },
    Workers: workers,
})
if err != nil {
    // handle error
}

if err := riverClient.Start(ctx); err != nil {
    // handle error
}
Previous
Maintenance services