Concurrency limiting allows precise control over how many jobs can be worked at once, globally or locally, with configurable partitioning options. Limits can be configured globally, across all processes, and locally, per process, and can also be partitioned based on job attributes.
Concurrency limits are a feature of River Pro ✨. If you haven't yet, install River Pro and run the pro
migration line.
Added in River Pro v0.12.0.
Basic usage
Concurrency limits require an up-to-date database schema with the pro
migration line:
riverpro migrate-up --database-url "$DATABASE_URL" --line pro
Concurrency limits are configured per queue by using the ProQueues
field on your riverpro.Config
:
&riverpro.Config{ Config: river.Config{ Queues: map[string]river.QueueConfig{ "default": {MaxWorkers: 100}, }, Workers: riverdemo.Workers(dbPool), }, ProQueues: map[string]riverpro.QueueConfig{ "external_api_provider": { Concurrency: riverpro.ConcurrencyConfig{ GlobalLimit: 20, LocalLimit: 10, }, MaxWorkers: 20, }, },}
A single queue may only be configured in either ProQueues
or Queues
, but not both. By default, no concurrency limits are applied unless specified in Concurrency
using a riverpro.ConcurrencyConfig
.
Concurrency limits build on River's normal job prioritization, meaning that jobs will still be fetched in order of priority
and scheduled_at
. However, any job that would be fetched based on normal prioritization will be skipped over if working it would exceed a concurrency limit.
Global limits
Global limits ensure that no more than the specified number of jobs are worked at once globally across all clients working jobs in that queue (and database / schema). They are configured with the GlobalLimit
field on riverpro.ConcurrencyConfig
.
If no partitioning is enabled, the global limit applies to all jobs in the queue. With partitioning, the global limit is enforced independently per partition.
Local limits
Local limits ensure that no more than the specified number of jobs are worked at once locally within a single client. They are configured with the LocalLimit
field on riverpro.ConcurrencyConfig
.
If no partitioning is enabled, the local limit applies to all jobs in the queue. With partitioning, the local limit is enforced independently per partition.
Partitioning
By default, concurrency limits are enforced across all jobs in a queue. With partitioning, limits are enforced separately for different subsets of jobs in the queue.
Partitioning can be based on job arguments, job kind, or both.
Partitioning by arguments
Partitioning by job arguments allows limits to be applied independently based on the argument values. A common use case is to partition by customer or tenant, allowing each to have an independent concurrency limit:
&riverpro.Config{ ProQueues: map[string]riverpro.QueueConfig{ "expensive_actions": { Concurrency: riverpro.ConcurrencyConfig{ GlobalLimit: 2, Partition: riverpro.PartitionConfig{ ByArgs: []string{"customer_id"}, }, }, MaxWorkers: 10, }, },}
Here, a maximum of 2 concurrent jobs per customer run globally, while each client can handle jobs for multiple customers concurrently, up to MaxWorkers
.
The ByArgs
option must use JSON keys, not Go struct field names. For example:
type MyJobArgs struct { CustomerID int `json:"customer_id"`}
To partition by customer, you must specify "customer_id"
in ByArgs
, not "CustomerID"
. If a specified field is missing in the job arguments, River will use an empty value; in the example above, all jobs without a customer_id
would be partitioned together.
ByArgs with all arguments
If ByArgs
is specified as an empty slice []string{}
, all arguments will be used for partitioning. This is generally not desirable, as it may lead to high cardinality and ineffective limits.
For example, if your jobs include unique values like timestamps, each job would be treated as a separate partition.
Most use cases will instead want to partition by one or more specific arguments.
Partitioning by kind
Partitioning can also be based on job kind. For example, rather than a single global limit of 10 across all job kinds, the following configuration allows up to 10 jobs concurrently for each job kind:
&riverpro.Config{ ProQueues: map[string]riverpro.QueueConfig{ "expensive_actions": { Concurrency: riverpro.ConcurrencyConfig{ GlobalLimit: 10, Partition: riverpro.PartitionConfig{ByKind: true}, }, MaxWorkers: 100, }, },}
In this example, no more than 10 jobs of any single kind will be worked concurrently across all clients. However, a single client may work up to 100 jobs at once from a mix of kinds.
Adjusting in the UI
The concurrency limit can be overridden in the UI by on the queue page. Limit overrides are persisted until removed, or until the queue has had no active clients for a long time.
An example of this can be viewed in the demo app.
FAQ
How do limits interact?
The local limit takes precedence over the global limit. For example, with a global limit of 20 and a local limit of 5, if 10 jobs are already running globally, the client will not fetch more than 5 additional jobs (per partition, if partitioning is enabled).
Each client will also respect its configured MaxWorkers
. Without partitioning, the smallest of MaxWorkers
, the local limit, and remaining global capacity determines how many jobs will be worked. With partitioning, the client may work up to MaxWorkers
jobs across partitions, but never more than the local or global limits for a single partition.
Why might limits appear exceeded?
Global concurrency limits are strictly enforced during job fetching. Jobs may appear to exceed limits in the database due to asynchronous completion updates, at least when viewed from the jobs table (or job UI in River UI).
River ensures limits are respected at execution time, despite temporary visibility differences. The queue page in River UI reflects the actual running job counts.
Compatibility with other features
Concurrency limits are compatible with all other River and River Pro features like workflows and sequences.