Django + Celery
Celery позволяет выполнять асинхронные и периодические задачи в Django. Это мощный инструмент для обработки фоновых задач, отправки email, обработки файлов и многого другого.
Установка и настройка
Сначала установи необходимые пакеты:
Настройка Celery в Django проекте
1. Создание файла celery.py
1# project/celery.py
2 from celery import Celery
3 import os
4
5 # Устанавливаем переменную окружения для настроек Django
6 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
7
8 # Создаем экземпляр приложения Celery
9 app = Celery('project')
10
11 # Загружаем настройки из Django settings
12 app.config_from_object('django.conf:settings', namespace='CELERY')
13
14 # Автоматически обнаруживаем задачи в приложениях Django
15 app.autodiscover_tasks()
16
17 @app.task(bind=True)
18 def debug_task(self):
19 print(f'Request: {self.request!r}')
2. Настройка в settings.py
1# settings.py
2
3 # Celery Configuration
4 CELERY_BROKER_URL = 'redis://localhost:6379/0'
5 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
6
7 # Настройки Redis
8 CELERY_REDIS_HOST = 'localhost'
9 CELERY_REDIS_PORT = 6379
10 CELERY_REDIS_DB = 0
11
12 # Настройки задач
13 CELERY_TASK_SERIALIZER = 'json'
14 CELERY_RESULT_SERIALIZER = 'json'
15 CELERY_ACCEPT_CONTENT = ['json']
16 CELERY_TIMEZONE = 'Europe/Moscow'
17 CELERY_ENABLE_UTC = True
18
19 # Настройки worker'ов
20 CELERY_WORKER_CONCURRENCY = 4
21 CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
22 CELERY_WORKER_DISABLE_RATE_LIMITS = False
23
24 # Настройки результатов
25 CELERY_RESULT_EXPIRES = 3600 # 1 час
26 CELERY_TASK_ALWAYS_EAGER = False # False для продакшена
27
28 # Настройки периодических задач
29 CELERY_BEAT_SCHEDULE = {
30 'send-daily-report': {
31 'task': 'apps.users.tasks.send_daily_report',
32 'schedule': crontab(hour=9, minute=0), # Каждый день в 9:00
33 },
34 'cleanup-old-files': {
35 'task': 'apps.common.tasks.cleanup_old_files',
36 'schedule': crontab(hour=2, minute=0), # Каждый день в 2:00
37 },
38 }
3. Импорт в __init__.py
Создание и использование задач
1. Простые задачи
1# apps/users/tasks.py
2 from celery import shared_task
3 from django.core.mail import send_mail
4 from django.conf import settings
5 import time
6
7 @shared_task
8 def send_welcome_email(user_email, username):
9 """Отправка приветственного email"""
10 subject = f'Добро пожаловать, {username}!'
11 message = f'''
12 Привет, {username}!
13
14 Спасибо за регистрацию на нашем сайте.
15 Мы рады видеть тебя в нашей команде!
16
17 С уважением,
18 Команда поддержки
19 '''
20
21 send_mail(
22 subject=subject,
23 message=message,
24 from_email=settings.DEFAULT_FROM_EMAIL,
25 recipient_list=[user_email],
26 fail_silently=False,
27 )
28
29 return f'Email отправлен на {user_email}'
30
31 @shared_task
32 def long_running_task():
33 """Долго выполняющаяся задача"""
34 time.sleep(10) # Имитируем долгую работу
35 return 'Задача завершена успешно'
36
37 @shared_task(bind=True)
38 def task_with_progress(self):
39 """Задача с отслеживанием прогресса"""
40 total_steps = 100
41
42 for i in range(total_steps):
43 # Обновляем прогресс
44 self.update_state(
45 state='PROGRESS',
46 meta={'current': i, 'total': total_steps}
47 )
48 time.sleep(0.1) # Имитируем работу
49
50 return {'current': total_steps, 'total': total_steps, 'status': 'Завершено'}
2. Задачи с параметрами и retry
1@shared_task(bind=True, max_retries=3, default_retry_delay=60)
2 def process_file(self, file_path, user_id):
3 """Обработка файла с возможностью повторных попыток"""
4 try:
5 # Имитируем обработку файла
6 if not os.path.exists(file_path):
7 raise FileNotFoundError(f"Файл {file_path} не найден")
8
9 # Логика обработки файла
10 result = process_file_content(file_path)
11
12 # Обновляем статус в базе данных
13 update_processing_status(user_id, 'completed', result)
14
15 return result
16
17 except FileNotFoundError as exc:
18 # Повторяем попытку через 60 секунд
19 raise self.retry(exc=exc, countdown=60)
20 except Exception as exc:
21 # Логируем ошибку и повторяем попытку
22 logger.error(f"Ошибка обработки файла {file_path}: {exc}")
23 raise self.retry(exc=exc, countdown=120)
24
25 @shared_task(bind=True, rate_limit='10/m') # Максимум 10 задач в минуту
26 def send_notification(self, user_id, message):
27 """Отправка уведомления с ограничением скорости"""
28 try:
29 user = User.objects.get(id=user_id)
30 send_push_notification(user, message)
31 return f'Уведомление отправлено пользователю {user.username}'
32 except User.DoesNotExist:
33 logger.error(f"Пользователь {user_id} не найден")
34 return None
3. Группы и цепочки задач
1from celery import group, chain, chord
2
3 @shared_task
4 def process_user_data(user_id):
5 """Обработка данных пользователя"""
6 user = User.objects.get(id=user_id)
7 # Обработка данных
8 return {'user_id': user_id, 'status': 'processed'}
9
10 @shared_task
11 def send_user_report(user_data):
12 """Отправка отчета пользователю"""
13 user_id = user_data['user_id']
14 # Отправка отчета
15 return f'Отчет отправлен пользователю {user_id}'
16
17 @shared_task
18 def cleanup_temp_data(user_data):
19 """Очистка временных данных"""
20 user_id = user_data['user_id']
21 # Очистка
22 return f'Временные данные очищены для пользователя {user_id}'
23
24 def process_multiple_users(user_ids):
25 """Обработка нескольких пользователей параллельно"""
26 # Создаем группу задач для параллельного выполнения
27 job = group(process_user_data.s(user_id) for user_id in user_ids)
28 return job.apply_async()
29
30 def process_user_with_report(user_id):
31 """Обработка пользователя с отправкой отчета"""
32 # Создаем цепочку задач
33 job = chain(
34 process_user_data.s(user_id),
35 send_user_report.s(),
36 cleanup_temp_data.s()
37 )
38 return job.apply_async()
39
40 def process_users_with_callback(user_ids):
41 """Обработка пользователей с callback после завершения всех"""
42 # Создаем chord (группа + callback)
43 job = chord(
44 group(process_user_data.s(user_id) for user_id in user_ids),
45 send_summary_report.s()
46 )
47 return job.apply_async()
Периодические задачи (Celery Beat)
1. Настройка расписания
1# settings.py
2 from celery.schedules import crontab, timedelta
3
4 CELERY_BEAT_SCHEDULE = {
5 # Каждый день в 9:00
6 'daily-report': {
7 'task': 'apps.users.tasks.send_daily_report',
8 'schedule': crontab(hour=9, minute=0),
9 },
10
11 # Каждый понедельник в 8:00
12 'weekly-summary': {
13 'task': 'apps.analytics.tasks.generate_weekly_summary',
14 'schedule': crontab(day_of_week=1, hour=8, minute=0),
15 },
16
17 # Каждые 30 минут
18 'check-system-health': {
19 'task': 'apps.monitoring.tasks.check_system_health',
20 'schedule': timedelta(minutes=30),
21 },
22
23 # Каждый час
24 'backup-database': {
25 'task': 'apps.maintenance.tasks.backup_database',
26 'schedule': crontab(minute=0),
27 },
28
29 # Каждые 15 минут в рабочее время
30 'process-queue': {
31 'task': 'apps.queue.tasks.process_pending_items',
32 'schedule': crontab(minute='*/15', hour='9-18'),
33 },
34 }
2. Периодические задачи с параметрами
1@shared_task
2def send_daily_report():
3 """Отправка ежедневного отчета"""
4 today = timezone.now().date()
5 users = User.objects.filter(is_active=True)
6
7 for user in users:
8 # Генерируем отчет для каждого пользователя
9 report_data = generate_user_report(user, today)
10 send_email_report.delay(user.email, report_data)
11
12 return f'Отправлено {users.count()} ежедневных отчетов'
13
14@shared_task
15def cleanup_old_files():
16 """Очистка старых файлов"""
17 cutoff_date = timezone.now() - timedelta(days=30)
18 old_files = FileUpload.objects.filter(
19 created_at__lt=cutoff_date,
20 is_temporary=True
21 )
22
23 deleted_count = 0
24 for file_obj in old_files:
25 try:
26 # Удаляем файл с диска
27 if os.path.exists(file_obj.file.path):
28 os.remove(file_obj.file.path)
29
30 # Удаляем запись из базы
31 file_obj.delete()
32 deleted_count += 1
33
34 except Exception as e:
35 logger.error(f"Ошибка удаления файла {file_obj.id}: {e}")
36
37 return f'Удалено {deleted_count} старых файлов'
Мониторинг и управление задачами
1. Настройка Flower для мониторинга
2. Отслеживание статуса задач
1# views.py
2from django.http import JsonResponse
3from celery.result import AsyncResult
4
5def check_task_status(request, task_id):
6 """Проверка статуса задачи"""
7 task_result = AsyncResult(task_id)
8
9 if task_result.ready():
10 if task_result.successful():
11 return JsonResponse({
12 'status': 'completed',
13 'result': task_result.result
14 })
15 else:
16 return JsonResponse({
17 'status': 'failed',
18 'error': str(task_result.info)
19 })
20 else:
21 return JsonResponse({
22 'status': 'running',
23 'progress': task_result.info.get('current', 0) if task_result.info else 0
24 })
25
26def cancel_task(request, task_id):
27 """Отмена задачи"""
28 task_result = AsyncResult(task_id)
29 if not task_result.ready():
30 task_result.revoke(terminate=True)
31 return JsonResponse({'status': 'cancelled'})
32 else:
33 return JsonResponse({'status': 'already_completed'})
3. Логирование и мониторинг
1# tasks.py
2import logging
3from celery.utils.log import get_task_logger
4
5logger = get_task_logger(__name__)
6
7@shared_task(bind=True)
8def monitored_task(self):
9 """Задача с подробным логированием"""
10 logger.info(f"Начало выполнения задачи {self.request.id}")
11
12 try:
13 # Выполнение задачи
14 result = perform_complex_operation()
15
16 logger.info(f"Задача {self.request.id} завершена успешно")
17 return result
18
19 except Exception as exc:
20 logger.error(f"Ошибка в задаче {self.request.id}: {exc}")
21 raise
22
23# settings.py для логирования Celery
24LOGGING = {
25 'version': 1,
26 'disable_existing_loggers': False,
27 'formatters': {
28 'verbose': {
29 'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
30 'style': '{',
31 },
32 },
33 'handlers': {
34 'celery_file': {
35 'level': 'INFO',
36 'class': 'logging.FileHandler',
37 'filename': 'logs/celery.log',
38 'formatter': 'verbose',
39 },
40 },
41 'loggers': {
42 'celery': {
43 'handlers': ['celery_file'],
44 'level': 'INFO',
45 'propagate': False,
46 },
47 },
48}
Запуск и управление
1. Запуск worker'ов
1# Запуск worker'а с логированием
2celery -A project worker -l info
3
4# Запуск с определенным количеством процессов
5celery -A project worker --concurrency=4 -l info
6
7# Запуск worker'а для определенной очереди
8celery -A project worker -Q high,default -l info
9
10# Запуск worker'а в фоновом режиме
11celery -A project worker -l info --detach
12
13# Запуск с пулом gevent для I/O задач
14celery -A project worker --pool=gevent --concurrency=100 -l info
2. Запуск Celery Beat
3. Мониторинг через командную строку
1# Проверка статуса worker'ов
2celery -A project inspect active
3
4# Проверка статистики
5celery -A project inspect stats
6
7# Проверка зарегистрированных задач
8celery -A project inspect registered
9
10# Очистка очередей
11celery -A project purge
12
13# Проверка результатов задач
14celery -A project result <task_id>
Тестирование задач
1# tests.py
2from django.test import TestCase
3from django.contrib.auth.models import User
4from unittest.mock import patch
5from apps.users.tasks import send_welcome_email
6
7class CeleryTasksTestCase(TestCase):
8 def setUp(self):
9 self.user = User.objects.create_user(
10 username='testuser',
11 email='test@example.com',
12 password='testpass123'
13 )
14
15 @patch('apps.users.tasks.send_mail')
16 def test_send_welcome_email(self, mock_send_mail):
17 """Тестирование отправки приветственного email"""
18 # Выполняем задачу синхронно
19 result = send_welcome_email.delay(
20 self.user.email,
21 self.user.username
22 )
23
24 # Проверяем результат
25 self.assertEqual(result.get(), f'Email отправлен на {self.user.email}')
26
27 # Проверяем, что send_mail был вызван
28 mock_send_mail.assert_called_once()
29
30 def test_task_registration(self):
31 """Проверка регистрации задач"""
32 from project.celery import app
33
34 # Проверяем, что задачи зарегистрированы
35 registered_tasks = app.tasks.keys()
36 self.assertIn('apps.users.tasks.send_welcome_email', registered_tasks)
Производительность и оптимизация
1. Настройка пулов worker'ов
1# settings.py
2# Для CPU-интенсивных задач
3CELERY_WORKER_POOL = 'prefork'
4CELERY_WORKER_CONCURRENCY = 4
5
6# Для I/O-интенсивных задач (HTTP запросы, работа с файлами)
7CELERY_WORKER_POOL = 'gevent'
8CELERY_WORKER_CONCURRENCY = 100
9
10# Для задач с долгими операциями
11CELERY_WORKER_POOL = 'solo'
12CELERY_WORKER_CONCURRENCY = 1
2. Очереди и приоритеты
1# tasks.py
2@shared_task(queue='high')
3def urgent_task():
4 """Срочная задача с высоким приоритетом"""
5 return 'Срочная задача выполнена'
6
7@shared_task(queue='default')
8def normal_task():
9 """Обычная задача"""
10 return 'Обычная задача выполнена'
11
12@shared_task(queue='low')
13def background_task():
14 """Фоновая задача с низким приоритетом"""
15 return 'Фоновая задача выполнена'
16
17# settings.py
18CELERY_TASK_DEFAULT_QUEUE = 'default'
19CELERY_TASK_QUEUES = {
20 'high': {
21 'exchange': 'high',
22 'routing_key': 'high',
23 },
24 'default': {
25 'exchange': 'default',
26 'routing_key': 'default',
27 },
28 'low': {
29 'exchange': 'low',
30 'routing_key': 'low',
31 },
32}
FAQ
Q: Как запустить Celery worker?
A: Используй команду celery -A project worker -l info в отдельном терминале. Для продакшена используй supervisor или systemd.
Q: Как отладить задачи Celery?
A: Используй Flower для мониторинга, логирование с get_task_logger, и проверяй статус задач через AsyncResult.
Q: Можно ли использовать Celery без Redis?
A: Да, можно использовать другие брокеры: RabbitMQ, Amazon SQS, или встроенный SQLite для разработки.
Q: Как обработать ошибки в задачах?
A: Используй try-except блоки, декоратор max_retries, и логируй ошибки. Настройте обработчики ошибок через on_failure.
Q: Как масштабировать Celery?
A: Запускай несколько worker'ов, используй разные очереди, настройте балансировку нагрузки и мониторинг.
Q: Как тестировать задачи Celery?
A: Используй CELERY_TASK_ALWAYS_EAGER=True для синхронного выполнения в тестах, мокай внешние зависимости.
Q: Как настроить периодические задачи?
A: Используй CELERY_BEAT_SCHEDULE в settings.py, запускай celery beat, или используй django-celery-beat для динамического расписания.
Q: Как мониторить производительность Celery?
A: Используй Flower, Prometheus метрики, логирование, и отслеживай время выполнения задач и размер очередей.