chrysn/aiocoap

How to simultaneously observe several resources?

Closed this issue · 9 comments

I have several coap servers with several resources that I wish to handle with a single coap client, how would I go about it? I got the observation part working, but the problem is that the client blocks on each coap server.

Thanks in advance.

Many resources can be observed in parallel; you might be writing in sequential style, which then "blocks" (really, suspends) the task that's observing. In general with waiting for multiple things in asyncio, your choices are either to have a separate task per observation (which is what you may be accustomed to from threads, cheaper in terms of memory but still not optimal), or use something like gather to wait until any of your observations gives data. If you use the generator interface to observations, the aiostream library might offer useful tools, but I have not tested that.

Do you have a minimal example of what you're trying to do?

I actually got the thing working but I'm quite sure it's not optimal (I don't know much about async python): link
Basically I create a bunch of requests:

    requests = [(aiocoap.Message(
        code=aiocoap.Code.GET, uri='coap://[' + node + ']/' + uri, observe=0))
for uri in uri_all for node in nodes]

Then I create a task for every request

 loop = asyncio.get_event_loop()
  asyncio.ensure_future(send_requests(requests, nodes))
  loop.run_forever()

and register a callback for each request:

 messages = [protocol.request(request) for request in requests]
 for msg in messages:
      msg.observation.register_callback(observation_cb)

This works, but I don't know if its the best way to go about it.

As you use register_callback, you can do that in a single task. You will still need to either

  • wait for the first response to gather error messages if something goes wrong (as you do now which is good)
  • use "gather" to do the above in parallel in a single task, or
  • start a task for waiting for each of the initial responses,
    but once the callbacks are registered, everything is handled in the aiocoap managed task(s), so there's no need to have tasks around. You don't need to (and probably shouldn't) loop over the last msg's observation -- you can just return from there.

What's still missing in the linked code is error handling -- if any resource is not observable or the server returns an error after some time, there's no way to know as long as you don't register_errback too.

Keeping this open as it'd make a good example to add to the guided tour documentation, feel free to ask on if you think your questions were not answered yet.

(Terminology and usage details: What you call uri in your variables is the path, and what "requests" and "messages" would probably better named "request_messages" and "requests", respectively -- I know aiocoap terminology is not ideal there. For observation_cb, I suggest you rather use m.get_request_uri() as str(m) can change without break notification between versions).

Thanks for the suggestions, as you can probably tell I don't have a lot of experience with programming in general. Basically, I just tried a couple of methods, until something worked.

I'm gonna investigate the workflow you suggest, but I agree that it could useful for newcomers to have documentation for this usecase.

I also have problem with observing multiple resources simultaneously. I have a lwm2m client (which runs on Raspberry Pi 3) and i have two temperature sensors attached to it. I want to observe data from these sensor in lwm2m server Leshan demo.
My method is that whenever i receive observation request from lwm2m server, i will create a separated task for this observation. This coroutine ("do_notify()") will call "updated_state()" function after updating new value

def _notifier(): 
    ObservableResource.updated_state(response=Encoder.encode()) 
    # Encoder.encode() function will encode sensor's data as payload 
    # and send them to lwm2m server in  COAP message
   
async def do_notify(model, notifier):
    await asyncio.sleep(2)
    # Update resource /3303/0/5700
    model.set_resource('3303', '0', '5700', update(sensor_1))
    _notifier()
    # Reschedule itself
    asyncio.create_task(do_notify(model, notifier))

asyncio.create_task(do_notify(model, notifier))

This method works well for one resource, but when i try to create one more coroutine ( do_notify_1() for example) to observe data from another sensor, i could only send data of both sensors in short period of time (10 -15 seconds) before it runs to error "Duplicate NON, ACK and RST" and stop sending data. Could you please help me in this problem?

Any solution to this yet?

So far, this is waiting for feedback from the original issue creator on investigating the suggested workflow. If there are more observation that a developer is happy having tasks around, chances are the observation should be on a batch resource rather than on individual resources -- so the recommendation is still to have a task (or at least an awaitable) per observation.

@WattageGuy, please open a new issue. (Same goes for @datle1112 -- that post seems to relate to the server side).

Okay @chrysn please take a look at #343 I dont know how to describe the problem but I hope you understand.

Sorry, I didn't get that I was supposed to do anything here. To be honest, I haven't done anything with coap since I opened this issue. Will close now.