airtai/fastkafka

Enable FastAPI + FastKafka testing

Gerleff opened this issue · 3 comments

@pytest.fixture
async def app(fastkafka_tester: Tester):
    app = prepare_fastapi_app()
    async with fastkafka_tester.fastapi_lifespan(settings.KAFKA.NAME)(app):
        yield app

If I'm using this fixture to test FastAPI application on sequence, having producer's call, I'm receiving

>       fut = await producer.send(
            topic, encoder_fn(wrapped_val.message), key=wrapped_val.key
        )
E       AttributeError: 'NoneType' object has no attribute 'send'

Because producers were not instantiated properly.
Am I doing something wrong?

Also this works:

@pytest.fixture
async def app(fastkafka_app: FastKafka):
    app = prepare_fastapi_app()
    async with fastkafka_app.fastapi_lifespan(settings.KAFKA.NAME)(app):
        yield app

Hi,

FastKafka app needs to be started so that producers and consumers are instantiated.
You can do that either by:

  1. Using the app as a context manager and yielding it
  2. When testing, passing the app to the tester and then doing the step 1. with the Tester instance

I see you are doing something similar to this but in the first example you are calling fast_api lifespan which returns a different context manager than the application. This should work:

@pytest.fixture
async def app(fastkafka_tester: Tester):
    app = prepare_fastapi_app()
    # you can setup the app inside tester like this, not really intended to be used like this
    tester.apps = [app]
    # ommit fastapi_lifespan
    async with fastkafka_tester:
        yield app # maybe yield the tester object also?

Can you provide a complete example of what are you trying to implement?

Updated working example

Provided solution throws "TypeError: 'Tester' object is not callable"

Hi, here is an example of pytest fixtures and testing FastKafka alongside FastAPI.
Let me know if this is something you are trying to implement to test FastAPI with FastKafka using fixtures.

import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from pydantic import BaseModel, Field

from fastkafka import FastKafka
from fastkafka.testing import Tester


# Define test msg
class Msg(BaseModel):
    msg: str = Field(...)

# Fixture for FastKafka application
@pytest.fixture
def fastapi_kafka_app():
    fastapi_kafka_app = FastKafka(
        kafka_brokers=dict(localhost=dict(url="localhost", port=9092)),
        group_id="app_for_fastapi_group",
    )

    @fastapi_kafka_app.produces(topic="predictions")
    async def to_app_for_fastapi_predictions(prediction: Msg) -> Msg:
        print("sending predictions on app_for_fastapi")
        return prediction

    return fastapi_kafka_app

# Fixture for FastAPI application
@pytest.fixture
def fastapi_app(fastapi_kafka_app):
    # Connect FastKafka to FastAPI app
    fastapi_app = FastAPI(lifespan=fastapi_kafka_app.fastapi_lifespan("localhost"))

    @fastapi_app.get("/predict")
    async def predict():
        # Return hello response and produce hello
        await fastapi_kafka_app.to_app_for_fastapi_predictions(Msg(msg="hello"))
        return {"result": "hello"}

    return fastapi_app


@pytest.mark.asyncio
async def test_pytest(fastapi_kafka_app, fastapi_app):
    # Start tester with FastKafka app
    async with Tester(fastapi_kafka_app) as tester:
        # Prepare TestClient with FastAPI app
        client = TestClient(fastapi_app)
        client.get("/predict")
        # Check that our app sent a msg into prediction topic after calling GET /predict
        await tester.on_predictions.assert_awaited_with(Msg(msg="hello"), timeout=5)