River supports inserting jobs from Ruby and have them worked in Go, a feature that may be desirable in performance sensitive cases so that jobs can take advantage of Go's considerably faster runtime speed.
Insertion is supported through Rails' ActiveRecord and Sequel.
Basic usage
Your project's Gemfile
should contain the riverqueue
gem and a driver like riverqueue-sequel
(see drivers):
gem "riverqueue"gem "riverqueue-activerecord"
Initialize a client with:
require "riverqueue"require "riverqueue-activerecord"
ActiveRecord::Base.establish_connection("postgres://...")client = River::Client.new(River::Driver::ActiveRecord.new)
Define a job and insert it:
class SortArgs attr_accessor :strings
def initialize(strings:) self.strings = strings end
def kind = "sort"
def to_json = JSON.dump({strings: strings})end
insert_res = client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))insert_res.job # inserted job row
Job args should:
- Respond to
#kind
with a unique string that identifies them in the database, and which a Go worker will recognize. - Response to
#to_json
with a JSON serialization that'll be parseable as an object in Go.
They may also respond to #insert_opts
with an instance of InsertOpts
to define insertion options that'll be used for all jobs of the kind.
Insertion options
Inserts take an insert_opts
parameter to customize features of the inserted job:
insert_res = client.insert( SimpleArgs.new(strings: ["whale", "tiger", "bear"]), insert_opts: River::InsertOpts.new( max_attempts: 17, priority: 3, queue: "my_queue", tags: ["custom"] ))
Inserting unique jobs
Unique jobs are supported through InsertOpts#unique_opts
, and can be made unique by args, period, queue, and state. If a job matching unique properties is found on insert, the insert is skipped and the existing job returned.
insert_res = client.insert(args, insert_opts: River::InsertOpts.new( unique_opts: River::UniqueOpts.new( by_args: true, by_period: 15 * 60, by_queue: true, by_state: [River::JOB_STATE_AVAILABLE] ))
# contains either a newly inserted job, or an existing one if insertion was skippedinsert_res.job
# true if insertion was skippedinsert_res.unique_skipped_as_duplicated
Inserting jobs in bulk
Use #insert_many
to bulk insert jobs as a single operation for improved efficiency:
results = client.insert_many([ SimpleArgs.new(job_num: 1), SimpleArgs.new(job_num: 2)])
Or with InsertManyParams
, which may include insertion options:
results = client.insert_many([ River::InsertManyParams.new( SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(max_attempts: 5) ), River::InsertManyParams.new( SimpleArgs.new(job_num: 2), insert_opts: River:InsertOpts.new(queue: "high_priority") )])
Inserting in a transaction
No extra code is needed to insert jobs from inside a transaction. Just make sure that one is open from your ORM of choice, call the normal #insert
or #insert_many
methods, and insertions will take part in it.
ActiveRecord::Base.transaction do client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))end
DB.transaction do client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))end
Inserting with a Ruby hash
JobArgsHash
can be used to insert with a kind and JSON hash so that it's not necessary to define a class:
insert_res = client.insert(River::JobArgsHash.new("hash_kind", { job_num: 1}))
RBS and type checking
The gem bundles RBS files containing type annotations for its API to support type checking in Ruby through a tool like Sorbet or Steep.
Drivers
ActiveRecord
Use River with Rails' ActiveRecord by putting the riverqueue-activerecord
driver in your Gemfile
:
gem "riverqueue"gem "riverqueue-activerecord"
Then initialize driver and client:
ActiveRecord::Base.establish_connection("postgres://...")client = River::Client.new(River::Driver::ActiveRecord.new)
Sequel
Use River with Sequel by putting the riverqueue-sequel
driver in your Gemfile
:
gem "riverqueue"gem "riverqueue-sequel"
Then initialize driver and client:
DB = Sequel.connect("postgres://...")client = River::Client.new(River::Driver::Sequel.new(DB))