Django + Celery

Celery позволяет выполнять асинхронные и периодические задачи в Django. Это мощный инструмент для обработки фоновых задач, отправки email, обработки файлов и многого другого.

Установка и настройка

Сначала установи необходимые пакеты:

1poetry add celery redis django-celery-results
2poetry add flower  # для мониторинга (опционально)

Настройка Celery в Django проекте

1. Создание файла celery.py

 1# project/celery.py
 2  from celery import Celery
 3  import os
 4
 5  # Устанавливаем переменную окружения для настроек Django
 6  os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
 7
 8  # Создаем экземпляр приложения Celery
 9  app = Celery('project')
10
11  # Загружаем настройки из Django settings
12  app.config_from_object('django.conf:settings', namespace='CELERY')
13
14  # Автоматически обнаруживаем задачи в приложениях Django
15  app.autodiscover_tasks()
16
17  @app.task(bind=True)
18  def debug_task(self):
19      print(f'Request: {self.request!r}')

2. Настройка в settings.py

 1# settings.py
 2
 3  # Celery Configuration
 4  CELERY_BROKER_URL = 'redis://localhost:6379/0'
 5  CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
 6
 7  # Настройки Redis
 8  CELERY_REDIS_HOST = 'localhost'
 9  CELERY_REDIS_PORT = 6379
10  CELERY_REDIS_DB = 0
11
12  # Настройки задач
13  CELERY_TASK_SERIALIZER = 'json'
14  CELERY_RESULT_SERIALIZER = 'json'
15  CELERY_ACCEPT_CONTENT = ['json']
16  CELERY_TIMEZONE = 'Europe/Moscow'
17  CELERY_ENABLE_UTC = True
18
19  # Настройки worker'ов
20  CELERY_WORKER_CONCURRENCY = 4
21  CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
22  CELERY_WORKER_DISABLE_RATE_LIMITS = False
23
24  # Настройки результатов
25  CELERY_RESULT_EXPIRES = 3600  # 1 час
26  CELERY_TASK_ALWAYS_EAGER = False  # False для продакшена
27
28  # Настройки периодических задач
29  CELERY_BEAT_SCHEDULE = {
30      'send-daily-report': {
31          'task': 'apps.users.tasks.send_daily_report',
32          'schedule': crontab(hour=9, minute=0),  # Каждый день в 9:00
33      },
34      'cleanup-old-files': {
35          'task': 'apps.common.tasks.cleanup_old_files',
36          'schedule': crontab(hour=2, minute=0),  # Каждый день в 2:00
37      },
38  }

3. Импорт в __init__.py

1# project/__init__.py
2  from .celery import app as celery_app
3
4  __all__ = ('celery_app',)

Создание и использование задач

1. Простые задачи

 1# apps/users/tasks.py
 2  from celery import shared_task
 3  from django.core.mail import send_mail
 4  from django.conf import settings
 5  import time
 6
 7  @shared_task
 8  def send_welcome_email(user_email, username):
 9      """Отправка приветственного email"""
10      subject = f'Добро пожаловать, {username}!'
11      message = f'''
12      Привет, {username}!
13
14      Спасибо за регистрацию на нашем сайте.
15      Мы рады видеть тебя в нашей команде!
16
17      С уважением,
18      Команда поддержки
19      '''
20
21      send_mail(
22          subject=subject,
23          message=message,
24          from_email=settings.DEFAULT_FROM_EMAIL,
25          recipient_list=[user_email],
26          fail_silently=False,
27      )
28
29      return f'Email отправлен на {user_email}'
30
31  @shared_task
32  def long_running_task():
33      """Долго выполняющаяся задача"""
34      time.sleep(10)  # Имитируем долгую работу
35      return 'Задача завершена успешно'
36
37  @shared_task(bind=True)
38  def task_with_progress(self):
39      """Задача с отслеживанием прогресса"""
40      total_steps = 100
41
42      for i in range(total_steps):
43          # Обновляем прогресс
44          self.update_state(
45              state='PROGRESS',
46              meta={'current': i, 'total': total_steps}
47          )
48          time.sleep(0.1)  # Имитируем работу
49
50      return {'current': total_steps, 'total': total_steps, 'status': 'Завершено'}

