/django-eb-sqs

SQS background worker for Django

Primary LanguagePythonMIT LicenseMIT

Django EB SQS - Background Tasks for Elastic Beanstalk and Amazon SQS

django-eb-sqs is a simple task manager for the Elastic Beanstalk Worker Tier. It uses SQS and the boto3 library.

Installation

Install the module with pip install git+git://github.com/cuda-networks/django-eb-sqs.git or add it to your requirements.txt.

Don't forget to add django-eb-sqs app to your Django INSTALLED_APPS settings:

INSTALLED_APPS = (
    ...,
    'eb_sqs',
)

Usage

Creating Tasks

Adding a task to a queue is simple.

from eb_sqs.decorators import task

@task(queue_name='test')
def echo(message):
    print(message)

echo.delay(message='Hello World!')

NOTE: This assumes that you have your AWS keys in the appropriate environment variables, or are using IAM roles. Consult the boto3 documentation for further info.

If you don't pass a queue name, the EB_SQS_DEFAULT_QUEUE setting is used. If not set, the queue name is default.

Additionally the task decorator supports max_retries (default 0) and use_pickle (default False) attributes for advanced control task execution.

You can also delay the execution of a task by specifying the delay time in seconds.

echo.delay(message='Hello World!', delay=60)

During development it is sometimes useful to execute a task immediately without using SQS. This is possible with the execute_inline argument.

echo.delay(message='Hello World!', execute_inline=True)

NOTE: delay is not applied when execute_inline is set to True.

Failed tasks can be retried by using the retry method. See the following example:

from eb_sqs.decorators import task

@task(queue_name='test', max_retries=5)
def upload_file(message):
    print('# of retries: {}'.format(upload_file.retry_num))
    try:
        # upload ...
    except ConnectionException:
        upload_file.retry()

The retry call supports the delay and execute_inline arguments in order to delay the retry or execute it inline. If the retry shall not be counted for the max retry limit set count_retries to false. Use 'retry_num' to get the number of retries for the current task.

NOTE: retry() throws a MaxRetriesReachedException exception if the maximum number of retries is reached.

Executing Tasks

The Elastic Beanstalk Worker Tier sends all tasks to a API endpoint. django-eb-sqs has already such an endpoint which can be used by specifying the url mapping in your urls.py file.

urlpatterns = [
    ...
    url(r'^worker/', include('eb_sqs.urls', namespace='eb_sqs'))
]

In that case the relative endpoint url would be: worker/process

Set this url in the Elastic Beanstalk Worker settings prior to deployment.

During development you can use the included Django command to execute a small script which retrieves messages from SQS and posts them to this endpoint.

python manage.py run_eb_sqs_worker --url <absoulte endpoint url> --queue <queue-name>

For example:

python manage.py run_eb_sqs_worker --url http://127.0.0.1:80/worker/process --queue default

Executing Tasks without Elastic Beanstalk

Another way of executing tasks is to use the Django command process_queue. This command can work with one or more queues, reading from the queues infinitely and executing tasks as they come-in.

python manage.py process_queue --queues <comma-delimited list of queue names>

This is a good idea for someone who wants to execute tasks without an Elastic Beanstalk worker.

You can either use full queue names, or queue prefix using prefix:*my_example_prefix* notation.

Examples:

python manage.py process_queue --queues queue1,queue2 # process queue1 and queue2
python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-'

Use the signals MESSAGES_RECEIVED, MESSAGES_PROCESSED, MESSAGES_DELETED of the WorkerService to get informed about the current SQS batch being processed by the management command.

Group Tasks

Multiple tasks can be grouped by specifying the group_id argument when calling delay on a task. If all tasks of a specific group are executed then the group callback task specified by EB_SQS_GROUP_CALLBACK_TASK is executed.

Example calls:

echo.delay(message='Hello World!', group_id='1')
echo.delay(message='Hallo Welt!', group_id='1')
echo.delay(message='Hola mundo!', group_id='1')

Example callback which is executed when all three tasks are finished:

from eb_sqs.decorators import task

@task(queue_name='test', max_retries=5)
def group_finished(group_id):
    pass

