GoogleCloudPlatform/cloud-sql-python-connector

Support multiple event loops with a given `Connector`

colonelpanic8 opened this issue ยท 9 comments

Bug Description

This can occur when sqlalchemy attempts to create a new connection in the context of event loop that is not the event loop that cloud-sql-python-connector is using.

The connector has a member _client:

the code is careful to make sure to instantiate this client in the correct async context (in the running event loop). The reason this has to be done lazily is really just because the init of CloudSQLClient initializees an aiohttp.ClientSession:

ClientSession must be instantiated on the even loop it will be used on because of this:

https://github.com/aio-libs/aiohttp/blob/f662958b150a9d8d92fcbd0c9235e6bee1bedd67/aiohttp/client.py#L257

now, whenever a request is made with this client session, a TimeoutHandle is created, parameterized by self._loop:

https://github.com/aio-libs/aiohttp/blob/f662958b150a9d8d92fcbd0c9235e6bee1bedd67/aiohttp/client.py#L443

This propagates to a TimerContext here:

https://github.com/aio-libs/aiohttp/blob/f662958b150a9d8d92fcbd0c9235e6bee1bedd67/aiohttp/helpers.py#L651

which we enter here:

https://github.com/aio-libs/aiohttp/blob/f662958b150a9d8d92fcbd0c9235e6bee1bedd67/aiohttp/client.py#L474

now when we enter this timer context we try to find the current task:

https://github.com/aio-libs/aiohttp/blob/f662958b150a9d8d92fcbd0c9235e6bee1bedd67/aiohttp/helpers.py#L697 on the right loop, which if we have executed on a different loop, will raise an exception.

One way to fix this would be to make sure that we are running on the right event loop in _perform_refresh:

async def _perform_refresh(self) -> ConnectionInfo:

alternatively, we could try to schedule it properly here:

scheduled_task = asyncio.create_task(_refresh_task(self, delay))

You could also try to make the argument that connectors should be 1:1 with event loops that they are being used on, but at the very least, I think we could add some type of check that we are on the correct event loop in _perform_refresh, especially given that things can actually work perfectly well and only fail later if multiple event loops are used.

Furthermore, the connector class currently handles building a separate event loop if you do not provide one on which it should execute, which seems to strongly suggest that there is no expectation that you have to inject the loop.

I'm happy to implement a fix and I have several ideas about how to do so.

Example code (or command)

No response

Stacktrace

Here is an example stacktrace:

