It's occasionally necessary to make significant changes to existing job args structs as code is refactored or job behavior is amended. Like when renaming jobs, some care must be taken while doing so to make sure already inserted jobs are still workable when new versions of code are deployed.
Breaking change example
Let's see what a breaking change looks like in action. Take an original job args that contains a field called Name
:
// job's previous versiontype HelloJobArgs struct { Name string `json:"name"`}
On a subsequent revision of the struct, Name
is changed to FullName
, and its JSON annotation is updated accordingly:
// job's new version after field renametype HelloJobArgs struct { FullName string `json:"full_name"`}
func (w *HelloWorker) Work(ctx context.Context, job *river.Job[HelloJobArgs]) error { // failure for previously inserted jobs with only `name`! fmt.Printf("Hello %s\n", job.Args.FullName) return nil}
Newly inserted jobs will work fine because they'll have a FullName
attribute that matches what the worker is trying to process. But jobs that were inserted before the change are unmarshaled with an empty FullName
(they only have Name
). When the worker runs one of these old jobs it won't error, but will silently produce the wrong result.
Approach 1: Multiple sets of fields on one struct
There are a couple strategies for making changes to job args safely. The simplest is to reuse a single job args struct by keeping multiple sets of fields on it, one for the old version of the job and one for the new, then conditionally handling one or the other in Work
.
For example, start with an original job representing an outdated mode of transport:
type VehicleJobArgs struct { NumHorses int `json:"num_horses"`}
func (VehicleJobArgs) Kind() string { return "vehicle" }
The program is transitioning from carriages to motor cars, so a new set of fields is added to represent the latter:
type VehicleJobArgs struct { // job's previous fields NumHorses int `json:"num_horses"`
// job's new fields EngineMaker string `json:"engine_maker"` Horsepower int `json:"horsepower"`}
The corresponding worker knows to be on the look out for either version of job, and process it conditionally one way or the other:
type VehicleWorker struct { river.WorkerDefaults[VehicleJobArgs]}
func (w *VehicleWorker) Work(ctx context.Context, job *river.Job[VehicleJobArgs]) error { switch { case job.Args.NumHorses != 0: return w.handleCarriage(job) // handle "old" style of job case job.Args.EngineMaker != "": return w.handleMotorCar(job) // handle "new" style of job }
return errors.New("job doesn't look like old or new version")}
func (w *VehicleWorker) handleCarriage(job *river.Job[VehicleJobArgs]) error { fmt.Printf("Handled carriage with %d horse(s)\n", job.Args.NumHorses) return nil}
func (w *VehicleWorker) handleMotorCar(job *river.Job[VehicleJobArgs]) error { fmt.Printf("Handled motor car with %d horsepower made by %s\n", job.Args.Horsepower, job.Args.EngineMaker) return nil}
Post-iteration cleanup
After insertions of jobs with the original group of args have stopped and a reasonable time to work them through has passed, use a database query to check whether there are still any still eligible to be worked:
SELECT count(*)FROM river_jobWHERE kind = 'vehicle' AND args ? 'num_horses' AND finalized_at IS NULL;
If not (i.e. the query above returns zero), it's safe to remove the original group of properties, simplifying the args:
type VehicleJobArgs struct { EngineMaker string `json:"engine_maker"` Horsepower int `json:"horsepower"`}
Notably, a chronically failing job using the default retry policy and default max attempts (25) will take about three weeks to exhaust its retries, so without special intervention, that's how long it might take before it's fully safe to clean up the old version of a job. With no failing jobs, you can do it much sooner.
Approach 2: Versioned job args
A safer-but-noisier alternative approach is to version job args instead. Starting with an original form of the args:
type VehicleJobArgs struct { NumHorses int `json:"num_horses"`}
func (VehicleJobArgs) Kind() string { return "vehicle" }
type VehicleWorker struct { river.WorkerDefaults[VehicleJobArgs]}
func (w *VehicleWorker) Work(ctx context.Context, job *river.Job[VehicleJobArgs]) error { fmt.Printf("Handled carriage with %d horse(s)\n", job.Args.NumHorses) return nil}
Instead of adding new fields to the existing job args, add a completely new struct and worker named with an explicit "V2":
// job's new versiontype VehicleJobArgsV2 struct { EngineMaker string `json:"engine_maker"` Horsepower int `json:"horsepower"`}
func (VehicleJobArgsV2) Kind() string { return "vehicle_v2" }
type VehicleWorkerV2 struct { river.WorkerDefaults[VehicleJobArgsV2]}
func (w *VehicleWorkerV2) Work(ctx context.Context, job *river.Job[VehicleJobArgsV2]) error { fmt.Printf("Handled motor car with %d horsepower made by %s\n", job.Args.Horsepower, job.Args.EngineMaker) return nil}
Make sure that both versions are registered with the client's worker bundle:
workers := river.NewWorkers()river.AddWorker(workers, &VehicleWorker{})river.AddWorker(workers, &VehicleWorkerV2{})
A program transitioning from one version to the other will stop inserting the original VehicleWorker
and move to only inserts of VehicleWorkerV2
:
_, err = riverClient.Insert(ctx, VehicleJobArgsV2{ EngineMaker: "Ford", Horsepower: 100,}, nil)if err != nil { panic(err)}
Post-iteration cleanup
After insertions of VehicleJobArgs
have stopped and a reasonable time to work them through has passed, use a database query to check whether there are still any eligible to be worked:
SELECT count(*)FROM river_jobWHERE kind = 'vehicle' -- the V1 kind AND finalized_at IS NULL;
If not (i.e. the query above returns zero), the original (non-V2) VehicleJobArgs
and VehicleWorker
can safely be dropped.
Reclaiming names and kinds
After VehicleJobArgs
has been removed, it might be desirable to reclaim its name so there isn't an unsightly V2 attached to it. Renaming the Go struct is easy, but as described in renaming jobs, some care must be taken to rename a job's Kind()
so that jobs that are already queued aren't accidentally orphaned.
Renaming safely is possible by changing Kind()
to the desired name and moving the original to KindAliases()
. New jobs are inserted as vehicle
, but in case one is found with vehicle_v2
, it's handled by the same worker:
type VehicleJobArgs struct { EngineMaker string `json:"engine_maker"` Horsepower int `json:"horsepower"`}
func (VehicleJobArgs) Kind() string { return "vehicle" }func (VehicleJobArgs) KindAliases() []string { return []string{"vehicle_v2"} }