Parsely
Parsely is a framework for enqueuing messages via HTTP request for celery.
Example
tasks.py:
from typing import Dict
import parsely
app = parsely.Parsely('example', 'redis://localhost:6379/0')
def two_times(text: str) -> Dict[str, str]:
return {
'text': text * 2
}
@app.task(two_times)
def echo(text: str) -> None:
print(text)
worker = p.celery
producer = p.make_producer()
two_times
works as pre processor. It works before enqueing. It means it can return BadRequest to your client. Parsely validate message with typehint. Also you can have extra validation and any other process here.
You can run tasks.py as Celery worker: celery -A tasks worker --loglevel=info
Also it runs as WSGI application. This is an example run it with uWSGI uwsgi --http :8080 --wsgi-file tasks.py --callable producer --enable-threads --thunder-lock --master
Send Test Message!
curl -X POST -d '{"message":{"text": "Hello"}}' http://localhost:8080/example/echo
- producer receives your request
- producer validates your message having text and the type is str or not. text`(str) is from typehint of
two_times
- producer validates two_times returned value having text and the type is str or not. text`(str) is from typehint of
echo
- producer put message
{"message":{"text":"HelloHello"}}
into queue. - curl receives response.
- Celery calls
echo
Extra
Preprocessor
preprocessor is optional. if you don't need it, you can:
@app.task()
def echo(text: str) -> None:
print(text)
Also you can give multiple preprocessor:
@app.task(two_times, two_times)
def echo(text: str) -> None:
print(text)
logging
Parsely uses logger witch named "parsely", so you can set log level like:
import logging
parsely_logger = logging.getLogger('parsely')
parsely_logger.setLevel(logging.DEBUG)