MagicStack/asyncpg

GCP CloudSQL Connector with asyncpg Pool

d1manson opened this issue · 1 comments

Not sure if i'm better asking this on the cloud sql python connector repo or here, but I'm trying to understand how to effectively call asyncpg.create_pool(...) but make it work with the cloudsql connector.

I can create a single asyncpg-style connection using this:

from google.cloud.sql.connector import Connector

async def init():  
    connector = Connector(enable_iam_auth=True, loop=asyncio.get_event_loop())
    conn =  await connector.connect_async(
                instance_connection_string="my-project:region:db-name",
                driver="asyncpg",
                db="postgres",
                user="something@my-project.iam.gserviceaccount.com")
    # do init stuff...
    await conn.set_type_codec("json", encoder=json.dumps, decoder=json.loads, schema="pg_catalog")
    return conn

but i'm not sure how to do a complete drop in replacement for async.create_pool, which is what we have in the existing codebase; I'm hoping to retain the pool object as is so that the rest of the codebase doesn't need to be modified in light of switching to CloudSQL.

Thanks!

In case anyone finds it useful, i did actually come back to this and got it working as follows:

from asyncpg import Pool as Pool_original, create_pool as create_pool_original
import inspect
from google.cloud.sql.connector import Connector
import asyncio

async def connect(dsn, **kwargs):
    # The dsn should be a cloudsql instance_connection_string, not a full dsn; we use the name 'dsn' for consistency
    # with the original asyncpg.connect function arguments.

    # you may want to make this Connector a singleton rather than create a new one scoped to this function call
    connector = Connector(enable_iam_auth=True, loop=loop) 

    return await connector.connect_async(
        dsn,
        driver="asyncpg",
        user="SOME_USER_HERE",
        **kwargs
    )


class Pool(Pool_original):

    async def _get_new_connection(self):
        # this function body is copy-pasted from the base class, with just the first line modified
        con = await connect(*self._connect_args, loop=self._loop,
                            connection_class=self._connection_class,
                            record_class=self._record_class,
                            **self._connect_kwargs)

        if self._init is not None:
            try:
                await self._init(con)
            except (Exception, asyncio.CancelledError) as ex:
                # If a user-defined `init` function fails, we don't
                # know if the connection is safe for re-use, hence
                # we close it.  A new connection will be created
                # when `acquire` is called again.
                try:
                    # Use `close()` to close the connection gracefully.
                    # An exception in `init` isn't necessarily caused
                    # by an IO or a protocol error.  close() will
                    # do the necessary cleanup via _release_on_close().
                    await con.close()
                finally:
                    raise ex

        return con


def _borrow_default_kwargs(func, kwargs):
    signature = inspect.signature(func)
    return {
        **{
            k: v.default
            for k, v in signature.parameters.items()
            if v.default is not inspect.Parameter.empty and v.kind == v.KEYWORD_ONLY
        },
        **kwargs,
    }


def create_pool(dsn=None, **kwargs):
    return Pool(dsn, **_borrow_default_kwargs(create_pool_original, kwargs))

Note this would be somewhat simpler if it was possible to provide a custom .connect method for the _get_new_connection method to use - here - rather than having it be hardcoded to use the default connection.connect.