River on Rails: Pushing select, performance sensitive tasks from Ruby to Go
Ruby's a classic programming language developed in the mid-90s out of Japan by Yukihiro "Matz" Matsumoto, famous for its expressiveness and dynamism. Despite having lost a little of its luster since its heyday in the early 2010s, it still powers some of the world's largest tech companies: Shopify, GitHub, Stripe, AirBnB, along with a multitude of others, and has a healthy community pushing forward on advancements. This year for example, it'll get a new JIT compiler, ZJIT.
For all the things that it gets right, Ruby is not well renowned for its runtime speed or concurrent capability. It's never been a particularly fast language (although not a perfect measurement, but the benchmarks game has it ~50x slower than the fastest measured languages), and the GVL (Global VM Lock) has kept the dream of true parallel code execution a fleeting one.
A powerful pattern to get the best of both worlds in terms of performance and added the productivity gleaned from terse, expressive syntax is to bring two great languages together. Use Ruby for the majority of domain logic and day-to-day operation, while offloading a key subset of tasks to a fast language like Go that'd get extra benefit from its blazing fast runtime speed and/or prodigious concurrent capacity.
Here, we'll demonstrate how to build a basic Rails app that'll insert a job into a Postgres backend that's shared with River. The job in question will be to send a webhook, a task where the lion's share of time is spent waiting on other servers, and which can be run in very large parallel batches when going through lightweight goroutines. A key property of webhooks are the transient failures that come with the territory, so we'll also fall to River to handle job timeouts and retries.
Rails installation and scaffolding
To start, we'll run through the routine of getting a basic Rails app in place. This is fairly standard so we'll only go by each step in brief, but we'll end up with a basic CRUD app built around a "payments" resource as if we're building a much abbreviated version of a Stripe-like API. We're using Rails here because it's a great framework and far and away Ruby's most popular, but the same techniques would work just as well on alternative stacks like Sequel plus Roda.
If you're already well versed with Rails and don't need to see the basics, skip to the next section, inserting webhook jobs.
Install Rails via Ruby's gem
command:
gem install rails
Start a new Postgres-based Rails project:
rails new webhooks -d=postgresqlcd webhooks
Generate scaffolding
Add scaffolding including database migration, model, and controller, for a payments resource that we'll use in this sample program. In reality, of course this could be anything.
bin/rails generate scaffold payment
invoke active_record create db/migrate/20250622175042_create_payments.rb create app/models/payment.rb invoke test_unit create test/models/payment_test.rb create test/fixtures/payments.yml invoke resource_route route resources :payments invoke scaffold_controller create app/controllers/payments_controller.rb invoke erb create app/views/payments create app/views/payments/index.html.erb create app/views/payments/edit.html.erb create app/views/payments/show.html.erb create app/views/payments/new.html.erb create app/views/payments/_form.html.erb create app/views/payments/_payment.html.erb invoke resource_route invoke test_unit create test/controllers/payments_controller_test.rb create test/system/payments_test.rb invoke helper create app/helpers/payments_helper.rb invoke test_unit invoke jbuilder create app/views/payments/index.json.jbuilder create app/views/payments/show.json.jbuilder create app/views/payments/_payment.json.jbuilder
Add River dependencies
Open Gemfile
, add River's main gem along with its ActiveRecord driver. Install the new dependencies:
gem "riverqueue"gem "riverqueue-activerecord"
bundle install
Create databases
Have Rails raise development and test databases:
bin/rails db:create
Created database 'webhooks_development'Created database 'webhooks_test'
Migrate databases
Migrate Rails schema, which consists of our payments scaffold:
bin/rails db:migrate
== 20250618084713 CreatePayments: migrating ===================================-- create_table(:payments) -> 0.0101s== 20250618084713 CreatePayments: migrated (0.0101s) ==========================
Migrate River's schema, which adds jobs table and supporting infrastructure:
$ go run github.com/riverqueue/river/cmd/river@latest migrate-up --database-url postgres://localhost:5432/webhooks_test --schema public
applied migration 001 [up] create river migration [5.61ms]applied migration 002 [up] initial schema [14.33ms]applied migration 003 [up] river job tags non null [1.5ms]applied migration 004 [up] pending and more [5.56ms]applied migration 005 [up] migration unique client [9.73ms]applied migration 006 [up] bulk unique [1.57ms]
Run tests to verify
Run tests to make sure it all works:
bin/rails test
Inserting webhook jobs
The Rails app acts as the program's API. It'll receive payment create requests and react by sending a job to River to send a webhook for them. We'll use ActiveRecord callbacks to accomplish this because they're a convenient way to make sure that an operation always runs on specific changes, like when a model's created, regardless of where that change was initiated from.
In app/models/payment.rb
add an after_create
hook, along with a pretend account ID and class-level River client (so it's reused across different model instances):
class Payment < ApplicationRecord after_create do |payment| insert_res = self.class.river_client.insert(River::JobArgsHash.new("webhook_send", { account_id: authenticated_account_id, payload: { id: payment.id }, webhook_kind: "payment.created" })) logger.info "inserted job: #{insert_res.job.id}" end
def authenticated_account_id 123 # replace with something real end
def self.river_client @river_client ||= client = River::Client.new(River::Driver::ActiveRecord.new) endend
We should be able to rerun our tests and see that they're all still passing:
bin/rails test
Running 7 tests in a single process (parallelization threshold is 50)Run options: --seed 54478
# Running:
.......
Finished in 0.193850s, 36.1104 runs/s, 56.7449 assertions/s.7 runs, 11 assertions, 0 failures, 0 errors, 0 skips
Controlling retries via insert opts
An aspect of a more mature webhooks system that ends up being quite important are the guarantees it makes to customers, like in case of a failure, whether the webhook send will be retried, how often, and on what schedule.
River defaults to 25 retries using the exponential backoff algorithm attempts ^ 4 + rand(±10%)
. It's a pretty reasonable default for many use cases, but if desired, the number of retries could be increased or reduced using insert time options on the job args:
insert_res = self.class.river_client.insert( River::JobArgsHash.new("webhook_send", { account_id: authenticated_account_id, payload: { id: payment.id }, webhook_kind: "payment.created" }), insert_opts: River::InsertOpts.new( max_attempts: 10 ))
Recall that in a real-world manifestation of Hyrum's Law, even a retry policy and its schedule become an implicit part of your API contract, so providers should think carefully about the sorts of redelivery guarantees they want to make before finalizing their webhook promises.
Transactional enqueuing
A major benefit of using River and Postgres for your job queue is its transactional enqueuing capabilities. In case any part of a request were to fail, by using a database transaction we can avoid inserting the job completely so we're not doing any work that shouldn't be done.
When Rails scaffolded our payments API for us, it generated this code in app/controllers/payments_controller.rb
:
# POST /payments or /payments.jsondef create @payment = Payment.new(payment_params)
respond_to do |format| if @payment.save format.html { redirect_to @payment, notice: "Payment was successfully created." } format.json { render :show, status: :created, location: @payment } else format.html { render :new, status: :unprocessable_entity } format.json { render json: @payment.errors, status: :unprocessable_entity } end endend
Let's wrap the whole thing with a ActiveRecord::Base.transaction
block:
# POST /payments or /payments.jsondef create ActiveRecord::Base.transaction do @payment = Payment.new(payment_params)
respond_to do |format| if @payment.save format.html { redirect_to @payment, notice: "Payment was successfully created." } format.json { render :show, status: :created, location: @payment } else format.html { render :new, status: :unprocessable_entity } format.json { render json: @payment.errors, status: :unprocessable_entity } end end endend
Not only does this put a transaction around saving the payment model and producing a response, it also wraps all model operations. So the INSERT
generated by @payment.save
runs in the same transaction as the River job insert in our after_create
hook. If the payment saves successfully but we weren't able to insert a River job for it, the payment is rolled back, ensuring consistent data integrity.
Job args conventions
In the same code above, we've used JobArgsHash
a quick and dirty way to create a River job. It gets a job kind (which map a job to the worker which will run it) from its first argument, and uses a free form hash for the rest of the job args:
insert_res = self.class.river_client.insert(River::JobArgsHash.new("webhook_send", { account_id: authenticated_account_id, payload: { id: payment.id }, webhook_kind: "payment.created"}))
A mature application might want to switch to a more structured way of defining jobs so that if multiple sites are inserting the same job, they do so consistently. Here's an alternative version as a class defined in lib/job_args/webhook_send_args.rb
:
module JobArgs class WebhookSendArgs attr_accessor :account_id attr_accessor :payload attr_accessor :webhook_kind
def initialize(account_id:, payload:, webhook_kind:) self.account_id = account_id self.payload = payload self.webhook_kind = webhook_kind end
def kind = "webhook_send"
def insert_opts = River::InsertOpts.new( max_attempts: 10 )
def to_json = JSON.dump({ account_id: account_id, payload: payload, webhook_kind: webhook_kind }) endend
Now when inserting the job (say from the after_create
hook), it looks like this:
insert_res = self.class.river_client.insert(JobArgs::WebhookSendArgs.new( account_id: authenticated_account_id, payload: { id: payment.id }, webhook_kind: "payment.created"))logger.info "inserted job: #{insert_res.job.id}"
We'd recommend that you come up with your own local conventions for defining job args so that they all look like each other. It might even be advisable to create a miniature framework or DSL to make jobs succinct and easy to define, and safe to invoke.
A more terse variant with Struct
The alternative above swings all the other way, going from a free form JobArgsHash
to a plain old Ruby class with no magic whatsoever. This is to illustrate that there's nothing special about a River job args class, and by implication callers have nearly limitless options on how they want to define one.
An argument could be made that it's preferable to sacrifice some explicitness for a more succinct implementation. Here's a second alternative that uses Ruby's built-in Struct
type, knocking the definition down to about half the LOCs:
module JobArgs WebhookSendArgs = Struct.new(:account_id, :payload, :webhook_kind) do def kind = "webhook_send"
def insert_opts = River::InsertOpts.new( max_attempts: 10 )
def to_json = to_h.to_json endend
It'd be inserted like:
insert_res = self.class.river_client.insert(JobArgs::WebhookSendArgs.new( authenticated_account_id, "payment.created", { id: payment.id }))
The Go client and worker
While this article's mainly meant to cover what insertion looks like from Ruby/Rails, the app above will obviously need a Go counterpart that'll work the inserted jobs. Here's a rough sketch of what one would look like.
First, the worker that intercepts jobs of kind webhook_send
and sends requests out to remote webhook URLs via Go's built-in net/http
client:
package main
import ( "bytes" "context" "fmt" "net/http" "os" "os/signal" "syscall"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/webhooksender/dbsqlc")
type WebhookSendArgs struct { AccountID int `json:"account_id"` Payload []byte `json:"payload"` WebhookKind int `json:"webhook_kind"`}
func (WebhookSendArgs) Kind() string { return "webhook_send" }
type WebhookSendWorker struct { river.WorkerDefaults[WebhookSendArgs] dbPool *pgxpool.Pool}
func (w *WebhookSendWorker) Work(ctx context.Context, job *river.Job[WebhookSendArgs]) error { url, err := dbsqlc.New().WebhookEndpointGetURLByAccountID(ctx, w.dbPool, int64(job.Args.AccountID)) if err != nil { return err }
resp, err := http.Post(url, "application/json", bytes.NewReader(job.Args.Payload)) if err != nil { return err }
if resp.StatusCode >= 300 { return fmt.Errorf("got unexpected status code: %d", resp.StatusCode) }
return nil}
Notably, the structure of WebhookSendArgs
mirrors what's defined in Ruby exactly, and the job args kind webhook_send
is the same as what's inserted from Ruby.
River's default job timeout is 1 minute. For the case of webhooks, it'd be wise to tighten that up so we don't spend too much time waiting on non-responsive servers. Timeouts for the webhook send job can be customized specifically by writing a Timeout
override on their worker:
func (w *WebhookSendWorker) Timeout(*river.Job[WebhookSendArgs]) time.Duration { return 10 * time.Second}
The SQL query to grab a webhook endpoint is based on sqlc. Here's a minimal definition of a theoretical webhook_endpoint
table (which tracks endpoints per account ID) and query to retrieve a URL:
CREATE TABLE webhook_endpoint ( id bigserial PRIMARY KEY, account_id bigint NOT NULL REFERENCES account (id), url text NOT NULL);
-- name: WebhookEndpointGetURLByAccountID :oneSELECT urlFROM webhook_endpointWHERE account_id = @account_id;
Next, a main
implementation that initializes a client, registers the webhook worker, and starts it up, working jobs until it encounters a SIGTERM
interrupt telling it to stop:
func main() { ctx := context.Background()
dbPool, err := pgxpool.New(ctx, "postgres://localhost:5432/webhooks_development") if err != nil { panic(err) } defer dbPool.Close()
workers := river.NewWorkers() river.AddWorker(workers, &WebhookSendWorker{dbPool: dbPool})
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: { MaxWorkers: 1_000, }, }, Workers: workers, }) if err != nil { panic(err) }
if err := riverClient.Start(ctx); err != nil { panic(err) } defer func() { fmt.Printf("Stopping River client\n") if err := riverClient.Stop(ctx); err != nil { panic(err) } }()
fmt.Printf("River client up and running; quit with ^C\n")
sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) <-sigChan}
Summary and future improvements
We've built a basic Rails app to insert River jobs and a companion program in Go to work them. Not all that many lines of code were needed to get it done, but what we assembled would be a major boon in concurrent efficiency as webhooks are sent in lightweight goroutines with huge capacity for parallel expansibility instead of Ruby threads (at best), or separate Ruby processes per webhook send (very wasteful when it comes to memory, but still the fastest option because it ensures the application isn't slowed down the GVL).
And although our little demo is functional, getting webhooks right is a big undertaking. Here's a shopping list of possible improvements that we might want to make:
- An in app mini-framework for defining River jobs so they're easy to define, safe to use, and enforce strong convention across callers.
- A well-designed retry policy including number of retries, time between retries, and a well-documented time until last retry.
- Multiple webhook endpoint URLs per account.
- Webhook signatures (using Go's great built-in crypto libraries!) so that webhook receivers can verify the authenticity of incoming webhooks.
- Implementing security gates like an egress proxy to make sure webhooks can't be used to attack internal services.
We think that Go and Ruby make a great combination. Ruby's concise syntax allows business logic to be captured easily and written quickly, Rails institutes strong conventions and makes implementing everything considerably faster, and meanwhile certain performance or parallel-sensitive tasks can be selectively offloaded via background job to Go. The result: quick iteration, a high degree of developer happiness, and peak performance.