dcrf only responds to 1 request, ignores the others (or worse breaks them)
zackkh opened this issue · 0 comments
Greeting !
after successfully integrating dcrf-client
& djangochannelrestframework
, i have found myself with another problem. (Disregard my last issue because things just decided to stop working)
i have re-wrote my app the same way the test integration
does to the T;
Since this morning, i have been testing with 3 browsers (Firefox, Opera, Chromium), to watch real-time data transfer, and i have noticed that djangochannelrestframework
does not respond very well to multiple connections to the same stream
Here's a log from logging
:
HTTP b'DELETE' request for ['127.0.0.1', 46992]
Using selector: EpollSelector
Creating tcp connection to ('127.0.0.1', 6379)
// group is added here (Opera receives the updates, the others are ignored)
Wocket Message: {'type': 'on.department.activity', 'body': {'id': 23, 'name': 'rWtni6KCMgAm'}, 'action': 'delete', 'group': 'DCRF-48fb53c7613a6679f841b954b7054ded0bf71d637bac6962cb8980d74bb2d2f0'}
Sent WebSocket packet to client for ['127.0.0.1', 34502]
// group missing here
Wocket Message: {'type': 'on.department.activity', 'body': {'id': 23, 'name': 'rWtni6KCMgAm'}, 'action': 'delete'}
// also here
Wocket Message: {'type': 'on.department.activity', 'body': {'id': 23, 'name': 'rWtni6KCMgAm'}, 'action': 'delete'}
Waiter future is already done <Future cancelled> ()
Closed 1 connection(s)
Closed 1 connection(s)
Exception inside application: 'group'
Traceback (most recent call last):
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/staticfiles.py", line 44, in __call__
return await self.application(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/routing.py", line 71, in __call__
return await application(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/sessions.py", line 47, in __call__
return await self.inner(dict(scope, cookies=cookies), receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/sessions.py", line 263, in __call__
return await self.inner(wrapper.scope, receive, wrapper.send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/auth.py", line 185, in __call__
return await super().__call__(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/middleware.py", line 26, in __call__
return await self.inner(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/routing.py", line 150, in __call__
return await application(
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 94, in app
return await consumer(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channelsmultiplexer/demultiplexer.py", line 61, in __call__
await future
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 94, in app
return await consumer(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 58, in __call__
await await_many_dispatch(
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/utils.py", line 51, in await_many_dispatch
await dispatch(result)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 73, in dispatch
await handler(message)
File "/home/john/Desktop/folder/dcrf/observers.py", line 21, in __call__
group = message.pop('group')
KeyError: 'group'
Exception inside application: 'group'
Traceback (most recent call last):
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/staticfiles.py", line 44, in __call__
return await self.application(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/routing.py", line 71, in __call__
return await application(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/sessions.py", line 47, in __call__
return await self.inner(dict(scope, cookies=cookies), receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/sessions.py", line 263, in __call__
return await self.inner(wrapper.scope, receive, wrapper.send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/auth.py", line 185, in __call__
return await super().__call__(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/middleware.py", line 26, in __call__
return await self.inner(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/routing.py", line 150, in __call__
return await application(
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 94, in app
return await consumer(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channelsmultiplexer/demultiplexer.py", line 61, in __call__
await future
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 94, in app
return await consumer(scope, receive, send)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 58, in __call__
await await_many_dispatch(
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/utils.py", line 51, in await_many_dispatch
await dispatch(result)
File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 73, in dispatch
await handler(message)
File "/home/john/Desktop/folder/dcrf/observers.py", line 21, in __call__
group = message.pop('group')
KeyError: 'group'
WebSocket closed for ['127.0.0.1', 34960]
WebSocket DISCONNECT /cache/ [127.0.0.1:34960]
WebSocket DISCONNECT /cache/ [127.0.0.1:34960]
WebSocket closed for ['127.0.0.1', 34964]
WebSocket DISCONNECT /cache/ [127.0.0.1:34964]
WebSocket DISCONNECT /cache/ [127.0.0.1:34964]
HTTP 204 response started for ['127.0.0.1', 46992]
HTTP close for ['127.0.0.1', 46992]
HTTP response complete for ['127.0.0.1', 46992]
HTTP DELETE /api/department/23/ 204 [0.83, 127.0.0.1:46992]
HTTP DELETE /api/department/23/ 204 [0.83, 127.0.0.1:46992]
WebSocket HANDSHAKING /cache/ [127.0.0.1:47038]
WebSocket HANDSHAKING /cache/ [127.0.0.1:47038]
Upgraded connection ['127.0.0.1', 47038] to WebSocket
WebSocket HANDSHAKING /cache/ [127.0.0.1:47042]
WebSocket HANDSHAKING /cache/ [127.0.0.1:47042]
Upgraded connection ['127.0.0.1', 47042] to WebSocket
WebSocket ['127.0.0.1', 47038] open and established
WebSocket CONNECT /cache/ [127.0.0.1:47038]
WebSocket CONNECT /cache/ [127.0.0.1:47038]
WebSocket ['127.0.0.1', 47038] accepted by application
WebSocket ['127.0.0.1', 47042] open and established
WebSocket CONNECT /cache/ [127.0.0.1:47042]
WebSocket CONNECT /cache/ [127.0.0.1:47042]
WebSocket ['127.0.0.1', 47042] accepted by application
WebSocket incoming frame on ['127.0.0.1', 47042]
WebSocket incoming frame on ['127.0.0.1', 47038]
Creating tcp connection to ('127.0.0.1', 6379)
Sent WebSocket packet to client for ['127.0.0.1', 47042]
Sent WebSocket packet to client for ['127.0.0.1', 47038]
i have followed (again), the integration to the T, and now i'm getting this behaviour.
is it a problem related to djangochannelrestframework
or dcrf-client
?
Thank You!
-- packages pip
channels 3.0.4
channels-redis 3.3.1
channelsmultiplexer 0.0.3
djangochannelsrestframework 0.3.0
-- packages npm
├─┬ dcrf-client@1.1.0
// consumers.py
import logging
from functools import partial
from typing import Iterable, Union
from dcrf.observers import model_observer
from djangochannelsrestframework.decorators import action
from djangochannelsrestframework.generics import GenericAsyncAPIConsumer
from djangochannelsrestframework.mixins import (CreateModelMixin,
DeleteModelMixin,
ListModelMixin,
PatchModelMixin,
UpdateModelMixin)
from djangochannelsrestframework.observer.generics import \
ObserverModelInstanceMixin
from office.groups.models import Department
from rest_framework import status
from rest_framework.exceptions import NotFound
from .serializers import DepartmentSerializer
logger = logging.getLogger(__name__)
class DepartmentConsumer(
ListModelMixin,
CreateModelMixin,
UpdateModelMixin,
PatchModelMixin,
DeleteModelMixin,
ObserverModelInstanceMixin,
GenericAsyncAPIConsumer,
):
queryset = Department.objects.all()
serializer_class = DepartmentSerializer
lookup_field = 'id'
def _unsubscribe(self, request_id: str):
request_id_found = False
to_remove = []
for group, request_ids in self.subscribed_requests.items():
if request_id in request_ids:
request_id_found = True
request_ids.remove(request_id)
if not request_ids:
to_remove.append(group)
if not request_id_found:
raise KeyError(request_id)
for group in to_remove:
del self.subscribed_requests[group]
@action()
async def unsubscribe_instance(self, request_id=None, **kwargs):
try:
return await super().unsubscribe_instance(request_id=request_id, **kwargs)
except KeyError:
raise NotFound(detail='Subscription not found')
@model_observer(Department)
async def on_department_activity(
self, message, observer=None, action: str = None, request_id: str = None, **kwargs
):
try:
reply = partial(self.reply, action=action, request_id=request_id)
if action == 'delete':
await reply(data=message, status=204)
# send the delete
return
# the @action decorator will wrap non-async action into async ones.
response = await self.retrieve(
request_id=request_id, action=action, **message
)
if isinstance(response, tuple):
data, status = response
else:
data, status = response, 200
await reply(data=data, status=status)
except Exception as exc:
await self.handle_exception(exc, action=action, request_id=request_id)
@on_department_activity.groups_for_signal
def on_department_activity(self, instance: Department, **kwargs):
yield f'-pk__{instance.pk}'
yield f'-all'
@on_department_activity.groups_for_consumer
def on_department_activity(self, departments: Iterable[Union[Department, int]] = None, **kwargs):
if departments is None:
yield f'-all'
else:
for department in departments:
department_id = department.pk if isinstance(
department, Department) else department
yield f'-pk__{department_id}'
@on_department_activity.serializer
def on_department_activity(self, instance: Department, action: str, **kwargs):
return DepartmentSerializer(instance).data
@action()
async def subscribe_many(self, request_id: str = None, departments: Iterable[int] = None, **kwargs):
await self.on_department_activity.subscribe(request_id=request_id, departments=departments)
return None, status.HTTP_201_CREATED
@action()
async def unsubscribe_many(self, request_id: str = None, departments: Iterable[int] = None, **kwargs):
await self.on_department_activity.unsubscribe(request_id=request_id, departments=departments)
return None, status.HTTP_204_NO_CONTENT
// routing.py
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from dcrf.demultiplexer import AsyncJsonWebsocketDemultiplexer
from django.core.asgi import get_asgi_application
from django.urls import path
from office.groups.asgi.consumers import DepartmentConsumer
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(
URLRouter([
path("cache/", AsyncJsonWebsocketDemultiplexer(
departments=DepartmentConsumer()
)),
])
),
"http": get_asgi_application(),
})
Browser breaks websocket connection with 1011 status code