File "/nix/store/ixbvdm0yi27i0wbbpa8fpz854rsdyy5r-python3.11-railbird-0.1.0/lib/python3.11/site-packages/railbird/util/sqlalchemy.py", line 97, in execute_async
result = await session.execute(query) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 455, in execute
result = await greenlet_spawn(
^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 190, in greenlet_spawn
result = context.throw(*sys.exc_info())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2308, in execute
return self._execute_internal(
^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2180, in _execute_internal
conn = self._connection_for_bind(bind)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2047, in _connection_for_bind
return trans._connection_for_bind(engine, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "", line 2, in _connection_for_bind
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
ret_value = fn(self, *arg, **kw)
^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1143, in _connection_for_bind
conn = bind.connect()
^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3268, in connect
return self._connection_cls(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 145, in init
self._dbapi_connection = engine.raw_connection()
^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3292, in raw_connection
return self.pool.connect()
^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 452, in connect
return _ConnectionFairy._checkout(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 1269, in _checkout
fairy = _ConnectionRecord.checkout(pool)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 716, in checkout
rec = pool._do_get()
^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 169, in _do_get
with util.safe_reraise():
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 146, in exit
raise exc_value.with_traceback(exc_tb)
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 167, in _do_get
return self._create_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 393, in _create_connection
return _ConnectionRecord(self)
^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 678, in init
self.__connect()
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 902, in __connect
with util.safe_reraise():
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 146, in exit
raise exc_value.with_traceback(exc_tb)
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 898, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 365, in
return lambda rec: creator_fn()
^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 112, in creator
return sync_engine.dialect.dbapi.connect( # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 936, in connect
await_only(creator_fn(*arg, **kw)),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 125, in await_only
return current.driver.switch(awaitable) # type: ignore[no-any-return]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 185, in greenlet_spawn
value = await result
^^^^^^^^^^^^
File "/nix/store/ixbvdm0yi27i0wbbpa8fpz854rsdyy5r-python3.11-railbird-0.1.0/lib/python3.11/site-packages/railbird/util/gcp.py", line 144, in getconn_async
return await connector.connect_async(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/8gjdx398fq4ss9a74ydppacmdzspnybl-python3.11-cloud-sql-python-connector-1.9.0/lib/python3.11/site-packages/google/cloud/sql/connector/refresh_utils.py", line 66, in _is_valid
metadata = await task
^^^^^^^^^^
RuntimeError: Timeout context manager should be used inside a task


### Steps to reproduce?

Its a bit hard to reproduce the issue because you need to trigger the need to connect in sqlalchemy.

### Environment

1. OS type and version:
2. Python version:
3. Cloud SQL Python Connector version:

None of these are particularly relevant. I fully understand the cause of the issue.

### Additional Details

_No response_

Hi @colonelpanic8 ๐Ÿ˜„ Thanks for the detailed explanation of the issue you are currently seeing.

Its a bit hard to reproduce the issue because you need to trigger the need to connect in sqlalchemy.

I'm trying to understand the use-case here as I've never run into this error myself. Do you mind sharing the piece of your application code where you are creating the Connector as well as how you are creating the sqlalchemy connection pool...?

The above info would be beneficial to wrap my head around whether this a config issue or indeed a bug. Thanks in advance ๐Ÿ˜„

The above info would be beneficial to wrap my head around whether this a config issue or indeed a bug. Thanks in advance ๐Ÿ˜„

The way our stuff is configured is kind of complicated, so I can't really just copy paste our stuff in a way that's going to be easy for you to understand.

I'm super sure about my understanding of the underlying issue though. It will occur whenever multiple event loops are used with a single connector.

You could certainly make the argument that the Async Driver Usage section in the readme sort of suggests that you need to be on the event loop that you want to use when you initialize things:

The Cloud SQL Connector has a helper create_async_connector function that is recommended for asyncio database connections. It returns a Connector object that uses the current thread's running event loop.

the thing is that it goes on to say:

This is different than Connector() which by default initializes a new event loop in a background thread.

which doesn't really make it clear that a connector that does so can't ever be used in an async context.

In the case of my application, we actually use multiple different event loops for different things. I think that if this use case is not supported it should be made clear that:

  • A new connector needs to be instantiated for each event loop in which there will be async usage
  • connectors not instantiated either with create_async_connector or with a loop explicitly passed in can't be used on a different event loop.

I also think it would be relatively easy to simply make this sort of thing impossible by simply requiring that connection instantiation always happen on the correct event loop.

On the other hand, I do think it may actually be possible to make a single connector support multiple event loops, but perhaps that is not the best thing to do. It's not totally clear to me to what extent this is even supported by sqlalchemy's async facilities.

Also, one detail that I should make clear here is the following:

Everything works fine as currently configured the vast majority of the time.

It is only when we encounter an intermittent error with connection creation that we encounter this issue.

Thanks @colonelpanic8 again for the additional details.

In the case of my application, we actually use multiple different event loops for different things. I think that if this use case is not supported it should be made clear that:

  • A new connector needs to be instantiated for each event loop in which there will be async usage
  • Connectors not instantiated either with create_async_connector or with a loop explicitly passed in can't be used on a different event loop.

I think those two statements are good summaries of the issue at hand. Today we do not support connections across multiple event loops for a single Connector. Whichever loop the Connector is initialized with is the event loop connect_async must be called from. We can probably update our README to make this a bit more clear.

@colonelpanic8 in your use case are you initializing a Connector and then submitting the coro for connector.connect_async to different event loops? I think that is my understanding of it but please correct me if I am wrong.

I'm trying to think if we want the Connector to support multiple event loops... a Connector shares resources that require an event loop such as the aiohttp.ClientSession and its generated client keys across connections. So that you can have connections to several Cloud SQL instances all using the same Connector.

I guess we could maybe have connect_async check if its running loop matches the loop the Connector was initialized with and have some additional logic. Might get pretty messy though, curious if you had any thoughts?

This may be related to #969 or even potentially a duplicate, as essentially gevent or eventlet also runs into a similar issue when attempting to use a Connector across multiple event loops.

@colonelpanic8 in your use case are you initializing a Connector and then submitting the coro for connector.connect_async to different event loops? I think that is my understanding of it but please correct me if I am wrong.

No.

I'm initializing an async_engine factory with a link to the connector on a dependency injection object. Then different objects use this engine factory to produce engines, possibly on different event loops.

I guess we could maybe have connect_async check if its running loop matches the loop the Connector was initialized with and have some additional logic. Might get pretty messy though, curious if you had any thoughts?

I don't think this is very messy at all. its basically just a couple of lines. This is basically the sort of thing that I have in mind.

I still actually think there is another option, which is to simply use asyncio.run_coroutine_threadsafe to run the connect_async on the CORRECT event loop and then await that future. This would make it perfectly fine to use a single connector across multiple threads.

That does open a whole can of worms though, and it would require careful thinking about the details of the implementation.

a Connector shares resources that require an event loop such as the aiohttp.ClientSession and its generated client keys across connections.

As far as I can tell. This is the only real reason this is a requirement, and I think that sharing of keys like that could pretty easily be accomplished without sharing through the ClientSession object.

Hi @colonelpanic8 I've changed this into a FR and not a bug as this is a feature we don't support currently (I'll put up a PR with small comment/docs fix making this more clear).

For now I would recommend creating a Connector per event loop as a workaround.

I'm initializing an async_engine factory with a link to the connector on a dependency injection object. Then different objects use this engine factory to produce engines, possibly on different event loops.

Are the different event loops running in different threads? I assume so but just wanted to make sure. Otherwise I would suggest just using a single event loop for your use-case.

Thanks again for bringing this to our attention ๐Ÿ˜„

Are the different event loops running in different threads?

yes.

I still think there is a real "bug" here, because its very easy to use things in a way that is going to cause issues when the connections later get refreshed. Doc fixes are great, but its super easy to do a simple check that just says hey are we on the right thread/loop and then give a more comprehensible error.

its very easy to use things in a way that is going to cause issues when the connections later get refreshed. Doc fixes are great, but its super easy to do a simple check that just says hey are we on the right thread/loop and then give a more comprehensible error.

@colonelpanic8 I put up a WIP PR to showcase the kind of error we could raise. Let me know if you think it looks like an actionable error message ๐Ÿ˜„