Skip to content

Inserting jobs from TypeScript

River supports inserting jobs from TypeScript and having them worked in Go, a feature that may be desirable for projects with a Node.js backend that also use River's Go-based job processing.

Insertion is supported through node-postgres (pg) and Prisma.


Basic usage

Install the core package along with the driver for your database library:

Terminal window
# Using node-postgres (pg)
pnpm add riverqueue @riverqueue/driver-pg pg
# Using Prisma
pnpm add riverqueue @riverqueue/driver-prisma

Initialize a client with:

import { Pool } from "pg";
import { Client } from "riverqueue";
import { PgDriver } from "@riverqueue/driver-pg";
const pool = new Pool({ connectionString: "postgres://localhost/mydb" });
const client = new Client(new PgDriver(pool));

Define job args and insert a job:

import type { JobArgs } from "riverqueue";
class SortArgs implements JobArgs {
kind = "sort";
constructor(public strings: string[]) {}
toJSON() {
return { strings: this.strings };
}
}
const result = await client.insert(new SortArgs(["whale", "tiger", "bear"]));
result.job; // inserted job row

Job args should:

  • Implement the JobArgs interface with a kind string that uniquely identifies them in the database, and which a Go worker will recognize.
  • Implement toJSON() to control which fields are serialized as the job's args.

For quick one-off jobs, JobArgsObject can be used to insert with a kind and JSON object so that it's not necessary to define a class:

import { JobArgsObject } from "riverqueue";
const result = await client.insert(
new JobArgsObject("sort", { strings: ["whale", "tiger", "bear"] })
);

Insertion options

Inserts take an options parameter to customize features of the inserted job:

const result = await client.insert(
new SortArgs(["whale", "tiger", "bear"]),
{
queue: "high_priority",
priority: 2,
maxAttempts: 5,
}
);

Inserting unique jobs

Unique jobs are supported through uniqueOpts, and can be made unique by args, period, and queue. If a job matching unique properties is found on insert, the insert is skipped and the existing job returned.

const result = await client.insert(
new SortArgs(["whale", "tiger", "bear"]),
{
uniqueOpts: {
byArgs: true,
byQueue: true,
byPeriod: 900, // unique within 15-minute windows
},
}
);
// contains either a newly inserted job, or an existing one
result.job;

Inserting jobs in bulk

Use insertMany to bulk insert jobs as a single operation for improved efficiency:

const results = await client.insertMany([
new SortArgs(["whale", "tiger"]),
new SortArgs(["bear", "fox"]),
]);

Or with InsertManyParams, which may include insertion options:

import { InsertManyParams } from "riverqueue";
const results = await client.insertMany([
new InsertManyParams(new SortArgs(["whale", "tiger"]), {
maxAttempts: 5,
}),
new InsertManyParams(new SortArgs(["bear", "fox"]), {
queue: "high_priority",
}),
]);

Inserting in a transaction

Pass a transaction handle as the tx option to insert or insertMany. There's no need to create a separate client for the transaction.

With node-postgres

node-postgres explicitly doesn't define any high-level transaction constructs, so usage looks like manually issuing BEGIN, COMMIT, and ROLLBACK queries on a client checked out from the pool:

Using a client checked out from the pool:

const pool = new Pool({ connectionString: "postgres://localhost/mydb" });
const poolClient = await pool.connect();
try {
await poolClient.query("BEGIN");
await client.insert(new SortArgs(["whale"]), { tx: poolClient });
await client.insert(new SortArgs(["tiger"]), { tx: poolClient });
await poolClient.query("COMMIT");
} catch (e) {
await poolClient.query("ROLLBACK");
throw e;
} finally {
poolClient.release();
}

With Prisma

await prisma.$transaction(async (tx) => {
await client.insert(new SortArgs(["whale"]), { tx });
await client.insert(new SortArgs(["tiger"]), { tx });
});

Drivers

node-postgres

Use River with node-postgres (pg):

Terminal window
pnpm add riverqueue @riverqueue/driver-pg pg

Then initialize driver and client:

import { Pool } from "pg";
import { Client } from "riverqueue";
import { PgDriver } from "@riverqueue/driver-pg";
const pool = new Pool({ connectionString: "postgres://localhost/mydb" });
const client = new Client(new PgDriver(pool));

Prisma

Use River with Prisma:

Terminal window
pnpm add riverqueue @riverqueue/driver-prisma

Then initialize driver and client:

import { PrismaClient } from "@prisma/client";
import { Client } from "riverqueue";
import { PrismaDriver } from "@riverqueue/driver-prisma";
const prisma = new PrismaClient();
const client = new Client(new PrismaDriver(prisma));