IBMStreams/streamsx.messagehub

MessageHubProducer: enable Kafka transactions when message hub upgraded the Kafka version

Opened this issue · 9 comments

Message hub is at Kafka 0.10.2, which has no support for transactions in the broker. Once it is upgraded to a newer version, the transactional producer should be enabled by adding the operator parameter consistentRegionPolicy with the values {AtLeastOnce, Transactional}.

Enterprise plan of Event Streams service is at Kafka 1.1, which supports transactions.

https://console.bluemix.net/docs/services/EventStreams/messagehub085.html#plan_choose

Planning support of transactional producer for the 1.5.0 toolkit release.

What would the semantics of consistentRegionPolicy with the values {AtLeastOnce, Transactional}. be?

From the Kafka toolkit documentation (where the feature would be inherited from):

Consistent Region Strategy

The operator can participate in a consistent region. The operator cannot be the start of a consistent region. The operator supports at least once (default behavior) and exactly once delivery semantics. The delivery semantics can be controlled by the consistentRegionPolicy parameter.

At least once delivery

If the operator crashes or is reset while in a consistent region, the operator will write all tuples replayed. This ensures that every tuple sent to the operator will be written to the topic(s). However, at least once semantics implies that duplicate messages may be written to the topic(s).

Exactly once delivery

Messages are always inserted into a topic within the context of a transaction. Transactions are committed when the operator checkpoints. If the operator crashes or is reset while in a consistent region, the operator will abort the ongoing transaction and write all tuples replayed within a new transaction. External consumers configured with isolation.level=read_committed will not read the duplicates from the aborted transactions. Consumers that use a different isolation level will read duplicate messages similar to at least once delivery.

To ensure that replayed tuples are not committed multiple times when the consistent region fails between commit and the point when the entire region has reached a consistent state, the operator writes the current checkpoint sequence number as commit-ID together with its unique transactional ID into the control topic __streams_control_topic as part of the committed transaction. The checkpoint sequence number is part of the operator's checkpointed state and represents the last successfully committed ID.

When the consistent region fails after commit, the last successfully committed ID is reset to the previous value and tuples are replayed. When the operator checkpoints next time, it detects the difference between the committed ID within the control topic and the restored ID from the checkpoint data and aborts the Kafka transaction rather than committing the replayed data again.

NOTE 1: Transactions in Kafka have an inactivity timeout with default value of 60 seconds. If the consistent region triggers less frequently and you expect a low message rate, consider to to increase the timeout by setting the producer property transaction.timeout.ms to a higher value, for example 120000 (milliseconds). The maximum value of this property is limited by the server property transaction.max.timeout.ms, which has a default value of 900000.

NOTE 2: In Kafka environments that have disabled automatic topic creation (broker config auto.create.topics.enable=false), the control topic with the name __streams_control_topic must be created manually before consistentRegionPolicy can be used with Transactional.

NOTE 3: For exactly once delivery semantics, the Kafka broker must have version 0.11 or higher because older brokers do not support transactions.

Hope, this helps.

Need to think about the implementation more, but it jumps out that one value for the parameter (AtLeastOnce) describes behaviour while the other (Transactional) describes an implementation detail.

Why not both behavioural which is what a app developer is trying to achieve? I.e. AtLeastOnce and ExactlyOnce ?

When the consistent region fails after commit, the last successfully committed ID is reset to the previous value and tuples are replayed. When the operator checkpoints next time, it detects the difference between the committed ID within the control topic and the restored ID from the checkpoint data and aborts the Kafka transaction rather than committing the replayed data again.

Minor correction in the first sentence that reset to the previous value should be reset to a previous value . Not sure if that affects the algorithm.

However, I don't understand how this is working. Is there an assumption that a cut will contain the same tuples as a previous run? Otherwise I don't see how this is exactly once?

For example, say the tuples are a simple sequence of numbers: 1,2,3,...

The first checkpoint with sequence 1 contains 1,2,3

The next with sequence 2 contains 4,5,6,7 but is reset after the kafka transaction commit. So Kafka contains 1,2,3,4,5,6,7 but Streams app thinks only 1,2,3 have been processed (assuming a perfect replayable source).

The next with sequence 3 contains 4,5,6,7,8,9,10.
According to the description the kafka transaction will be aborted, but it contains 8,9,10 which have never been written to kafka???

There may be a similar issue when there are multiple resets between successful commits, e.g. if there are multiple resets after sequence 1 was completed above, then the sequence id I think may be 5 for the next cut that reaches the checkpoint.

Maybe I'm missing something as its stated the unique [Kafka] transactional ID is stored in the control topic, but it's not mentioned as being used?

When the consistent region fails after commit, the last successfully committed ID is reset to the previous value and tuples are replayed. When the operator checkpoints next time, it detects the difference between the committed ID within the control topic and the restored ID from the checkpoint data and aborts the Kafka transaction rather than committing the replayed data again.

Minor correction in the first sentence that reset to the previous value should be reset to a previous value . Not sure if that affects the algorithm.

I correct the doc. It does not affect the alogorithm.

However, I don't understand how this is working. Is there an assumption that a cut will contain the same tuples as a previous run? Otherwise I don't see how this is exactly once?

For example, say the tuples are a simple sequence of numbers: 1,2,3,...

The first checkpoint with sequence 1 contains 1,2,3

The next with sequence 2 contains 4,5,6,7 but is reset after the kafka transaction commit. So Kafka contains 1,2,3,4,5,6,7 but Streams app thinks only 1,2,3 have been processed (assuming a perfect replayable source).

The next with sequence 3 contains 4,5,6,7,8,9,10.
According to the description the kafka transaction will be aborted, but it contains 8,9,10 which have never been written to kafka???

Yes you are right. 8, 9, 10 would be missing in the end. This 'exactly once' has the precondition, that each repeated cut produces exactly the same data. Otherwise data may be missing(!!) or be duplicate.

Now, when I reflect over the exactly-once feature again, I vote for reducing the transactional delivery from exacly once to a better at least once and I would remove the weird detection algorithm using the control topic because of

  1. The precondition, that a replayed cut produces exactly the same data as the previously failed one, is so restrictive that most applications (99%?) will not achieve this.
  2. The applications that do not meet the preconditions (99%?) may even lose data - they have not even at-least-once delivery

There may be a similar issue when there are multiple resets between successful commits, e.g. if there are multiple resets after sequence 1 was completed above, then the sequence id I think may be 5 for the next cut that reaches the checkpoint.

For the algorithm to work, the sequenceIDs must be ascending, but not be incremental by 1.

Maybe I'm missing something as its stated the unique [Kafka] transactional ID is stored in the control topic, but it's not mentioned as being used?

In the algorithm, the transactional ID is used to identify the producer in the __streams_control_topic (every producer has its own transactional ID); its function in the Kafka broker doesn't matter here.

I vote for reducing the transactional delivery from exacly once to a better at least once and I would remove the weird detection algorithm using the control topic

+1 - assuming this means transactions are only aborted during reset processing.

I think then is a logical reason for choosing transactions or not:

  • No transactions: Messages appear as soon as they are produced with the downside that a reset will most likely lead to duplicate messages
  • Transactions: Duplicate messages are minimized (though not eliminated) upon reset with the downside that produced messages are only visible at the checkpoint interval. Maybe also downside of processing overhead??

As this feature is completely inherited from the Kafka toolkit, I created an issue there:

IBMStreams/streamsx.kafka#116

feature released with v1.5.0.