confluentinc/kafka-streams-examples

Case: 100+ KafkaStreams threads on 3.5k+ topics/partitions with exactly_once_v2 guarantee

fu22ybear opened this issue · 8 comments

hi folks!

currently I'm working on KafkaStreams based app. the idea is to stream from one source topic to 3.5k+ topics using processing.guarantee: exactly_once_v2 mode.

source topic is an endpoint and a single source of truth for all incoming messages. messages are different with different content and the goal is to split those incoming messages by content and write to a specific topic. for instance, type1 messages to type1-topic, type2 to type2-topic and so forth.

                   / ----------- type1-topic
                  / ------------ type2-topic
source-topic ---- ...
                  \ ------------ typeN-topic

details

  • 5 brokers;
  • source topic has 630 partitions;
  • at the moment there are 3,500 types of messages and the number will grow;
  • the total flow of messages is about 2.5-3 millions messages per second, average message size is 860b (max size is up to 250kb), which gives 2-2.4gb per second;
  • the frequency of messages is unevenly distributed: some types occur very frequently, some rarely;
  • destination topics are mostly single partition, but for high-flow messages topics have 10 partitions.

questions

  1. is this even a good idea?
    I mean, I like the idea because of its simplicity and elegance. but I also feel that the implementation cannot be reliable and stable.

  2. how Kafka handles multiple ongoing transactions at time on one specific partition?
    I found a lot of articles with explanation how Kafka transaction works, but all of them are about one-thread stream with one ongoing transaction at a time. but I've not idea how TransactionCoordinator works when a bunch of threads run transactions on same set of topics/partitions. as instance, thread-1 performs transaction on type1_topic-0, type2_topic-0, type3_topic-0 and at the same time there are a bunch of thread performing transactions on same topics type1_topic-0, type2_topic-0, type3_topic-0.
    are the partitions blocked for writing until the transaction ends? are other transactions waiting in line?

  3. is it even possible to make this solution solid and durable?

also, I will separately describe current state.

configs

### KAFKA STREAMS SECTION ### ------------------------------------------------------------------------------------------
application.id: kafka-splitter
bootstrap.servers: kafka1.host:9092,kafka2.host:9092,kafka3.host:9092,kafka4.host:9092,kafka5.host:9092
# tried up to 16
num.stream.threads: 4

processing.guarantee: exactly_once_v2

num.standby.replicas: 1

topology.optimization: none
auto.offset.reset: latest

# 2gb
statestore.cache.max.bytes: 2147483647
# 7sec, tried 100ms-60000ms
commit.interval.ms: 7000


### KAFKA CONSUMER SECTION ### -----------------------------------------------------------------------------------------
poll.ms: 60000
max.poll.interval.ms: 900000
max.poll.records: 50000000

# 2gb
fetch.max.bytes: 2147483647
fetch.max.wait.ms: 10000
# 100mb
fetch.min.bytes: 104857600
# 5mb
max.partition.fetch.bytes: 5242880


### KAFKA PRODUCER SECTION ### -----------------------------------------------------------------------------------------
compression.type: zstd

# 5min, tried to reduce and increase
linger.ms: 300000
# 2mb, tried to reduce and increase
batch.size: 2097152

# 10min, tried to reduce and increase
max.block.ms: 1200000
# 2gb
buffer.memory: 2147483647

# 2gb
max.request.size: 2147483647

# 30min, tried to reduce and increase
delivery.timeout.ms: 1800000
# 15min, tried to reduce and increase
transaction.timeout.ms: 900000
# 15min, tried to reduce and increase
producer.transaction.timeout.ms: 900000

# 20min, tried to reduce and increase
task.timeout.ms: 1200000
# 20min, tried to reduce and increase
request.timeout.ms: 1200000

the logic that I mostly follow when selecting configs https://stackoverflow.com/a/74518032

errors handlers

