Building an idempotent email API with River unique jobs
We use an email service to send out various River-related notifications. Despite our best efforts setting up SPF, DKIM, and DMARC, we were still getting reports of mail going to spam, so we started looking into alternate mailing providers to see how they’d fare by comparison. Something we noticed while researching is that a shockingly low number1 of them have API facilities to guarantee idempotency — a surprise because email is such a sensitive topic. Not quite as high stakes as charging a credit card maybe, but if a user didn’t receive an important email they were expecting, or accidentally received two of them, it’d be at best, annoying.
In case of a communication error between between two services (say River talking to its mail service to send an email), the client can’t be sure whether its request was successfully transmitted or not, so it must retry to be sure the operation succeeded. This is best achieved with an API that supports idempotency so that requests after the first don’t perform the original action twice.
Here we’ll demonstrate the use of River to provide a toy version of a mailing API that’s idempotent-safe. We’ve chosen mail as our test subject, the same techniques apply widely in any number of places where good idempotent behavior is an asset — confirming a payment, registering a domain name, submitting an AI prompt, or any number of other uses.
What’s an idempotency key?
Some HTTP verbs like GET
, PUT
and DELETE
are intended to be inherently idempotent, and repeated invocations should have no side effects. These verbs involve resource reads, replacement, or deletion, and with a little care idempotent implementation is often somewhat simple for these types of CRUD operations:
# reads are usually idempotent with no additional workGET /contacts/123
# deletes may return an error on repeated calls, but generally have no# additional side effects, and are therefore considered idempotentDELETE /contacts/123
Mutations like PUT
or DELETE
take more care than read, but especially when using an ACID database like Postgres, idempotency is usually very achievable as long as all changes happen in one transaction.
But not every operation is so simple. When a request has a side effect that can’t be constrained to a transaction, then an API needs to provide some other mechanism to guarantee idempotency. Our toy email service will try to send mail via SMTP, and there’s nothing transactional about that.
APIs like Stripe pioneered a concept called an “idempotency key”, which is a unique value transmitted along with the request:
curl -X POST https://api.stripe.com/v1/customers \ -H "Idempotency-Key: KG5LxwFBepaKHyUD"
In case of an indeterminate result, a client retries with the same idempotency key it used the first time around. If the server sees a key for the first time, the request is executed normally. If the key’s been seen before, the request responds with a valid result saying so, letting the client know that it can stop retrying.
Defining a River job
Let’s write a River job to send an email. Sending email is complicated in real life, with the sender having to worry about SFP, DKIM, DMARC, filtering, source IP, and reputation, but for the purposes of this demo we’re going to show the simplest possible SMTP send. It’s the use of idempotency that we’re trying to show off, and that’d stay the same regardless of how sophisticated our mailing code was to get.
Define job args and worker:
type SendEmailArgs struct { AccountID uuid.UUID `json:"account_id" river:"unique"` Body string `json:"body" river:"-"` EmailRecipient string `json:"email_recipient" river:"-"` EmailSender string `json:"email_sender" river:"-"` IdempotencyKey uuid.UUID `json:"idempotency_key" river:"unique"` Subject string `json:"subject" river:"-"`}
func (SendEmailArgs) Kind() string { return "send_email" }
func (SendEmailArgs) InsertOpts() river.InsertOpts { return river.InsertOpts{ UniqueOpts: river.UniqueOpts{ ByArgs: true, }, }}
type SendEmailWorker struct { river.WorkerDefaults[SendEmailArgs] smtpHost, smtpPass, smtpUser string}
func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error { // This will probably too simple to work in reality, but is here to // demonstrate the basic shape of what sending an email would look like. var ( auth = smtp.PlainAuth("", w.smtpUser, w.smtpPass, w.smtpHost) message = []byte(fmt.Sprintf("To: %s\r\n"+ "Subject: %s\r\n"+ "\r\n"+ "%s\r\n", job.Args.EmailRecipient, job.Args.Subject, job.Args.Body, )) ) return smtp.SendMail(w.smtpHost, auth, job.Args.EmailSender, []string{job.Args.EmailRecipient}, message)}
Job args specify how a job is serialized to the database along with meta insert behavior like its unique treatment. Workers dictate what a job does after it’s dequeued by implementing a Work
function.
Idempotency by unique arg
There’s a number of ways we could implement idempotency, but for the sake of expediency, we’re going to leverage River’s built-in system for unique jobs. Its inner machinations are fairly elaborate, but at the most basic level it operates by building a unique key from a job’s unique properties and attempting to upsert the job with a unique index in Postgres:
CREATE UNIQUE INDEX river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL;
In the case of the job above, uniqueness is based on job kind (only other SendEmailArgs
are eligible for comparison), the ID of the authenticated account (AccountID
), and the idempotency key (IdempotencyKey
). Other job properties like EmailRecipient
don’t affect uniqueness, which is a safety feature that we’ll get to later.
Sharding by account
It’s a tempting mistake to base uniqueness only on incoming idempotency key, but we have to remember that an idempotency key is user-specified, so there could be collisions between accounts. Hopefully all responsible users will be generating V4 UUIDs that will statistically never collide with anything else, but bugs or malicious behavior could lead to duplicated keys, at which point scoping them to specific accounts becomes important, so our uniqueness is based off AccountID
+ IdempototencyKey
.
type SendEmailArgs struct { AccountID uuid.UUID `json:"account_id" river:"unique"` Body string `json:"body" river:"-"` EmailRecipient string `json:"email_recipient" river:"-"` EmailSender string `json:"email_sender" river:"-"` IdempotencyKey uuid.UUID `json:"idempotency_key" river:"unique"` Subject string `json:"subject" river:"-"`}
See the river:"unique"
annotations on AccountID
and IdempotencyKey
, which tells River to consider only those two fields for uniqueness.
Unique keys are generated by hashing incoming unique properties and args, so the input length is almost inconsequential and won’t have any measurable performance impact.
Filling in the rest of the API
Next, fill in an HTTP handler that’ll act as our API endpoint. You can find fully working code in the project’s GitHub repo.
Define an APIService
which will contain a River client and API service handlers. It gets a transaction begin
reference so that tests can easily inject a test transaction, and it can mount itself onto an http.ServeMux
:
type APIService struct { begin func(ctx context.Context) (pgx.Tx, error) riverClient *river.Client[pgx.Tx]}
func (s *APIService) ServeMux() *http.ServeMux { mux := http.NewServeMux() mux.Handle("POST /emails", MakeHandler(s.EmailCreate)) return mux}
A “create email” service handler takes input parameters and queues a job:
type HandleEmailCreateRequest struct { AccountID uuid.UUID `json:"account_id" validate:"required"` Body string `json:"body" validate:"required"` EmailRecipient string `json:"email_recipient" validate:"required"` EmailSender string `json:"email_sender" validate:"required"` IdempotencyKey uuid.UUID `json:"idempotency_key" validate:"required"` Subject string `json:"subject" validate:"required"`}
type HandleEmailCreateResponse struct { Message string `json:"message"`}
func (s *APIService) EmailCreate(ctx context.Context, req *HandleEmailCreateRequest) (*HandleEmailCreateResponse, error) { tx, err := s.begin(ctx) if err != nil { return nil, err } defer func() { _ = tx.Rollback(ctx) }()
insertRes, err := s.riverClient.InsertTx(ctx, tx, SendEmailArgs{ AccountID: req.AccountID, Body: req.Body, EmailRecipient: req.EmailRecipient, EmailSender: req.EmailSender, IdempotencyKey: req.IdempotencyKey, Subject: req.Subject, }, nil) if err != nil { return nil, err }
if err := tx.Commit(ctx); err != nil { return nil, err }
if insertRes.UniqueSkippedAsDuplicate { // see the "Reacting to duplicates" section below }
return &HandleEmailCreateResponse{Message: "Email has been queued for sending."}, nil}
A run function initializes a River client and mounts APIService
onto a net/http.Server
:
dbPool, err := pgxpool.New(ctx, config.DatabaseURL)if err != nil { return err}
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: makeWorkers(&config),})if err != nil { return err}
server := &http.Server{ Addr: ":8080", Handler: (&APIService{ begin: dbPool.Begin, riverClient: riverClient, }).ServeMux(),}fmt.Printf("Listening on %s\n", server.Addr)if err := server.ListenAndServe(); err != nil { return err}
Reacting to duplicates
River’s API tells us when an insert has been skipped because a job with the same unique key is already present. Write the API implementation to be aware of these potential no-ops, and return a more specific message when they occur:
if insertRes.UniqueSkippedAsDuplicate { if insertRes.Job.State == rivertype.JobStateCompleted { return &HandleEmailCreateResponse{Message: "Email has been sent."}, nil }
return &HandleEmailCreateResponse{Message: "Email was already queued and is pending send."}, nil}
See rivertype.JobInsertResult
.
This is the only idempotency logic that we’ll need to write because River automatically does most of the heavy lifting behind the scenes.
Sample idempotent invocations
Start the API server and use cURL to send an email:
curl -i -X POST http://localhost:8080/emails -d '{ "account_id":"bb381da5-8275-41f2-9238-4afaf9f8e359", "body":"Hello from email demo.", "email_recipient":"receiver@example.com", "email_sender":"sender@example.com", "idempotency_key":"d8923851-4bc5-45ba-a9fa-077ed8755ef1", "subject":"Hello." }'
HTTP/1.1 200 OKDate: Sat, 22 Mar 2025 06:41:22 GMTContent-Length: 48Content-Type: text/plain; charset=utf-8
{"message":"Email has been queued for sending."}
The background worker hasn’t been started so the job won’t get worked right away, but subsequent requests will still detect its presence. Try the request again and see the API notice that a job’s already queued:
curl -i -X POST http://localhost:8080/emails -d '{ "account_id":"bb381da5-8275-41f2-9238-4afaf9f8e359", "body":"Hello from email demo.", "email_recipient":"receiver@example.com", "email_sender":"sender@example.com", "idempotency_key":"d8923851-4bc5-45ba-a9fa-077ed8755ef1", "subject":"Hello." }'
HTTP/1.1 200 OKDate: Sat, 22 Mar 2025 06:41:29 GMTContent-Length: 59Content-Type: text/plain; charset=utf-8
{"message":"Email was already queued and is pending send."}
Idempotency key reuse and expiry
River keeps completed jobs around for some time so they’re available for analytics or inspection. After 24 hours (configurable) they’re reaped by a job cleaner maintenance service running on the cluster’s leader.
By default, job uniqueness is checked within this (configurable) set of states:
[]rivertype.JobState{ rivertype.JobStateAvailable, rivertype.JobStateCompleted, rivertype.JobStatePending, rivertype.JobStateRunning, rivertype.JobStateRetryable, rivertype.JobStateScheduled,}
Even after a mail job runs and is set to completed
, it’ll still be observed when a new mail job with the same idempotency key is inserted. After 24 hours the completed
record will removed, and a new job with the same key can be inserted.
24 hours is consistent the duration that idempotency keys from companies like Stripe stay valid. Plenty of time for a failed intermittent request to retry many times, but no so long that integrations start relying on idempotency keys being permanently available.
Parameter match safety
As an additional safety feature, the API will return an error in case incoming parameters didn’t match those that created an existing job. A parameter mismatch likely means that an idemoptency key was reused, which is probably a bug in the calling integration that it should know about:
if insertRes.UniqueSkippedAsDuplicate { var existingArgs SendEmailArgs if err := json.Unmarshal(insertRes.Job.EncodedArgs, &existingArgs); err != nil { return nil, err }
// If incoming parameters don't match those of an already queued job, // tell the user about it. There's probably a bug in the caller. if req.Body != existingArgs.Body || req.EmailRecipient != existingArgs.EmailRecipient || req.EmailSender != existingArgs.EmailSender || req.Subject != existingArgs.Subject { return nil, &APIError{ Message: "Incoming parameters don't match those of queued email. You may have a bug.", StatusCode: http.StatusBadRequest, } }
if insertRes.Job.State == rivertype.JobStateCompleted { return &HandleEmailCreateResponse{Message: "Email has been sent."}, nil }
return &HandleEmailCreateResponse{Message: "Email was already queued and is pending send."}, nil}
Going back to cURL, we can demonstrate how using the same parameters as before, but with a new idempotency_key
value bypasses the unique check and queues a new email:
curl -i -X POST http://localhost:8080/emails -d '{ "account_id":"bb381da5-8275-41f2-9238-4afaf9f8e359", "body":"Hello from email demo.", "email_recipient":"receiver@example.com", "email_sender":"sender@example.com", "idempotency_key":"668298b1-b59b-405d-894f-1dde8847e66e", # new key! "subject":"Hello." }'
HTTP/1.1 200 OKDate: Sat, 22 Mar 2025 07:01:17 GMTContent-Length: 48Content-Type: text/plain; charset=utf-8
{"message":"Email has been queued for sending."}
Simplified, but production grade
The API we’ve built has obviously been dumbed down for demonstration purposes, but even so, its idempotency logic is already more sophisticated than some of the largest mailing APIs on the market that’ve existed for years. In ~300 lines of code we’ve built an API that:
- Accepts a new message and queues it up for fast, robust out-of-band sending.
- Recognizes a duplicate request and tells the caller about it with a user-friendly message.
- Detects likely misuse of idempotency keys and warns about a likely bug.
The lesson to take away is that providing idempotency does take some thought, but it’s well within grasp, and using an off the shelf product like River can even make it easy. Your API can (and should!) have it too.