[Help]How to initialize tortoise orm in multiple workers?
Abeautifulsnow opened this issue · 2 comments
As described above, I have a FastAPI application that uses theconcurrent.futures.ProcessPoolExecutor
module to perform CPU-intensive tasks and save the results to MySQL. However, it seems that I encounter some errors when performing database query operations within this process.
- main.py
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
set_loguru()
disable_installed_extensions_check()
from manager.config import configuration
logger.info(f"configuration => {configuration}")
mysql_conf = configuration.mysql
# Handle password which includes special characters. Such as #.
encoded_password = urllib.parse.quote(mysql_conf.password)
mysql_db_url = f"mysql://{mysql_conf.username}:{encoded_password}@{mysql_conf.host}:{mysql_conf.port}/{mysql_conf.database}?charset={mysql_conf.charset}&maxsize=10"
logger.debug(f"mysql db url => {mysql_db_url}")
# app startup
async with RegisterTortoise(
app,
db_url=mysql_db_url,
modules={"models": ["manager.govern.models"]},
generate_schemas=True,
add_exception_handlers=True,
use_tz=False,
timezone="Asia/Shanghai",
):
connection_name = next(iter(connections.db_config.keys()))
connection = connections.get(connection_name)
logger.info(
f"connections db_config: {connections.db_config} | connection_name: `{connection_name}` | connection: {connection}"
)
# db connected
yield
logger.info("Start to shut down executor")
shutdown_executor()
Encounter errors...
# error.log
File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/tortoise/backends/mysql/client.py", line 199, in execute_query
await cursor.execute(query, values)
│ │ │ └ None
│ │ └ "SELECT `server_host`,`source_name`,`create_time`,`db_name`,`server_port`,`project_id`,`source_type`,`id`,`password`,`update_...
│ └ <cyfunction Cursor.execute at 0x7f431f0a4790>
└ <asyncmy.cursors.Cursor object at 0x7f431dce5240>
File "asyncmy/cursors.pyx", line 179, in execute
result = await self._query(query)
File "asyncmy/cursors.pyx", line 364, in _query
await conn.query(q)
File "asyncmy/connection.pyx", line 494, in query
await self._read_query_result(unbuffered=unbuffered)
File "asyncmy/connection.pyx", line 682, in _read_query_result
await result.read()
File "asyncmy/connection.pyx", line 1069, in read
first_packet = await self.connection.read_packet()
File "asyncmy/connection.pyx", line 617, in read_packet
packet_header = await self._read_bytes(4)
File "asyncmy/connection.pyx", line 656, in _read_bytes
data = await self._reader.readexactly(num_bytes)
File "/usr/lib/python3.10/asyncio/streams.py", line 708, in readexactly
await self._wait_for_data('readexactly')
│ └ <function StreamReader._wait_for_data at 0x7f4356e21090>
└ <StreamReader transport=<TCPTransport closed=False reading=True 0x7fffc2b4dea0>>
File "/usr/lib/python3.10/asyncio/streams.py", line 501, in _wait_for_data
await self._waiter
│ └ None
└ <StreamReader transport=<TCPTransport closed=False reading=True 0x7fffc2b4dea0>>
RuntimeError: Task <Task pending name='Task-9' coro=<instance_taos() running at /home/runstone/work/project/data-govern-manager/src/manager/govern/db.py:132> cb=[_LRUCacheWrapper._task_done_callback(<Future pendi...tasks.py:847]>, '3')()]> got Future <Future pending> attached to a different loop
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/runstone/work/project/data-govern-manager/src/manager/govern/combine.py", line 216, in do_govern_entry
await self.set_taos()
│ └ <function BackgroundGovernCombine.set_taos at 0x7f431f596d40>
└ <manager.govern.combine.BackgroundGovernCombine object at 0x7f431f0fe0b0>
File "/home/runstone/work/project/data-govern-manager/src/manager/govern/combine.py", line 205, in set_taos
self.taos_db = await instance_taos(self.govern_params.common.project_id)
│ │ │ │ │ │ └ '3'
│ │ │ │ │ └ CommonSchema(global_id='85dc9a3c-fb93-41b3-bf21-1b57b802a205', project_id='3', devcode_name='T101002', devproperty_name='...
│ │ │ │ └ GovernanceSystemPageSchema(common=CommonSchema(global_id='85dc9a3c-fb93-41b3-bf21-1b57b802a205', project_id='3', devcode_name...
│ │ │ └ <manager.govern.combine.BackgroundGovernCombine object at 0x7f431f0fe0b0>
│ │ └ <async_lru._LRUCacheWrapper object at 0x7f431f59c3a0>
│ └ None
└ <manager.govern.combine.BackgroundGovernCombine object at 0x7f431f0fe0b0>
File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/async_lru/__init__.py", line 227, in __call__
return await asyncio.shield(fut)
│ │ └ <Future finished exception=RuntimeError("Task <Task pending name='Task-9' coro=<instance_taos() running at /home/runstone/wor...
│ └ <function shield at 0x7f4357004280>
└ <module 'asyncio' from '/usr/lib/python3.10/asyncio/__init__.py'>
File "/home/runstone/work/project/data-govern-manager/src/manager/govern/db.py", line 132, in instance_taos
project = await GovernanceDatasourceModel.get_or_none(project_id=project_id)
│ │ └ '3'
│ └ <classmethod(<function Model.get_or_none at 0x7f43550f4c10>)>
└ <class 'manager.govern.models.GovernanceDatasourceModel'>
File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/tortoise/queryset.py", line 1059, in _execute
instance_list = await self._db.executor_class(
│ └ <member '_db' of 'QuerySet' objects>
└ <tortoise.queryset.QuerySet object at 0x7f431f2c8f20>
File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/tortoise/backends/base/executor.py", line 131, in execute_select
_, raw_results = await self.db.execute_query(query.get_sql())
│ │ │ │ └ <function MySQLQueryBuilder.get_sql at 0x7f43551f1090>
│ │ │ └ SELECT `server_host`,`source_name`,`create_time`,`db_name`,`server_port`,`project_id`,`source_type`,`id`,`password`,`update_t...
│ │ └ <function MySQLClient.execute_query at 0x7f431f0cc3a0>
│ └ <tortoise.backends.mysql.client.MySQLClient object at 0x7f431f030ac0>
└ <tortoise.backends.mysql.executor.MySQLExecutor object at 0x7f431f0fe230>
File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/tortoise/backends/mysql/client.py", line 44, in translate_exceptions_
return await func(self, *args)
│ │ └ ("SELECT `server_host`,`source_name`,`create_time`,`db_name`,`server_port`,`project_id`,`source_type`,`id`,`password`,`update...
│ └ <tortoise.backends.mysql.client.MySQLClient object at 0x7f431f030ac0>
└ <function MySQLClient.execute_query at 0x7f431f0cc310>
File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/tortoise/backends/mysql/client.py", line 196, in execute_query
async with self.acquire_connection() as connection:
│ │ └ <asyncmy.connection.Connection object at 0x7f431f0afe20>
│ └ <function MySQLClient.acquire_connection at 0x7f431f03ff40>
└ <tortoise.backends.mysql.client.MySQLClient object at 0x7f431f030ac0>
RuntimeError: Task <Task pending name='Task-9' coro=<instance_taos() running at /home/runstone/work/project/data-govern-manager/src/manager/govern/db.py:132> cb=[_LRUCacheWrapper._task_done_callback(<Future pendi...tasks.py:847]>, '3')()]> got Future <Task pending name='Task-10' coro=<Pool._wakeup() running at asyncmy/pool.pyx:164>> attached to a different loop
Then I realized that Python's multiprocessing involves independent resources, so I separately initialized Tortoise within this process, and everything worked fine.
- task_in_another_process.py
async def initialize_tortoise(): # How to handle this in a separate process?
mysql_conf = configuration.mysql
# Handle password which includes special characters. Such as #.
encoded_password = urllib.parse.quote(mysql_conf.password)
mysql_db_url = f"mysql://{mysql_conf.username}:{encoded_password}@{mysql_conf.host}:{mysql_conf.port}/{mysql_conf.database}?charset={mysql_conf.charset}"
logger.debug(f"mysql db url => {mysql_db_url}")
async def init_tortoise():
await Tortoise.init(
db_url=mysql_db_url,
modules={"models": ["manager.govern.models"]},
use_tz=False,
timezone="Asia/Shanghai",
)
await init_tortoise()
connection_name = next(iter(connections.db_config.keys()))
connection = connections.get(connection_name)
logger.info(
f"connections db_config: {connections.db_config} | connection_name: `{connection_name}` | connection: {connection}"
)
logger.success("Initializing Tortoise ORM")
async def do_govern_entry():
try:
# initialize tortoise orm
await initialize_tortoise()
# do other cpu tasks and save result to mysql.
except Exception as e:
...
- api.py
import asyncio
from fastapi import APIRouter
from concurrent.futures import ProcessPoolExecutor
from .task_in_another_process import do_govern_entry
executor = ProcessPoolExecutor(max_workers=configuration.concurrency_nums, initializer=set_loguru)
router = APIRouter()
def do_async(func, *args, **kwargs):
asyncio.run(func(*args, **kwargs))
@router.post("/demo")
async def demo():
# other task
executor.submit(do_async, do_govern_entry)
return {"msg": "task submitted."}
However, I feel that this approach is not quite appropriate, so I wanted to ask if there is a better way to handle this.
How about this:
#!/usr/bin/env python
from datetime import datetime
import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from contextlib import asynccontextmanager
from pathlib import Path
from typing import AsyncGenerator
import uvicorn
from faker import Faker
from fastapi import FastAPI
from tortoise import Model
from tortoise.contrib.fastapi import RegisterTortoise
from tortoise.fields import IntField, TextField
concurrency_nums = multiprocessing.cpu_count()
API_TITLE = "main"
def set_loguru() -> None:
pass
executor = ProcessPoolExecutor(max_workers=concurrency_nums, initializer=set_loguru)
class Group(Model):
id = IntField(primary_key=True)
name = TextField()
async def _initial_groups() -> None:
faker = Faker()
await Group.bulk_create([Group(name=faker.name()) for _ in range(10)])
async def do_govern_entry() -> None:
print(f"Enter do_govern_entry@{datetime.now()}")
async with lifespan(FastAPI()):
group = await Group.get(id=1)
print(f"I'm do_govern_entry: {dict(group)}")
group.name = str(datetime.now())
await group.save()
print(f"{group.id=} updated: {dict(group)}")
def do_async(func, *args, **kwargs) -> None:
try:
asyncio.run(func(*args, **kwargs))
except Exception:
import traceback
traceback.print_exc()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
async with RegisterTortoise(
app,
db_url="sqlite://db_one.sqlite3",
modules={"models": [__name__]},
generate_schemas=True,
):
if not await Group.all().count():
await _initial_groups()
yield
if app.title == API_TITLE:
executor.shutdown(wait=False, cancel_futures=True)
app = FastAPI(title=API_TITLE, lifespan=lifespan)
@app.get("/groups")
async def group_list() -> list[dict]:
objs = await Group.all()
executor.submit(do_async, do_govern_entry)
return [dict(i) for i in objs]
if __name__ == "__main__":
uvicorn.run(f"{Path(__file__).stem}:app", reload=True)
Thanks for your reponse and the code demo.
However, I see that Tortoise will also be registered more than once, once when running in the main thread and another time in a separate process. 🤔In one sense, it seems similar to what I did but yours are simpler to use.
To be frank, I was actually going to ask if there was something like a connection pool 😂 instead of doing it the way you and I are doing.