metricq/metricq-source-bacnet

Drain result queue on stop

Opened this issue · 2 comments

The stop method is waiting with result_queue.join(). So we need to empty all results from queue before stop

For normal stops, this actually prevents data loss. The workflow is
(https://github.com/metricq/metricq-source-bacnet/blob/master/metricq_source_bacnet/source.py#L197-L218)

  1. the stop function is called
  2. all worker tasks are stopped
  3. _bacnet_reader is stopped
    1. _result_queue.join() in the stop function
    2. main task empties _result_queue as usual
  4. _result_queue.join() returns
  5. main task is stopped with _main_task_stop_future
  6. call super.stop(...)

If stop is called because of an exception in the main task, _result_queue.join() may block. So maybe we should call join with a timeout if exception is not None

We need a timeout for result_queue.join in case the data connection died