Skip to content

Changing job args safely

It's occasionally necessary to make significant changes to existing job args structs as code is refactored or job behavior is amended. Like when renaming jobs, some care must be taken while doing so to make sure already inserted jobs are still workable when new versions of code are deployed.


Breaking change example

Let's see what a breaking change looks like in action. Take an original job args that contains a field called Name:

// job's previous version
type HelloJobArgs struct {
Name string `json:"name"`
}

On a subsequent revision of the struct, Name is changed to FullName, and its JSON annotation is updated accordingly:

// job's new version after field rename
type HelloJobArgs struct {
FullName string `json:"full_name"`
}
func (w *HelloWorker) Work(ctx context.Context, job *river.Job[HelloJobArgs]) error {
// failure for previously inserted jobs with only `name`!
fmt.Printf("Hello %s\n", job.Args.FullName)
return nil
}

Newly inserted jobs will work fine because they'll have a FullName attribute that matches what the worker is trying to process. But jobs that were inserted before the change are unmarshaled with an empty FullName (they only have Name). When the worker runs one of these old jobs it won't error, but will silently produce the wrong result.


Approach 1: Multiple sets of fields on one struct

There are a couple strategies for making changes to job args safely. The simplest is to reuse a single job args struct by keeping multiple sets of fields on it, one for the old version of the job and one for the new, then conditionally handling one or the other in Work.

For example, start with an original job representing an outdated mode of transport:

type VehicleJobArgs struct {
NumHorses int `json:"num_horses"`
}
func (VehicleJobArgs) Kind() string { return "vehicle" }

The program is transitioning from carriages to motor cars, so a new set of fields is added to represent the latter:

type VehicleJobArgs struct {
// job's previous fields
NumHorses int `json:"num_horses"`
// job's new fields
EngineMaker string `json:"engine_maker"`
Horsepower int `json:"horsepower"`
}

The corresponding worker knows to be on the look out for either version of job, and process it conditionally one way or the other:

type VehicleWorker struct {
river.WorkerDefaults[VehicleJobArgs]
}
func (w *VehicleWorker) Work(ctx context.Context, job *river.Job[VehicleJobArgs]) error {
switch {
case job.Args.NumHorses != 0:
return w.handleCarriage(job) // handle "old" style of job
case job.Args.EngineMaker != "":
return w.handleMotorCar(job) // handle "new" style of job
}
return errors.New("job doesn't look like old or new version")
}
func (w *VehicleWorker) handleCarriage(job *river.Job[VehicleJobArgs]) error {
fmt.Printf("Handled carriage with %d horse(s)\n", job.Args.NumHorses)
return nil
}
func (w *VehicleWorker) handleMotorCar(job *river.Job[VehicleJobArgs]) error {
fmt.Printf("Handled motor car with %d horsepower made by %s\n", job.Args.Horsepower, job.Args.EngineMaker)
return nil
}

Post-iteration cleanup

After insertions of jobs with the original group of args have stopped and a reasonable time to work them through has passed, use a database query to check whether there are still any still eligible to be worked:

SELECT count(*)
FROM river_job
WHERE kind = 'vehicle'
AND args ? 'num_horses'
AND finalized_at IS NULL;

If not (i.e. the query above returns zero), it's safe to remove the original group of properties, simplifying the args:

type VehicleJobArgs struct {
EngineMaker string `json:"engine_maker"`
Horsepower int `json:"horsepower"`
}

Notably, a chronically failing job using the default retry policy and default max attempts (25) will take about three weeks to exhaust its retries, so without special intervention, that's how long it might take before it's fully safe to clean up the old version of a job. With no failing jobs, you can do it much sooner.

Approach 2: Versioned job args

A safer-but-noisier alternative approach is to version job args instead. Starting with an original form of the args:

type VehicleJobArgs struct {
NumHorses int `json:"num_horses"`
}
func (VehicleJobArgs) Kind() string { return "vehicle" }
type VehicleWorker struct {
river.WorkerDefaults[VehicleJobArgs]
}
func (w *VehicleWorker) Work(ctx context.Context, job *river.Job[VehicleJobArgs]) error {
fmt.Printf("Handled carriage with %d horse(s)\n", job.Args.NumHorses)
return nil
}

Instead of adding new fields to the existing job args, add a completely new struct and worker named with an explicit "V2":

// job's new version
type VehicleJobArgsV2 struct {
EngineMaker string `json:"engine_maker"`
Horsepower int `json:"horsepower"`
}
func (VehicleJobArgsV2) Kind() string { return "vehicle_v2" }
type VehicleWorkerV2 struct {
river.WorkerDefaults[VehicleJobArgsV2]
}
func (w *VehicleWorkerV2) Work(ctx context.Context, job *river.Job[VehicleJobArgsV2]) error {
fmt.Printf("Handled motor car with %d horsepower made by %s\n", job.Args.Horsepower, job.Args.EngineMaker)
return nil
}

Make sure that both versions are registered with the client's worker bundle:

workers := river.NewWorkers()
river.AddWorker(workers, &VehicleWorker{})
river.AddWorker(workers, &VehicleWorkerV2{})

A program transitioning from one version to the other will stop inserting the original VehicleWorker and move to only inserts of VehicleWorkerV2:

_, err = riverClient.Insert(ctx, VehicleJobArgsV2{
EngineMaker: "Ford",
Horsepower: 100,
}, nil)
if err != nil {
panic(err)
}

Post-iteration cleanup

After insertions of VehicleJobArgs have stopped and a reasonable time to work them through has passed, use a database query to check whether there are still any eligible to be worked:

SELECT count(*)
FROM river_job
WHERE kind = 'vehicle' -- the V1 kind
AND finalized_at IS NULL;

If not (i.e. the query above returns zero), the original (non-V2) VehicleJobArgs and VehicleWorker can safely be dropped.

Reclaiming names and kinds

After VehicleJobArgs has been removed, it might be desirable to reclaim its name so there isn't an unsightly V2 attached to it. Renaming the Go struct is easy, but as described in renaming jobs, some care must be taken to rename a job's Kind() so that jobs that are already queued aren't accidentally orphaned.

Renaming safely is possible by changing Kind() to the desired name and moving the original to KindAliases(). New jobs are inserted as vehicle, but in case one is found with vehicle_v2, it's handled by the same worker:

type VehicleJobArgs struct {
EngineMaker string `json:"engine_maker"`
Horsepower int `json:"horsepower"`
}
func (VehicleJobArgs) Kind() string { return "vehicle" }
func (VehicleJobArgs) KindAliases() []string { return []string{"vehicle_v2"} }