MagicStack/asyncpg

error in pytest sqlalchemy.exc.InterfaceError

zabit923 opened this issue · 1 comments

FAILED tests/test_users.py::test_get_all_users - sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: another ope...
FAILED tests/test_users.py::test_get_user_by_id - sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: another ope...

db.py

from typing import AsyncGenerator

from sqlalchemy import NullPool
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

from config import settings

engine = create_async_engine(settings.db.url)
async_session_maker = async_sessionmaker(
    bind=engine, class_=AsyncSession, expire_on_commit=False
)


async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_maker() as session:
        yield session


test_engine = create_async_engine(settings.db.test_url, poolclass=NullPool)
test_async_session_maker = async_sessionmaker(
    bind=test_engine, class_=AsyncSession, expire_on_commit=False
)


async def get_test_async_session() -> AsyncGenerator[AsyncSession, None]:
    async with test_async_session_maker() as session:
        yield session

conftest.py

import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession

from app import app
from core.database import get_async_session, get_test_async_session, test_engine
from core.database.models import Base


@pytest_asyncio.fixture(scope="function")
async def init_db():
    async with test_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    async with test_engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)


@pytest_asyncio.fixture(scope="function")
async def session(init_db) -> AsyncSession:
    async for session in get_test_async_session():
        try:
            yield session
        except IntegrityError:
            await session.rollback()
            raise
        finally:
            await session.rollback()
            await session.close()


@pytest_asyncio.fixture(scope="function")
async def client(init_db) -> AsyncClient:
    async def override_get_async_session():
        async for session in get_test_async_session():
            yield session

    app.dependency_overrides[get_async_session] = override_get_async_session

    transport = ASGITransport(app=app)
    client = AsyncClient(transport=transport, base_url="http://testserver/api/v1")
    yield client
    await client.aclose()


async def user_authentication_headers(client: AsyncClient, username: str, password: str):
    json = {"username": username, "password": password}
    response = await client.post("/users/login", json=json)
    data = response.json()
    auth_token = data["access_token"]
    headers = {"Authorization": f"Bearer {auth_token}"}
    return headers

test_users.py

@pytest.mark.asyncio
async def test_get_all_users(client: AsyncClient, session: AsyncSession):
    UserFactory._meta.sqlalchemy_session = session
    superuser = UserFactory(is_superuser=True)
    session.add(superuser)
    await session.commit()

    headers = await user_authentication_headers(client, superuser.username, "password123")
    response = await client.get(
        "/users",
        headers=headers
    )
    assert response.status_code == 200
    data = response.json()
    assert isinstance(data, list)


@pytest.mark.asyncio
async def test_get_user_by_id(client: AsyncClient, session: AsyncSession):
    UserFactory._meta.sqlalchemy_session = session
    user = UserFactory(username="singleuser", is_active=True)
    session.add(user)
    await session.commit()

    headers = await user_authentication_headers(client, user.username, "password123")
    response = await client.get(
        f"/users/{user.id}",
        headers=headers
    )
    assert response.status_code == 200
    data = response.json()
    assert data["username"] == "singleuser"

It's hard to diagnose what's going on here, but the error suggests that you're somehow using the same connection concurrently, which is not supported.