Drain result queue on stop
Opened this issue · 2 comments
kinnarr commented
The stop method is waiting with result_queue.join()
. So we need to empty all results from queue before stop
kinnarr commented
For normal stops, this actually prevents data loss. The workflow is
(https://github.com/metricq/metricq-source-bacnet/blob/master/metricq_source_bacnet/source.py#L197-L218)
- the
stop
function is called - all worker tasks are stopped
_bacnet_reader
is stopped-
_result_queue.join()
in thestop
function- main task empties
_result_queue
as usual
_result_queue.join()
returns- main task is stopped with
_main_task_stop_future
- call
super.stop(...)
If stop is called because of an exception in the main task, _result_queue.join()
may block. So maybe we should call join with a timeout if exception is not None
kinnarr commented
We need a timeout for result_queue.join
in case the data connection died