WebSocket в Django с Django Channels

Django Channels добавляет поддержку WebSocket для real-time функциональности, позволяя создавать интерактивные приложения с двусторонней связью между клиентом и сервером.

Установка и настройка

Сначала установи необходимые пакеты:

1pip install channels channels-redis daphne

Или через poetry:

1poetry add channels channels-redis daphne

Настройка в settings.py

Добавь Channels в настройки Django:

 1# settings.py
 2INSTALLED_APPS = [
 3    # ... другие приложения
 4    'channels',
 5    'channels_redis',
 6]
 7
 8# Настройка ASGI
 9ASGI_APPLICATION = 'myproject.asgi.application'
10
11# Channel Layer для Redis
12CHANNEL_LAYERS = {
13    'default': {
14        'BACKEND': 'channels_redis.core.RedisChannelLayer',
15        'CONFIG': {
16            "hosts": [('127.0.0.1', 6379)],
17            "capacity": 1500,  # Максимальное количество сообщений в очереди
18            "expiry": 10,      # Время жизни сообщений в секундах
19        },
20    },
21}
22
23# Для production используй переменные окружения
24import os
25CHANNEL_LAYERS = {
26    'default': {
27        'BACKEND': 'channels_redis.core.RedisChannelLayer',
28        'CONFIG': {
29            "hosts": [os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379')],
30            "capacity": 1500,
31            "expiry": 10,
32        },
33    },
34}

Настройка ASGI

Создай или обнови asgi.py для поддержки WebSocket:

 1# myproject/asgi.py
 2import os
 3from django.core.asgi import get_asgi_application
 4from channels.routing import ProtocolTypeRouter, URLRouter
 5from channels.auth import AuthMiddlewareStack
 6from channels.security.websocket import AllowedHostsOriginValidator
 7from .routing import websocket_urlpatterns
 8
 9os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
10
11# ASGI приложение с поддержкой HTTP и WebSocket
12application = ProtocolTypeRouter({
13    "http": get_asgi_application(),
14    "websocket": AllowedHostsOriginValidator(
15        AuthMiddlewareStack(
16            URLRouter(websocket_urlpatterns)
17        )
18    ),
19})
20
21# Альтернативная настройка с кастомным middleware
22from .middleware import TokenAuthMiddleware
23
24application = ProtocolTypeRouter({
25    "http": get_asgi_application(),
26    "websocket": AllowedHostsOriginValidator(
27        TokenAuthMiddleware(
28            URLRouter(websocket_urlpatterns)
29        )
30    ),
31})

Создание routing

Настрой маршрутизацию для WebSocket соединений:

 1# myproject/routing.py
 2from django.urls import re_path
 3from .consumers import (
 4    ChatConsumer,
 5    NotificationConsumer,
 6    GameConsumer,
 7    DashboardConsumer
 8)
 9
10websocket_urlpatterns = [
11    # Чат комнаты
12    re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
13
14    # Уведомления пользователя
15    re_path(r'ws/notifications/(?P<user_id>\d+)/$', NotificationConsumer.as_asgi()),
16
17    # Игровые сессии
18    re_path(r'ws/game/(?P<game_id>\d+)/$', GameConsumer.as_asgi()),
19
20    # Дашборд в реальном времени
21    re_path(r'ws/dashboard/(?P<dashboard_type>\w+)/$', DashboardConsumer.as_asgi()),
22]
23
24# Альтернативная маршрутизация с namespace
25from django.urls import path, include
26
27websocket_urlpatterns = [
28    path('ws/', include([
29        path('chat/', include('chat.routing')),
30        path('notifications/', include('notifications.routing')),
31        path('games/', include('games.routing')),
32    ])),
33]

Создание consumers

Создавай consumers для обработки WebSocket соединений:

  1# chat/consumers.py
  2import json
  3from channels.generic.websocket import WebsocketConsumer
  4from channels.db import database_sync_to_async
  5from asgiref.sync import async_to_sync
  6from .models import ChatRoom, Message
  7
  8class ChatConsumer(WebsocketConsumer):
  9    def connect(self):
 10        # Получаем параметры из URL
 11        self.room_name = self.scope['url_route']['kwargs']['room_name']
 12        self.room_group_name = f'chat_{self.room_name}'
 13        self.user = self.scope['user']
 14
 15        # Проверяем доступ к комнате
 16        if self.can_join_room():
 17            # Присоединяемся к группе комнаты
 18            async_to_sync(self.channel_layer.group_add)(
 19                self.room_group_name,
 20                self.channel_name
 21            )
 22
 23            # Принимаем соединение
 24            self.accept()
 25
 26            # Отправляем сообщение о подключении
 27            async_to_sync(self.channel_layer.group_send)(
 28                self.room_group_name,
 29                {
 30                    'type': 'chat_message',
 31                    'message': f'{self.user.username} присоединился к чату',
 32                    'user': 'System',
 33                    'timestamp': self.get_timestamp()
 34                }
 35            )
 36
 37            # Отправляем историю сообщений
 38            self.send_chat_history()
 39        else:
 40            self.close(code=4001)  # Доступ запрещен
 41
 42    def disconnect(self, close_code):
 43        if hasattr(self, 'room_group_name'):
 44            # Покидаем группу комнаты
 45            async_to_sync(self.channel_layer.group_discard)(
 46                self.room_group_name,
 47                self.channel_name
 48            )
 49
 50            # Отправляем сообщение об отключении
 51            async_to_sync(self.channel_layer.group_send)(
 52                self.room_group_name,
 53                {
 54                    'type': 'chat_message',
 55                    'message': f'{self.user.username} покинул чат',
 56                    'user': 'System',
 57                    'timestamp': self.get_timestamp()
 58                }
 59            )
 60
 61    def receive(self, text_data):
 62        try:
 63            text_data_json = json.loads(text_data)
 64            message_type = text_data_json.get('type', 'message')
 65
 66            if message_type == 'message':
 67                message = text_data_json.get('message', '')
 68
 69                # Валидация сообщения
 70                if self.validate_message(message):
 71                    # Сохраняем сообщение в базе
 72                    saved_message = self.save_message(message)
 73
 74                    # Отправляем сообщение в группу
 75                    async_to_sync(self.channel_layer.group_send)(
 76                        self.room_group_name,
 77                        {
 78                            'type': 'chat_message',
 79                            'message': message,
 80                            'user': self.user.username,
 81                            'timestamp': self.get_timestamp(),
 82                            'message_id': saved_message.id
 83                        }
 84                    )
 85                else:
 86                    self.send(text_data=json.dumps({
 87                        'type': 'error',
 88                        'message': 'Сообщение не прошло валидацию'
 89                    }))
 90
 91            elif message_type == 'typing':
 92                # Отправляем уведомление о печати
 93                async_to_sync(self.channel_layer.group_send)(
 94                    self.room_group_name,
 95                    {
 96                        'type': 'user_typing',
 97                        'user': self.user.username
 98                    }
 99                )
100
101            elif message_type == 'ping':
102                # Отвечаем на ping
103                self.send(text_data=json.dumps({'type': 'pong'}))
104
105        except json.JSONDecodeError:
106            self.send(text_data=json.dumps({
107                'type': 'error',
108                'message': 'Неверный формат JSON'
109            }))
110        except Exception as e:
111            self.send(text_data=json.dumps({
112                'type': 'error',
113                'message': 'Внутренняя ошибка сервера'
114            }))
115
116    def chat_message(self, event):
117        # Отправляем сообщение в WebSocket
118        self.send(text_data=json.dumps({
119            'type': 'chat',
120            'message': event['message'],
121            'user': event['user'],
122            'timestamp': event['timestamp'],
123            'message_id': event.get('message_id')
124        }))
125
126    def user_typing(self, event):
127        # Отправляем уведомление о печати
128        self.send(text_data=json.dumps({
129            'type': 'typing',
130            'user': event['user']
131        }))
132
133    @database_sync_to_async
134    def can_join_room(self):
135        # Проверка доступа к комнате
136        try:
137            room = ChatRoom.objects.get(name=self.room_name)
138            return room.is_public or self.user in room.members.all()
139        except ChatRoom.DoesNotExist:
140            return False
141
142    @database_sync_to_async
143    def save_message(self, message):
144        # Сохранение сообщения в базе
145        return Message.objects.create(
146            room_name=self.room_name,
147            user=self.user,
148            content=message
149        )
150
151    def send_chat_history(self):
152        # Отправка истории сообщений
153        messages = Message.objects.filter(
154            room_name=self.room_name
155        ).order_by('-timestamp')[:50]
156
157        for msg in reversed(messages):
158            self.send(text_data=json.dumps({
159                'type': 'history',
160                'message': msg.content,
161                'user': msg.user.username,
162                'timestamp': msg.timestamp.isoformat()
163            }))
164
165    def validate_message(self, message):
166        # Валидация сообщения
167        if not message or len(message.strip()) == 0:
168            return False
169        if len(message) > 1000:
170            return False
171        return True
172
173    def get_timestamp(self):
174        from django.utils import timezone
175        return timezone.now().isoformat()

Асинхронные consumers

Используй AsyncWebsocketConsumer для лучшей производительности:

  1# notifications/consumers.py
  2import json
  3from channels.generic.websocket import AsyncWebsocketConsumer
  4from channels.db import database_sync_to_async
  5from .models import Notification, User
  6
  7class NotificationConsumer(AsyncWebsocketConsumer):
  8    async def connect(self):
  9        self.user_id = self.scope['url_route']['kwargs']['user_id']
 10        self.user_group_name = f'user_{self.user_id}'
 11
 12        # Проверяем аутентификацию
 13        if await self.is_authenticated():
 14            await self.channel_layer.group_add(
 15                self.user_group_name,
 16                self.channel_name
 17            )
 18            await self.accept()
 19
 20            # Отправляем непрочитанные уведомления
 21            await self.send_unread_notifications()
 22
 23            # Отправляем статистику
 24            await self.send_notification_stats()
 25        else:
 26            await self.close(code=4001)
 27
 28    async def disconnect(self, close_code):
 29        await self.channel_layer.group_discard(
 30            self.user_group_name,
 31            self.channel_name
 32        )
 33
 34    async def receive(self, text_data):
 35        try:
 36            text_data_json = json.loads(text_data)
 37            action = text_data_json.get('action')
 38
 39            if action == 'mark_read':
 40                notification_id = text_data_json.get('notification_id')
 41                await self.mark_notification_read(notification_id)
 42
 43            elif action == 'mark_all_read':
 44                await self.mark_all_notifications_read()
 45
 46            elif action == 'delete':
 47                notification_id = text_data_json.get('notification_id')
 48                await self.delete_notification(notification_id)
 49
 50            elif action == 'ping':
 51                await self.send(text_data=json.dumps({'type': 'pong'}))
 52
 53        except Exception as e:
 54            await self.send(text_data=json.dumps({
 55                'type': 'error',
 56                'message': str(e)
 57            }))
 58
 59    async def notification_message(self, event):
 60        # Отправляем уведомление пользователю
 61        await self.send(text_data=json.dumps({
 62            'type': 'notification',
 63            'id': event['id'],
 64            'title': event['title'],
 65            'message': event['message'],
 66            'level': event.get('level', 'info'),
 67            'timestamp': event['timestamp']
 68        }))
 69
 70    async def notification_stats(self, event):
 71        # Отправляем статистику уведомлений
 72        await self.send(text_data=json.dumps({
 73            'type': 'stats',
 74            'unread_count': event['unread_count'],
 75            'total_count': event['total_count']
 76        }))
 77
 78    @database_sync_to_async
 79    def is_authenticated(self):
 80        try:
 81            user = User.objects.get(id=self.user_id)
 82            return user.is_active
 83        except User.DoesNotExist:
 84            return False
 85
 86    @database_sync_to_async
 87    def send_unread_notifications(self):
 88        notifications = Notification.objects.filter(
 89            user_id=self.user_id,
 90            is_read=False
 91        ).order_by('-created_at')[:10]
 92
 93        for notification in notifications:
 94            self.send(text_data=json.dumps({
 95                'type': 'notification',
 96                'id': notification.id,
 97                'title': notification.title,
 98                'message': notification.message,
 99                'timestamp': notification.created_at.isoformat()
100            }))
101
102    @database_sync_to_async
103    def send_notification_stats(self):
104        unread_count = Notification.objects.filter(
105            user_id=self.user_id,
106            is_read=False
107        ).count()
108
109        total_count = Notification.objects.filter(
110            user_id=self.user_id
111        ).count()
112
113        self.send(text_data=json.dumps({
114            'type': 'stats',
115            'unread_count': unread_count,
116            'total_count': total_count
117        }))
118
119    @database_sync_to_async
120    def mark_notification_read(self, notification_id):
121        Notification.objects.filter(
122            id=notification_id,
123            user_id=self.user_id
124        ).update(is_read=True)
125
126    @database_sync_to_async
127    def mark_all_notifications_read(self):
128        Notification.objects.filter(
129            user_id=self.user_id,
130            is_read=False
131        ).update(is_read=True)
132
133    @database_sync_to_async
134    def delete_notification(self, notification_id):
135        Notification.objects.filter(
136            id=notification_id,
137            user_id=self.user_id
138        ).delete()

Middleware для аутентификации

Создавай кастомные middleware для WebSocket:

 1# middleware.py
 2from channels.middleware import BaseMiddleware
 3from channels.db import database_sync_to_async
 4from django.contrib.auth.models import AnonymousUser
 5from django.contrib.auth import get_user
 6import jwt
 7from django.conf import settings
 8
 9class TokenAuthMiddleware(BaseMiddleware):
10    async def __call__(self, scope, receive, send):
11        # Получаем токен из query параметров или заголовков
12        token = self.get_token(scope)
13
14        if token:
15            scope['user'] = await self.get_user_from_token(token)
16        else:
17            scope['user'] = AnonymousUser()
18
19        return await super().__call__(scope, receive, send)
20
21    def get_token(self, scope):
22        # Из query параметров
23        query_string = scope.get('query_string', b'').decode()
24        query_params = dict(x.split('=') for x in query_string.split('&') if x)
25        token = query_params.get('token', '')
26
27        if not token:
28            # Из заголовков
29            headers = dict(scope['headers'])
30            token = headers.get(b'authorization', b'').decode()
31            if token.startswith('Bearer '):
32                token = token[7:]
33
34        return token
35
36    @database_sync_to_async
37    def get_user_from_token(self, token):
38        try:
39            # Декодируем JWT токен
40            payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
41            user_id = payload['user_id']
42
43            from django.contrib.auth import get_user_model
44            User = get_user_model()
45            user = User.objects.get(id=user_id)
46
47            if user.is_active:
48                return user
49            else:
50                return AnonymousUser()
51
52        except (jwt.InvalidTokenError, KeyError, User.DoesNotExist):
53            return AnonymousUser()
54
55class RateLimitMiddleware(BaseMiddleware):
56    def __init__(self, app, rate_limit=100):
57        super().__init__(app)
58        self.rate_limit = rate_limit
59        self.connections = {}
60
61    async def __call__(self, scope, receive, send):
62        # Проверяем rate limiting
63        client_ip = self.get_client_ip(scope)
64
65        if self.is_rate_limited(client_ip):
66            # Отклоняем соединение
67            return
68
69        # Добавляем соединение
70        self.add_connection(client_ip)
71
72        try:
73            return await super().__call__(scope, receive, send)
74        finally:
75            # Убираем соединение
76            self.remove_connection(client_ip)
77
78    def get_client_ip(self, scope):
79        headers = dict(scope['headers'])
80        return headers.get(b'x-forwarded-for', b'127.0.0.1').decode()
81
82    def is_rate_limited(self, client_ip):
83        return self.connections.get(client_ip, 0) >= self.rate_limit
84
85    def add_connection(self, client_ip):
86        self.connections[client_ip] = self.connections.get(client_ip, 0) + 1
87
88    def remove_connection(self, client_ip):
89        if client_ip in self.connections:
90            self.connections[client_ip] = max(0, self.connections[client_ip] - 1)

Работа с группами и broadcast

Отправляй сообщения множеству пользователей:

 1# services.py
 2from channels.layers import get_channel_layer
 3from asgiref.sync import async_to_sync
 4import json
 5
 6class WebSocketService:
 7    @staticmethod
 8    def send_notification_to_user(user_id, notification_data):
 9        channel_layer = get_channel_layer()
10        async_to_sync(channel_layer.group_send)(
11            f'user_{user_id}',
12            {
13                'type': 'notification_message',
14                **notification_data
15            }
16        )
17
18    @staticmethod
19    def send_notification_to_group(group_name, notification_data):
20        channel_layer = get_channel_layer()
21        async_to_sync(channel_layer.group_send)(
22            group_name,
23            {
24                'type': 'notification_message',
25                **notification_data
26            }
27        )
28
29    @staticmethod
30    def broadcast_to_all(message_data):
31        channel_layer = get_channel_layer()
32        async_to_sync(channel_layer.group_send)(
33            'all_users',
34            {
35                'type': 'broadcast_message',
36                **message_data
37            }
38        )
39
40    @staticmethod
41    def send_to_channel(channel_name, message_data):
42        channel_layer = get_channel_layer()
43        async_to_sync(channel_layer.send)(
44            channel_name,
45            {
46                'type': 'direct_message',
47                **message_data
48            }
49        )
50
51# Использование в Django views или tasks
52def notify_user_about_new_message(request, user_id):
53    WebSocketService.send_notification_to_user(
54        user_id,
55        {
56            'title': 'Новое сообщение',
57            'message': 'У вас новое сообщение в чате',
58            'level': 'info'
59        }
60    )
61
62# В Celery task
63@shared_task
64def send_bulk_notifications(user_ids, message):
65    for user_id in user_ids:
66        WebSocketService.send_notification_to_user(
67            user_id,
68            {
69                'title': 'Массовое уведомление',
70                'message': message,
71                'level': 'warning'
72            }
73        )

JavaScript клиент

Создавай клиентский код для подключения к WebSocket:

  1// static/js/websocket-client.js
  2class WebSocketClient {
  3    constructor(url, options = {}) {
  4        this.url = url;
  5        this.options = {
  6            reconnectAttempts: 5,
  7            reconnectDelay: 1000,
  8            heartbeatInterval: 30000,
  9            ...options
 10        };
 11
 12        this.ws = null;
 13        this.reconnectAttempts = 0;
 14        this.heartbeatTimer = null;
 15        this.listeners = new Map();
 16        this.isConnecting = false;
 17    }
 18
 19    connect() {
 20        if (this.isConnecting || this.ws?.readyState === WebSocket.OPEN) {
 21            return;
 22        }
 23
 24        this.isConnecting = true;
 25
 26        try {
 27            this.ws = new WebSocket(this.url);
 28
 29            this.ws.onopen = () => {
 30                console.log('WebSocket соединение установлено');
 31                this.isConnecting = false;
 32                this.reconnectAttempts = 0;
 33                this.startHeartbeat();
 34                this.emit('connected');
 35            };
 36
 37            this.ws.onmessage = (event) => {
 38                try {
 39                    const data = JSON.parse(event.data);
 40                    this.handleMessage(data);
 41                } catch (e) {
 42                    console.error('Ошибка парсинга сообщения:', e);
 43                }
 44            };
 45
 46            this.ws.onclose = (event) => {
 47                console.log('WebSocket соединение закрыто:', event.code, event.reason);
 48                this.isConnecting = false;
 49                this.stopHeartbeat();
 50                this.emit('disconnected', event);
 51
 52                // Попытка переподключения
 53                if (this.reconnectAttempts < this.options.reconnectAttempts) {
 54                    setTimeout(() => {
 55                        this.reconnectAttempts++;
 56                        this.connect();
 57                    }, this.options.reconnectDelay * this.reconnectAttempts);
 58                }
 59            };
 60
 61            this.ws.onerror = (error) => {
 62                console.error('WebSocket ошибка:', error);
 63                this.isConnecting = false;
 64                this.emit('error', error);
 65            };
 66
 67        } catch (error) {
 68            console.error('Ошибка создания WebSocket:', error);
 69            this.isConnecting = false;
 70        }
 71    }
 72
 73    disconnect() {
 74        if (this.ws) {
 75            this.ws.close();
 76        }
 77    }
 78
 79    send(data) {
 80        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
 81            this.ws.send(JSON.stringify(data));
 82        } else {
 83            console.error('WebSocket не подключен');
 84        }
 85    }
 86
 87    handleMessage(data) {
 88        const messageType = data.type;
 89
 90        switch (messageType) {
 91            case 'chat':
 92                this.emit('chat_message', data);
 93                break;
 94            case 'notification':
 95                this.emit('notification', data);
 96                break;
 97            case 'typing':
 98                this.emit('user_typing', data);
 99                break;
100            case 'error':
101                this.emit('error', data);
102                break;
103            case 'pong':
104                // Ответ на heartbeat
105                break;
106            default:
107                this.emit('message', data);
108        }
109    }
110
111    startHeartbeat() {
112        this.heartbeatTimer = setInterval(() => {
113            this.send({ type: 'ping' });
114        }, this.options.heartbeatInterval);
115    }
116
117    stopHeartbeat() {
118        if (this.heartbeatTimer) {
119            clearInterval(this.heartbeatTimer);
120            this.heartbeatTimer = null;
121        }
122    }
123
124    on(event, callback) {
125        if (!this.listeners.has(event)) {
126            this.listeners.set(event, []);
127        }
128        this.listeners.get(event).push(callback);
129    }
130
131    emit(event, data) {
132        if (this.listeners.has(event)) {
133            this.listeners.get(event).forEach(callback => callback(data));
134        }
135    }
136}
137
138// Использование
139const chatClient = new WebSocketClient('ws://localhost:8000/ws/chat/room1/');
140
141chatClient.on('connected', () => {
142    console.log('Подключились к чату');
143    showConnectionStatus('Подключено');
144});
145
146chatClient.on('chat_message', (data) => {
147    displayMessage(data.message, data.user, data.timestamp);
148});
149
150chatClient.on('user_typing', (data) => {
151    showTypingIndicator(data.user);
152});
153
154chatClient.on('notification', (data) => {
155    showNotification(data.title, data.message, data.level);
156});
157
158chatClient.on('disconnected', () => {
159    console.log('Отключились от чата');
160    showConnectionStatus('Отключено');
161});
162
163chatClient.on('error', (data) => {
164    showError(data.message);
165});
166
167chatClient.connect();
168
169// Отправка сообщения
170function sendMessage(message) {
171    chatClient.send({
172        type: 'message',
173        message: message
174    });
175}
176
177// Уведомление о печати
178function sendTypingNotification() {
179    chatClient.send({
180        type: 'typing'
181    });
182}