public class KafkaSplitterProductionExceptionHandler implements ProductionExceptionHandler {
    @Override
    public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
        ...
        return ProductionExceptionHandlerResponse.FAIL;
    }

    @Override
    public void configure(Map<String, ?> configs) {}
}

public class KafkaSplitterStreamsUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
    @Override
    public StreamThreadExceptionResponse handle(Throwable exception) {
        ...
        return StreamThreadExceptionResponse.REPLACE_THREAD;
    }
}

errors

randomly happens and producer can not recover

[] ERROR 2023-09-18 20:48:14,680 [processor.internals.RecordCollectorImpl, kafka-producer-network-thread | kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-1-producer] : stream-thread [kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-1] stream-task [0_337] Error encountered sending record to topic *TOPIC* for task 0_337 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.

also sometimes partitions leaders change on brokers side

[] WARN  2023-09-18 20:48:12,756 [producer.internals.Sender, kafka-producer-network-thread | kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-2-producer] : [Producer clientId=kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-2-producer, transactionalId=kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-2] Got error produce response with correlation id 13414 on topic-partition *TOPIC-0*, retrying (2147483645 attempts left). Error: NOT_LEADER_OR_FOLLOWER
[] WARN  2023-09-18 20:48:12,756 [producer.internals.Sender, kafka-producer-network-thread | kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-2-producer] : [Producer clientId=kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-2-producer, transactionalId=kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-2] Received invalid metadata error in produce request on partition *TOPIC-0* due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
mjsax commented

is this even a good idea?
I mean, I like the idea because of its simplicity and elegance. but I also feel that the implementation cannot be reliable and stable.

It does sound like a large scale deployment (and EOS will make it a little bit more tricky), however, given the simplicity of the application, I am quite confident that it's possible and would not call it a "bad idea".

how Kafka handles multiple ongoing transactions at time on one specific partition?
That's not an issue. Kafka TX are designed for this pattern.

are the partitions blocked for writing until the transaction ends? are other transactions waiting in line?
No, all TX will happen in parallel, and as a matter of fact, the write of different TX are still interleaved.

is it even possible to make this solution solid and durable?
I believe yes. (cf my first reply) -- Might require some fine tuning of course (but not sure what your requirement are, so hard to advice.)

About the "errors" -- for the first one, you exception handler should restart the StreamThread and thus you should get recovery? Given you 7 sec commit interval, it does not sound too bad with regard to retrying a bunch of data. Or is this overall behavior not good enough? -- Btw: what version are you using (both broker / KS): newer version got significant fixes to make EOS more robust.

For the warn logs: I am not a broker expert, so not sure why the leader moved, but in general it's a "normal" thing and it seem the client did handle it well, refreshing it's metadata. It's no error but a warn log, so wondering why it bothers you?

@mjsax thank you for your reply!

you exception handler should restart the StreamThread and thus you should get recovery?

