Since the question seems to be quite generic, I believe this is the right place for a quick cheat sheet about "how to use django-background-tasks" based on my personal experience. Hopefully I won't be the only one to use it :)
Environment
Installation
I like pipenv so:
> cd [my-django-project root directory]
> pipenv install django-background-tasks
Now add 'background_task' to INSTALLED_APPS in settings.py:
INSTALLED_APPS = (
# ...
'background_task',
# ...
)
and perform database migrations to ensure the django-background-tasks schema is in place:
> pipenv shell
(my-django-project) bash-3.2$ python manage.py migrate
Creating and registering a Task
Any Python function can be a task, we simply need to apply the @background annotation to register it as such:
from background_task import background
@background(schedule=10)
def do_something(s1: str, s1: str) -> None:
"""
Does something that takes a long time
:param p1: first parameter
:param p2: second parameter
:return: None
"""
pass
Now we can call the function as usual in our project:
do_something("first parameter", "second parameter")
It is important to note that calling the function does not actually execute its code; rather a Task record is stored into the database by the "django-background-tasks" module, more precisely into the "background_task" table. For this reason, writing a task function that returns something is of little use, because the task is going to be executed in background at a later moment anyway, so the "value" returned by the function at the time it is invoked is almost meaningless. The only use case I see for a return value is for testing purposes, see the Testing a Task section below.
Processing Tasks
In order to actually run a registered task we have to employ the following management command:
> python manage.py process_tasks
Please refer to the module's documentation for a description of the command options.
As other users have already pointed out, it is usual to wrap this command in a cron job to make sure tasks are periodically processed. In this case, the duration option might turn out to be useful: it represents the number of seconds the process_task command is kept running. By default the duration is 0, which means "run it forever" but this is quite risky in my view, because if for some reason the command crashes or is interrupted, your tasks won't be processed anymore and a long time might pass before you realize it.
A better way is to set the duration to a well defined time, for example 15 minutes, and then configure a cron job to run every 15 minutes to restart the processing command. This way if the command crashes it will get restarted by the cron job later anyway.
Testing a Task
Testing a task via the "process_tasks" administrative command is awful, we should stick to Python unittest module for that, which is also the "Django way".
I am not going to discuss about unittest in this post of course, I only want to point out that during a unit test you want to execute the function in a synchronous way, just like a normal Python function. The syntax for that is as follow:
do_something.now("first parameter", "second parameter")
The modifier "now" runs the function and wait for it to terminate. This is the only use case when a return value is useful in my view. With a return value at hand you can use the full power of the "assert*" functions provided by unittest.
Checking if a Task is already running
Sometimes it may happen that you don't want the same task to be run multiple times. For example I frequently use background tasks for training Machine Learning models, which takes a lot of time. To prevent my data to be messed up, I prefer to make sure that another training task on the same model cannot be started before the previous one is complete.
For this to work, I have to check if the task is already running before starting a new one; but how to uniquely identify a task? For me the simple way is to assign a "verbose_name" to the task, which can be done at the time the task is scheduled:
do_something("first parameter", "second parameter", verbose_name="my_task_verbose_name")
Now, if I want to check whether this task is already running or not, I can simply read the background_task table and verify there is no task with the same "verbose name" therein. This can very easily be done by leveraging the Task model provided by "django-background-tasks" itself:
from background_task.models import Task
tasks = Task.objects.filter(verbose_name="my_task_verbose_name")
if len(tasks) == 0:
# no task running with this name, go ahead!
pass
else:
# task already running
pass
Needless to say, we have to make sure the verbose names assigned to our tasks are unique.
Further Readings
Django Background Tasks documentation