Тестирование WebSocket

Создавай тесты для проверки WebSocket функциональности:

 1# tests/test_websocket.py
 2from channels.testing import WebsocketCommunicator
 3from channels.routing import URLRouter
 4from django.test import TestCase
 5from django.urls import re_path
 6from .consumers import ChatConsumer, NotificationConsumer
 7
 8class WebSocketTestCase(TestCase):
 9    async def test_chat_consumer_connect(self):
10        # Создаем тестовое приложение
11        application = URLRouter([
12            re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
13        ])
14
15        # Создаем коммуникатор
16        communicator = WebsocketCommunicator(
17            application,
18            "ws/chat/testroom/"
19        )
20
21        # Подключаемся
22        connected, _ = await communicator.connect()
23        self.assertTrue(connected)
24
25        # Отправляем сообщение
26        await communicator.send_json_to({
27            'type': 'message',
28            'message': 'Hello, World!'
29        })
30
31        # Получаем ответ
32        response = await communicator.receive_json_from()
33        self.assertEqual(response['type'], 'chat')
34        self.assertEqual(response['message'], 'Hello, World!')
35
36        # Отключаемся
37        await communicator.disconnect()
38
39    async def test_chat_consumer_group_messages(self):
40        application = URLRouter([
41            re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
42        ])
43
44        # Создаем два коммуникатора для одной комнаты
45        communicator1 = WebsocketCommunicator(
46            application,
47            "ws/chat/testroom/"
48        )
49        communicator2 = WebsocketCommunicator(
50            application,
51            "ws/chat/testroom/"
52        )
53
54        # Подключаем оба
55        connected1, _ = await communicator1.connect()
56        connected2, _ = await communicator2.connect()
57        self.assertTrue(connected1)
58        self.assertTrue(connected2)
59
60        # Отправляем сообщение от первого пользователя
61        await communicator1.send_json_to({
62            'type': 'message',
63            'message': 'Hello from user 1!'
64        })
65
66        # Проверяем, что оба получили сообщение
67        response1 = await communicator1.receive_json_from()
68        response2 = await communicator2.receive_json_from()
69
70        self.assertEqual(response1['message'], 'Hello from user 1!')
71        self.assertEqual(response2['message'], 'Hello from user 1!')
72
73        # Отключаем оба
74        await communicator1.disconnect()
75        await communicator2.disconnect()
76
77    async def test_notification_consumer(self):
78        application = URLRouter([
79            re_path(r'ws/notifications/(?P<user_id>\d+)/$', NotificationConsumer.as_asgi()),
80        ])
81
82        communicator = WebsocketCommunicator(
83            application,
84            "ws/notifications/1/"
85        )
86
87        connected, _ = await communicator.connect()
88        self.assertTrue(connected)
89
90        # Тестируем ping/pong
91        await communicator.send_json_to({'action': 'ping'})
92        response = await communicator.receive_json_from()
93        self.assertEqual(response['type'], 'pong')
94
95        await communicator.disconnect()