2. Задачи с параметрами и retry

 1@shared_task(bind=True, max_retries=3, default_retry_delay=60)
 2  def process_file(self, file_path, user_id):
 3      """Обработка файла с возможностью повторных попыток"""
 4      try:
 5          # Имитируем обработку файла
 6          if not os.path.exists(file_path):
 7              raise FileNotFoundError(f"Файл {file_path} не найден")
 8
 9          # Логика обработки файла
10          result = process_file_content(file_path)
11
12          # Обновляем статус в базе данных
13          update_processing_status(user_id, 'completed', result)
14
15          return result
16
17      except FileNotFoundError as exc:
18          # Повторяем попытку через 60 секунд
19          raise self.retry(exc=exc, countdown=60)
20      except Exception as exc:
21          # Логируем ошибку и повторяем попытку
22          logger.error(f"Ошибка обработки файла {file_path}: {exc}")
23          raise self.retry(exc=exc, countdown=120)
24
25  @shared_task(bind=True, rate_limit='10/m')  # Максимум 10 задач в минуту
26  def send_notification(self, user_id, message):
27      """Отправка уведомления с ограничением скорости"""
28      try:
29          user = User.objects.get(id=user_id)
30          send_push_notification(user, message)
31          return f'Уведомление отправлено пользователю {user.username}'
32      except User.DoesNotExist:
33          logger.error(f"Пользователь {user_id} не найден")
34          return None

3. Группы и цепочки задач

 1from celery import group, chain, chord
 2
 3  @shared_task
 4  def process_user_data(user_id):
 5      """Обработка данных пользователя"""
 6      user = User.objects.get(id=user_id)
 7      # Обработка данных
 8      return {'user_id': user_id, 'status': 'processed'}
 9
10  @shared_task
11  def send_user_report(user_data):
12      """Отправка отчета пользователю"""
13      user_id = user_data['user_id']
14      # Отправка отчета
15      return f'Отчет отправлен пользователю {user_id}'
16
17  @shared_task
18  def cleanup_temp_data(user_data):
19      """Очистка временных данных"""
20      user_id = user_data['user_id']
21      # Очистка
22      return f'Временные данные очищены для пользователя {user_id}'
23
24  def process_multiple_users(user_ids):
25      """Обработка нескольких пользователей параллельно"""
26      # Создаем группу задач для параллельного выполнения
27      job = group(process_user_data.s(user_id) for user_id in user_ids)
28      return job.apply_async()
29
30  def process_user_with_report(user_id):
31      """Обработка пользователя с отправкой отчета"""
32      # Создаем цепочку задач
33      job = chain(
34          process_user_data.s(user_id),
35          send_user_report.s(),
36          cleanup_temp_data.s()
37      )
38      return job.apply_async()
39
40  def process_users_with_callback(user_ids):
41      """Обработка пользователей с callback после завершения всех"""
42      # Создаем chord (группа + callback)
43      job = chord(
44          group(process_user_data.s(user_id) for user_id in user_ids),
45          send_summary_report.s()
46      )
47      return job.apply_async()

Периодические задачи (Celery Beat)

1. Настройка расписания

 1# settings.py
 2  from celery.schedules import crontab, timedelta
 3
 4  CELERY_BEAT_SCHEDULE = {
 5      # Каждый день в 9:00
 6      'daily-report': {
 7          'task': 'apps.users.tasks.send_daily_report',
 8          'schedule': crontab(hour=9, minute=0),
 9      },
10
11      # Каждый понедельник в 8:00
12      'weekly-summary': {
13          'task': 'apps.analytics.tasks.generate_weekly_summary',
14          'schedule': crontab(day_of_week=1, hour=8, minute=0),
15      },
16
17      # Каждые 30 минут
18      'check-system-health': {
19          'task': 'apps.monitoring.tasks.check_system_health',
20          'schedule': timedelta(minutes=30),
21      },
22
23      # Каждый час
24      'backup-database': {
25          'task': 'apps.maintenance.tasks.backup_database',
26          'schedule': crontab(minute=0),
27      },
28
29      # Каждые 15 минут в рабочее время
30      'process-queue': {
31          'task': 'apps.queue.tasks.process_pending_items',
32          'schedule': crontab(minute='*/15', hour='9-18'),
33      },
34  }

2. Периодические задачи с параметрами

 1@shared_task
 2def send_daily_report():
 3    """Отправка ежедневного отчета"""
 4    today = timezone.now().date()
 5    users = User.objects.filter(is_active=True)
 6
 7    for user in users:
 8        # Генерируем отчет для каждого пользователя
 9        report_data = generate_user_report(user, today)
