XKNX/xknx

Group values sent with delay are received at the same time

tfassbender opened this issue ยท 2 comments

Description of problem:

I'm trying to enhance my KNX switches by using the xknx library on a local server that is connected to the KNX network. What I'm trying to achieve is: When a button on the switch is pressed I want the LED on that button to blink (turn on for two seconds, then turn off again).

The problem I'm facing is that the telegrams seem to be sent with the expected delay of 2 seconds (I can see that in the logs), but they seem to be received at the same time (about the time the second telegram is sent). At least that's what I see in the log output of a telegram monitor (and it would explain why the LED is not blinking).


Code and logs:

The code I'm using to (try to) get the LED to blink is the following (shortened to the important parts):

async def react_to_all_lights_off(telegram: Telegram):
    if isinstance(telegram.payload, GroupValueWrite):
        value = telegram.payload.value.value
        if value == 0: # "off" command for all lights
            append_log("received telegram to turn off all lights -> blinking control light on the clicked button")
            control_light = Light(xknx,
                name='control_light__all_lights_off',
                group_address_switch='13/0/1')

            append_log("sending light on")
            await control_light.set_on() # turn the LED on
            await asyncio.sleep(2) # wait for 2 seconds before turning the controll light off again
            append_log("sending light off")
            await control_light.set_off() # turn the LED off

            # TODO the telegrams are sent at different times, but seem to be received at the same time (in the telegram monitor)

# ...

async def telegram_received_cb(telegram: Telegram):
    destination = str(telegram.destination_address)

    if destination in invocations_per_destination:
        for callback in invocations_per_destination[destination]:
            await callback(telegram) # will call the react_to_all_lights_off function

async def main():
    global xknx
    xknx = XKNX(telegram_received_cb=telegram_received_cb, daemon_mode=True)
    await xknx.start()
    await xknx.stop() # won't stop till ended with Ctrl+C because of the daemon mode

asyncio.run(main())

I'm using a telegram monitor scipt to check the time the telegrams are sent:

def append_log(log_data):
    current_date = datetime.datetime.now().strftime("%Y_%m_%d")
    log_directory = "log"
    filename = f"{current_date}__telegram_monotoring.log"
    filepath = os.path.join(log_directory, filename)

    if not os.path.exists(log_directory):
        os.makedirs(log_directory)
    
    with open(filepath, "a+") as file:
        file.write("{0} --- {1}".format(datetime.datetime.now(), log_data) + "\n")

async def telegram_received_cb(telegram: Telegram):
    append_log(telegram)

async def main():
    xknx = XKNX(telegram_received_cb=telegram_received_cb, daemon_mode=True)
    await xknx.start()
    await xknx.stop()

asyncio.run(main())

In the logs from the first script (to enhance the KNX button press) everything looks fine. There are about 2 seconds between the logs of "sending light on" and "sending light off":

2023-08-30 16:15:12.134687 --- received telegram to turn off all lights -> blinking control light on the clicked button
2023-08-30 16:15:12.166698 --- sending light on
2023-08-30 16:15:14.178454 --- sending light off

But somehow the telegram monitor receives the telegrams that are sent from this script at the same time (without the expected two seconds delay between the telegrams). Therefore the LED is turned on and off in milliseconds and won't blink.
The first telegram in the log is the button press that activates the callback function. The second and third telegrams are from the group_write function in the callback script (you can see that because of the source and destination addresses):

2023-08-30 16:15:12.348366 --- <Telegram direction="Incoming" source_address="1.1.16" destination_address="13/0/0" payload="<GroupValueWrite value="<DPTBinary value="0" />" />" />
2023-08-30 16:15:14.401976 --- <Telegram direction="Incoming" source_address="1.1.1" destination_address="13/0/1" payload="<GroupValueWrite value="<DPTBinary value="1" />" />" />
2023-08-30 16:15:14.425259 --- <Telegram direction="Incoming" source_address="1.1.1" destination_address="13/0/1" payload="<GroupValueWrite value="<DPTBinary value="0" />" />" />

Question:

When I try to run this without the callback (like in the example_light_switch.py script) everything is working like expected, so I think this might be a problem in the daemon / callback mode? Or maybe I'm just missing something?

Thanks in advance :)


  • using xknx standalone
  • using Home-Assistant knx integration

Version information:

  • xknx / Home-Assistant release with the issue: xknx 2.11.2
  • last working xknx / Home-Assistant release (if known): unknown
  • python: 3.11.5

KNX installation:

  • IP Interface: MDT IP Interface SCN-IP000.03
  • Switch: MDT BE-TA55P8.01