Лучшие практики

  • Всегда используй обработку ошибок в consumers
  • Логируй важные события WebSocket соединений
  • Используй группы для эффективной отправки сообщений
  • Реализуй механизм переподключения на клиенте
  • Валидируйте данные на сервере
  • Используй асинхронные consumers для лучшей производительности
  • Настраивай аутентификацию для защищенных WebSocket
  • Тестируй WebSocket функциональность
  • Мониторь количество активных соединений
  • Используй Redis для production deployments
  • Реализуй heartbeat для проверки соединения
  • Ограничивай количество соединений с одного IP

FAQ

Q: Когда использовать WebSocket?
A: Для real-time обновлений: чаты, уведомления, live данные, интерактивные приложения.

Q: Нужен ли Redis для Channels?
A: Да, для production рекомендуется использовать Redis как channel layer.

Q: Как обрабатывать большое количество WebSocket соединений?
A: Используй Redis cluster, настройте connection pooling и мониторьте производительность.

Q: Можно ли использовать Channels без WebSocket?
A: Да, Channels поддерживает HTTP long polling, Server-Sent Events и другие протоколы.

Q: Как обеспечить безопасность WebSocket соединений?
A: Используй аутентификацию, валидацию данных и HTTPS/WSS в production.

Q: Можно ли использовать Channels с Django REST Framework?
A: Да, они отлично работают вместе. DRF для HTTP API, Channels для real-time.

Q: Как тестировать WebSocket в Django?
A: Используй channels.testing.WebsocketCommunicator для асинхронных тестов.