A `database/sql` escape hatch for broader ecosystem interoperability

We're on record as making the claim that Pgx is the best implementation of the Postgres client protocol available for Go, and recommend the use of River's riverpgxv5 driver in most situations.

River's had a riverdatabasesql driver for some time that lets its internal migration framework be used in conjunction with Goose's Go migration API, but the driver's functionality has been limited to migrations, and it was never capable of inserting a job, or anything more sophisticated.

As of River v0.10.0, that's no longer the case. riverdatabasesql is expanded so that it supports the full spectrum of driver functionality, including job insertion, job list/lookup, cancellation, queue pause/resume, or anything else a River client can do.

The most immediate benefit is that River becomes interoperable with ORMs (Object Relational Mappers) available in the broader Go ecosystem. Most packages that do database work are making use of the *sql.DB and *sql.Tx primitives, even if they're hidden behind a layer of abstraction, and it's possible to reveal them for use with riverdatabasesql.

In short, River picks support for Bun and GORM!

Bun

Bun and GORM are the two main packages that we've had requests for River to interoperate with, so we'll take a look at them specifically, but similar techniques should be broadly applicable elsewhere.

Insert jobs on a transaction started by Bun by using the *sql.Tx embedded on Bun's Tx struct:

tx, err := bunDB.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
    return nil, err
}

_, err = riverClient.InsertTx(ctx, tx.Tx, SortArgs{ // tx.Tx is *sql.Tx
    Strings: []string{
        "whale", "tiger", "bear",
    },
}, nil)
if err != nil {
    return nil, err
}

if err := tx.Commit(); err != nil {
    return nil, err
}

See inserting jobs from Bun for detail.

GORM

GORM's layer of abstraction makes sharing a transaction less obvious, but possible. When inside a transaction, GORM's Statement.ConnPool is an *sql.Tx which can be extracted with a type assertion and fed into River:

tx := gormDB.Begin()
if err := tx.Error; err != nil {
    return nil, err
}

// If in a transaction, ConnPool can be converted to an *sql.Tx so operations
// from GORM and River occur on the same transaction.
sqlTx := tx.Statement.ConnPool.(*sql.Tx)

_, err = riverClient.InsertTx(ctx, sqlTx, SortArgs{
    Strings: []string{
        "whale", "tiger", "bear",
    },
}, nil)
if err != nil {
    return nil, err
}

if err := tx.Commit().Error; err != nil {
    return nil, err
}

See inserting jobs from GORM for detail.

Listen/notify

River makes use of Postgres' listen/notify to find out when new jobs have been inserted, and as part of leadership election. Listening for notifications isn't part of the normal SQL interface and therefore needs a separate, Postgres-specific API for it to work, the likes of which database/sql doesn't include.

River can still use riverdatabasesql without listen/notify, but it'll find out about new jobs or resigned leaders more slowly than it would otherwise since these events have to be polled for rather than being notified of them immediately. We recommend using riverdatabasesql for job insertions only, and maintaining a separate client using riverpgxv5 that run work, and with listen/notify support.

A client with Queues configured and which is started so it executes work:

workClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    Queues: map[string]river.QueueConfig{
        river.QueueDefault: {MaxWorkers: 100},
    },
    Workers: workers,
})
if err != nil {
    // handle error
}

if err := workClient.Start(ctx); err != nil {
    // handle error
}

A client used only for job insertions:

insertOnlyClient, err := river.NewClient(riverdatabasesql.New(sqlDB), &river.Config{
    Workers: workers,
})
if err != nil {
    // handle error
}

// Do insertions, but don't start the client for work.
_, err = insertOnlyClient.InsertTx(ctx, tx, MyJobArgs{}, nil)
if err != nil {
    // handle error
}

Big changes to come

See the inserting jobs from Bun and from GORM documentation for more detailed usage information on each.

In case you missed it last week, check out River Python: Insert in Python, work in Go, on River's new Python job insertion package. We'd appreciate it if you signed up our mailing list, which is sent to sparingly with new blog posts and the occasional announcement.