River Python: Insert in Python, work in Go

Python joins Ruby as River's second officially supported language for cross-language insertions. The riverqueue package is available on PyPI (and over on GitHub), which inserts jobs from a Python program so they can be worked over in Go.

Why Python? The ranking of a programming language's popularity can vary widely by index (measuring popularity is an uncertain science), but Python ranks as the most popular on most of them, and if anything has increased in prominence over the last few years as it's become the de facto language for ML and AI training. As much as we like Go, there's no question that Python is, and will continue, to be an important cornerstone language for our industry, and more broadly, for the very heart of civilization itself.

Why would you want Go in your Python stack? It's not appropriate in all situations, but can be useful where both languages are in use already, and its desirable to communicate from one to the other. Go's performance and memory economy tend to vastly outperform dynamic languages, so it might also be advantageous to use River to run a select subset of jobs that are performance sensitive.

The insert API

The insert API is similar to the Go. A client is initialized with a driver, and jobs inserted through it:

@dataclass
class SortArgs:
    strings: list[str]

    kind: str = "sort"

    def to_json(self) -> str:
        return json.dumps({"strings": self.strings})
        
engine = sqlalchemy.create_engine("postgresql://...")
client = riverqueue.Client(riversqlalchemy.Driver(engine))

insert_res = client.insert(
    SortArgs(strings=["whale", "tiger", "bear"]),
)

As you see above, we recommend using dataclasses for River job args. Job args should ideally be a bag of JSON properties without any fanfare, and Python data classes are perfect for defining them in a way that's terse and type safe.

SQLAlchemy is quite dominant as the preferred ORM in the Python ecosystem, so for the time being it's the only supported driver, but others are possible if there's demand.

Transactions, of course

Like with Go, transactional enqueuing is supported for robust consistency guarantees by way of Postgres transactions:

engine = sqlalchemy.create_engine("postgresql://...")
client = riverqueue.Client(riversqlalchemy.Driver(engine))

with engine.begin() as session:
    insert_res = client.insert_tx(
        session,
        SortArgs(strings=["whale", "tiger", "bear"]),
    )

asyncio

Asynchronous I/O is supported out of the box with AsyncClient, for users interested in maximizing insert performance:

engine = sqlalchemy.ext.asyncio.create_async_engine("postgresql+asyncpg://...")
client = riverqueue.AsyncClient(riversqlalchemy.AsyncDriver(engine))

insert_res = await client.insert(
    SortArgs(strings=["whale", "tiger", "bear"]),
)

And of course, the library ships with MyPy type hints for its entire API, enabling the detection of many problems through static analysis without even leaving your IDE.

Acknowledgements, more to come

See the River Python documentation for more detailed usage information.

We'd like to give a major thank you to Eric Hauser who contributed the original code. Without him, this project wouldn't have happened.

River Python's our second major ship in the last few weeks (check out Announcing River UI if you missed it), and we've got more to come. Check out our mailing list (which we send to only very occasionally) to stay appraised of new features.