10        send_email_report.delay(user.email, report_data)
11
12    return f'Отправлено {users.count()} ежедневных отчетов'
13
14@shared_task
15def cleanup_old_files():
16    """Очистка старых файлов"""
17    cutoff_date = timezone.now() - timedelta(days=30)
18    old_files = FileUpload.objects.filter(
19        created_at__lt=cutoff_date,
20        is_temporary=True
21    )
22
23    deleted_count = 0
24    for file_obj in old_files:
25        try:
26            # Удаляем файл с диска
27            if os.path.exists(file_obj.file.path):
28                os.remove(file_obj.file.path)
29
30            # Удаляем запись из базы
31            file_obj.delete()
32            deleted_count += 1
33
34        except Exception as e:
35            logger.error(f"Ошибка удаления файла {file_obj.id}: {e}")
36
37    return f'Удалено {deleted_count} старых файлов'

Мониторинг и управление задачами

1. Настройка Flower для мониторинга

1# settings.py
2CELERY_FLOWER_URL = 'http://localhost:5555'
3CELERY_FLOWER_BASIC_AUTH = ['admin:password']  # Опционально
4
5# Запуск Flower
6# celery -A project flower --port=5555

2. Отслеживание статуса задач

 1# views.py
 2from django.http import JsonResponse
 3from celery.result import AsyncResult
 4
 5def check_task_status(request, task_id):
 6    """Проверка статуса задачи"""
 7    task_result = AsyncResult(task_id)
 8
 9    if task_result.ready():
10        if task_result.successful():
11            return JsonResponse({
12                'status': 'completed',
13                'result': task_result.result
14            })
15        else:
16            return JsonResponse({
17                'status': 'failed',
18                'error': str(task_result.info)
19            })
20    else:
21        return JsonResponse({
22            'status': 'running',
23            'progress': task_result.info.get('current', 0) if task_result.info else 0
24        })
25
26def cancel_task(request, task_id):
27    """Отмена задачи"""
28    task_result = AsyncResult(task_id)
29    if not task_result.ready():
30        task_result.revoke(terminate=True)
31        return JsonResponse({'status': 'cancelled'})
32    else:
33        return JsonResponse({'status': 'already_completed'})

3. Логирование и мониторинг

 1# tasks.py
 2import logging
 3from celery.utils.log import get_task_logger
 4
 5logger = get_task_logger(__name__)
 6
 7@shared_task(bind=True)
 8def monitored_task(self):
 9    """Задача с подробным логированием"""
10    logger.info(f"Начало выполнения задачи {self.request.id}")
11
12    try:
13        # Выполнение задачи
14        result = perform_complex_operation()
15
16        logger.info(f"Задача {self.request.id} завершена успешно")
17        return result
18
19    except Exception as exc:
20        logger.error(f"Ошибка в задаче {self.request.id}: {exc}")
21        raise
22
23# settings.py для логирования Celery
24LOGGING = {
25    'version': 1,
26    'disable_existing_loggers': False,
27    'formatters': {
28        'verbose': {
29            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
30            'style': '{',
31        },
32    },
33    'handlers': {
34        'celery_file': {
35            'level': 'INFO',
36            'class': 'logging.FileHandler',
37            'filename': 'logs/celery.log',
38            'formatter': 'verbose',
39        },
40    },
41    'loggers': {
42        'celery': {
43            'handlers': ['celery_file'],
44            'level': 'INFO',
45            'propagate': False,
46        },
47    },
48}

Запуск и управление

1. Запуск worker'ов

 1# Запуск worker'а с логированием
 2celery -A project worker -l info
 3
 4# Запуск с определенным количеством процессов
 5celery -A project worker --concurrency=4 -l info
 6
 7# Запуск worker'а для определенной очереди
 8celery -A project worker -Q high,default -l info
 9
10# Запуск worker'а в фоновом режиме
11celery -A project worker -l info --detach
12
13# Запуск с пулом gevent для I/O задач
14celery -A project worker --pool=gevent --concurrency=100 -l info

2. Запуск Celery Beat

1# Запуск планировщика периодических задач
2celery -A project beat -l info
3
4# Запуск с сохранением расписания в файл
5celery -A project beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
6
7# Запуск в фоновом режиме
8celery -A project beat -l info --detach

