KafkaProducer's exception handling makes the operator lose tuples when in CR
Closed this issue · 3 comments
The producer works asynchronous. Tuple processing invokes producer.send(...)
and gives the producer a callback, which is invoked asynchronous when the PRODUCE request is acknowledged from the server. When thePRODUCE request fails, an exception is handed over by the callback function.
The current implementation stores such an exception, and throws it by the tuple processing on next tuple, so that the runtime restarts the PE as long as the exception is not caught (@catch
in SPL or in a tuple logic when tightly fused with upstream operator).
When there is no next tuple, but drain() and checkpoint() instead, the region is successfully made consistent. Later, the tuple processing then hits the exception, with reset of the region, but with reply of the tuple that hits the exception, but never with the tuple that failed.
In generally, it is not a good idea, to propagate exceptions from callbacks to input port tuple processing, as it suggests that something with current tuple is wrong - but it is'nt.
Resolution: When the ProducerCallback
invokes the client's handleSendException()
method, the consistent region is reset immediately.
On drain of the CR, the producer is flushed, resulting in all outstanding batches are sent. When a callback is invoked with an exception during drain()
a) the region is reset from the Callback's thread
b) the exception is stored an thrown after producer.flush()
completes within the state handler thread that invoked drain()
, so that the operator restarts and the region is reset.
Change, when not in a consistent region: On Exception received in the Callback, the PE is terminated instead of propagating the exception to the input port, where it used to be thrown on next tuple.
Terminating and restarting the PE is current recovery strategy in autonomous region. This strategy cannot be foiled by catching Exceptions from the input port anymore.
resolved with v2.1.0