Problem-relevant configuration.yaml entries (fill out even if it seems unimportant):
pure xknx script - no home assistant configuration.yaml involved

Diagnostic data of the config entry (only when Home Assistant is used)
no Home Assistant used

Traceback (if applicable):
no traceback

farmio commented

Hi ๐Ÿ‘‹!
These telegram callbacks are invoked in the TelegramQueue class. While they are called, the queue in not processed further. See

await asyncio.gather(*callbacks)

So if you call await asyncio.sleep(2) in such a callback, you are essentially pausing the heart of xknx. Both your telegrams - on and off - are put into the queue, but since the Queue waits until the callbacks have returned, they are not processed (sent) immediately (incoming and outgoing telegrams are processed in the same queue).

Here are some remarks for your code:

  • You are creating a new Light instance on every callback. I'd suggest to create this once and use it in the callback, or just send a telegram without a Device abstraction using
    async def group_value_write(
  • generally don't os.makedirs, open(), file.write(), etc. (blocking IO calls) in an async context. It may not be a problem in your example code, but this stalls the whole asyncio loop. Use async compatible functions for such, an executor, or in this special case I'd strongly recommend to have a look at the logging module - this will handle things properly.
  • Have a look into Device callbacks
    device_updated_cb: DeviceCallbackType[BinarySensor] | None = None,

    You can use this to trigger a callback for a specific GA (your 13/0/0), easily compare the state of the device instead of extracting telegram.payload.value.value and call your blink-function.
  • Use background tasks (asyncio.Task) for your delay-function. Always save a reference to that task - so it won't be garbage-collected and you can cancel it if it is running (sleeping) while you would invoke it again (because of 2 Telegrams with "0" to 13/0/0 in less than 2 seconds). This task reference could be saved as an attribute of the BinarySensor device that has invoked the callback.
    See also https://docs.python.org/3/library/asyncio-task.html#creating-tasks

Hi ๐Ÿ‘‹

Thanks for the fast reply and the perfect explenation.
Using a background task to turn the light off again works like a charm! ๐Ÿ˜ƒ

Also thank you for the remarks. I'll try to add them soon ๐Ÿ˜„.

If someone is interested in it, the working solution looks like this:

"""
    Additional functionality that cannot be configured in ETS
"""

import os
import datetime

import asyncio
from xknx import XKNX
from xknx.telegram import Telegram
from xknx.telegram.apci import GroupValueWrite
from xknx.tools import group_value_write
from xknx.devices import Light

# set for background tasks, so they are not garbage collected
background_tasks = set()

def init_devices():
    global control_light
    control_light = Light(xknx, name='control_light__all_lights_off', group_address_switch='13/0/1')

# TODO: blockin IO operations should not be used in callbacks - use logging module instead
def append_log(log_data):
    current_date = datetime.datetime.now().strftime("%Y_%m_%d")
    log_directory = "log"
    filename = f"{current_date}__additional_functionality.log"
    filepath = os.path.join(log_directory, filename)

    if not os.path.exists(log_directory):
        os.makedirs(log_directory)
    
    with open(filepath, "a+") as file:
        file.write("{0} --- {1}".format(datetime.datetime.now(), log_data) + "\n")


# 13/0/0
async def react_to_all_lights_off(telegram: Telegram):
    if isinstance(telegram.payload, GroupValueWrite):
        value = telegram.payload.value.value
        if value == 0: # "off" command for all lights
            append_log("received telegram to turn off all lights -> blinking control light on the clicked button")
            
            # turn the control light on and off in a separate task, so it doesn't block xknx
            task = asyncio.create_task(blink_control_light())
            background_tasks.add(task)
            task.add_done_callback(background_tasks.discard)

async def blink_control_light():
    await control_light.set_on()
    await asyncio.sleep(2) # wait for 2 seconds before turning the controll light off again
    await control_light.set_off()


# core logic that passes telegrams to the configured callback methods depending on their source / destination

invocations_per_destination = {
        # enter destination groups here and assign them to a list of async functions (with a telegram parameter)
        "13/0/0": [react_to_all_lights_off],
    }

async def telegram_received_cb(telegram: Telegram):
    """
        This function will be invoked for every telegram that is sent over the knx bus
    """
    destination = str(telegram.destination_address)

    # TODO: use device callbacks instead
    if destination in invocations_per_destination:
        for callback in invocations_per_destination[destination]:
            await callback(telegram)

async def main():
    global xknx
    xknx = XKNX(telegram_received_cb=telegram_received_cb, daemon_mode=True)
    
    init_devices()
    
    await xknx.start()
    await xknx.stop() # won't stop till ended with Ctrl+C because of the daemon mode

asyncio.run(main())