mavlink/MAVSDK-Python

Subscribing to a stream cancels older subscriptions

Closed this issue ยท 4 comments

Currently, at the C++ level, streams like position_async() register one unique callback. This means that one cannot register two callbacks: the second call will erase the previous one.

I believe that this is not the behavior we want in Python. Already in Swift and Java, a stream is shared between the consumers at the language binding level (by using Rx mechanics).

I made a proof of concept for position() (thanks to Samyash for the help), essentially trying to call _stub.SubscribePosition(request) only once and sharing that with all the consumers.

I looks like below, exposing position() as before and instantiating the factory in a lazy way (only when the first consumer registers).

position_factory = None

...

def position(self):
    if not self.position_factory:
        self.position_factory = self._position_factory()

    return self.position_factory()

def _position_factory(self):
    import collections

    request = telemetry_pb2.SubscribePositionRequest()
    position_stream = self._stub.SubscribePosition(request)

    deques = []
    already_gone = []

    def new_position_generator():
        new_deque = collections.deque()
        new_deque.extend(already_gone)
        deques.append(new_deque)

        async def gen(my_deque):
            while True:
                if not my_deque:
                    rpc_pos = await position_stream.__anext__()
                    new_pos = Position.translate_from_rpc(rpc_pos.position)

                    for d in deques:
                        d.append(new_pos)
                yield my_deque.popleft()

        return gen(new_deque)

    return new_position_generator

Still to be investigated:

  • This is currently caching all the events, but a new consumer is only interested in the current state. That can be improved thanks to Samyash suggestion here.
  • What happens when a consumer wants to unregister? It should get erased from the queues, probably.
  • Can we stop the gRPC connection when all the consumers unregister?

I agree it would be nice to be able to register multiple callbacks for the wrappers as well as for the C++ API. Now as you asked the question is how to unregister. I don't know what pattern or mechanism is suited for this.

I have the same challenge with various methods that are registered by the plugins against SystemImpl. I use a "cookie" which is basically the this pointer of the object registering something.

https://github.com/Dronecode/DronecodeSDK/blob/aeed8a846ce0b7918eb56a55037e508e3aa1cbd8/core/system_impl.cpp#L71-L73

@JonasVautherin @julianoes Any updates on this? I also encountered this issue while using the Python wrapper, currently avoiding the problem by splitting my program into multiple scripts with separate MAVSDK connections ... this is not ideal ๐Ÿ˜„

No update as far as I know, so it's still in the state described above. I would love it if an advanced python developer could give an opinion and help finishing it ๐Ÿ‘.

That will be fixed with MAVSDK v2.