River

riverencrypt

package
v0.12.0 Latest
Published: Apr 9, 2025 License: Proprietary
Example (EncryptHook)

Example_encryptHook demonstrates the use of encryption hook that encrypts and decrypts job args.

package main

import (
	"context"
	"encoding/base64"
	"fmt"
	"log/slog"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/rivershared/util/slogutil"
	"github.com/riverqueue/river/rivertype"

	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
	"riverqueue.com/riverpro/internal/internaltest"
	"riverqueue.com/riverpro/riverencrypt"
	"riverqueue.com/riverpro/riverencrypt/riversecretbox"
)

type EncryptArgs struct {
	// Message is a secret message encrypted in the database, but available to
	// workers.
	Message string `json:"message"`
}

func (EncryptArgs) Kind() string { return "encrypt" }

type EncryptWorker struct {
	river.WorkerDefaults[EncryptArgs]
}

func (w *EncryptWorker) Work(ctx context.Context, job *river.Job[EncryptArgs]) error {
	fmt.Printf("Secret message: %+v\n", job.Args.Message)
	return nil
}

// Example_encryptHook demonstrates the use of encryption hook that encrypts and
// decrypts job args.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, internaltest.DatabaseConfig(""))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := internaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &EncryptWorker{})

	// Generate your own key with something like:
	//
	// 	var key [32]byte
	// 	if _, err := rand.Reader.Read(key[:]); err != nil {
	// 		panic(err)
	// 	}
	// 	encodedKey := base64.StdEncoding.EncodeToString(key[:])
	// 	fmt.Printf("encoded key: %s\n", encodedKey)
	//
	key := mustDecodeBase64EncodedKey("iRmwTuVGl2BAwTUPRTJbP/iA2EKpTrzXpEcNIXG2BI0=")

	riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{
		Config: river.Config{
			Hooks: []rivertype.Hook{
				riverencrypt.NewEncryptHook(riversecretbox.NewEncryptor(key)),
			},
			Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
			Queues: map[string]river.QueueConfig{
				river.QueueDefault: {MaxWorkers: 100},
			},
			TestOnly: true, // suitable only for use in tests; remove for live environments
			Workers:  workers,
		},
	})
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	_, err = riverClient.Insert(ctx, EncryptArgs{
		Message: "This message is encrypted in the database, but plaintext in workers.",
	}, nil)
	if err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 1)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

// Decodes a base64-encoded key to a 32-byte array for use with riverencrypt.
// This exactly methodology isn't necessary, but it demonstrates how a key might
// be easily stored to an env var or vault.
func mustDecodeBase64EncodedKey(encodedKey string) [32]byte {
	decodedBytes, err := base64.StdEncoding.DecodeString(encodedKey)
	if err != nil {
		panic(err)
	}

	var decodedKey [32]byte
	if copy(decodedKey[:], decodedBytes) != 32 {
		panic("expected to copy exactly 32 bytes")
	}

	return decodedKey
}

Output:

Secret message: This message is encrypted in the database, but plaintext in workers.
Example (KeyRotation)

Example_keyRotation demonstrates how keys can be rotated with riverencrypt.

var (
	keyNew = mustDecodeBase64EncodedKey("T8sUAPOQNSDDMAiMyfrK8EaLOlY/cJ21PPNn1InCqIQ=")
	keyOld = mustDecodeBase64EncodedKey("fdnQ7+v/5Pb28rYqpynRSdzWfqs1gD6/J/0I9IUh65s=")
)

// Step 0: Client starts with the old key (i.e. the one used before starting
// the key rotation).
{
	_, err := riverpro.NewClient(riverpropgxv5.New(nil), &riverpro.Config{
		Config: river.Config{
			Hooks: []rivertype.Hook{
				riverencrypt.NewEncryptHook(riversecretbox.NewEncryptor(
					keyOld,
				)),
			},
		},
	})
	if err != nil {
		panic(err)
	}
}

// Step 1: Generate a new key and add it to the encryptor (in this case
// using NaCl NewEncryptor).
//
// The new key should be in the first key position so that all new jobs will
// be encrypted with it.
{
	_, err := riverpro.NewClient(riverpropgxv5.New(nil), &riverpro.Config{
		Config: river.Config{
			Hooks: []rivertype.Hook{
				riverencrypt.NewEncryptHook(riversecretbox.NewEncryptor(
					keyNew,
					keyOld,
				)),
			},
		},
	})
	if err != nil {
		panic(err)
	}
}

// Step 2: After all jobs using the original key have rotated through the
// queue, remove the old key.
//
// Remember that there may be jobs encrypted using the original key that
// errored and are queued for future retry. The original key shouldn't be
// removed until all those jobs have gotten all the way to their last retry.
{
	_, err := riverpro.NewClient(riverpropgxv5.New(nil), &riverpro.Config{
		Config: river.Config{
			Hooks: []rivertype.Hook{
				riverencrypt.NewEncryptHook(riversecretbox.NewEncryptor(
					keyNew,
				)),
			},
		},
	})
	if err != nil {
		panic(err)
	}
}

// Output:
Example (SpecificJobKinds)

Example_specificJobKinds demonstrates how to encrypt only specific job kinds using EncryptHookConfig.JobKind.

key := mustDecodeBase64EncodedKey("iRmwTuVGl2BAwTUPRTJbP/iA2EKpTrzXpEcNIXG2BI0=")

