theY4Kman/dcrf-client

dcrf only responds to 1 request, ignores the others (or worse breaks them)

zackkh opened this issue · 0 comments

Greeting !

after successfully integrating dcrf-client & djangochannelrestframework, i have found myself with another problem. (Disregard my last issue because things just decided to stop working)

i have re-wrote my app the same way the test integration does to the T;

Since this morning, i have been testing with 3 browsers (Firefox, Opera, Chromium), to watch real-time data transfer, and i have noticed that djangochannelrestframework does not respond very well to multiple connections to the same stream

Here's a log from logging:

HTTP b'DELETE' request for ['127.0.0.1', 46992]
Using selector: EpollSelector
Creating tcp connection to ('127.0.0.1', 6379)

// group is added here (Opera receives the updates, the others are ignored)
Wocket Message:  {'type': 'on.department.activity', 'body': {'id': 23, 'name': 'rWtni6KCMgAm'}, 'action': 'delete', 'group': 'DCRF-48fb53c7613a6679f841b954b7054ded0bf71d637bac6962cb8980d74bb2d2f0'}
Sent WebSocket packet to client for ['127.0.0.1', 34502]

// group missing here
Wocket Message:  {'type': 'on.department.activity', 'body': {'id': 23, 'name': 'rWtni6KCMgAm'}, 'action': 'delete'}
// also here
Wocket Message:  {'type': 'on.department.activity', 'body': {'id': 23, 'name': 'rWtni6KCMgAm'}, 'action': 'delete'}

