miguelgrinberg/microblog

Unable to load celery application. 'str' object has no attribute 'level' when Integrating Celery with Flask?

JTP-123 opened this issue · 17 comments

In order to issue long running task like deleting posts in background job, Because I used windows platform, I decided to integrate Celery with Flask as a new beginner rather than rq, But an error report occurs:

[2023-10-22 18:15:47,668\] INFO in __init__: Microblog startup
  [2023-10-22 18:15:47,684\] INFO in __init__: Microblog startup
  Usage: celery \[OPTIONS\] COMMAND \[ARGS\]...
  Try 'celery --help' for help.

  Error: Invalid value for '-A' / '--app':
  Unable to load celery application.
  'str' object has no attribute 'level'

The file structure is as follows:

--app
    --__init__.py
  --make_celery.py

The following code is :

// The __init__.py
def celery_init_app(app: Flask):
   class FlaskTask(Task):
      def __call__(self, *args: object, **kwargs: object):
         with app.app_context():
            return self.run(*args, **kwargs)

   celery_app  = Celery(app.name, task_cls=FlaskTask)
   celery_app.config_from_object(app.config['CELERY'])
   celery_app.set_default()
   app.extensions['celery'] = celery_app 
   return celery_app 


// call the celery_init_app in factory function
def create_app(config_class=Config):
   app.config.from_object(config_class)
   app.config.from_mapping(
    CELERY=dict(
        broker_url = "pyamqp://guest@localhost//",
        result_backend = "redis://localhost",

        task_serializer = 'json',
        result_serializer = 'json',
        accept_content = ['json'],
        task_ignore_result = True,
    ),
   )
   app.config.from_prefixed_env()

   // ... other configuration code
   celery_init_app(app)
   return app

The make_celery code is:

from app import create_app 

flask_app = create_app()
celery_app = flask_app.extensions["celery"]

I do not know why it just shows a single mesage, not a full traceback report. I depolyed it on windows platform and I don't know whether it is related or not, I would really appreciate it if anyone can help.

This app is a few years old, but in it I show how to integrate Flask with Celery: https://github.com/miguelgrinberg/flasky-with-celery

I have read your article, but I still don't know how to use it with celery in micrblog to issue time-consuming task like post deleting. Precisely speaking, The effort I made was to make that configuration code like the flask with celery and apply @shared_task to decortation function delete post, but it still seems not work

Unfortunately the example I have shared above is the only example I have. If that is not what you need, then you will need to find a more recent example from someone else.

The sole reason that i want to use celery just for solving hiccup caused by the post submission or deletion without opeing eslaticseach, is there another solution? Anyway, thank you for your help.

I do not know what hiccup you are experiencing.

That is when I decide to submit a post without running elasticseach, it will report elastic_transport.ConnectionError, but If I reopen the index page, the post is indeed submitted successfully. But when post submission is made with a running elasticsearch, such connection error will not happen.
So I want to hand over this time-consuming background job to celery to avoid such thing.

That is when I decide to submit a post without running elasticseach, it will report elastic_transport.ConnectionError, but If I reopen the index page, the post is indeed submitted successfully. But when post submission is made with a running elasticsearch, such connection error will not happen. So I want to hand over this time-consuming background job to celery to avoid such thing.

Not only post submisson invloves but also post deletion

The indexing functions in search.py silently return when elasticsearch isn't running, so you would not see any errors. See https://github.com/miguelgrinberg/microblog/blob/main/app/search.py#L5

Sorry, I do not understand. Actually I already have such code in search.py, but error still happens.

Show me the complete error please

Here is the full error:

Traceback (most recent call last):
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 2548, in __call__
    return self.wsgi_app(environ, start_response)
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 2528, in wsgi_app
    response = self.handle_exception(e)
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 2525, in wsgi_app
    response = self.full_dispatch_request()
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 1822, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 1820, in full_dispatch_request
    rv = self.dispatch_request()
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 1796, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask_login\utils.py", line 290, in decorated_view
    return current_app.ensure_sync(func)(*args, **kwargs)
  File "C:\Users\sky\python_work\microblog\app\main\routes.py", line 37, in index
    db.session.commit()
  File "<string>", line 2, in commit
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1451, in commit
    self._transaction.commit(_to_root=self.future)
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\sqlalchemy\orm\session.py", line 839, in commit
    self.session.dispatch.after_commit(self.session)
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\sqlalchemy\event\attr.py", line 247, in __call__
    fn(*args, **kw)
  File "C:\Users\sky\python_work\microblog\app\models.py", line 48, in after_commit
    add_to_index(obj.__tablename__, obj)
  File "C:\Users\sky\python_work\microblog\app\search.py", line 9, in add_to_index
    current_app.elasticsearch.index(index=index, id=model.id, body=payload) # store index
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\elasticsearch\_sync\client\utils.py", line 414, in wrapped
    return api(*args, **kwargs)
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\elasticsearch\_sync\client\__init__.py", line 2265, in index
    return self.perform_request(  # type: ignore[return-value]
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\elasticsearch\_sync\client\_base.py", line 286, in perform_request
    meta, resp_body = self.transport.perform_request(
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\elastic_transport\_transport.py", line 329, in perform_request
    meta, raw_data = node.perform_request(
  File "c:\users\sky\python_work\microblog\venv\lib\site-packages\elastic_transport\_node\_http_urllib3.py", line 199, in perform_request
    raise err from None
elastic_transport.ConnectionError: Connection error caused by: ConnectionError(Connection error caused by: NewConnectionError(<urllib3.connection.HTTPConnection object at 0x00000209B71DAC70>: Failed to establish a new connection: [WinError 10061] 由于目标计算机积极拒绝,无法连接。))

The problem is that app.elasticsearch is not None in your situation. Make sure you don't have the ELASTICSEARCH_URL environment variable set. The application only attempts to talk to Elasticsearch when this environment variable is set.

Thank you so much. I indeed set ELASTICSEARCH_URL environment variable and it works when this environment variable was deleted. Can I ask why the application try to talk to elascticsearrch when the environment variable ELASTICSEARCH_URL is set ?

I have tried to read the source code in elastic_transport\_node\_http_urllib3.py to understand why it fails, but i am not currently smart enough to find out reason by error report.

The value of the ELASTICSEARCH_URL variable is used to determine if the Elasticsearch service should be used or not. It has nothing to do with Elasticsearch, this is done in this application. If the variable is set, the application assumes you want to use Elasticsearch. If the variable is not set, then it assumes you do not want to use Elasticsearch.

So it means that if I do not want such above error to happen, namely the post can be submitted or deleted without delay, I have to close elasticsearch service by setting the ELASTICSEARCH_URL variable to be None. And if i want to use elasticsearch, such above error will happen?

I don't really understand what you are saying. If you want to use Elasticsearch you have to run the Elasticsearch service, and set ELASTICSEARCH_URL to point to the running service. Then everything will work. Setting the variable when you do not have a running Elasticsearch service makes no sense, the variable is there to tell the application where the service can be reached.