// Demonstrates EncryptHook.JobKindsExclude.
{
	_, err := riverpro.NewClient(riverpropgxv5.New(nil), &riverpro.Config{
		Config: river.Config{
			Hooks: []rivertype.Hook{
				riverencrypt.NewEncryptHookConfig(&riverencrypt.EncryptHookConfig{
					Encryptor: riversecretbox.NewEncryptor(key),
					// Encrypt/decrypt all job args except those in this list.
					JobKindsExclude: []string{
						(EncryptArgs{}).Kind(),
					},
				}),
			},
		},
	})
	if err != nil {
		panic(err)
	}
}

// Demonstrates EncryptHook.JobKindsInclude.
{
	_, err := riverpro.NewClient(riverpropgxv5.New(nil), &riverpro.Config{
		Config: river.Config{
			Hooks: []rivertype.Hook{
				riverencrypt.NewEncryptHookConfig(&riverencrypt.EncryptHookConfig{
					Encryptor: riversecretbox.NewEncryptor(key),
					// Only encrypt/decrypt job args included in this list.
					JobKindsInclude: []string{
						(EncryptArgs{}).Kind(),
					},
				}),
			},
		},
	})
	if err != nil {
		panic(err)
	}
}

// Output:

Index

Examples

Constants

This section is empty.

Variables

var ErrNoKeyDecrypted = errors.New("no key successfully decrypted ciphertext")

ErrNoKeyDecrypted is returned by Encryptor.Decrypt in case no suitable keys were available to decrypt given ciphertext.

Functions

This section is empty.

Types

type EncryptHook

type EncryptHook struct {
	river.HookDefaults
	// contains filtered or unexported fields
}

EncryptHook encrypts and decrypts job args. It can be installed globally to a client's Hooks configuration, or to specific job args by implementing the river.JobArgsWithHooks interface.

Notably, encryption using this struct should probably not be necessary for the majority of River users. It's common practice for hosted Postgres provides to encrypt data at rest, which will cover many cases where encryption is important (e.g. the server itself is stolen). This additional encryption protects some additional cases like if the online database itself is exfiltrated through a psql shell or like. Encrypting job args does have the downside in that it makes operational insight more difficult (a human cannot easily inspect what's inside the job row from psql or other admin interfaces).

func NewEncryptHook

func NewEncryptHook(encryptor Encryptor) *EncryptHook

NewEncryptHook creates a new EncryptHook using the given encryptor where no more elaborate configuration is required.

func NewEncryptHookConfig

func NewEncryptHookConfig(config *EncryptHookConfig) *EncryptHook

NewEncryptHookConfig creates a new EncryptHook using the given config.

func (*EncryptHook) InsertBegin

func (m *EncryptHook) InsertBegin(ctx context.Context, params *rivertype.JobInsertParams) error

func (*EncryptHook) WorkBegin

func (m *EncryptHook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error

type EncryptHookConfig

type EncryptHookConfig struct {
	// DecryptOnly tells EncryptHook to decrypt jobs being worked, but not
	// encrypt new ones being inserted. This can be used to phase out encryption
	// that was previously desirable, but which it's now desired be turned off.
	// Encryption is removed from the database incrementally as existing
	// encrypted jobs are completed and new jobs are inserted without
	// encryption. After all encrypted jobs have succeeded or been discarded
	// after their final retry, EncryptHook can be removed from the client
	// completely.
	DecryptOnly bool

	// Encryptor to use for encryption and decryption.
	//
	//  Encryptors define the specific crypto to be used. See for example
	//  riverencrypt/riversecretbox for one making use of NaCl Secretbox which
	//  should be suitable for most applications.
	Encryptor Encryptor

	// JobKindsInclude can be configured to encrypt or decrypt all job args
	// except for the specific job kinds in this list.
	//
	// Only configure one of JobKindsExclude or JobKindsInclude.
	JobKindsExclude []string

	// JobKindsInclude can be configured to only encrypt or decrypt for the
	// specific job kinds in this list.
	//
	// This option and an EncryptHook being returned from a specific job arg's
	// Hooks implementation are two ways of accomplishing encryption for only
	// some job args. This method might be preferable so that encrypt hooks can
	// be initialized outside of a static context (so for example, parsing
	// encryption keys could return errors from a main run function instead of
	// having to panic).
	//
	// Only configure one of JobKindsExclude or JobKindsInclude.
	JobKindsInclude []string
}

EncryptHookConfig are configuration options for use with NewEncryptHook.

type Encryptor

type Encryptor interface {
	// Decrypt decrypts plaintext to ciphertext. It should try all available
	// keys before failing, and return the specific ErrNoKeyDecrypted error in
	// case of failure due to no suitable decryption keys.
	Decrypt(cipher []byte) ([]byte, error)

	// Encrypt encrypts plaintext to ciphertext.
	Encrypt(plain []byte) []byte
}

Encryptor is an interface that can be used to implement a specific encryption algorithm for use with EncryptHook.

It's abstracted so that River encryption isn't tied to a single specific algorithm since some users may have specific requirements for cryptography. If you don't, try riverencrypt/riversecretbox.Encryptor which uses NaCL SecretBox, widely respected cryptography that's suitable for most uses.

Directories

riversecretbox Package riversecretbox provides a riverencrypt.Encryptor implementation that uses NaCl Secretbox, a good default encryption choice using widely respected cryptography.