
Scheduling recurring tasks in Django
7 Nov 2020 #python #django #django-admin
Some of the tasks for my website are recurring and need to be executed on a daily or hourly basis. This can be done by using a crontab, but each of them requires a different configuration and I prefer to keep the cron definitions part of the code.
Therefor, I settled on a different approach using apscheduler
.
The way I tend to start it is by starting to define the jobs in a jobs
module under each application package:
blog/jobs.py
from django.core import management def daily_db_backup(): print('---> Making datababase backup') management.call_command('dbbackup') print('---> Finished datababase backup') def hourly_project_refresh(): # refresh the project list
Each application can easily have it's own jobs
module.
Then, under the main application, I create a command which can be used to run them:
core/management/commands/run_cronjobs
from django.core.management.base import BaseCommand from apscheduler.schedulers.blocking import BlockingScheduler import blog.jobs as blog_jobs class Command(BaseCommand): help = 'Runs the cronjobs' def handle(self, *args, **options): self._log_info("Configuring jobs") scheduler = BlockingScheduler() scheduler.add_job(blog_jobs.hourly_project_refresh, 'cron', minute='0', hour='*', day='*', week='*', month='*') scheduler.add_job(blog_jobs.daily_db_backup, 'cron', minute='0', hour='1', day='*', week='*', month='*') self._log_info("Running scheduler") try: scheduler.start() except KeyboardInterrupt: return def _log_info(self, *msg): full_msg = ' '.join(msg) self.stdout.write(self.style.SUCCESS(full_msg))
You can then run the following command to have the cronjobs processed (in my case, I'm running them with supervisord):
$ ./manage.py run_cronjobs