MagicStack/asyncpg

add listener callback not getting executed in asyncio pytest framework

sdalmia11 opened this issue · 0 comments

def _notification_callback(self, _, pid, channel, payload):
        LOG.info("_notification_callback invoked", channel=channel, pid=pid, payload_preview=str(payload)[:100])
        print(f"Notification callback invoked for channel: {channel}, pid: {pid}, payload: {payload}")

        # Create a task to process this notification asynchronously
        # This avoids blocking the callback which would block the connection
        try:
            asyncio.create_task(
                self._process_notification_callback(payload, channel, pid)
            )
        except RuntimeError as error:
            self._log_error(
                "Failed to create notification processing task",
                pid=pid,
                error=str(error),
            )
async def _process_notification_callback(self, payload, channel, pid):
        """
        Process a notification received via callback.
        This runs in a separate task to avoid blocking the connection.
        """
        try:
            notification = json.loads(payload)
            LOG.info("Raw payload received", notification=notification, channel=channel)
            await self.distribute_notification(notification)
        except asyncio.CancelledError:
            LOG.info("Notification processing cancelled", channel=channel, pid=pid)
            raise
        except json.JSONDecodeError as error:
            self._log_error(
                "Failed to decode JSON payload from notification",
                pid=pid,
                error=str(error),
                payload=payload,
            )
        except Exception as error:
            self._log_error(
                "Error processing notification from callback",
                pid=pid,
                error=str(error),
            )
async def _listen_to_channel(self):
        """
        Background task that listens to the database notification channel.
        Uses db_backoff decorator to handle reconnection logic.
        """
        LOG.info("Connecting to database for notifications", channel=self.channel)
        conn = await asyncpg.connect(
            user=SETTINGS.db_user,
            password=SETTINGS.db_password,
            host=SETTINGS.db_host,
            port=SETTINGS.db_port,
            database=SETTINGS.database,
        )
        LOG.info("Database connection established", channel=self.channel)
        try:
            # Listen to the channel
            await conn.add_listener(self.channel, self._notification_callback)
            LOG.info("Listening on channel", channel=self.channel)
            # Main notification loop
            while self.running:
                try:
                    # Just keep the connection alive
                    await asyncio.sleep(1)
                except asyncio.TimeoutError:
                    # Timeout is expected, just continue the loop
                    continue
                except asyncio.CancelledError:
                    LOG.info(
                        "Notification listener cancelled received from the running task",
                        channel=self.channel,
                    )
                    break
                except Exception as error:
                    self._log_error(
                        "Error processing notification",
                        error=str(error),
                    )
        finally:
            # Clean up listener and close connection
            try:
                await conn.remove_listener(self.channel, self._notification_callback)
                await conn.close()
                LOG.info("Database connection closed", channel=self.channel)
            except Exception as error:
                # Handle any errors during cleanup
                self._log_error(
                    "Error closing database connection",
                    error=str(error),
                )

Below is the tests setup

@pytest_asyncio.fixture(scope="session")
async def pg_conn_fixture():
    """PG Connection fixture"""
    with patch(
        "dlnspublisher.core.notification_manager.DalEngine.get_connection"
    ) as mock_get_connection:
        conn = await asyncpg.connect(
            dsn="postgres://postgres:postgres@dlnsdb:5432/postgres"
        )
        mock_get_connection.return_value = conn
        yield
        await conn.close()


@pytest_asyncio.fixture(scope="session")
async def fixture_app(pg_conn_fixture):
    yield create_app()


@pytest.fixture(scope="session")
async def fixture_async_client(fixture_app) -> AsyncGenerator[AsyncClient, None]:
    async with LifespanManager(fixture_app):
        async with AsyncClient(
            transport=ASGITransport(app=fixture_app), base_url="http://test"
        ) as async_client:
            yield async_client

@pytest.mark.asyncio
async def test_sse_event_queue_receives_inserted_notification(
    fixture_async_client: AsyncClient,
    fixture_app,
    insert_distribution_notification,
):
    """
    tests /eap/notifications/sse returns OK when valid JWT is passed.
    """
    insert_distribution_notification()
    async with fixture_async_client.stream(
        "GET",
        "/eap/notifications/sse",
        headers={HTTP_HEADER_CIS_JWT: VALID_JWT},
    ) as response:
        try:
            chunk = await asyncio.wait_for(
                response.aiter_bytes().__anext__(), timeout=0.1
            )
            # SSE events are typically in the format: b'data: ...\n\n'
            assert chunk
            # Optionally, parse the event data
            if chunk.startswith(b"data:"):
                data = chunk[len(b"data:") :].strip()
                # Try to decode JSON if possible
                try:
                    payload = json.loads(data)
                    assert payload.get("payload") == "example"
                except Exception:
                    # If not JSON, just check content
                    assert b"example" in data
        except asyncio.TimeoutError:
            assert False, "No event received from SSE endpoint after DB insert"

When I run the test case, fixture_app runs the fastAPI app and starts listening to the channel and then insert_distribution_notification inserts some data into a table which triggers the NOTIFY channel, however the callback for add_listener nevers gets executed, as I have a pdb debugger in that function but it never executes the call back and the test case gets stuck indefinitely.

I could see in my pg_stats_activity that the LISTEN channel process through fixture_app is successful, however callback nevers executed even though the NOTIFY channel name is triggered through insert_distribution_notification fixture