Welcome to asyncpgsa’s

A python library wrapper around asyncpg for use with sqlalchemy.

sqlalchemy ORM

Currently this repo does not support SA ORM, only SA Core.

As we at canopy do not use the ORM, if you would like to have ORM support feel free to PR it. You would need to create an “engine” interface, and that should be it. Then you can bind your sessions to the engine.

sqlalchemy Core

This repo supports sqlalchemy core.

install Currently this repo does not support SA ORM, only SA Core.

As we at canopy do not use the ORM, if you would like to have ORM support feel free to PR it. You would need to create an “engine” interface, and that should be it. Then you can bind your sessions to the engine.

Install

Examples

There are two ways to use this library, the first is by establishing a pool, and using that. The next is a singleton called PG. This manages a pool for you and makes it easy to call from module to module without having to pass anything around.

Query Object

all there examples use the variable query, this is a query object. It can either be a string or a sqlalchemy core statement. Here are some examples

# string
query = 'select * from sqrt(16)'

#sqlalchemy statement with table object
import sqlalchemy as sa
pg_tables = pg_tables = sa.Table(
         'pg_tables', sa.MetaData(),
         sa.Column('schemaname'),
         sa.Column('tablename'),
         sa.Column('tableowner'),
         sa.Column('tablespace'),
         sa.Column('hasindexes')
)
query = pg_tables.select().where(pg_tables.c.schemaname == 'pg_catalog')

# sqlalchemy statement with parameters
query = sa.select('*') \
    .select_from(sa.text('sqrt(:num) as a')) \
    .select_from(sa.text('sqrt(:a2) as b')) \
    .select_from(sa.text('sqrt(:z3) as c')) \
    .params(num=16, a2=36, z3=25)

PG Singleton

If you want the highest level of abstraction, you can can use the singleton object. This will create a pool for you.

Init

Before you can run any queries you first have to initialize the pool

await pg.init(
    host=HOST,
    port=PORT,
    database=DB_NAME,
    user=USER,
    # loop=loop,
    password=PASS,
    min_size=5,
    max_size=10
)

Query

Query is for making read only select statements. This method will create a prepared statement for you and return a cursor object that will get a couple rows at a time from the database. This is great for select statements with lots of results. You can also use query without a cursor and transaction by using await

from asyncpgsa import pg

# using a cursor and and transaction isolation
async with pg.query(select_statement) as cursor:
    async for row in cursor:
        a = row['col_name']

# no cursor or isolation, all results at once
results = await pg.query(select_statement)
for row in results:
    a = row['col_name']

fetch

Want to run a simple statement and get the results as a list? Fetch is for you.

from asyncpgsa import pg

for row in await pg.fetch(query):
    a = row['col_name']

fetchrow

This is just like fetch, but only returns a single row. Good for insert/update/delete calls.

from asyncpgsa import pg

row = await pg.fetchrow(query)
a = row['col_name']

fetchval

Like fetch row but also only a single column. Dont bother getting the whole row when you only need a single value

Column is a 0 index value.

from asyncpgsa import pg

value = await pg.fetchval(query, column=0)

Transaction

Everything is wrapped in a transaction for you, but if you need to do multiple things in a single transaction, then establish a transaction using an async with block. Commits and rollbacks will be handled for you.

from asyncpgsa import pg

async with pg.transaction() as conn:
    for row in await conn.fetch(query):
        a =  row['col_name']

    await conn.fetchval(update_query)

Begin

Begin is the same as transaction, you just get to choose which word you like best

from asyncpgsa import pg

async with pg.begin() as conn:
    for row in await conn.fetch(query):
        a =  row['col_name']

    await conn.fetchval(update_query)

Pool

If you dont mind passing around the pool object, you can use a pool directly. With the pool object, you currently have to wrap everything in a transaction.

Creating the pool

import asyncpgsa
pool = await asyncpgsa.create_pool(
    host=HOST,
    port=PORT,
    database=DATABASE,
    user=USER,
    # loop=event_loop,
    password=PASS,
    min_size=5,
    max_size=10
)

Transaction

The transaction context manager will establish a connection and start a transaction all at once. It returns the connection object. Commits and rollbacks will be handled for you.

async with pool.transaction() as conn:
    # do something with conn
    # when your code block is done, rollback/commit will happen automatically

fetch

Want to run a simple statement and get the results as a list? Fetch is for you.

#No transaction
async with pool.acquire() as conn:
    for row in await conn.fetch(query):
        a = row['col_name']

#with transaction
async with pool.transaction() as conn:
        result = await conn.fetch(query)

    for row in result:
        a = row['col_name']

fetchrow

This is just like fetch, but only returns a single row. Good for insert/update/delete calls.

async with pool.transaction() as conn:
    row = await conn.fetchrow(query)

a = row['col_name']

fetchval

Like fetch row but also only a single column. Dont bother getting the whole row when you only need a single value

Column is a 0 index value.

async with pool.transaction() as conn:
    value = await conn.fetchval(query, column=0)

json

import json
import ujson
# NOTE: ujson is very fast but ujson.dumps is not safe.

async def main():
     async def set_json_charset(connection):
         await connection.set_type_codec(
             'json',
             encoder=json.dumps,
             decoder=ujson.loads,
             schema='pg_catalog'
         )

     await pg.init("postgresql://127.0.0.1/template0", init=set_json_charset)

     ...

As another option you can initialize PostgreSQL dialect with custom JSON serializer and deserializer and pass it into pg.init

from asyncpgsa.connection import get_dialect

async def main():
    dialect = get_dialect(
        json_serializer=json.dumps,
        json_deserializer=ujson.loads
    )

    await pg.init("postgresql://127.0.0.1/template0", dialect=dialect)

    ...

Also you can initialize pool with custom dialect

import asyncpgsa
from asyncpgsa.connection import get_dialect

async def main():
    dialect = get_dialect(
        json_serializer=json.dumps,
        json_deserializer=ujson.loads
    )

    await asyncpgsa.create_pool(
        dialect=dialect,
        ...
    )

    ...

Compile

If you just want to roll you own everything and use asyncpg raw without all these wrappers, you can probably do it by just using the compile method in this repo

import asyncpgsa

query = sa.select('*').select_from(sa.text('mt_table'))
query_string, params = asyncpgsa.compile_query(query)
# Now you have the raw query string ready for asyncpg, and the ordered parameters.
results = await asyncpg_connection.fetch(query_string, params)

Testing

The library includes a testing library as well to make testing in your app easier. The testing module mocks out all the database calls and never actually hits a database.

Setting up

In order to setup a mock, all you need to do is use the MockPG module in asyncpgsa.testing Then you need to set the responses that you are expecting. Setting the responses is done by calling mock_pg.set_database_results() where every argument is a list of dictionaries.

Example test

Here is an example test.

from asyncpgsa.testing import MockPG

async def test_run_query(monkeypatch)
   pg = MockPG()
   pg.set_database_results([{'id': 1}, {'id': 2}])
   monkeypatch.setattr('mypackage.mymodule.pg', pg)
   results = await mymodule.run_query()

   assert results[0].id == 1
   assert results[1].id == 2

And another where there are multiple queries

from asyncpgsa.testing import MockPG

async def test_run_query(monkeypatch)
   pg = MockPG()
   pg.set_database_results([{'id': 1}, {'id': 2}],
                           [{'id': 28, 'name': 'bob'])
   monkeypatch.setattr('mypackage.mymodule.pg', pg)
   results = await mymodule.run_multiple_queries()

   assert results[0].id == 1
   assert results[1].id == 2
   assert results[2].name == 'bob'