WebSocket в Django с Django Channels
Django Channels добавляет поддержку WebSocket для real-time функциональности, позволяя создавать интерактивные приложения с двусторонней связью между клиентом и сервером.
Установка и настройка
Сначала установи необходимые пакеты:
1pip install channels channels-redis daphne
Или через poetry:
1poetry add channels channels-redis daphne
Настройка в settings.py
Добавь Channels в настройки Django:
1# settings.py
2INSTALLED_APPS = [
3 # ... другие приложения
4 'channels',
5 'channels_redis',
6]
7
8# Настройка ASGI
9ASGI_APPLICATION = 'myproject.asgi.application'
10
11# Channel Layer для Redis
12CHANNEL_LAYERS = {
13 'default': {
14 'BACKEND': 'channels_redis.core.RedisChannelLayer',
15 'CONFIG': {
16 "hosts": [('127.0.0.1', 6379)],
17 "capacity": 1500, # Максимальное количество сообщений в очереди
18 "expiry": 10, # Время жизни сообщений в секундах
19 },
20 },
21}
22
23# Для production используй переменные окружения
24import os
25CHANNEL_LAYERS = {
26 'default': {
27 'BACKEND': 'channels_redis.core.RedisChannelLayer',
28 'CONFIG': {
29 "hosts": [os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379')],
30 "capacity": 1500,
31 "expiry": 10,
32 },
33 },
34}
Настройка ASGI
Создай или обнови asgi.py для поддержки WebSocket:
1# myproject/asgi.py
2import os
3from django.core.asgi import get_asgi_application
4from channels.routing import ProtocolTypeRouter, URLRouter
5from channels.auth import AuthMiddlewareStack
6from channels.security.websocket import AllowedHostsOriginValidator
7from .routing import websocket_urlpatterns
8
9os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
10
11# ASGI приложение с поддержкой HTTP и WebSocket
12application = ProtocolTypeRouter({
13 "http": get_asgi_application(),
14 "websocket": AllowedHostsOriginValidator(
15 AuthMiddlewareStack(
16 URLRouter(websocket_urlpatterns)
17 )
18 ),
19})
20
21# Альтернативная настройка с кастомным middleware
22from .middleware import TokenAuthMiddleware
23
24application = ProtocolTypeRouter({
25 "http": get_asgi_application(),
26 "websocket": AllowedHostsOriginValidator(
27 TokenAuthMiddleware(
28 URLRouter(websocket_urlpatterns)
29 )
30 ),
31})
Создание routing
Настрой маршрутизацию для WebSocket соединений:
1# myproject/routing.py
2from django.urls import re_path
3from .consumers import (
4 ChatConsumer,
5 NotificationConsumer,
6 GameConsumer,
7 DashboardConsumer
8)
9
10websocket_urlpatterns = [
11 # Чат комнаты
12 re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
13
14 # Уведомления пользователя
15 re_path(r'ws/notifications/(?P<user_id>\d+)/$', NotificationConsumer.as_asgi()),
16
17 # Игровые сессии
18 re_path(r'ws/game/(?P<game_id>\d+)/$', GameConsumer.as_asgi()),
19
20 # Дашборд в реальном времени
21 re_path(r'ws/dashboard/(?P<dashboard_type>\w+)/$', DashboardConsumer.as_asgi()),
22]
23
24# Альтернативная маршрутизация с namespace
25from django.urls import path, include
26
27websocket_urlpatterns = [
28 path('ws/', include([
29 path('chat/', include('chat.routing')),
30 path('notifications/', include('notifications.routing')),
31 path('games/', include('games.routing')),
32 ])),
33]
Создание consumers
Создавай consumers для обработки WebSocket соединений:
1# chat/consumers.py
2import json
3from channels.generic.websocket import WebsocketConsumer
4from channels.db import database_sync_to_async
5from asgiref.sync import async_to_sync
6from .models import ChatRoom, Message
7
8class ChatConsumer(WebsocketConsumer):
9 def connect(self):
10 # Получаем параметры из URL
11 self.room_name = self.scope['url_route']['kwargs']['room_name']
12 self.room_group_name = f'chat_{self.room_name}'
13 self.user = self.scope['user']
14
15 # Проверяем доступ к комнате
16 if self.can_join_room():
17 # Присоединяемся к группе комнаты
18 async_to_sync(self.channel_layer.group_add)(
19 self.room_group_name,
20 self.channel_name
21 )
22
23 # Принимаем соединение
24 self.accept()
25
26 # Отправляем сообщение о подключении
27 async_to_sync(self.channel_layer.group_send)(
28 self.room_group_name,
29 {
30 'type': 'chat_message',
31 'message': f'{self.user.username} присоединился к чату',
32 'user': 'System',
33 'timestamp': self.get_timestamp()
34 }
35 )
36
37 # Отправляем историю сообщений
38 self.send_chat_history()
39 else:
40 self.close(code=4001) # Доступ запрещен
41
42 def disconnect(self, close_code):
43 if hasattr(self, 'room_group_name'):
44 # Покидаем группу комнаты
45 async_to_sync(self.channel_layer.group_discard)(
46 self.room_group_name,
47 self.channel_name
48 )
49
50 # Отправляем сообщение об отключении
51 async_to_sync(self.channel_layer.group_send)(
52 self.room_group_name,
53 {
54 'type': 'chat_message',
55 'message': f'{self.user.username} покинул чат',
56 'user': 'System',
57 'timestamp': self.get_timestamp()
58 }
59 )
60
61 def receive(self, text_data):
62 try:
63 text_data_json = json.loads(text_data)
64 message_type = text_data_json.get('type', 'message')
65
66 if message_type == 'message':
67 message = text_data_json.get('message', '')
68
69 # Валидация сообщения
70 if self.validate_message(message):
71 # Сохраняем сообщение в базе
72 saved_message = self.save_message(message)
73
74 # Отправляем сообщение в группу
75 async_to_sync(self.channel_layer.group_send)(
76 self.room_group_name,
77 {
78 'type': 'chat_message',
79 'message': message,
80 'user': self.user.username,
81 'timestamp': self.get_timestamp(),
82 'message_id': saved_message.id
83 }
84 )
85 else:
86 self.send(text_data=json.dumps({
87 'type': 'error',
88 'message': 'Сообщение не прошло валидацию'
89 }))
90
91 elif message_type == 'typing':
92 # Отправляем уведомление о печати
93 async_to_sync(self.channel_layer.group_send)(
94 self.room_group_name,
95 {
96 'type': 'user_typing',
97 'user': self.user.username
98 }
99 )
100
101 elif message_type == 'ping':
102 # Отвечаем на ping
103 self.send(text_data=json.dumps({'type': 'pong'}))
104
105 except json.JSONDecodeError:
106 self.send(text_data=json.dumps({
107 'type': 'error',
108 'message': 'Неверный формат JSON'
109 }))
110 except Exception as e:
111 self.send(text_data=json.dumps({
112 'type': 'error',
113 'message': 'Внутренняя ошибка сервера'
114 }))
115
116 def chat_message(self, event):
117 # Отправляем сообщение в WebSocket
118 self.send(text_data=json.dumps({
119 'type': 'chat',
120 'message': event['message'],
121 'user': event['user'],
122 'timestamp': event['timestamp'],
123 'message_id': event.get('message_id')
124 }))
125
126 def user_typing(self, event):
127 # Отправляем уведомление о печати
128 self.send(text_data=json.dumps({
129 'type': 'typing',
130 'user': event['user']
131 }))
132
133 @database_sync_to_async
134 def can_join_room(self):
135 # Проверка доступа к комнате
136 try:
137 room = ChatRoom.objects.get(name=self.room_name)
138 return room.is_public or self.user in room.members.all()
139 except ChatRoom.DoesNotExist:
140 return False
141
142 @database_sync_to_async
143 def save_message(self, message):
144 # Сохранение сообщения в базе
145 return Message.objects.create(
146 room_name=self.room_name,
147 user=self.user,
148 content=message
149 )
150
151 def send_chat_history(self):
152 # Отправка истории сообщений
153 messages = Message.objects.filter(
154 room_name=self.room_name
155 ).order_by('-timestamp')[:50]
156
157 for msg in reversed(messages):
158 self.send(text_data=json.dumps({
159 'type': 'history',
160 'message': msg.content,
161 'user': msg.user.username,
162 'timestamp': msg.timestamp.isoformat()
163 }))
164
165 def validate_message(self, message):
166 # Валидация сообщения
167 if not message or len(message.strip()) == 0:
168 return False
169 if len(message) > 1000:
170 return False
171 return True
172
173 def get_timestamp(self):
174 from django.utils import timezone
175 return timezone.now().isoformat()
Асинхронные consumers
Используй AsyncWebsocketConsumer для лучшей производительности:
1# notifications/consumers.py
2import json
3from channels.generic.websocket import AsyncWebsocketConsumer
4from channels.db import database_sync_to_async
5from .models import Notification, User
6
7class NotificationConsumer(AsyncWebsocketConsumer):
8 async def connect(self):
9 self.user_id = self.scope['url_route']['kwargs']['user_id']
10 self.user_group_name = f'user_{self.user_id}'
11
12 # Проверяем аутентификацию
13 if await self.is_authenticated():
14 await self.channel_layer.group_add(
15 self.user_group_name,
16 self.channel_name
17 )
18 await self.accept()
19
20 # Отправляем непрочитанные уведомления
21 await self.send_unread_notifications()
22
23 # Отправляем статистику
24 await self.send_notification_stats()
25 else:
26 await self.close(code=4001)
27
28 async def disconnect(self, close_code):
29 await self.channel_layer.group_discard(
30 self.user_group_name,
31 self.channel_name
32 )
33
34 async def receive(self, text_data):
35 try:
36 text_data_json = json.loads(text_data)
37 action = text_data_json.get('action')
38
39 if action == 'mark_read':
40 notification_id = text_data_json.get('notification_id')
41 await self.mark_notification_read(notification_id)
42
43 elif action == 'mark_all_read':
44 await self.mark_all_notifications_read()
45
46 elif action == 'delete':
47 notification_id = text_data_json.get('notification_id')
48 await self.delete_notification(notification_id)
49
50 elif action == 'ping':
51 await self.send(text_data=json.dumps({'type': 'pong'}))
52
53 except Exception as e:
54 await self.send(text_data=json.dumps({
55 'type': 'error',
56 'message': str(e)
57 }))
58
59 async def notification_message(self, event):
60 # Отправляем уведомление пользователю
61 await self.send(text_data=json.dumps({
62 'type': 'notification',
63 'id': event['id'],
64 'title': event['title'],
65 'message': event['message'],
66 'level': event.get('level', 'info'),
67 'timestamp': event['timestamp']
68 }))
69
70 async def notification_stats(self, event):
71 # Отправляем статистику уведомлений
72 await self.send(text_data=json.dumps({
73 'type': 'stats',
74 'unread_count': event['unread_count'],
75 'total_count': event['total_count']
76 }))
77
78 @database_sync_to_async
79 def is_authenticated(self):
80 try:
81 user = User.objects.get(id=self.user_id)
82 return user.is_active
83 except User.DoesNotExist:
84 return False
85
86 @database_sync_to_async
87 def send_unread_notifications(self):
88 notifications = Notification.objects.filter(
89 user_id=self.user_id,
90 is_read=False
91 ).order_by('-created_at')[:10]
92
93 for notification in notifications:
94 self.send(text_data=json.dumps({
95 'type': 'notification',
96 'id': notification.id,
97 'title': notification.title,
98 'message': notification.message,
99 'timestamp': notification.created_at.isoformat()
100 }))
101
102 @database_sync_to_async
103 def send_notification_stats(self):
104 unread_count = Notification.objects.filter(
105 user_id=self.user_id,
106 is_read=False
107 ).count()
108
109 total_count = Notification.objects.filter(
110 user_id=self.user_id
111 ).count()
112
113 self.send(text_data=json.dumps({
114 'type': 'stats',
115 'unread_count': unread_count,
116 'total_count': total_count
117 }))
118
119 @database_sync_to_async
120 def mark_notification_read(self, notification_id):
121 Notification.objects.filter(
122 id=notification_id,
123 user_id=self.user_id
124 ).update(is_read=True)
125
126 @database_sync_to_async
127 def mark_all_notifications_read(self):
128 Notification.objects.filter(
129 user_id=self.user_id,
130 is_read=False
131 ).update(is_read=True)
132
133 @database_sync_to_async
134 def delete_notification(self, notification_id):
135 Notification.objects.filter(
136 id=notification_id,
137 user_id=self.user_id
138 ).delete()
Middleware для аутентификации
Создавай кастомные middleware для WebSocket:
1# middleware.py
2from channels.middleware import BaseMiddleware
3from channels.db import database_sync_to_async
4from django.contrib.auth.models import AnonymousUser
5from django.contrib.auth import get_user
6import jwt
7from django.conf import settings
8
9class TokenAuthMiddleware(BaseMiddleware):
10 async def __call__(self, scope, receive, send):
11 # Получаем токен из query параметров или заголовков
12 token = self.get_token(scope)
13
14 if token:
15 scope['user'] = await self.get_user_from_token(token)
16 else:
17 scope['user'] = AnonymousUser()
18
19 return await super().__call__(scope, receive, send)
20
21 def get_token(self, scope):
22 # Из query параметров
23 query_string = scope.get('query_string', b'').decode()
24 query_params = dict(x.split('=') for x in query_string.split('&') if x)
25 token = query_params.get('token', '')
26
27 if not token:
28 # Из заголовков
29 headers = dict(scope['headers'])
30 token = headers.get(b'authorization', b'').decode()
31 if token.startswith('Bearer '):
32 token = token[7:]
33
34 return token
35
36 @database_sync_to_async
37 def get_user_from_token(self, token):
38 try:
39 # Декодируем JWT токен
40 payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
41 user_id = payload['user_id']
42
43 from django.contrib.auth import get_user_model
44 User = get_user_model()
45 user = User.objects.get(id=user_id)
46
47 if user.is_active:
48 return user
49 else:
50 return AnonymousUser()
51
52 except (jwt.InvalidTokenError, KeyError, User.DoesNotExist):
53 return AnonymousUser()
54
55class RateLimitMiddleware(BaseMiddleware):
56 def __init__(self, app, rate_limit=100):
57 super().__init__(app)
58 self.rate_limit = rate_limit
59 self.connections = {}
60
61 async def __call__(self, scope, receive, send):
62 # Проверяем rate limiting
63 client_ip = self.get_client_ip(scope)
64
65 if self.is_rate_limited(client_ip):
66 # Отклоняем соединение
67 return
68
69 # Добавляем соединение
70 self.add_connection(client_ip)
71
72 try:
73 return await super().__call__(scope, receive, send)
74 finally:
75 # Убираем соединение
76 self.remove_connection(client_ip)
77
78 def get_client_ip(self, scope):
79 headers = dict(scope['headers'])
80 return headers.get(b'x-forwarded-for', b'127.0.0.1').decode()
81
82 def is_rate_limited(self, client_ip):
83 return self.connections.get(client_ip, 0) >= self.rate_limit
84
85 def add_connection(self, client_ip):
86 self.connections[client_ip] = self.connections.get(client_ip, 0) + 1
87
88 def remove_connection(self, client_ip):
89 if client_ip in self.connections:
90 self.connections[client_ip] = max(0, self.connections[client_ip] - 1)
Работа с группами и broadcast
Отправляй сообщения множеству пользователей:
1# services.py
2from channels.layers import get_channel_layer
3from asgiref.sync import async_to_sync
4import json
5
6class WebSocketService:
7 @staticmethod
8 def send_notification_to_user(user_id, notification_data):
9 channel_layer = get_channel_layer()
10 async_to_sync(channel_layer.group_send)(
11 f'user_{user_id}',
12 {
13 'type': 'notification_message',
14 **notification_data
15 }
16 )
17
18 @staticmethod
19 def send_notification_to_group(group_name, notification_data):
20 channel_layer = get_channel_layer()
21 async_to_sync(channel_layer.group_send)(
22 group_name,
23 {
24 'type': 'notification_message',
25 **notification_data
26 }
27 )
28
29 @staticmethod
30 def broadcast_to_all(message_data):
31 channel_layer = get_channel_layer()
32 async_to_sync(channel_layer.group_send)(
33 'all_users',
34 {
35 'type': 'broadcast_message',
36 **message_data
37 }
38 )
39
40 @staticmethod
41 def send_to_channel(channel_name, message_data):
42 channel_layer = get_channel_layer()
43 async_to_sync(channel_layer.send)(
44 channel_name,
45 {
46 'type': 'direct_message',
47 **message_data
48 }
49 )
50
51# Использование в Django views или tasks
52def notify_user_about_new_message(request, user_id):
53 WebSocketService.send_notification_to_user(
54 user_id,
55 {
56 'title': 'Новое сообщение',
57 'message': 'У вас новое сообщение в чате',
58 'level': 'info'
59 }
60 )
61
62# В Celery task
63@shared_task
64def send_bulk_notifications(user_ids, message):
65 for user_id in user_ids:
66 WebSocketService.send_notification_to_user(
67 user_id,
68 {
69 'title': 'Массовое уведомление',
70 'message': message,
71 'level': 'warning'
72 }
73 )
JavaScript клиент
Создавай клиентский код для подключения к WebSocket:
1// static/js/websocket-client.js
2class WebSocketClient {
3 constructor(url, options = {}) {
4 this.url = url;
5 this.options = {
6 reconnectAttempts: 5,
7 reconnectDelay: 1000,
8 heartbeatInterval: 30000,
9 ...options
10 };
11
12 this.ws = null;
13 this.reconnectAttempts = 0;
14 this.heartbeatTimer = null;
15 this.listeners = new Map();
16 this.isConnecting = false;
17 }
18
19 connect() {
20 if (this.isConnecting || this.ws?.readyState === WebSocket.OPEN) {
21 return;
22 }
23
24 this.isConnecting = true;
25
26 try {
27 this.ws = new WebSocket(this.url);
28
29 this.ws.onopen = () => {
30 console.log('WebSocket соединение установлено');
31 this.isConnecting = false;
32 this.reconnectAttempts = 0;
33 this.startHeartbeat();
34 this.emit('connected');
35 };
36
37 this.ws.onmessage = (event) => {
38 try {
39 const data = JSON.parse(event.data);
40 this.handleMessage(data);
41 } catch (e) {
42 console.error('Ошибка парсинга сообщения:', e);
43 }
44 };
45
46 this.ws.onclose = (event) => {
47 console.log('WebSocket соединение закрыто:', event.code, event.reason);
48 this.isConnecting = false;
49 this.stopHeartbeat();
50 this.emit('disconnected', event);
51
52 // Попытка переподключения
53 if (this.reconnectAttempts < this.options.reconnectAttempts) {
54 setTimeout(() => {
55 this.reconnectAttempts++;
56 this.connect();
57 }, this.options.reconnectDelay * this.reconnectAttempts);
58 }
59 };
60
61 this.ws.onerror = (error) => {
62 console.error('WebSocket ошибка:', error);
63 this.isConnecting = false;
64 this.emit('error', error);
65 };
66
67 } catch (error) {
68 console.error('Ошибка создания WebSocket:', error);
69 this.isConnecting = false;
70 }
71 }
72
73 disconnect() {
74 if (this.ws) {
75 this.ws.close();
76 }
77 }
78
79 send(data) {
80 if (this.ws && this.ws.readyState === WebSocket.OPEN) {
81 this.ws.send(JSON.stringify(data));
82 } else {
83 console.error('WebSocket не подключен');
84 }
85 }
86
87 handleMessage(data) {
88 const messageType = data.type;
89
90 switch (messageType) {
91 case 'chat':
92 this.emit('chat_message', data);
93 break;
94 case 'notification':
95 this.emit('notification', data);
96 break;
97 case 'typing':
98 this.emit('user_typing', data);
99 break;
100 case 'error':
101 this.emit('error', data);
102 break;
103 case 'pong':
104 // Ответ на heartbeat
105 break;
106 default:
107 this.emit('message', data);
108 }
109 }
110
111 startHeartbeat() {
112 this.heartbeatTimer = setInterval(() => {
113 this.send({ type: 'ping' });
114 }, this.options.heartbeatInterval);
115 }
116
117 stopHeartbeat() {
118 if (this.heartbeatTimer) {
119 clearInterval(this.heartbeatTimer);
120 this.heartbeatTimer = null;
121 }
122 }
123
124 on(event, callback) {
125 if (!this.listeners.has(event)) {
126 this.listeners.set(event, []);
127 }
128 this.listeners.get(event).push(callback);
129 }
130
131 emit(event, data) {
132 if (this.listeners.has(event)) {
133 this.listeners.get(event).forEach(callback => callback(data));
134 }
135 }
136}
137
138// Использование
139const chatClient = new WebSocketClient('ws://localhost:8000/ws/chat/room1/');
140
141chatClient.on('connected', () => {
142 console.log('Подключились к чату');
143 showConnectionStatus('Подключено');
144});
145
146chatClient.on('chat_message', (data) => {
147 displayMessage(data.message, data.user, data.timestamp);
148});
149
150chatClient.on('user_typing', (data) => {
151 showTypingIndicator(data.user);
152});
153
154chatClient.on('notification', (data) => {
155 showNotification(data.title, data.message, data.level);
156});
157
158chatClient.on('disconnected', () => {
159 console.log('Отключились от чата');
160 showConnectionStatus('Отключено');
161});
162
163chatClient.on('error', (data) => {
164 showError(data.message);
165});
166
167chatClient.connect();
168
169// Отправка сообщения
170function sendMessage(message) {
171 chatClient.send({
172 type: 'message',
173 message: message
174 });
175}
176
177// Уведомление о печати
178function sendTypingNotification() {
179 chatClient.send({
180 type: 'typing'
181 });
182}
Тестирование WebSocket
Создавай тесты для проверки WebSocket функциональности:
1# tests/test_websocket.py
2from channels.testing import WebsocketCommunicator
3from channels.routing import URLRouter
4from django.test import TestCase
5from django.urls import re_path
6from .consumers import ChatConsumer, NotificationConsumer
7
8class WebSocketTestCase(TestCase):
9 async def test_chat_consumer_connect(self):
10 # Создаем тестовое приложение
11 application = URLRouter([
12 re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
13 ])
14
15 # Создаем коммуникатор
16 communicator = WebsocketCommunicator(
17 application,
18 "ws/chat/testroom/"
19 )
20
21 # Подключаемся
22 connected, _ = await communicator.connect()
23 self.assertTrue(connected)
24
25 # Отправляем сообщение
26 await communicator.send_json_to({
27 'type': 'message',
28 'message': 'Hello, World!'
29 })
30
31 # Получаем ответ
32 response = await communicator.receive_json_from()
33 self.assertEqual(response['type'], 'chat')
34 self.assertEqual(response['message'], 'Hello, World!')
35
36 # Отключаемся
37 await communicator.disconnect()
38
39 async def test_chat_consumer_group_messages(self):
40 application = URLRouter([
41 re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
42 ])
43
44 # Создаем два коммуникатора для одной комнаты
45 communicator1 = WebsocketCommunicator(
46 application,
47 "ws/chat/testroom/"
48 )
49 communicator2 = WebsocketCommunicator(
50 application,
51 "ws/chat/testroom/"
52 )
53
54 # Подключаем оба
55 connected1, _ = await communicator1.connect()
56 connected2, _ = await communicator2.connect()
57 self.assertTrue(connected1)
58 self.assertTrue(connected2)
59
60 # Отправляем сообщение от первого пользователя
61 await communicator1.send_json_to({
62 'type': 'message',
63 'message': 'Hello from user 1!'
64 })
65
66 # Проверяем, что оба получили сообщение
67 response1 = await communicator1.receive_json_from()
68 response2 = await communicator2.receive_json_from()
69
70 self.assertEqual(response1['message'], 'Hello from user 1!')
71 self.assertEqual(response2['message'], 'Hello from user 1!')
72
73 # Отключаем оба
74 await communicator1.disconnect()
75 await communicator2.disconnect()
76
77 async def test_notification_consumer(self):
78 application = URLRouter([
79 re_path(r'ws/notifications/(?P<user_id>\d+)/$', NotificationConsumer.as_asgi()),
80 ])
81
82 communicator = WebsocketCommunicator(
83 application,
84 "ws/notifications/1/"
85 )
86
87 connected, _ = await communicator.connect()
88 self.assertTrue(connected)
89
90 # Тестируем ping/pong
91 await communicator.send_json_to({'action': 'ping'})
92 response = await communicator.receive_json_from()
93 self.assertEqual(response['type'], 'pong')
94
95 await communicator.disconnect()
Лучшие практики
- Всегда используй обработку ошибок в consumers
- Логируй важные события WebSocket соединений
- Используй группы для эффективной отправки сообщений
- Реализуй механизм переподключения на клиенте
- Валидируйте данные на сервере
- Используй асинхронные consumers для лучшей производительности
- Настраивай аутентификацию для защищенных WebSocket
- Тестируй WebSocket функциональность
- Мониторь количество активных соединений
- Используй Redis для production deployments
- Реализуй heartbeat для проверки соединения
- Ограничивай количество соединений с одного IP
FAQ
Q: Когда использовать WebSocket?
A: Для real-time обновлений: чаты, уведомления, live данные, интерактивные приложения.
Q: Нужен ли Redis для Channels?
A: Да, для production рекомендуется использовать Redis как channel layer.
Q: Как обрабатывать большое количество WebSocket соединений?
A: Используй Redis cluster, настройте connection pooling и мониторьте производительность.
Q: Можно ли использовать Channels без WebSocket?
A: Да, Channels поддерживает HTTP long polling, Server-Sent Events и другие протоколы.
Q: Как обеспечить безопасность WebSocket соединений?
A: Используй аутентификацию, валидацию данных и HTTPS/WSS в production.
Q: Можно ли использовать Channels с Django REST Framework?
A: Да, они отлично работают вместе. DRF для HTTP API, Channels для real-time.
Q: Как тестировать WebSocket в Django?
A: Используй channels.testing.WebsocketCommunicator для асинхронных тестов.