3. Мониторинг через командную строку

 1# Проверка статуса worker'ов
 2celery -A project inspect active
 3
 4# Проверка статистики
 5celery -A project inspect stats
 6
 7# Проверка зарегистрированных задач
 8celery -A project inspect registered
 9
10# Очистка очередей
11celery -A project purge
12
13# Проверка результатов задач
14celery -A project result <task_id>

Тестирование задач

 1# tests.py
 2from django.test import TestCase
 3from django.contrib.auth.models import User
 4from unittest.mock import patch
 5from apps.users.tasks import send_welcome_email
 6
 7class CeleryTasksTestCase(TestCase):
 8    def setUp(self):
 9        self.user = User.objects.create_user(
10            username='testuser',
11            email='test@example.com',
12            password='testpass123'
13        )
14
15    @patch('apps.users.tasks.send_mail')
16    def test_send_welcome_email(self, mock_send_mail):
17        """Тестирование отправки приветственного email"""
18        # Выполняем задачу синхронно
19        result = send_welcome_email.delay(
20            self.user.email,
21            self.user.username
22        )
23
24        # Проверяем результат
25        self.assertEqual(result.get(), f'Email отправлен на {self.user.email}')
26
27        # Проверяем, что send_mail был вызван
28        mock_send_mail.assert_called_once()
29
30    def test_task_registration(self):
31        """Проверка регистрации задач"""
32        from project.celery import app
33
34        # Проверяем, что задачи зарегистрированы
35        registered_tasks = app.tasks.keys()
36        self.assertIn('apps.users.tasks.send_welcome_email', registered_tasks)

Производительность и оптимизация

1. Настройка пулов worker'ов

 1# settings.py
 2# Для CPU-интенсивных задач
 3CELERY_WORKER_POOL = 'prefork'
 4CELERY_WORKER_CONCURRENCY = 4
 5
 6# Для I/O-интенсивных задач (HTTP запросы, работа с файлами)
 7CELERY_WORKER_POOL = 'gevent'
 8CELERY_WORKER_CONCURRENCY = 100
 9
10# Для задач с долгими операциями
11CELERY_WORKER_POOL = 'solo'
12CELERY_WORKER_CONCURRENCY = 1

2. Очереди и приоритеты

 1# tasks.py
 2@shared_task(queue='high')
 3def urgent_task():
 4    """Срочная задача с высоким приоритетом"""
 5    return 'Срочная задача выполнена'
 6
 7@shared_task(queue='default')
 8def normal_task():
 9    """Обычная задача"""
10    return 'Обычная задача выполнена'
11
12@shared_task(queue='low')
13def background_task():
14    """Фоновая задача с низким приоритетом"""
15    return 'Фоновая задача выполнена'
16
17# settings.py
18CELERY_TASK_DEFAULT_QUEUE = 'default'
19CELERY_TASK_QUEUES = {
20    'high': {
21        'exchange': 'high',
22        'routing_key': 'high',
23    },
24    'default': {
25        'exchange': 'default',
26        'routing_key': 'default',
27    },
28    'low': {
29        'exchange': 'low',
30        'routing_key': 'low',
31    },
32}

FAQ

Q: Как запустить Celery worker?
A: Используй команду celery -A project worker -l info в отдельном терминале. Для продакшена используй supervisor или systemd.

Q: Как отладить задачи Celery?
A: Используй Flower для мониторинга, логирование с get_task_logger, и проверяй статус задач через AsyncResult.

Q: Можно ли использовать Celery без Redis?
A: Да, можно использовать другие брокеры: RabbitMQ, Amazon SQS, или встроенный SQLite для разработки.

Q: Как обработать ошибки в задачах?
A: Используй try-except блоки, декоратор max_retries, и логируй ошибки. Настройте обработчики ошибок через on_failure.

Q: Как масштабировать Celery?
A: Запускай несколько worker'ов, используй разные очереди, настройте балансировку нагрузки и мониторинг.

Q: Как тестировать задачи Celery?
A: Используй CELERY_TASK_ALWAYS_EAGER=True для синхронного выполнения в тестах, мокай внешние зависимости.

Q: Как настроить периодические задачи?
A: Используй CELERY_BEAT_SCHEDULE в settings.py, запускай celery beat, или используй django-celery-beat для динамического расписания.

Q: Как мониторить производительность Celery?
A: Используй Flower, Prometheus метрики, логирование, и отслеживай время выполнения задач и размер очередей.