River supports inserting jobs from TypeScript and having them worked in Go, a feature that may be desirable for projects with a Node.js backend that also use River's Go-based job processing.
Insertion is supported through node-postgres (pg) and Prisma.
Basic usage
Install the core package along with the driver for your database library:
# Using node-postgres (pg)pnpm add riverqueue @riverqueue/driver-pg pg
# Using Prismapnpm add riverqueue @riverqueue/driver-prismaInitialize a client with:
import { Pool } from "pg";import { Client } from "riverqueue";import { PgDriver } from "@riverqueue/driver-pg";
const pool = new Pool({ connectionString: "postgres://localhost/mydb" });const client = new Client(new PgDriver(pool));Define job args and insert a job:
import type { JobArgs } from "riverqueue";
class SortArgs implements JobArgs { kind = "sort";
constructor(public strings: string[]) {}
toJSON() { return { strings: this.strings }; }}
const result = await client.insert(new SortArgs(["whale", "tiger", "bear"]));result.job; // inserted job rowJob args should:
- Implement the
JobArgsinterface with akindstring that uniquely identifies them in the database, and which a Go worker will recognize. - Implement
toJSON()to control which fields are serialized as the job's args.
For quick one-off jobs, JobArgsObject can be used to insert with a kind and JSON object so that it's not necessary to define a class:
import { JobArgsObject } from "riverqueue";
const result = await client.insert( new JobArgsObject("sort", { strings: ["whale", "tiger", "bear"] }));Insertion options
Inserts take an options parameter to customize features of the inserted job:
const result = await client.insert( new SortArgs(["whale", "tiger", "bear"]), { queue: "high_priority", priority: 2, maxAttempts: 5, });Inserting unique jobs
Unique jobs are supported through uniqueOpts, and can be made unique by args, period, and queue. If a job matching unique properties is found on insert, the insert is skipped and the existing job returned.
const result = await client.insert( new SortArgs(["whale", "tiger", "bear"]), { uniqueOpts: { byArgs: true, byQueue: true, byPeriod: 900, // unique within 15-minute windows }, });
// contains either a newly inserted job, or an existing oneresult.job;Inserting jobs in bulk
Use insertMany to bulk insert jobs as a single operation for improved efficiency:
const results = await client.insertMany([ new SortArgs(["whale", "tiger"]), new SortArgs(["bear", "fox"]),]);Or with InsertManyParams, which may include insertion options:
import { InsertManyParams } from "riverqueue";
const results = await client.insertMany([ new InsertManyParams(new SortArgs(["whale", "tiger"]), { maxAttempts: 5, }), new InsertManyParams(new SortArgs(["bear", "fox"]), { queue: "high_priority", }),]);Inserting in a transaction
Pass a transaction handle as the tx option to insert or insertMany. There's no need to create a separate client for the transaction.
With node-postgres
node-postgres explicitly doesn't define any high-level transaction constructs, so usage looks like manually issuing BEGIN, COMMIT, and ROLLBACK queries on a client checked out from the pool:
Using a client checked out from the pool:
const pool = new Pool({ connectionString: "postgres://localhost/mydb" });const poolClient = await pool.connect();try { await poolClient.query("BEGIN");
await client.insert(new SortArgs(["whale"]), { tx: poolClient }); await client.insert(new SortArgs(["tiger"]), { tx: poolClient });
await poolClient.query("COMMIT");} catch (e) { await poolClient.query("ROLLBACK"); throw e;} finally { poolClient.release();}With Prisma
await prisma.$transaction(async (tx) => { await client.insert(new SortArgs(["whale"]), { tx }); await client.insert(new SortArgs(["tiger"]), { tx });});Drivers
node-postgres
Use River with node-postgres (pg):
pnpm add riverqueue @riverqueue/driver-pg pgThen initialize driver and client:
import { Pool } from "pg";import { Client } from "riverqueue";import { PgDriver } from "@riverqueue/driver-pg";
const pool = new Pool({ connectionString: "postgres://localhost/mydb" });const client = new Client(new PgDriver(pool));Prisma
Use River with Prisma:
pnpm add riverqueue @riverqueue/driver-prismaThen initialize driver and client:
import { PrismaClient } from "@prisma/client";import { Client } from "riverqueue";import { PrismaDriver } from "@riverqueue/driver-prisma";
const prisma = new PrismaClient();const client = new Client(new PrismaDriver(prisma));