tortoise/tortoise-orm

[Help]How to initialize tortoise orm in multiple workers?

Abeautifulsnow opened this issue · 2 comments

As described above, I have a FastAPI application that uses theconcurrent.futures.ProcessPoolExecutor module to perform CPU-intensive tasks and save the results to MySQL. However, it seems that I encounter some errors when performing database query operations within this process.

  • main.py
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    set_loguru()
    disable_installed_extensions_check()

    from manager.config import configuration

    logger.info(f"configuration => {configuration}")

    mysql_conf = configuration.mysql

    # Handle password which includes special characters. Such as #.
    encoded_password = urllib.parse.quote(mysql_conf.password)
    mysql_db_url = f"mysql://{mysql_conf.username}:{encoded_password}@{mysql_conf.host}:{mysql_conf.port}/{mysql_conf.database}?charset={mysql_conf.charset}&maxsize=10"
    logger.debug(f"mysql db url => {mysql_db_url}")

    # app startup
    async with RegisterTortoise(
        app,
        db_url=mysql_db_url,
        modules={"models": ["manager.govern.models"]},
        generate_schemas=True,
        add_exception_handlers=True,
        use_tz=False,
        timezone="Asia/Shanghai",
    ):
        connection_name = next(iter(connections.db_config.keys()))
        connection = connections.get(connection_name)
        logger.info(
            f"connections db_config: {connections.db_config} | connection_name: `{connection_name}` | connection: {connection}"
        )
        # db connected
        yield

        logger.info("Start to shut down executor")
        shutdown_executor()

Encounter errors...

# error.log

File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/tortoise/backends/mysql/client.py", line 199, in execute_query
    await cursor.execute(query, values)
          │      │       │      └ None
          │      │       └ "SELECT `server_host`,`source_name`,`create_time`,`db_name`,`server_port`,`project_id`,`source_type`,`id`,`password`,`update_...
          │      └ <cyfunction Cursor.execute at 0x7f431f0a4790>
<asyncmy.cursors.Cursor object at 0x7f431dce5240>
  File "asyncmy/cursors.pyx", line 179, in execute
    result = await self._query(query)
  File "asyncmy/cursors.pyx", line 364, in _query
    await conn.query(q)
  File "asyncmy/connection.pyx", line 494, in query
    await self._read_query_result(unbuffered=unbuffered)
  File "asyncmy/connection.pyx", line 682, in _read_query_result
    await result.read()
  File "asyncmy/connection.pyx", line 1069, in read
    first_packet = await self.connection.read_packet()
  File "asyncmy/connection.pyx", line 617, in read_packet
    packet_header = await self._read_bytes(4)
  File "asyncmy/connection.pyx", line 656, in _read_bytes
    data = await self._reader.readexactly(num_bytes)
  File "/usr/lib/python3.10/asyncio/streams.py", line 708, in readexactly
    await self._wait_for_data('readexactly')
          │    └ <function StreamReader._wait_for_data at 0x7f4356e21090>
<StreamReader transport=<TCPTransport closed=False reading=True 0x7fffc2b4dea0>>
  File "/usr/lib/python3.10/asyncio/streams.py", line 501, in _wait_for_data
    await self._waiter
          │    └ None
<StreamReader transport=<TCPTransport closed=False reading=True 0x7fffc2b4dea0>>

RuntimeError: Task <Task pending name='Task-9' coro=<instance_taos() running at /home/runstone/work/project/data-govern-manager/src/manager/govern/db.py:132> cb=[_LRUCacheWrapper._task_done_callback(<Future pendi...tasks.py:847]>, '3')()]> got Future <Future pending> attached to a different loop


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/home/runstone/work/project/data-govern-manager/src/manager/govern/combine.py", line 216, in do_govern_entry
    await self.set_taos()
          │    └ <function BackgroundGovernCombine.set_taos at 0x7f431f596d40>
