Backtick demonstrates a pattern that enables you to schedule asynchronous background tasks through HTTP calls. You can choose to execute the task immediately or schedule it for a future timestamp. Once scheduled, a worker process will pick up the task and execute it in the background. Additionally, you have the option to cancel a scheduled task by calling another endpoint.
While working on a Django project at my workplace, we needed a way to call asynchronous
tasks at future datetimes. We didn't need any periodic scheduling or cron support.
Naturally, we went for Celery's task.apply_async(at=<datetime>)
function but
that suffers from one major gotcha: it keeps the schedule logs in memory and loses the
scheduled tasks whenever the workers are restarted. This also causes a situation where
future task cancellation doesn't work if the associated workers lose their working
memory.
To avoid this, Celery doc recommends creating a persistent worker that'll save the worker state in a file on the disk. This whole setup feels janky and goes against the philosophy of keeping the workers stateless and being able to redeploy them without losing any task.
So this prototype demonstrates a service that allows you to register any background
task, schedule and cancel it with HTTP calls, and it'll work reliably even if you have to
restart the workers for deployment. For simplicity's sake, backtick
keeps the
scheduling logs in the Redis broker.
To take the app for a spin:
-
Make sure you have Docker and Docker Compose V2 installed on your system.
-
Clone the repo and head over to the root directory.
-
Run
make up
to start the containers. -
Go to
localhost:5000/docs
to view the API docs. This will take you to an interactive OpenAPI compliant doc that looks like this:
You can use the POST /schedule
endpoint to schedule a pre-registered task. Click on
the schedule bar and it'll allow you to send a POST request. The app comes with a
few registered tasks that you can execute. Paste the payload below to the request
section and click on the Execute button:
{
"task_name": "do_something",
"datetimes": [],
"kwargs": {"how_long": 5}
}
Here, do_something
is a registered task that takes the {"how_long": 5}
keyworded
argument. The task just waits for how_long
seconds and returns a message. Once you've
made the POST request, you'll get a response that returns the scheduled task id. In this
case, since the datetimes
field is empty, the task will be scheduled for immediate
execution:
Now you can check the container logs to see that the scheduled tasks have been executed
by a worker. The task id returned by the response of the /schedule
endpoint should
match that of the worker logs. Run:
docker compose logs -f worker
backtick-worker-1 | INFO:rq.worker:default: backtick.tasks.do_something(how_long=5) (9777414c-f42a-42f6-a013-2d0eeb3ab1e9)
backtick-worker-1 | INFO:root:Starting to do something
backtick-worker-1 | INFO:root:Finished doing something
backtick-worker-1 | INFO:rq.worker:default: Job OK (9777414c-f42a-42f6-a013-2d0eeb3ab1e9)
backtick-worker-1 | INFO:rq.worker:Result is kept for 60 seconds
backtick-worker-1 | INFO:rq.worker:default: backtick.tasks.do_something(how_long=5) (422f32a4-bdf4-4499-99d7-5d9991b54a96)
Future tasks can be scheduled by sending valid datetime strings to the datetimes
field. The datetimes must be valid UTC strings in YYYY-MM-DDTHH:MM:SS+00:00
or
YYYY-MM-DDTHH:MM:SSZ
format. Multiple datetime strings can be sent to schedule
mutliple executions. The following request will launch to tasks at future datetimes
with 1 minute intervals between them.
Before making the request make sure to change the datetime strings so that they're in the future relative to the time when you're running the command.
curl -X 'POST' \
'http://localhost:5000/schedule' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"task_name": "do_something",
"datetimes": ["2023-04-20T21:04:03.843Z", "2023-04-20T21:05:04.843Z"],
"kwargs": {"how_long": 5}
}'
This will return:
{
"task_ids": [
"8c0ba34a-3e35-4826-ad94-0dec86d392d7",
"9a44357f-da5a-454e-b028-e671a68c2b77"
],
"message": "Tasks scheduled successfully"
}
Check the worker logs to ensure that the tasks get run successfully.
Use the POST /unschedule
endpoint to cancel scheduled tasks.
curl -X 'POST' \
'http://localhost:5000/unschedule' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"task_ids": [
"9fb6ff54-d758-4cd1-9adb-d074604b788c",
"3691e144-fd9c-4893-809b-55199fb804ff"
],
"enqueue_dependents": true
}'
The enqueue_dependents
flag instructs the system whether to cancel the dependent tasks
while canceling the primary one or not. You'll see shortly how to write and schedule
dependent tasks. Calling the endpoint will give you the following response:
{
"task_ids": [
"9fb6ff54-d758-4cd1-9adb-d074604b788c",
"3691e144-fd9c-4893-809b-55199fb804ff"
],
"message": "Tasks unscheduled successfully"
}
So far, we've only seen how to invoke and cancel pre-registered tasks but this section
will talk about how you can register and run your own tasks. Tasks are any Python
callable that's decorated with the @utils.tasks
decorator. In the backtick.tasks
module, you'll be able to see the do_something
task that we've seen so far:
# backtick/tasks.py
import logging
import time
from backtick import utils
@utils.task(
queue=settings.BACKTICK_QUEUES["default"],
connection=utils.get_redis(),
timeout=60,
result_ttl=60,
)
def do_something(*, how_long: int) -> None:
"""Do something for a while.
Args:
how_long (int): How long to do something for.
Returns:
None
"""
logging.info("Starting to do something")
time.sleep(how_long)
logging.info("Finished doing something")
The utils.task
decorator accepts all the arguments accepted by the
rq.decorators.job decorator. Once the task has been defined, it
needs to be included to the BACKTICK_TASKS
dict on the backtick.settings
module.
# backtick/settings.py
BACKTICK_TASKS = {
"do_something": "backtick.tasks.do_something" # fully qualified task name
}
On the POST /schedule
endpoint, the task_name
field will refer to a key in this
task mapping.
You can retry tasks upon failure by taking advantage of rq's Retry
option. To do so,
a task has to be defined like this:
# backtick/tasks.py
from rq import Retry
from backtick import settings, utils
@utils.task(
queue=settings.BACKTICK_QUEUES["default"],
connection=utils.get_redis(),
retry=Retry(max=3, interval=2),
timeout=60,
result_ttl=60,
)
def raise_exception() -> None:
"""Raise an exception.
Args:
None
Returns:
None
"""
# This just raises an exception to trigger retry logic.
raise ValueError("This is an exception")
You'll have to register the task before you can call the /schedule
endpoint:
# backtick/settings.py
BACKTICK_TASKS = {"raise_exception": "backtick.tasks.raise_exception"}
Now, you can make a request to the endpoint to schedule an immediate or a future task; in either case, the underlying task will raise a value error and rq will retry it 3 times with 2 seconds of interval in between each call.
curl -X 'POST' \
'http://localhost:5000/schedule' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"task_name": "raise_exception",
"datetimes": [],
"kwargs": {}
}'
If you check the worker logs, you'll see that the task has been retried 3 times after the first failed call with 2 seconds of interval between them.
Worker log
backtick-web-1 | INFO:root:Task 35bfdfb4-a6ff-41db-8420-3e672b81c046 scheduled
backtick-worker-1 | INFO:rq.worker:default: backtick.tasks.raise_exception() (35bfdfb4-a6ff-41db-8420-3e672b81c046)
backtick-worker-1 | ERROR:rq.worker:[Job 35bfdfb4-a6ff-41db-8420-3e672b81c046]:
exception raised while executing (backtick.tasks.raise_exception)
backtick-worker-1 | Traceback (most recent call last):
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/worker.py",
line 1359, in perform_job
backtick-worker-1 | rv = job.perform()
backtick-worker-1 | ^^^^^^^^^^^^^
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/job.py",
line 1178, in perform
backtick-worker-1 | self._result = self._execute()
backtick-worker-1 | ^^^^^^^^^^^^^^^
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/job.py",
line 1215, in _execute
backtick-worker-1 | result = self.func(*self.args, **self.kwargs)
backtick-worker-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
backtick-worker-1 | File "/code/backtick/tasks.py", line 48, in raise_exception
backtick-worker-1 | raise ValueError("This is an exception")
backtick-worker-1 | ValueError: This is an exception
backtick-worker-1 |
backtick-worker-1 | INFO:rq.worker:default: backtick.tasks.raise_exception() (35bfdfb4-a6ff-41db-8420-3e672b81c046)
backtick-worker-1 | ERROR:rq.worker:[Job 35bfdfb4-a6ff-41db-8420-3e672b81c046]:
exception raised while executing (backtick.tasks.raise_exception)
backtick-worker-1 | Traceback (most recent call last):
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/worker.py",
line 1359, in perform_job
backtick-worker-1 | rv = job.perform()
backtick-worker-1 | ^^^^^^^^^^^^^
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/job.py",
line 1178, in perform
backtick-worker-1 | self._result = self._execute()
backtick-worker-1 | ^^^^^^^^^^^^^^^
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/job.py",
line 1215, in _execute
backtick-worker-1 | result = self.func(*self.args, **self.kwargs)
backtick-worker-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
backtick-worker-1 | File "/code/backtick/tasks.py", line 48, in raise_exception
backtick-worker-1 | raise ValueError("This is an exception")
backtick-worker-1 | ValueError: This is an exception
backtick-worker-1 |
backtick-worker-1 | INFO:rq.worker:default: backtick.tasks.raise_exception() (35bfdfb4-a6ff-41db-8420-3e672b81c046)
backtick-worker-1 | ERROR:rq.worker:[Job 35bfdfb4-a6ff-41db-8420-3e672b81c046]:
exception raised while executing (backtick.tasks.raise_exception)
backtick-worker-1 | Traceback (most recent call last):
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/worker.py",
line 1359, in perform_job
backtick-worker-1 | rv = job.perform()
backtick-worker-1 | ^^^^^^^^^^^^^
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/job.py",
line 1178, in perform
backtick-worker-1 | self._result = self._execute()
backtick-worker-1 | ^^^^^^^^^^^^^^^
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/job.py",
line 1215, in _execute
backtick-worker-1 | result = self.func(*self.args, **self.kwargs)
backtick-worker-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
backtick-worker-1 | File "/code/backtick/tasks.py", line 48, in raise_exception
backtick-worker-1 | raise ValueError("This is an exception")
backtick-worker-1 | ValueError: This is an exception
backtick-worker-1 |
backtick-worker-1 | INFO:rq.worker:default: backtick.tasks.raise_exception() (35bfdfb4-a6ff-41db-8420-3e672b81c046)
backtick-worker-1 | ERROR:rq.worker:[Job 35bfdfb4-a6ff-41db-8420-3e672b81c046]:
exception raised while executing (backtick.tasks.raise_exception)
backtick-worker-1 | Traceback (most recent call last):
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/worker.py",
line 1359, in perform_job
backtick-worker-1 | rv = job.perform()
backtick-worker-1 | ^^^^^^^^^^^^^
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/job.py",
line 1178, in perform
backtick-worker-1 | self._result = self._execute()
backtick-worker-1 | ^^^^^^^^^^^^^^^
backtick-worker-1 | File "/usr/local/lib/python3.11/site-packages/rq/job.py",
line 1215, in _execute
backtick-worker-1 | result = self.func(*self.args, **self.kwargs)
backtick-worker-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
backtick-worker-1 | File "/code/backtick/tasks.py", line 48, in raise_exception
backtick-worker-1 | raise ValueError("This is an exception")
backtick-worker-1 | ValueError: This is an exception
backtick-worker-1 |
You can define a task as follows to employ exponential backoff in your retry logic:
# backtick/tasks.py
from rq import Retry
from backtick import settings, utils
# This will retry the raise_exception function in 2^1, 2^2, 2^3 seconds.
interval_with_backoff = [2**i for i in range(1, 4)]
@utils.task(
queue=settings.BACKTICK_QUEUES["default"],
connection=utils.get_redis(),
retry=Retry(max=len(interval_with_backoff), interval=interval_with_backoff),
timeout=60,
result_ttl=60,
)
def raise_exception() -> None:
"""Raise an exception. The task will be retried with exponential backoff.
Args:
None
Returns:
None
"""
raise ValueError("This is an exception")
If you need to attach rq's on_success
or on_failure
callbacks, you can do that like
this:
# black/tasks.py
from rq import Retry
from backtick import settings, utils
interval_with_backoff = [2**i for i in range(3)]
def on_success_callback(*args: Any, **kwargs: Any) -> None:
logging.info("From on_success callback!")
def on_failure_callback(*args: Any, **kwargs: Any) -> None:
logging.error("From on_failure callback!")
@utils.task(
queue=settings.BACKTICK_QUEUES["default"],
connection=utils.get_redis(),
retry=Retry(max=len(interval_with_backoff), interval=interval_with_backoff),
timeout=60,
result_ttl=60,
on_success=on_success_callback,
on_failure=on_failure_callback,
)
def raise_exception() -> None:
"""Raise an exception. Here, on_failure will be called
Args:
None
Returns:
None
"""
raise ValueError("This is an exception")
Just make sure that the callbacks aren't lambda functions since rq
doesn't support
lambda callbacks.
Backtick provides a management script that allows you to gracefully shut down all the workers. Running the script will make the workers wait until the currently running task is finished, and then the associated worker processes will be cleaned up. Here's the command to stop the workers:
make stop-workers
To cancel the currently, running tasks, execute:
make cancel-running-tasks
Run make cancel-scheduled-tasks
to cancel all the future scheduled tasks.
The tests are run inside a separate docker container.
-
To execute the unit tests, run:
make test-up && make test-unit && make test-down
This will spin up the test container, run the tests, and shut it down.
-
Similarly, you can run the integration tests with the following command. The
sleep
is required to give the database enough time to be ready.make test-up && sleep 5 && make test-integration && make test-down
-
To run all the tests, use this command:
make test-up && sleep 5 && make test && make test-down
-
Backtick currently doesn't support cron based periodic task scheduling. I had a hard time building reliable cron scheduling with
rq-scheduler
. This is something I'm still exploring. -
It doesn't support dependent task scheduling. I'm still working on a suitable way to build that feature so that it works with the existing
/schedule
endpoint.