Unable to load celery application. 'str' object has no attribute 'level' when Integrating Celery with Flask?
JTP-123 opened this issue · 17 comments
In order to issue long running task like deleting posts in background job, Because I used windows platform, I decided to integrate Celery with Flask as a new beginner rather than rq, But an error report occurs:
[2023-10-22 18:15:47,668\] INFO in __init__: Microblog startup
[2023-10-22 18:15:47,684\] INFO in __init__: Microblog startup
Usage: celery \[OPTIONS\] COMMAND \[ARGS\]...
Try 'celery --help' for help.
Error: Invalid value for '-A' / '--app':
Unable to load celery application.
'str' object has no attribute 'level'
The file structure is as follows:
--app
--__init__.py
--make_celery.py
The following code is :
// The __init__.py
def celery_init_app(app: Flask):
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object):
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config['CELERY'])
celery_app.set_default()
app.extensions['celery'] = celery_app
return celery_app
// call the celery_init_app in factory function
def create_app(config_class=Config):
app.config.from_object(config_class)
app.config.from_mapping(
CELERY=dict(
broker_url = "pyamqp://guest@localhost//",
result_backend = "redis://localhost",
task_serializer = 'json',
result_serializer = 'json',
accept_content = ['json'],
task_ignore_result = True,
),
)
app.config.from_prefixed_env()
// ... other configuration code
celery_init_app(app)
return app
The make_celery code is:
from app import create_app
flask_app = create_app()
celery_app = flask_app.extensions["celery"]
I do not know why it just shows a single mesage, not a full traceback report. I depolyed it on windows platform and I don't know whether it is related or not, I would really appreciate it if anyone can help.
This app is a few years old, but in it I show how to integrate Flask with Celery: https://github.com/miguelgrinberg/flasky-with-celery
I have read your article, but I still don't know how to use it with celery in micrblog to issue time-consuming task like post deleting. Precisely speaking, The effort I made was to make that configuration code like the flask with celery and apply @shared_task to decortation function delete post, but it still seems not work
Unfortunately the example I have shared above is the only example I have. If that is not what you need, then you will need to find a more recent example from someone else.
The sole reason that i want to use celery just for solving hiccup caused by the post submission or deletion without opeing eslaticseach, is there another solution? Anyway, thank you for your help.
I do not know what hiccup you are experiencing.
That is when I decide to submit a post without running elasticseach, it will report elastic_transport.ConnectionError, but If I reopen the index page, the post is indeed submitted successfully. But when post submission is made with a running elasticsearch, such connection error will not happen.
So I want to hand over this time-consuming background job to celery to avoid such thing.
That is when I decide to submit a post without running elasticseach, it will report elastic_transport.ConnectionError, but If I reopen the index page, the post is indeed submitted successfully. But when post submission is made with a running elasticsearch, such connection error will not happen. So I want to hand over this time-consuming background job to celery to avoid such thing.
Not only post submisson invloves but also post deletion
The indexing functions in search.py silently return when elasticsearch isn't running, so you would not see any errors. See https://github.com/miguelgrinberg/microblog/blob/main/app/search.py#L5
Sorry, I do not understand. Actually I already have such code in search.py, but error still happens.
Show me the complete error please
Here is the full error:
Traceback (most recent call last):
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 2548, in __call__
return self.wsgi_app(environ, start_response)
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 2528, in wsgi_app
response = self.handle_exception(e)
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 2525, in wsgi_app
response = self.full_dispatch_request()
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 1822, in full_dispatch_request
rv = self.handle_user_exception(e)
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 1820, in full_dispatch_request
rv = self.dispatch_request()
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask\app.py", line 1796, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\flask_login\utils.py", line 290, in decorated_view
return current_app.ensure_sync(func)(*args, **kwargs)
File "C:\Users\sky\python_work\microblog\app\main\routes.py", line 37, in index
db.session.commit()
File "<string>", line 2, in commit
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1451, in commit
self._transaction.commit(_to_root=self.future)
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\sqlalchemy\orm\session.py", line 839, in commit
self.session.dispatch.after_commit(self.session)
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\sqlalchemy\event\attr.py", line 247, in __call__
fn(*args, **kw)
File "C:\Users\sky\python_work\microblog\app\models.py", line 48, in after_commit
add_to_index(obj.__tablename__, obj)
File "C:\Users\sky\python_work\microblog\app\search.py", line 9, in add_to_index
current_app.elasticsearch.index(index=index, id=model.id, body=payload) # store index
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\elasticsearch\_sync\client\utils.py", line 414, in wrapped
return api(*args, **kwargs)
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\elasticsearch\_sync\client\__init__.py", line 2265, in index
return self.perform_request( # type: ignore[return-value]
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\elasticsearch\_sync\client\_base.py", line 286, in perform_request
meta, resp_body = self.transport.perform_request(
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\elastic_transport\_transport.py", line 329, in perform_request
meta, raw_data = node.perform_request(
File "c:\users\sky\python_work\microblog\venv\lib\site-packages\elastic_transport\_node\_http_urllib3.py", line 199, in perform_request
raise err from None
elastic_transport.ConnectionError: Connection error caused by: ConnectionError(Connection error caused by: NewConnectionError(<urllib3.connection.HTTPConnection object at 0x00000209B71DAC70>: Failed to establish a new connection: [WinError 10061] 由于目标计算机积极拒绝,无法连接。))
The problem is that app.elasticsearch
is not None in your situation. Make sure you don't have the ELASTICSEARCH_URL
environment variable set. The application only attempts to talk to Elasticsearch when this environment variable is set.
Thank you so much. I indeed set ELASTICSEARCH_URL
environment variable and it works when this environment variable was deleted. Can I ask why the application try to talk to elascticsearrch when the environment variable ELASTICSEARCH_URL
is set ?
I have tried to read the source code in elastic_transport\_node\_http_urllib3.py
to understand why it fails, but i am not currently smart enough to find out reason by error report.
The value of the ELASTICSEARCH_URL
variable is used to determine if the Elasticsearch service should be used or not. It has nothing to do with Elasticsearch, this is done in this application. If the variable is set, the application assumes you want to use Elasticsearch. If the variable is not set, then it assumes you do not want to use Elasticsearch.
So it means that if I do not want such above error to happen, namely the post can be submitted or deleted without delay, I have to close elasticsearch service by setting the ELASTICSEARCH_URL
variable to be None
. And if i want to use elasticsearch, such above error will happen?
I don't really understand what you are saying. If you want to use Elasticsearch you have to run the Elasticsearch service, and set ELASTICSEARCH_URL
to point to the running service. Then everything will work. Setting the variable when you do not have a running Elasticsearch service makes no sense, the variable is there to tell the application where the service can be reached.