<manager.govern.combine.BackgroundGovernCombine object at 0x7f431f0fe0b0>

  File "/home/runstone/work/project/data-govern-manager/src/manager/govern/combine.py", line 205, in set_taos
    self.taos_db = await instance_taos(self.govern_params.common.project_id)
    │    │               │             │    │             │      └ '3'
    │    │               │             │    │             └ CommonSchema(global_id='85dc9a3c-fb93-41b3-bf21-1b57b802a205', project_id='3', devcode_name='T101002', devproperty_name='...
    │    │               │             │    └ GovernanceSystemPageSchema(common=CommonSchema(global_id='85dc9a3c-fb93-41b3-bf21-1b57b802a205', project_id='3', devcode_name...
    │    │               │             └ <manager.govern.combine.BackgroundGovernCombine object at 0x7f431f0fe0b0>
    │    │               └ <async_lru._LRUCacheWrapper object at 0x7f431f59c3a0>
    │    └ None
    └ <manager.govern.combine.BackgroundGovernCombine object at 0x7f431f0fe0b0>

  File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/async_lru/__init__.py", line 227, in __call__
    return await asyncio.shield(fut)
                 │       │      └ <Future finished exception=RuntimeError("Task <Task pending name='Task-9' coro=<instance_taos() running at /home/runstone/wor...
                 │       └ <function shield at 0x7f4357004280>
                 └ <module 'asyncio' from '/usr/lib/python3.10/asyncio/__init__.py'>

  File "/home/runstone/work/project/data-govern-manager/src/manager/govern/db.py", line 132, in instance_taos
    project = await GovernanceDatasourceModel.get_or_none(project_id=project_id)
                    │                         │                      └ '3'
                    │                         └ <classmethod(<function Model.get_or_none at 0x7f43550f4c10>)>
                    └ <class 'manager.govern.models.GovernanceDatasourceModel'>

  File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/tortoise/queryset.py", line 1059, in _execute
    instance_list = await self._db.executor_class(
                          │    └ <member '_db' of 'QuerySet' objects>
                          └ <tortoise.queryset.QuerySet object at 0x7f431f2c8f20>
  File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/tortoise/backends/base/executor.py", line 131, in execute_select
    _, raw_results = await self.db.execute_query(query.get_sql())
                           │    │  │             │     └ <function MySQLQueryBuilder.get_sql at 0x7f43551f1090>
                           │    │  │             └ SELECT `server_host`,`source_name`,`create_time`,`db_name`,`server_port`,`project_id`,`source_type`,`id`,`password`,`update_t...
                           │    │  └ <function MySQLClient.execute_query at 0x7f431f0cc3a0>
                           │    └ <tortoise.backends.mysql.client.MySQLClient object at 0x7f431f030ac0>
                           └ <tortoise.backends.mysql.executor.MySQLExecutor object at 0x7f431f0fe230>
  File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/tortoise/backends/mysql/client.py", line 44, in translate_exceptions_
    return await func(self, *args)
                 │    │      └ ("SELECT `server_host`,`source_name`,`create_time`,`db_name`,`server_port`,`project_id`,`source_type`,`id`,`password`,`update...
                 │    └ <tortoise.backends.mysql.client.MySQLClient object at 0x7f431f030ac0>
                 └ <function MySQLClient.execute_query at 0x7f431f0cc310>
  File "/home/runstone/work/project/data-govern-manager/.venv/lib/python3.10/site-packages/tortoise/backends/mysql/client.py", line 196, in execute_query
    async with self.acquire_connection() as connection:
               │    │                       └ <asyncmy.connection.Connection object at 0x7f431f0afe20>
               │    └ <function MySQLClient.acquire_connection at 0x7f431f03ff40>
               └ <tortoise.backends.mysql.client.MySQLClient object at 0x7f431f030ac0>

RuntimeError: Task <Task pending name='Task-9' coro=<instance_taos() running at /home/runstone/work/project/data-govern-manager/src/manager/govern/db.py:132> cb=[_LRUCacheWrapper._task_done_callback(<Future pendi...tasks.py:847]>, '3')()]> got Future <Task pending name='Task-10' coro=<Pool._wakeup() running at asyncmy/pool.pyx:164>> attached to a different loop

Then I realized that Python's multiprocessing involves independent resources, so I separately initialized Tortoise within this process, and everything worked fine.

  • task_in_another_process.py
async def initialize_tortoise():    # How to handle this in a separate process?
    mysql_conf = configuration.mysql

    # Handle password which includes special characters. Such as #.
    encoded_password = urllib.parse.quote(mysql_conf.password)
    mysql_db_url = f"mysql://{mysql_conf.username}:{encoded_password}@{mysql_conf.host}:{mysql_conf.port}/{mysql_conf.database}?charset={mysql_conf.charset}"
    logger.debug(f"mysql db url => {mysql_db_url}")

    async def init_tortoise():
        await Tortoise.init(
            db_url=mysql_db_url,
            modules={"models": ["manager.govern.models"]},
            use_tz=False,
            timezone="Asia/Shanghai",
        )

    await init_tortoise()

    connection_name = next(iter(connections.db_config.keys()))
    connection = connections.get(connection_name)
    logger.info(
        f"connections db_config: {connections.db_config} | connection_name: `{connection_name}` | connection: {connection}"
    )
    logger.success("Initializing Tortoise ORM")


async def do_govern_entry():
        try:
            # initialize tortoise orm
            await initialize_tortoise()
            # do other cpu tasks and save result to mysql.
        except Exception as e:
            ...
  • api.py
import asyncio
from fastapi import APIRouter
from concurrent.futures import ProcessPoolExecutor

from .task_in_another_process import do_govern_entry

executor = ProcessPoolExecutor(max_workers=configuration.concurrency_nums, initializer=set_loguru)
router = APIRouter()

def do_async(func, *args, **kwargs):
    asyncio.run(func(*args, **kwargs))

@router.post("/demo")
async def demo():
    # other task
    executor.submit(do_async, do_govern_entry)
    return {"msg": "task submitted."}

However, I feel that this approach is not quite appropriate, so I wanted to ask if there is a better way to handle this.

How about this:

#!/usr/bin/env python
from datetime import datetime
import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from contextlib import asynccontextmanager
from pathlib import Path
from typing import AsyncGenerator

import uvicorn
from faker import Faker
from fastapi import FastAPI
from tortoise import Model
from tortoise.contrib.fastapi import RegisterTortoise
from tortoise.fields import IntField, TextField

concurrency_nums = multiprocessing.cpu_count()
API_TITLE = "main"


def set_loguru() -> None:
    pass


executor = ProcessPoolExecutor(max_workers=concurrency_nums, initializer=set_loguru)


class Group(Model):
    id = IntField(primary_key=True)
    name = TextField()


async def _initial_groups() -> None:
    faker = Faker()
    await Group.bulk_create([Group(name=faker.name()) for _ in range(10)])


async def do_govern_entry() -> None:
    print(f"Enter do_govern_entry@{datetime.now()}")
    async with lifespan(FastAPI()):
        group = await Group.get(id=1)
        print(f"I'm do_govern_entry: {dict(group)}")
        group.name = str(datetime.now())
        await group.save()
        print(f"{group.id=} updated: {dict(group)}")


def do_async(func, *args, **kwargs) -> None:
    try:
        asyncio.run(func(*args, **kwargs))
    except Exception:
        import traceback

        traceback.print_exc()


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    async with RegisterTortoise(
        app,
        db_url="sqlite://db_one.sqlite3",
        modules={"models": [__name__]},
        generate_schemas=True,
    ):
        if not await Group.all().count():
            await _initial_groups()
        yield
    if app.title == API_TITLE:
        executor.shutdown(wait=False, cancel_futures=True)


app = FastAPI(title=API_TITLE, lifespan=lifespan)


@app.get("/groups")
async def group_list() -> list[dict]:
    objs = await Group.all()
    executor.submit(do_async, do_govern_entry)
    return [dict(i) for i in objs]


if __name__ == "__main__":
    uvicorn.run(f"{Path(__file__).stem}:app", reload=True)

Thanks for your reponse and the code demo.

However, I see that Tortoise will also be registered more than once, once when running in the main thread and another time in a separate process. 🤔In one sense, it seems similar to what I did but yours are simpler to use.

To be frank, I was actually going to ask if there was something like a connection pool 😂 instead of doing it the way you and I are doing.