theY4Kman/dcrf-client

๐Ÿ†˜ Unable to catch subscription payload

zackkh opened this issue ยท 6 comments

Greeting !

i am trying to log my stream subscription payload to the console but i can't seem to make it work,

class CacheSocket {
    static client: DCRFClient;

    static getClient(path?: string) {
            if (!path) {
                path = 'ws://localhost:8000/cache/';
            }
            if (!CacheSocket.client) {
                CacheSocket.client = dcrf.connect(path, {
                    websocket: { minReconnectionDelay: 2000 }
                });
            }
    
            return CacheSocket.client;
        }
    }
}

export default class CacheComponent extends React.Component {

    loadSocket = async () => {
        const _client = CacheSocket.getClient();
        const subscription = _client.subscribe(
            'agent',
            {
                // Additional arguments may be passed to action
            },
            (payload, action) => console.log({ payload, action }), // Not Working ...
            {
                requestId: String(uuidv4()),
                includeCreateEvents: true,
                subscribeAction: 'subscribe_to_agent_activity',
                unsubscribeAction: 'unsubscribe_to_agent_activity',
            },
        );

    };

    componentDidMount() {
        this.loadSocket();
    };

    render() {
        return <>{this.props.children}</>;
    }
}

Everything is working perfectly, i can see the stream response in the browser in the Network tab, but i can't log the payload onto the console ...

Hmmm, lemme think. What does "everything is working perfectly" mean, precisely? Like, you see websocket messages for the stream whenever you create/update/delete an "agent"?

OH! Try logging the passed requestId, and making sure it matches the one in the websocket messages.

I recall there being some issue with DCRF, where it wasn't returning the same request ID in the subscription event messages, so dcrf-client couldn't dispatch them properly. In the integration tests for dcrf-client, I believe I had to use a custom @model_observer โ€” see test/integration/dcrf_client_test/observers.py (and its usage in test/integration/dcrf_client_test/consumers.py).

Oh, hmm, maybe I'm thinking of #8, where multiple subscriptions to a single stream would use only the last subscription's request ID โ€” and which was reported as dcrf#44 (and resolved in #46).

Well, in any case, your code seems fine, so I'm hoping with some more info, I might be able to nail the issue down.

Firefox

    Out: {"stream":"agent","payload":{"action":"subscribe_to_agent_activity","request_id":"1cea510a-fb97-43a8-9fd8-1254ac29c480"}} 
    
    14:49:02.145

    In: {"stream": "agent", "payload": {"id": 1, "name": "Amanda Sr. Aguirre"}} // db update
    
    14:49:21.548

Opera

    Out: {"stream":"agent","payload":{"action":"subscribe_to_agent_activity","request_id":"6aff00bf-f680-4dba-9494-e4c0ba9446ce"}} 
    
    14:48:59.823

    In: {"stream": "agent", "payload": {"id": 1, "name": "Amanda Sr. Aguirre"}} // opera receives server updates
    
    14:49:21.548

As you can see, the stream is sending and receiving the server updates, but my client can't seem to log them.

server code [djangochannelrestframework + channelmultiplexer]

    class AgentConsumer(ListModelMixin, ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
      serializer_class = serializers.AgentSerializer
      model = serializer_class.Meta.model
      queryset = model.objects.all()
    
      @model_observer(Agent)
      async def agent_activity(self, message, observer=None, **kwargs):
          await self.send_json(message)
    
      @agent_activity.serializer
      def agent_activity(self, instance, action, *args, **kwargs):
          '''This will return the agent serializer'''
          return serializers.AgentSerializer(instance).data
    
      @action()
      async def subscribe_to_agent_activity(self, **kwargs):
          await self.agent_activity.subscribe()
    
      @action()
      async def unsubscribe_to_agent_activity(self, **kwargs):
          await self.agent_activity.unsubscribe()

I have followed your integration method and now it's working as intended (a pain in the a** i must say)

I had to tweak your integration a little bit

routing.py

    application = ProtocolTypeRouter({
        "http": get_asgi_application(),
        "websocket": AuthMiddlewareStack(
            URLRouter([
                # i had to add as_asgi() to make it work ...
                path("cache/", AsyncJsonWebsocketDemultiplexer.as_asgi(
                    agents=AgentConsumer().as_asgi(),
                    agents_with_id=AgentWithIdConsumer.as_asgi()
                )),
            ])
        )
    })

Thank You !

a pain in the a** i must say

Agreed! I want it to be upstream. But I never opened the upstream PR for it, like I told myself I would :P Well, I'll close this for now; holler if you have any other issues.

Out of curiosity, what version of channels and dcrf are you using?

Sorry for the late response;

// package.json
dcrf-client@1.1.0

//requirements.txt
channels 3.0.4
channels-redis 3.3.1
channelsmultiplexer 0.0.3
djangochannelsrestframework 0.3.0

Update

For some weird reason, everything stopped working and the issue was from .as_asgi() part, i had to remove it from every consumer ...