BatchEventProcessor can try to deque event with negative timeout.
jkolenofferup opened this issue · 6 comments
The following code may ask for an item from the event_queue with a negative timeout interval.
class BatchEventProcessor(BaseEventProcessor):
def _run(self):
try:
while True:
if self._get_time() >= self.flushing_interval_deadline:
self._flush_batch()
self.flushing_interval_deadline = self._get_time() + \
self._get_time(self.flush_interval.total_seconds())
self.logger.debug('Flush interval deadline. Flushed batch.')
try:
interval = self.flushing_interval_deadline - self._get_time()
item = self.event_queue.get(True, interval) ## interval can be negative
If the flushing_interval_deadline is between the first call of _get_time() and the second call of _get_time() then interval will be negative.
Proposed fix:
class BatchEventProcessor(BaseEventProcessor):
def _run(self):
try:
while True:
loop_time = self._get_time() ## only call get_time once per loop iteration
if loop_time >= self.flushing_interval_deadline:
self._flush_batch()
self.flushing_interval_deadline = self._get_time() + \
self._get_time(self.flush_interval.total_seconds())
self.logger.debug('Flush interval deadline. Flushed batch.')
try:
interval = self.flushing_interval_deadline - loop_time
item = self.event_queue.get(True, interval)
Thx @jkolenofferup . Can you provide a case where you were able to produce negative interval?
It's sporadic. We have four jobs that each send about 500k events in a batch setting (batch size 100, flush time 10s). Once we dealt with the event_queue dropping problem, we get about 30 to 40 error messages of the form "{event_processor.py:212} ERROR - Uncaught exception processing buffer. Error: 'timeout' must be a non-negative number@-@".
If you are trying to replicate in a unit test.
- Set flushing_interval_time to T
- Let the first call to _get_time() return T - epsilon
- Let the second call to _get_time() return T + epsilon
Anyways, making two requests for current time in a situation like this (expecting two calls to _get_time() to be constant) should be fixed.
@jkolenofferup I'm curious, do you still get the 30-40 error messages when you change two _get_time() instances into one (loop_time)?
Does your suggestion fix it?
I wasn't able to get any sporadic error messages in my tests. But I was able to trigger the error message based on your suggestion (which is different than generating 500k events with 4 jobs).
I'm not sure if this setup is realistic. Looks like it just triggers the error.
- Set flushing_interval_time to T
- Let the first call to _get_time() return T - epsilon
- Let the second call to _get_time() return T + epsilon
There must be a negative time interval at some point to trigger the errors, but writing a unit test with that makes interval negative and asserts the error message is not exactly what happens in your case?
I'm just trying to get the unit tests for the why the interval would become negative...but not easy with sporadic occurrence.
What do you mean by "context switch" here:
"The two calls to _get_time() easily break if there is a context switch
between the first and second call." ?