miguelgrinberg/flask-celery-example

How to mock the self. update in unit test

umaparvat opened this issue · 1 comments

how to write a unit test for this .

@celery.task(bind=True)
def long_task(self):
    """Background task that runs a long function with progress reports."""
    verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
    adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
    noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
    message = ''
    total = random.randint(10, 50)
    for i in range(total):
        if not message or random.random() < 0.25:
            message = '{0} {1} {2}...'.format(random.choice(verb),
                                              random.choice(adjective),
                                              random.choice(noun))
        self.update_state(state='PROGRESS',
                          meta={'current': i, 'total': total,
                                'status': message})
        time.sleep(1)
    return {'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}

When i write a unit test, it fails at unable to open the database file. fails at update_state line.

def test_long_task(self, mocker):
        s = mocker.patch('service.random.randint')
        s.return_value = 5
        res = service.long_task()
        assert res == {'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}

PS: i've used the same code and must to write a unit test for sonarqube coverage and for my CI/CD pipeline. There it fails with error as unable to connect with database. i've used sqlite for celery backend instead of redis.

What's the error that you are getting? Copy the entire thing, including the stack trace please.