Subscribing to a stream cancels older subscriptions
Closed this issue ยท 4 comments
Currently, at the C++ level, streams like position_async()
register one unique callback. This means that one cannot register two callbacks: the second call will erase the previous one.
I believe that this is not the behavior we want in Python. Already in Swift and Java, a stream is shared between the consumers at the language binding level (by using Rx mechanics).
I made a proof of concept for position()
(thanks to Samyash for the help), essentially trying to call _stub.SubscribePosition(request)
only once and sharing that with all the consumers.
I looks like below, exposing position()
as before and instantiating the factory in a lazy way (only when the first consumer registers).
position_factory = None
...
def position(self):
if not self.position_factory:
self.position_factory = self._position_factory()
return self.position_factory()
def _position_factory(self):
import collections
request = telemetry_pb2.SubscribePositionRequest()
position_stream = self._stub.SubscribePosition(request)
deques = []
already_gone = []
def new_position_generator():
new_deque = collections.deque()
new_deque.extend(already_gone)
deques.append(new_deque)
async def gen(my_deque):
while True:
if not my_deque:
rpc_pos = await position_stream.__anext__()
new_pos = Position.translate_from_rpc(rpc_pos.position)
for d in deques:
d.append(new_pos)
yield my_deque.popleft()
return gen(new_deque)
return new_position_generator
Still to be investigated:
- This is currently caching all the events, but a new consumer is only interested in the current state. That can be improved thanks to Samyash suggestion here.
- What happens when a consumer wants to unregister? It should get erased from the queues, probably.
- Can we stop the gRPC connection when all the consumers unregister?
I agree it would be nice to be able to register multiple callbacks for the wrappers as well as for the C++ API. Now as you asked the question is how to unregister. I don't know what pattern or mechanism is suited for this.
I have the same challenge with various methods that are registered by the plugins against SystemImpl
. I use a "cookie" which is basically the this
pointer of the object registering something.
@JonasVautherin @julianoes Any updates on this? I also encountered this issue while using the Python wrapper, currently avoiding the problem by splitting my program into multiple scripts with separate MAVSDK connections ... this is not ideal ๐
No update as far as I know, so it's still in the state described above. I would love it if an advanced python developer could give an opinion and help finishing it ๐.
That will be fixed with MAVSDK v2.