Auto Tasks

This is a helper tool for the case you wish to define one of your class method as a task, and make it seamless to all callers. This makes the code much simpler, and allows using classes to invoke your method directly without considering whether it's invoked async or not.

This is how you would define your class:

class MyService:
    def __init__(self, p1=default1, ..., pN=defaultN, auto_task_service=None):
        self._auto_task_service = auto_task_service or AutoTaskService()

        self._auto_task_service.register_task(self.my_task_method)
    
    def my_task_method(self, *args, **kwargs):
        ...

Notice the following:

  1. Your class needs to have defaults for all parameters in the c'tor
  2. The c'tor must have a parameter named auto_task_service
  3. The method shouldn't have any return value (as it's invoked async)

In case you want your method to retry certain cases, you need to raise RetryableTaskException. You can provide on optional delay time for the retry, set count_retries=False in case you don't want to limit retries, or use max_retries_func to specify a function which will be invoked when the defined maximum number of retries is exhausted.

Settings

The following settings can be used to fine tune django-eb-sqs. Copy them into your Django settings.py file.

  • EB_AWS_REGION (us-east-1): The AWS region to use when working with SQS.
  • EB_SQS_MAX_NUMBER_OF_MESSAGES (10): The maximum number of messages to read in a single call from SQS (<= 10).
  • EB_SQS_WAIT_TIME_S (2): The time to wait (seconds) when receiving messages from SQS.
  • NO_QUEUES_WAIT_TIME_S (5): The time a workers waits if there are no SQS queues available to process.
  • EB_SQS_AUTO_ADD_QUEUE (False): If queues should be added automatically to AWS if they don't exist.
  • EB_SQS_QUEUE_MESSAGE_RETENTION (1209600): The value (in seconds) to be passed to MessageRetentionPeriod parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True).
  • EB_SQS_QUEUE_VISIBILITY_TIMEOUT (300): The value (in seconds) to be passed to VisibilityTimeout parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True).
  • EB_SQS_DEAD_LETTER_MODE (False): Enable if this worker is handling the SQS dead letter queue. Tasks won't be executed but group callback is.
  • EB_SQS_DEFAULT_DELAY (0): Default task delay time in seconds.
  • EB_SQS_DEFAULT_MAX_RETRIES (0): Default retry limit for all tasks.
  • EB_SQS_DEFAULT_COUNT_RETRIES (True): Count retry calls. Needed if max retries check shall be executed.
  • EB_SQS_DEFAULT_QUEUE (default): Default queue name if none is specified when creating a task.
  • EB_SQS_EXECUTE_INLINE (False): Execute tasks immediately without using SQS. Useful during development. Global setting True will override setting it on a task level.
  • EB_SQS_FORCE_SERIALIZATION (False): Forces serialization of tasks when executed inline. This setting is helpful during development to see if all arguments are serialized and deserialized properly.
  • EB_SQS_GROUP_CALLBACK_TASK (None): Group callback (String or Function). Must be a valid task.
  • EB_SQS_QUEUE_PREFIX (eb-sqs-): Prefix to use for the queues. The prefix is added to the queue name.
  • EB_SQS_REDIS_CLIENT (None): Set the Redis connection client (StrictRedis)
  • EB_SQS_REDIS_EXPIRY (604800): Default expiry time in seconds until a group is removed
  • EB_SQS_REDIS_KEY_PREFIX (eb-sqs-): Prefix used for all Redis keys
  • EB_SQS_USE_PICKLE (False): Enable to use pickle to serialize task parameters. Uses json as default.
  • EB_SQS_AWS_MAX_RETRIES (30): Default retry limit on a boto3 call to AWS SQS.
  • EB_SQS_REFRESH_PREFIX_QUEUES_S (10): Minimal number of seconds to wait between refreshing queue list, in case prefix is used
  • EB_SQS_ENDPOINT_URL (None): Endpoint url use to contact AWS SQS service. Usefull during develoment in order to use a mock queue

Development

Make sure to install the development dependencies from development.txt.

Tests

The build in tests can be executed with the Django test runner.

python -m django test --settings=eb_sqs.test_settings