Testing your app that uses SAQ
grigi opened this issue · 4 comments
I find I'm needing to do a lot of mocking to unit test my app that uses SAQ.
It would be great to have a test double of Queue that basically does NOPs.
What would be needed to make this comprehensive enough to be included in the library as test helper?
imo this is out of scope and it's better to just add redis + saq to your unit tests
the alternative would be you'd have to basically implement redis
i'm not sure what the state of this is yet https://pypi.org/project/fakeredis/
but you could look into it
I am specifically talking of a lightweight test double, not something comprehensive.
e.g:
- confirming that a task got queued
- running a task
The latter is easier as you can just call it, but generating a valid ctx
is something that, if needed, can be done transparently.
The prior really just needs to replace the Queue.enque()
method.
It's not for testing that saq itself works right, so assumption is that retries, delayed execution all works just fine.
It could probably be done with no replacement of redis itself.
is using unittest mock not enough?
It's hard, I need the job object back when queueing so basically I need to implement all the logic for that in the mocks.
I have a small prototype:
@patch('magneto.config.Config.queue', new=TestQueue())
class TestSaq(IsolatedAsyncioTestCase):
async def test_queue(self):
job = await queue.enqueue('add', val1=3, val2=5, timeout=10)
self.assertIsInstance(job, Job)
self.assertEqual(job.function, 'add')
self.assertEqual(job.timeout, 10)
self.assertEqual(job.status, Status.QUEUED)
self.assertEqual(job.kwargs, {'val1': 3, 'val2': 5})
And a simple double:
class TestQueue(Queue):
def __init__(self) -> None:
super().__init__(redis=AsyncMock(spec=Redis))
async def enqueue(self, job_or_func: str | Job, **kwargs: t.Any) -> Job:
job_kwargs: dict[str, t.Any] = {}
for k, v in kwargs.items():
if k in Job.__dataclass_fields__: # pylint: disable=no-member
job_kwargs[k] = v
else:
job_kwargs.setdefault("kwargs", {})[k] = v
if isinstance(job_or_func, str):
job = Job(function=job_or_func, **job_kwargs)
else:
job = job_or_func
for k, v in job_kwargs.items():
setattr(job, k, v)
job.queue = self
job.queued = now()
job.status = Status.QUEUED
return job
If there is some small refactoring in the saq.queue.Queue
class that TestQueue
can be made significantly smaller.
If this is expanded to also make calling tasks a little easier (e.g. generating a valid ctx), and handle queue.apply()
/queue.map()
it would probably suit the needs of 80% of users.