php-mqtt/client

async responses

guestisp opened this issue · 5 comments

Let's assume a json-rpc request over MQTT.
The reply from the server could arrive immediatly or after hours (in example, when you are sending a json-rpc request to start a long running task that takes hours to complete).

Is possible to "map" the request to the response and trigger some action during the subscribe event ?

Should I invent something or this is already supported in some way? (events? hooks? ...)

I'm not sure I can follow you, so I'll try to phrase it in my own words. Please let me know if I understood you correctly.

  • You have two clients, lets call them A and B.
  • A randomly generates a response topic and subscribes to it.
  • A publishes a json-rpc request on a topic where B is listening. The json-rpc request contains the response topic subscribed by A in the previous step.
  • B receives the json-rpc request and executes whatever the request was asking for. This takes some time (minutes, hours, ...). B then responds on the response topic from the json-rpc request with the response it generated from the task (or whatever).
  • A receives the response.
  • A maps the response to the original request. Is this what you are looking for?

not exactly.
I have a client and a server, talking with jsonrpc via mqtt.

Client send out a standard jsonrpc request:

{"jsonrpc": "2.0", "method": "subtract", "params": [42, 23], "id": 1}

The server gets the request and, after some time (a lot of time with some commands) , reply to the client with a jsonrpc response:

{"jsonrpc": "2.0", "result": 19, "id": 1}

if responses are sent back in a very short time, would be easy: just subscribe for a fixed amount of time and exit when we get the response.

But some commands takes hours to complete, so I can't "wait" for the response, I need something to map the response to the original request (jsonrpc has the id field shared between request and response exactly for this) and then execute some action.

Example:

CLIENT TO SERVER:
{"jsonrpc": "2.0", "method": "wait.30.minutes", "id": "c32e7e4d-8d8a-41ea-9dc8-508b3a0ed914"}

... after 30 minutes ...
SERVER TO CLIENT:
{"jsonrpc": "2.0", "result": "done", "id": "c32e7e4d-8d8a-41ea-9dc8-508b3a0ed914"}

when the client gets back the response, i need something that is automatically triggered with the response json.

I've tought to use the laravel Cache, setting a key with the json rpc id and the callback to call as the cache value.
Then, on the subscribe loop (that has to run forever), I can check if the id coming from the response is cached, if yes, I call the function with the response json as argument.

This should work, but maybe there is a better (or native) way to do that.

(requests and responses are NOT sequential, I can send a long running request, then 4 short requests, get back 4 responses and then the long-running response)

Ok, I understand. Let me begin with some general words:
If your application is horizontally scalable (multiple worker processes on each end of the RPC pipeline), you'll need some kind of storage for in-flight requests. Whether Redis or any other type of cache is suitable for the job really depends on your needs regarding scalability, persistence, etc. This library does not make these opinionated decisions for you. There is no built-in support for json-rpc or similar.

When looking at the json-rpc client only, I would personally write an Artisan command which subscribes to the response topic (where the json-rpc server sends its answers to). The subscription callback would then parse the message and validate it. For valid messages, a queue job with the id, result/error and possible meta information (like a timestamp) would be created. This is to avoid blocking the MQTT loop (which, under high load, could lead to dropped messages on the broker side).
In the queue job, we would then try to match the response to the previously stored request. Since I have no information what you are going to do with the response (i.e. store it in the database; update some view of the user; or trigger some other processes), this part is a bit abstract.

Where the storage of sent requests happens and how the requests are sent, is really out of scope for this question though. And in my opinion, it doesn't matter anyway. You can easily send json-rpc requests from HTTP requests while the responses are consumed by a long-running background process.

I could imagine code like this to do what you want (just off my head, totally untested):

// Command to subscribe to json-rpc responses.
class ReceiveJsonRpcResponses extends Command
{
    protected $signature = 'jsonrpc:receive'; 
    protected $description = 'Receives json-rpc responses and dispatches job to process them';
 
    public function handle()
    {
        $mqtt = MQTT::connection();
        $mqtt->subscribe('some/response/topic', function (string $topic, string $message) {
            if (! self::isValidJsonRpcResponse($message)) {
                return;
            }

            $response = json_decode($message, true);

            Bus::dispatch(new ProcessJsonRpcResponse(
                id: $response['id'],
                result: $response['result'] ?? null,
                error: $response['error'] ?? null
           ));
        }, 2);

        $mqtt->loop(true);
    }

    private static function isValidJsonRpcResponse(string $message): bool
    {
        $json = json_decode($message, true);

        return $json !== null && isset($json['id']) && (isset($json['result']) xor isset($json['error']));
    }
}

// Job to process the json-rpc responses (by linking them to requests).
class ProcessJsonRpcResponse implements ShouldQueue
{
    use Dispatchable, Queueable, SerializesModels;
 
    private string $id;
    private mixed $result;
    private mixed $error;
 
    public function __construct(string $id, mixed $result = null, mixed $error = null)
    {
        $this->id = $id;
        $this->result = $result;
        $this->error = $error;
    }
 
    public function handle(RpcJobRepository $rpcJobRepository)
    {
        $rpcJob = $rpcJobRepository->findById($this->id);
        if ($rpcJob === null) {
            return; // Unexpected RPC response
        }

        $rpcJob->result = $this->result;
        $rpcJob->error = $this->error;

        $rpcJobRepository->update($rpcJob);
    }
}

this is clear, but let's assume a web interface, where you have a button that send out a jsonrpc command via mqtt on a remote device. when you press the button, you start a spinner to tell users that something is going on. Then you wait for a reply.
I have to match the single request (the one that started the spinner) from the response and stop the spinner.

it's not only how to match a type of response (like the response to the long running command), but the response to long running command triggered by that single request.

I can use commands or DB, but the only solution that comes to my mind is save to the DB my single request, then countinously check for updates to the db (coming from the subscriber command that is always running). something like the change of a flag "isRunning" true/false

Probably this is not related to the php-mqtt client, but i was just thinking if something already existing, like the handlers, was present.

You are already on the right track and you should definitely not block the HTTP request by trying to wait for the json-rpc response. An HTTP request should always be handled in less than one second under normal circumstances.

Technically speaking, you only have two (or maybe three) options really:

  • Let the client poll for status updates. You can use caching on the server-side, but it may still be intense for your database depending on the number and frequency of requests.
  • Let the server push status updates to the client, which is possible if the client opens a WebSocket connection.
  • A combination of the above, where the client will only poll for status changes once it receives an event via WebSocket. This makes sense because you need an equal logic in place for when the user refreshes their browser.