faust-streaming/faust

Can't use agent.test_context() for sink-less agents

dario-collavini opened this issue · 3 comments

This is cloned from the original Faust repository: robinhood/faust#657

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Trying to build a very simple unit test, for an Agent without any sink (and thus not yielding).

import faust

app=faust.App("app-id")
input_channel = app.channel()

@app.agent(input_channel)
async def handle_events(events_stream):
  async for event in events_stream.events():
    print(event)

async def test_handle_events():
    # start and stop the agent in this block
    async with handle_events.test_context() as agent:
        # sent order to the test agents local channel, and wait
        # the agent to process it.
        await agent.put("useless")

async def run_tests():
    await test_handle_events()

if __name__ == '__main__':
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_tests())

See: https://repl.it/@agrenott/faustbugtestcontext

Expected behavior

No exception :)

Actual behavior

faust.exceptions.ImproperlyConfigured: Agent must yield to use sinks is raised.
This is because test_context() creates an AgentTestWrapper, which unconditionally adds a sink to the agent (see https://github.com/robinhood/faust/blob/master/faust/agents/agent.py#L1059).
This changes the behavior of the tested agent, which is now expected to yield something.

Full traceback

Traceback (most recent call last):
  File "main.py", line 24, in <module>
    loop.run_until_complete(run_tests())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "main.py", line 19, in run_tests
    await test_handle_events()
  File "main.py", line 13, in test_handle_events
    async with handle_events.test_context() as agent:
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/mode/services.py", line 119, in __aenter__
    await self.start()
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
    await self.on_start()
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/faust/agents/agent.py", line 282, in on_start
    await self._on_start_supervisor()
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/faust/agents/agent.py", line 312, in _on_start_supervisor
    res = await self._start_one(
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/faust/agents/agent.py", line 251, in _start_one
    return await self._start_task(
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/faust/agents/agent.py", line 623, in _start_task
    return await self._prepare_actor(
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/faust/agents/agent.py", line 633, in _prepare_actor
    raise ImproperlyConfigured('Agent must yield to use sinks')
faust.exceptions.ImproperlyConfigured: Agent must yield to use sinks

Versions

  • Python version
  • Faust version
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)

You can add a yield at the end of the agent yield to test it with test_context

@app.agent(input_channel)
async def handle_events(events_stream):
  async for event in events_stream.events():
    print(event)
    yield event

Thus you can process some assertions on the yield results.

def test_handle_events():
    async with handle_events.test_context() as agent:
        event = await agent.put(...)
        result = agent.results[event.message.offset]

The thing is that test_context() wants to add a sink and this would trigger the check that we need to yield in the agent for a sink to be valid. Maybe we can check in test_context if it is already yielding and if not wrap it in some helper to yield. This might be a graceful solution...

A note about tests context and yielding

It's currently impossible to test a function that yields "sometimes", as the .put() keeps waiting forever even the async for has completed an iteration.

For agents that yields multiple elements for a single message, the test context returns after the first one, so you need to manually wait in your test until the expected the number of messages has been yielded.

Maybe if the test context was written around the consumption of the event events_stream it would be a better experience.=, but maybe that's very hard to do, just sharing my experience.