Django Channels для WebSocket
Channels расширяет Django для поддержки WebSocket и других асинхронных протоколов. Это позволяет создавать real-time приложения с двусторонней связью между клиентом и сервером.
Установка и настройка
Сначала установи необходимые пакеты:
1pip install channels channels-redis daphne
Или через poetry:
1poetry add channels channels-redis daphne
Настройка в settings.py
Добавь Channels в настройки Django:
1INSTALLED_APPS = [
2 # ... другие приложения
3 'channels',
4 'channels_redis',
5]
6
7# Настройка ASGI
8ASGI_APPLICATION = 'project.asgi.application'
9
10# Channel Layer для Redis
11CHANNEL_LAYERS = {
12 'default': {
13 'BACKEND': 'channels_redis.core.RedisChannelLayer',
14 'CONFIG': {
15 "hosts": [('127.0.0.1', 6379)],
16 "capacity": 1500, # Максимальное количество сообщений в очереди
17 "expiry": 10, # Время жизни сообщений в секундах
18 },
19 },
20}
21
22# Для production используй переменные окружения
23CHANNEL_LAYERS = {
24 'default': {
25 'BACKEND': 'channels_redis.core.RedisChannelLayer',
26 'CONFIG': {
27 "hosts": [os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379')],
28 "capacity": 1500,
29 "expiry": 10,
30 },
31 },
32}
Настройка ASGI
Создай или обнови asgi.py:
1# project/asgi.py
2import os
3from django.core.asgi import get_asgi_application
4from channels.routing import ProtocolTypeRouter, URLRouter
5from channels.auth import AuthMiddlewareStack
6from channels.security.websocket import AllowedHostsOriginValidator
7from .routing import websocket_urlpatterns
8
9os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
10
11application = ProtocolTypeRouter({
12 "http": get_asgi_application(),
13 "websocket": AllowedHostsOriginValidator(
14 AuthMiddlewareStack(
15 URLRouter(websocket_urlpatterns)
16 )
17 ),
18})
Создание routing
Настрой маршрутизацию для WebSocket соединений:
1# project/routing.py
2from django.urls import re_path
3from .consumers import ChatConsumer, NotificationConsumer, GameConsumer
4
5websocket_urlpatterns = [
6 re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
7 re_path(r'ws/notifications/(?P<user_id>\d+)/$', NotificationConsumer.as_asgi()),
8 re_path(r'ws/game/(?P<game_id>\d+)/$', GameConsumer.as_asgi()),
9]
Базовые consumers
Создавай consumers для обработки WebSocket соединений:
1import json
2from channels.generic.websocket import WebsocketConsumer
3from channels.db import database_sync_to_async
4from .models import ChatRoom, Message
5
6class ChatConsumer(WebsocketConsumer):
7 def connect(self):
8 # Получаем имя комнаты из URL
9 self.room_name = self.scope['url_route']['kwargs']['room_name']
10 self.room_group_name = f'chat_{self.room_name}'
11
12 # Присоединяемся к группе комнаты
13 async_to_sync(self.channel_layer.group_add)(
14 self.room_group_name,
15 self.channel_name
16 )
17
18 # Принимаем соединение
19 self.accept()
20
21 # Отправляем сообщение о подключении
22 async_to_sync(self.channel_layer.group_send)(
23 self.room_group_name,
24 {
25 'type': 'chat_message',
26 'message': f'Пользователь присоединился к чату {self.room_name}'
27 }
28 )
29
30 def disconnect(self, close_code):
31 # Покидаем группу комнаты
32 async_to_sync(self.channel_layer.group_discard)(
33 self.room_group_name,
34 self.channel_name
35 )
36
37 # Отправляем сообщение об отключении
38 async_to_sync(self.channel_layer.group_send)(
39 self.room_group_name,
40 {
41 'type': 'chat_message',
42 'message': f'Пользователь покинул чат {self.room_name}'
43 }
44 )
45
46 def receive(self, text_data):
47 text_data_json = json.loads(text_data)
48 message = text_data_json['message']
49 user = text_data_json.get('user', 'Anonymous')
50
51 # Отправляем сообщение в группу
52 async_to_sync(self.channel_layer.group_send)(
53 self.room_group_name,
54 {
55 'type': 'chat_message',
56 'message': message,
57 'user': user
58 }
59 )
60
61 def chat_message(self, event):
62 message = event['message']
63 user = event.get('user', 'System')
64
65 # Отправляем сообщение в WebSocket
66 self.send(text_data=json.dumps({
67 'message': message,
68 'user': user,
69 'type': 'chat'
70 }))
Асинхронные consumers
Используй AsyncWebsocketConsumer для лучшей производительности:
1import json
2from channels.generic.websocket import AsyncWebsocketConsumer
3from channels.db import database_sync_to_async
4from .models import Notification, User
5
6class NotificationConsumer(AsyncWebsocketConsumer):
7 async def connect(self):
8 self.user_id = self.scope['url_route']['kwargs']['user_id']
9 self.user_group_name = f'user_{self.user_id}'
10
11 # Проверяем аутентификацию
12 if await self.is_authenticated():
13 await self.channel_layer.group_add(
14 self.user_group_name,
15 self.channel_name
16 )
17 await self.accept()
18
19 # Отправляем непрочитанные уведомления
20 notifications = await self.get_unread_notifications()
21 for notification in notifications:
22 await self.send(text_data=json.dumps({
23 'type': 'notification',
24 'message': notification.message,
25 'id': notification.id
26 }))
27 else:
28 await self.close()
29
30 async def disconnect(self, close_code):
31 await self.channel_layer.group_discard(
32 self.user_group_name,
33 self.channel_name
34 )
35
36 async def receive(self, text_data):
37 text_data_json = json.loads(text_data)
38 message_type = text_data_json.get('type')
39
40 if message_type == 'mark_read':
41 notification_id = text_data_json.get('notification_id')
42 await self.mark_notification_read(notification_id)
43 elif message_type == 'ping':
44 await self.send(text_data=json.dumps({'type': 'pong'}))
45
46 async def notification_message(self, event):
47 # Отправляем уведомление пользователю
48 await self.send(text_data=json.dumps({
49 'type': 'notification',
50 'message': event['message'],
51 'id': event['id']
52 }))
53
54 @database_sync_to_async
55 def is_authenticated(self):
56 # Проверка аутентификации пользователя
57 user = User.objects.filter(id=self.user_id).first()
58 return user is not None
59
60 @database_sync_to_async
61 def get_unread_notifications(self):
62 return list(Notification.objects.filter(
63 user_id=self.user_id,
64 is_read=False
65 )[:10])
66
67 @database_sync_to_async
68 def mark_notification_read(self, notification_id):
69 Notification.objects.filter(
70 id=notification_id,
71 user_id=self.user_id
72 ).update(is_read=True)
Работа с группами
Используй группы для отправки сообщений множеству пользователей:
1from channels.layers import get_channel_layer
2from asgiref.sync import async_to_sync
3
4class NotificationService:
5 @staticmethod
6 def send_notification_to_user(user_id, message):
7 channel_layer = get_channel_layer()
8 async_to_sync(channel_layer.group_send)(
9 f'user_{user_id}',
10 {
11 'type': 'notification_message',
12 'message': message,
13 'id': generate_notification_id()
14 }
15 )
16
17 @staticmethod
18 def send_notification_to_group(group_name, message):
19 channel_layer = get_channel_layer()
20 async_to_sync(channel_layer.group_send)(
21 group_name,
22 {
23 'type': 'notification_message',
24 'message': message,
25 'id': generate_notification_id()
26 }
27 )
28
29 @staticmethod
30 def send_notification_to_all(message):
31 channel_layer = get_channel_layer()
32 async_to_sync(channel_layer.group_send)(
33 'all_users',
34 {
35 'type': 'notification_message',
36 'message': message,
37 'id': generate_notification_id()
38 }
39 )
40
41# Использование в Django views или tasks
42def notify_user_about_new_message(request, user_id):
43 NotificationService.send_notification_to_user(
44 user_id,
45 'У вас новое сообщение!'
46 )
47
48# В Celery task
49@shared_task
50def send_bulk_notifications(user_ids, message):
51 for user_id in user_ids:
52 NotificationService.send_notification_to_user(user_id, message)
Аутентификация в WebSocket
Настрой аутентификацию для WebSocket соединений:
1# middleware.py
2from channels.middleware import BaseMiddleware
3from channels.db import database_sync_to_async
4from django.contrib.auth.models import AnonymousUser
5from django.contrib.auth import get_user
6from django.contrib.sessions.models import Session
7
8class TokenAuthMiddleware(BaseMiddleware):
9 async def __call__(self, scope, receive, send):
10 # Получаем токен из query параметров
11 query_string = scope.get('query_string', b'').decode()
12 query_params = dict(x.split('=') for x in query_string.split('&') if x)
13 token = query_params.get('token', '')
14
15 if token:
16 scope['user'] = await self.get_user_from_token(token)
17 else:
18 scope['user'] = AnonymousUser()
19
20 return await super().__call__(scope, receive, send)
21
22 @database_sync_to_async
23 def get_user_from_token(self, token):
24 # Здесь должна быть логика проверки токена
25 # Например, проверка JWT токена или токена сессии
26 try:
27 # Пример с JWT
28 from rest_framework_simplejwt.tokens import AccessToken
29 from django.contrib.auth import get_user_model
30
31 User = get_user_model()
32 decoded_token = AccessToken(token)
33 user_id = decoded_token['user_id']
34 return User.objects.get(id=user_id)
35 except Exception:
36 return AnonymousUser()
37
38# Обнови asgi.py
39from .middleware import TokenAuthMiddleware
40
41application = ProtocolTypeRouter({
42 "http": get_asgi_application(),
43 "websocket": AllowedHostsOriginValidator(
44 TokenAuthMiddleware(
45 URLRouter(websocket_urlpatterns)
46 )
47 ),
48})
Обработка ошибок
Добавляй обработку ошибок в consumers:
1import logging
2from channels.generic.websocket import AsyncWebsocketConsumer
3
4logger = logging.getLogger(__name__)
5
6class RobustChatConsumer(AsyncWebsocketConsumer):
7 async def connect(self):
8 try:
9 self.room_name = self.scope['url_route']['kwargs']['room_name']
10 self.room_group_name = f'chat_{self.room_name}'
11
12 # Проверяем существование комнаты
13 if not await self.room_exists():
14 await self.close(code=4001) # Комната не существует
15 return
16
17 await self.channel_layer.group_add(
18 self.room_group_name,
19 self.channel_name
20 )
21 await self.accept()
22
23 logger.info(f'Пользователь подключился к комнате {self.room_name}')
24
25 except Exception as e:
26 logger.error(f'Ошибка подключения: {e}')
27 await self.close(code=4000) # Внутренняя ошибка сервера
28
29 async def receive(self, text_data):
30 try:
31 text_data_json = json.loads(text_data)
32 message = text_data_json.get('message', '')
33
34 # Валидация сообщения
35 if not message or len(message.strip()) == 0:
36 await self.send(text_data=json.dumps({
37 'type': 'error',
38 'message': 'Сообщение не может быть пустым'
39 }))
40 return
41
42 if len(message) > 1000:
43 await self.send(text_data=json.dumps({
44 'type': 'error',
45 'message': 'Сообщение слишком длинное'
46 }))
47 return
48
49 # Отправляем сообщение в группу
50 await self.channel_layer.group_send(
51 self.room_group_name,
52 {
53 'type': 'chat_message',
54 'message': message,
55 'user': self.scope['user'].username if self.scope['user'].is_authenticated else 'Anonymous'
56 }
57 )
58
59 except json.JSONDecodeError:
60 await self.send(text_data=json.dumps({
61 'type': 'error',
62 'message': 'Неверный формат JSON'
63 }))
64 except Exception as e:
65 logger.error(f'Ошибка обработки сообщения: {e}')
66 await self.send(text_data=json.dumps({
67 'type': 'error',
68 'message': 'Внутренняя ошибка сервера'
69 }))
70
71 async def disconnect(self, close_code):
72 try:
73 await self.channel_layer.group_discard(
74 self.room_group_name,
75 self.channel_name
76 )
77 logger.info(f'Пользователь отключился от комнаты {self.room_name}')
78 except Exception as e:
79 logger.error(f'Ошибка отключения: {e}')
80
81 @database_sync_to_async
82 def room_exists(self):
83 from .models import ChatRoom
84 return ChatRoom.objects.filter(name=self.room_name).exists()
Тестирование WebSocket
Создавай тесты для WebSocket consumers:
1from channels.testing import WebsocketCommunicator
2from channels.routing import URLRouter
3from django.test import TestCase
4from django.urls import re_path
5from .consumers import ChatConsumer
6
7class WebSocketTestCase(TestCase):
8 async def test_chat_consumer_connect(self):
9 # Создаем тестовое приложение
10 application = URLRouter([
11 re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
12 ])
13
14 # Создаем коммуникатор
15 communicator = WebsocketCommunicator(
16 application,
17 "ws/chat/testroom/"
18 )
19
20 # Подключаемся
21 connected, _ = await communicator.connect()
22 self.assertTrue(connected)
23
24 # Отправляем сообщение
25 await communicator.send_json_to({
26 'message': 'Hello, World!'
27 })
28
29 # Получаем ответ
30 response = await communicator.receive_json_from()
31 self.assertEqual(response['message'], 'Hello, World!')
32
33 # Отключаемся
34 await communicator.disconnect()
35
36 async def test_chat_consumer_group_messages(self):
37 application = URLRouter([
38 re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
39 ])
40
41 # Создаем два коммуникатора для одной комнаты
42 communicator1 = WebsocketCommunicator(
43 application,
44 "ws/chat/testroom/"
45 )
46 communicator2 = WebsocketCommunicator(
47 application,
48 "ws/chat/testroom/"
49 )
50
51 # Подключаем оба
52 connected1, _ = await communicator1.connect()
53 connected2, _ = await communicator2.connect()
54 self.assertTrue(connected1)
55 self.assertTrue(connected2)
56
57 # Отправляем сообщение от первого пользователя
58 await communicator1.send_json_to({
59 'message': 'Hello from user 1!'
60 })
61
62 # Проверяем, что оба получили сообщение
63 response1 = await communicator1.receive_json_from()
64 response2 = await communicator2.receive_json_from()
65
66 self.assertEqual(response1['message'], 'Hello from user 1!')
67 self.assertEqual(response2['message'], 'Hello from user 1!')
68
69 # Отключаем оба
70 await communicator1.disconnect()
71 await communicator2.disconnect()
JavaScript клиент
Пример клиентского кода для подключения к WebSocket:
1class WebSocketClient {
2 constructor(url) {
3 this.url = url;
4 this.ws = null;
5 this.reconnectAttempts = 0;
6 this.maxReconnectAttempts = 5;
7 this.reconnectDelay = 1000;
8 this.listeners = new Map();
9 }
10
11 connect() {
12 try {
13 this.ws = new WebSocket(this.url);
14
15 this.ws.onopen = () => {
16 console.log('WebSocket соединение установлено');
17 this.reconnectAttempts = 0;
18 this.emit('connected');
19 };
20
21 this.ws.onmessage = (event) => {
22 try {
23 const data = JSON.parse(event.data);
24 this.emit('message', data);
25 } catch (e) {
26 console.error('Ошибка парсинга сообщения:', e);
27 }
28 };
29
30 this.ws.onclose = (event) => {
31 console.log('WebSocket соединение закрыто:', event.code, event.reason);
32 this.emit('disconnected', event);
33
34 // Попытка переподключения
35 if (this.reconnectAttempts < this.maxReconnectAttempts) {
36 setTimeout(() => {
37 this.reconnectAttempts++;
38 this.connect();
39 }, this.reconnectDelay * this.reconnectAttempts);
40 }
41 };
42
43 this.ws.onerror = (error) => {
44 console.error('WebSocket ошибка:', error);
45 this.emit('error', error);
46 };
47
48 } catch (error) {
49 console.error('Ошибка создания WebSocket:', error);
50 }
51 }
52
53 send(data) {
54 if (this.ws && this.ws.readyState === WebSocket.OPEN) {
55 this.ws.send(JSON.stringify(data));
56 } else {
57 console.error('WebSocket не подключен');
58 }
59 }
60
61 disconnect() {
62 if (this.ws) {
63 this.ws.close();
64 }
65 }
66
67 on(event, callback) {
68 if (!this.listeners.has(event)) {
69 this.listeners.set(event, []);
70 }
71 this.listeners.get(event).push(callback);
72 }
73
74 emit(event, data) {
75 if (this.listeners.has(event)) {
76 this.listeners.get(event).forEach(callback => callback(data));
77 }
78 }
79}
80
81// Использование
82const chatClient = new WebSocketClient('ws://localhost:8000/ws/chat/room1/');
83
84chatClient.on('connected', () => {
85 console.log('Подключились к чату');
86});
87
88chatClient.on('message', (data) => {
89 if (data.type === 'chat') {
90 displayMessage(data.message, data.user);
91 } else if (data.type === 'error') {
92 showError(data.message);
93 }
94});
95
96chatClient.on('disconnected', () => {
97 console.log('Отключились от чата');
98});
99
100chatClient.connect();
101
102// Отправка сообщения
103function sendMessage(message) {
104 chatClient.send({
105 message: message
106 });
107}
Лучшие практики
- Всегда используй обработку ошибок в consumers
- Логируй важные события WebSocket соединений
- Используй группы для эффективной отправки сообщений
- Реализуй механизм переподключения на клиенте
- Валидируйте данные на сервере
- Используй асинхронные consumers для лучшей производительности
- Настраивай аутентификацию для защищенных WebSocket
- Тестируй WebSocket функциональность
- Мониторь количество активных соединений
- Используй Redis для production deployments
FAQ
Q: Нужен ли Redis для Channels?
A: Да, для production рекомендуется использовать Redis как channel layer.
Q: Как обрабатывать большое количество WebSocket соединений?
A: Используй Redis cluster, настройте connection pooling и мониторьте производительность.
Q: Можно ли использовать Channels без WebSocket?
A: Да, Channels поддерживает HTTP long polling, Server-Sent Events и другие протоколы.
Q: Как обеспечить безопасность WebSocket соединений?
A: Используй аутентификацию, валидацию данных и HTTPS/WSS в production.
Q: Можно ли использовать Channels с Django REST Framework?
A: Да, они отлично работают вместе. DRF для HTTP API, Channels для real-time.
Q: Как тестировать WebSocket в Django?
A: Используй channels.testing.WebsocketCommunicator для асинхронных тестов.