Waiter future is already done <Future cancelled> ()
Closed 1 connection(s)
Closed 1 connection(s)
Exception inside application: 'group'
Traceback (most recent call last):
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/staticfiles.py", line 44, in __call__
    return await self.application(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/routing.py", line 71, in __call__
    return await application(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/sessions.py", line 47, in __call__
    return await self.inner(dict(scope, cookies=cookies), receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/sessions.py", line 263, in __call__
    return await self.inner(wrapper.scope, receive, wrapper.send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/auth.py", line 185, in __call__
    return await super().__call__(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/middleware.py", line 26, in __call__
    return await self.inner(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/routing.py", line 150, in __call__
    return await application(
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 94, in app
    return await consumer(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channelsmultiplexer/demultiplexer.py", line 61, in __call__
    await future
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 94, in app
    return await consumer(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 58, in __call__
    await await_many_dispatch(
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/utils.py", line 51, in await_many_dispatch
    await dispatch(result)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/home/john/Desktop/folder/dcrf/observers.py", line 21, in __call__
    group = message.pop('group')
KeyError: 'group'
Exception inside application: 'group'
Traceback (most recent call last):
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/staticfiles.py", line 44, in __call__
    return await self.application(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/routing.py", line 71, in __call__
    return await application(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/sessions.py", line 47, in __call__
    return await self.inner(dict(scope, cookies=cookies), receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/sessions.py", line 263, in __call__
    return await self.inner(wrapper.scope, receive, wrapper.send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/auth.py", line 185, in __call__
    return await super().__call__(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/middleware.py", line 26, in __call__
    return await self.inner(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/routing.py", line 150, in __call__
    return await application(
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 94, in app
    return await consumer(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channelsmultiplexer/demultiplexer.py", line 61, in __call__
    await future
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 94, in app
    return await consumer(scope, receive, send)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 58, in __call__
    await await_many_dispatch(
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/utils.py", line 51, in await_many_dispatch
    await dispatch(result)
  File "/home/john/Desktop/folder/.env/lib/python3.8/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/home/john/Desktop/folder/dcrf/observers.py", line 21, in __call__
    group = message.pop('group')
KeyError: 'group'
WebSocket closed for ['127.0.0.1', 34960]
WebSocket DISCONNECT /cache/ [127.0.0.1:34960]
WebSocket DISCONNECT /cache/ [127.0.0.1:34960]
WebSocket closed for ['127.0.0.1', 34964]
WebSocket DISCONNECT /cache/ [127.0.0.1:34964]
WebSocket DISCONNECT /cache/ [127.0.0.1:34964]
HTTP 204 response started for ['127.0.0.1', 46992]
HTTP close for ['127.0.0.1', 46992]
HTTP response complete for ['127.0.0.1', 46992]
HTTP DELETE /api/department/23/ 204 [0.83, 127.0.0.1:46992]
HTTP DELETE /api/department/23/ 204 [0.83, 127.0.0.1:46992]
WebSocket HANDSHAKING /cache/ [127.0.0.1:47038]
WebSocket HANDSHAKING /cache/ [127.0.0.1:47038]
Upgraded connection ['127.0.0.1', 47038] to WebSocket
WebSocket HANDSHAKING /cache/ [127.0.0.1:47042]
WebSocket HANDSHAKING /cache/ [127.0.0.1:47042]
Upgraded connection ['127.0.0.1', 47042] to WebSocket
WebSocket ['127.0.0.1', 47038] open and established
WebSocket CONNECT /cache/ [127.0.0.1:47038]
WebSocket CONNECT /cache/ [127.0.0.1:47038]
WebSocket ['127.0.0.1', 47038] accepted by application
WebSocket ['127.0.0.1', 47042] open and established
WebSocket CONNECT /cache/ [127.0.0.1:47042]
WebSocket CONNECT /cache/ [127.0.0.1:47042]
WebSocket ['127.0.0.1', 47042] accepted by application
WebSocket incoming frame on ['127.0.0.1', 47042]
WebSocket incoming frame on ['127.0.0.1', 47038]
Creating tcp connection to ('127.0.0.1', 6379)
Sent WebSocket packet to client for ['127.0.0.1', 47042]
Sent WebSocket packet to client for ['127.0.0.1', 47038]

i have followed (again), the integration to the T, and now i'm getting this behaviour.

is it a problem related to djangochannelrestframework or dcrf-client ?

Thank You!

-- packages pip
channels 3.0.4
channels-redis 3.3.1
channelsmultiplexer 0.0.3
djangochannelsrestframework 0.3.0

-- packages npm
├─┬ dcrf-client@1.1.0

// consumers.py

import logging
from functools import partial
from typing import Iterable, Union

from dcrf.observers import model_observer
from djangochannelsrestframework.decorators import action
from djangochannelsrestframework.generics import GenericAsyncAPIConsumer
from djangochannelsrestframework.mixins import (CreateModelMixin,
                                                DeleteModelMixin,
                                                ListModelMixin,
                                                PatchModelMixin,
                                                UpdateModelMixin)
from djangochannelsrestframework.observer.generics import \
    ObserverModelInstanceMixin
from office.groups.models import Department
from rest_framework import status
from rest_framework.exceptions import NotFound

from .serializers import DepartmentSerializer

logger = logging.getLogger(__name__)


class DepartmentConsumer(
    ListModelMixin,
    CreateModelMixin,
    UpdateModelMixin,
    PatchModelMixin,
    DeleteModelMixin,
    ObserverModelInstanceMixin,
    GenericAsyncAPIConsumer,
):
    queryset = Department.objects.all()
    serializer_class = DepartmentSerializer
    lookup_field = 'id'

    def _unsubscribe(self, request_id: str):
        request_id_found = False
        to_remove = []
        for group, request_ids in self.subscribed_requests.items():
            if request_id in request_ids:
                request_id_found = True
                request_ids.remove(request_id)
            if not request_ids:
                to_remove.append(group)

        if not request_id_found:
            raise KeyError(request_id)

        for group in to_remove:
            del self.subscribed_requests[group]

    @action()
    async def unsubscribe_instance(self, request_id=None, **kwargs):
        try:
            return await super().unsubscribe_instance(request_id=request_id, **kwargs)
        except KeyError:
            raise NotFound(detail='Subscription not found')

    @model_observer(Department)
    async def on_department_activity(
        self, message, observer=None, action: str = None, request_id: str = None, **kwargs
    ):
        try:
            reply = partial(self.reply, action=action, request_id=request_id)

            if action == 'delete':
                await reply(data=message, status=204)
                # send the delete
                return

            # the @action decorator will wrap non-async action into async ones.
            response = await self.retrieve(
                request_id=request_id, action=action, **message
            )

            if isinstance(response, tuple):
                data, status = response
            else:
                data, status = response, 200
            await reply(data=data, status=status)
        except Exception as exc:
            await self.handle_exception(exc, action=action, request_id=request_id)

    @on_department_activity.groups_for_signal
    def on_department_activity(self, instance: Department, **kwargs):
        yield f'-pk__{instance.pk}'
        yield f'-all'

    @on_department_activity.groups_for_consumer
    def on_department_activity(self, departments: Iterable[Union[Department, int]] = None, **kwargs):
        if departments is None:
            yield f'-all'
        else:
            for department in departments:
                department_id = department.pk if isinstance(
                    department, Department) else department
                yield f'-pk__{department_id}'

    @on_department_activity.serializer
    def on_department_activity(self, instance: Department, action: str, **kwargs):
        return DepartmentSerializer(instance).data

    @action()
    async def subscribe_many(self, request_id: str = None, departments: Iterable[int] = None, **kwargs):
        await self.on_department_activity.subscribe(request_id=request_id, departments=departments)
        return None, status.HTTP_201_CREATED

    @action()
    async def unsubscribe_many(self, request_id: str = None, departments: Iterable[int] = None, **kwargs):
        await self.on_department_activity.unsubscribe(request_id=request_id, departments=departments)
        return None, status.HTTP_204_NO_CONTENT

// routing.py

import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from dcrf.demultiplexer import AsyncJsonWebsocketDemultiplexer
from django.core.asgi import get_asgi_application
from django.urls import path
from office.groups.asgi.consumers import DepartmentConsumer

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')


application = ProtocolTypeRouter({
    "websocket": AuthMiddlewareStack(
        URLRouter([
            path("cache/", AsyncJsonWebsocketDemultiplexer(
                departments=DepartmentConsumer()
            )),
        ])
    ),
    "http": get_asgi_application(),
})

Browser breaks websocket connection with 1011 status code