Skip to content

Concurrency limits

Concurrency limiting allows precise control over how many jobs can be worked at once, globally or locally, with configurable partitioning options. Limits can be configured globally, across all processes, and locally, per process, and can also be partitioned based on job attributes.

Concurrency limits are a feature of River Pro ✨. If you haven't yet, install River Pro and run the pro migration line.

Added in River Pro v0.12.0.


Basic usage

Concurrency limits require an up-to-date database schema with the pro migration line:

Terminal window
riverpro migrate-up --database-url "$DATABASE_URL" --line pro

Concurrency limits are configured per queue by using the ProQueues field on your riverpro.Config:

&riverpro.Config{
Config: river.Config{
Queues: map[string]river.QueueConfig{
"default": {MaxWorkers: 100},
},
Workers: riverdemo.Workers(dbPool),
},
ProQueues: map[string]riverpro.QueueConfig{
"external_api_provider": {
Concurrency: riverpro.ConcurrencyConfig{
GlobalLimit: 20,
LocalLimit: 10,
},
MaxWorkers: 20,
},
},
}

A single queue may only be configured in either ProQueues or Queues, but not both. By default, no concurrency limits are applied unless specified in Concurrency using a riverpro.ConcurrencyConfig.

Concurrency limits build on River's normal job prioritization, meaning that jobs will still be fetched in order of priority and scheduled_at. However, any job that would be fetched based on normal prioritization will be skipped over if working it would exceed a concurrency limit.

Global limits

Global limits ensure that no more than the specified number of jobs are worked at once globally across all clients working jobs in that queue (and database / schema). They are configured with the GlobalLimit field on riverpro.ConcurrencyConfig.

If no partitioning is enabled, the global limit applies to all jobs in the queue. With partitioning, the global limit is enforced independently per partition.

Local limits

Local limits ensure that no more than the specified number of jobs are worked at once locally within a single client. They are configured with the LocalLimit field on riverpro.ConcurrencyConfig.

If no partitioning is enabled, the local limit applies to all jobs in the queue. With partitioning, the local limit is enforced independently per partition.

Partitioning

By default, concurrency limits are enforced across all jobs in a queue. With partitioning, limits are enforced separately for different subsets of jobs in the queue.

Partitioning can be based on job arguments, job kind, or both.

Partitioning by arguments

Partitioning by job arguments allows limits to be applied independently based on the argument values. A common use case is to partition by customer or tenant, allowing each to have an independent concurrency limit:

&riverpro.Config{
ProQueues: map[string]riverpro.QueueConfig{
"expensive_actions": {
Concurrency: riverpro.ConcurrencyConfig{
GlobalLimit: 2,
Partition: riverpro.PartitionConfig{
ByArgs: []string{"customer_id"},
},
},
MaxWorkers: 10,
},
},
}

Here, a maximum of 2 concurrent jobs per customer run globally, while each client can handle jobs for multiple customers concurrently, up to MaxWorkers.

The ByArgs option must use JSON keys, not Go struct field names. For example:

type MyJobArgs struct {
CustomerID int `json:"customer_id"`
}

To partition by customer, you must specify "customer_id" in ByArgs, not "CustomerID". If a specified field is missing in the job arguments, River will use an empty value; in the example above, all jobs without a customer_id would be partitioned together.

ByArgs with all arguments

If ByArgs is specified as an empty slice []string{}, all arguments will be used for partitioning. This is generally not desirable, as it may lead to high cardinality and ineffective limits.

For example, if your jobs include unique values like timestamps, each job would be treated as a separate partition.

Most use cases will instead want to partition by one or more specific arguments.

Partitioning by kind

Partitioning can also be based on job kind. For example, rather than a single global limit of 10 across all job kinds, the following configuration allows up to 10 jobs concurrently for each job kind:

&riverpro.Config{
ProQueues: map[string]riverpro.QueueConfig{
"expensive_actions": {
Concurrency: riverpro.ConcurrencyConfig{
GlobalLimit: 10,
Partition: riverpro.PartitionConfig{ByKind: true},
},
MaxWorkers: 100,
},
},
}

In this example, no more than 10 jobs of any single kind will be worked concurrently across all clients. However, a single client may work up to 100 jobs at once from a mix of kinds.

Adjusting in the UI

The concurrency limit can be overridden in the UI by on the queue page. Limit overrides are persisted until removed, or until the queue has had no active clients for a long time.

An example of this can be viewed in the demo app.

FAQ

How do limits interact?

The local limit takes precedence over the global limit. For example, with a global limit of 20 and a local limit of 5, if 10 jobs are already running globally, the client will not fetch more than 5 additional jobs (per partition, if partitioning is enabled).

Each client will also respect its configured MaxWorkers. Without partitioning, the smallest of MaxWorkers, the local limit, and remaining global capacity determines how many jobs will be worked. With partitioning, the client may work up to MaxWorkers jobs across partitions, but never more than the local or global limits for a single partition.

Why might limits appear exceeded?

Global concurrency limits are strictly enforced during job fetching. Jobs may appear to exceed limits in the database due to asynchronous completion updates, at least when viewed from the jobs table (or job UI in River UI).

River ensures limits are respected at execution time, despite temporary visibility differences. The queue page in River UI reflects the actual running job counts.

Compatibility with other features

Concurrency limits are compatible with all other River and River Pro features like workflows and sequences.