How to mock the self. update in unit test
umaparvat opened this issue · 1 comments
umaparvat commented
how to write a unit test for this .
@celery.task(bind=True)
def long_task(self):
"""Background task that runs a long function with progress reports."""
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
for i in range(total):
if not message or random.random() < 0.25:
message = '{0} {1} {2}...'.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total,
'status': message})
time.sleep(1)
return {'current': 100, 'total': 100, 'status': 'Task completed!',
'result': 42}
When i write a unit test, it fails at unable to open the database file. fails at update_state line.
def test_long_task(self, mocker):
s = mocker.patch('service.random.randint')
s.return_value = 5
res = service.long_task()
assert res == {'current': 100, 'total': 100, 'status': 'Task completed!',
'result': 42}
PS: i've used the same code and must to write a unit test for sonarqube coverage and for my CI/CD pipeline. There it fails with error as unable to connect with database. i've used sqlite for celery backend instead of redis.
miguelgrinberg commented
What's the error that you are getting? Copy the entire thing, including the stack trace please.