I expect yes, because of return StreamThreadExceptionResponse.REPLACE_THREAD;. or should I do something else to recover failed thread? I also can see that consumer for failed thread is working (not sure if it was recovered one in new thread or it is just wasn't failed), but producer is not.

Given you 7 sec commit interval, it does not sound too bad with regard to retrying a bunch of data. Or is this overall behavior not good enough?

it performs well until an error occurs. and after error producer is not able to continue producing. and error The producer attempted to use a producer id which is not currently assigned to its transactional id sounds for me like TX is already done (doesn't matter was it committed or aborted), but producer is still trying to run TX with same ID.

Btw: what version are you using (both broker / KS): newer version got significant fixes to make EOS more robust.

we use 3.5 Kafka brokers (still on ZooKeeper) and Kafka Streams

but in general it's a "normal" thing and it seem the client did handle it well, refreshing it's metadata

yeah, I also understand that it should be normal thing, but there were a lot errors and long retries. but I believe because of long delivery.timeout.ms: 1800000. there also was severe degradation on brokers side, unfortunately, I've already lost logs, but for bunch of topics leaders were skewed and data was under-replicated. maybe it happened because of long retries...

but not sure what your requirement are, so hard to advice

what specific information would you like to know to be able to advice something?

mjsax commented

Thanks. -- When a thread is replace, we literally start a new one which creates a new producer and calls initTransaction() what should put everything into clean state. There is one known issue (https://issues.apache.org/jira/browse/KAFKA-14567) which results in fencing, but it seems to be different to your report.

Guess we would need detailed Kafka Streams, producer, and broker logs to figure out what is happening.

Not sure if the high TX-timeout you configures has anything to do with it 🤔. On the other hand, even with the large timeout, calling initTransaction on the new producer (if we re-use a TX-id as mentioned on the ticket above) should abort any pending TX for this TID and we should get a clean producer state. -- We might need to get some input from EOS experts. \cc @jolshan

hey @mjsax @jolshan.
sorry for the delayed response.

here are application logs from all instances and also some metrics.
app was running on 6 servers with 6 threads on each, so 36 threads in total.
after ~2 hours something went wrong and 4 producers (one on each of 1, 2, 5, 6 hosts) could no longer send messages.

app_config.txt

app_logs_hosts1,2.zip
app_logs_hosts5,6.zip
app-logs_hosts3,4.zip

avg_number_of_records avg_number_of_requests avg_records_count avg_request_size failed_rebalances_per_hour rebalances_per_hour tnx_aborting

Hey sorry for the delay. Catching up on the situation.

I will try to answer the questions posed to me, but let me know if I missed something.

  1. (is this even a good idea?)
    In theory I think it is doable, but it could be tricky depending on the load for specific partitions and configurations.

  2. (how Kafka handles multiple ongoing transactions at time on one specific partition?)
    So a single producer can only work on a single transaction at a time. This transaction can contain multiple partitions. Other producers can also concurrently write to the same partitions as part of separate transactions. So I'm not exactly sure how your Thread maps to producer instances, but each transaction needs its own producer.

As for writing, there is no blocking writes on the same partition. We do block read_committed reads. Each partition has something called the LSO. This is the first offset in the log that contains an uncommitted transaction. Read-committed consumers can not read past the LSO, so if you have transaction A and transaction B in the log like this:

(A message 1) (B message 1) (B message 2) (B message 3) (B commit)

The LSO would be at A message 1 and even though B is committed, you would not be able to read it until A is also committed.

(A message 1) (B message 1) (B message 2) (B message 3) (B commit) (C message 1) (A commit)

At this point LSO is at C message 1 and A and B transactions are readable.

  1. See 1. :)

As for some of the followups I saw:
The leadership errors should be retriable on metadata refresh and you shouldn't have to worry about that.
InvalidPidMapping happens when the transactional id expires. This happens when a transactional producer is left idle for the expiration period (transactional.id.expiration.ms) and the default is 7 days. (Note: the time starts when the transaction is idle, and we will never expire when the transaction is ongoing.)

As for the timeouts, I don't think it is good for your transactional timeout to be less than your delivery timeout.

mjsax commented

Thanks @jolshan

So I'm not exactly sure how your Thread maps to producer instances, but each transaction needs its own producer.

With EOSv1 (deprecated) there would be a producer per task, and a StreamThread might have multiple assigned tasks. For EOSv2, there is one producer per StreamThread.

thank you for your replies @jolshan @mjsax!

@jolshan as for transactional.id.expiration.ms, we have default value for it.
also, regarding transactional timeout, it was more than delivery timeout for my last run (details are here).
and still there were a lot of InvalidPidMappingException's (for instance, on host2, logs are provided above), as well as ProducerFencedException's (for instance, on host1). for some reasons producers were not able to recover.

as far as I understand, ProducerFencedException is something that can happen time to time, but InvalidPidMappingException doesn't seem so. and InvalidPidMappingException might be the root of problem, but it's only a suggestion.