Case: 100+ KafkaStreams threads on 3.5k+ topics/partitions with exactly_once_v2 guarantee
fu22ybear opened this issue · 8 comments
hi folks!
currently I'm working on KafkaStreams based app. the idea is to stream from one source topic to 3.5k+ topics using processing.guarantee: exactly_once_v2
mode.
source topic is an endpoint and a single source of truth for all incoming messages. messages are different with different content and the goal is to split those incoming messages by content and write to a specific topic. for instance, type1
messages to type1-topic
, type2
to type2-topic
and so forth.
/ ----------- type1-topic
/ ------------ type2-topic
source-topic ---- ...
\ ------------ typeN-topic
details
- 5 brokers;
- source topic has 630 partitions;
- at the moment there are 3,500 types of messages and the number will grow;
- the total flow of messages is about
2.5-3 millions
messages per second, average message size is860b
(max size is up to250kb
), which gives2-2.4gb
per second; - the frequency of messages is unevenly distributed: some types occur very frequently, some rarely;
- destination topics are mostly single partition, but for high-flow messages topics have 10 partitions.
questions
-
is this even a good idea?
I mean, I like the idea because of its simplicity and elegance. but I also feel that the implementation cannot be reliable and stable. -
how Kafka handles multiple ongoing transactions at time on one specific partition?
I found a lot of articles with explanation how Kafka transaction works, but all of them are about one-thread stream with one ongoing transaction at a time. but I've not idea howTransactionCoordinator
works when a bunch of threads run transactions on same set of topics/partitions. as instance,thread-1
performs transaction ontype1_topic-0, type2_topic-0, type3_topic-0
and at the same time there are a bunch of thread performing transactions on same topicstype1_topic-0, type2_topic-0, type3_topic-0
.
are the partitions blocked for writing until the transaction ends? are other transactions waiting in line? -
is it even possible to make this solution solid and durable?
also, I will separately describe current state.
configs
### KAFKA STREAMS SECTION ### ------------------------------------------------------------------------------------------
application.id: kafka-splitter
bootstrap.servers: kafka1.host:9092,kafka2.host:9092,kafka3.host:9092,kafka4.host:9092,kafka5.host:9092
# tried up to 16
num.stream.threads: 4
processing.guarantee: exactly_once_v2
num.standby.replicas: 1
topology.optimization: none
auto.offset.reset: latest
# 2gb
statestore.cache.max.bytes: 2147483647
# 7sec, tried 100ms-60000ms
commit.interval.ms: 7000
### KAFKA CONSUMER SECTION ### -----------------------------------------------------------------------------------------
poll.ms: 60000
max.poll.interval.ms: 900000
max.poll.records: 50000000
# 2gb
fetch.max.bytes: 2147483647
fetch.max.wait.ms: 10000
# 100mb
fetch.min.bytes: 104857600
# 5mb
max.partition.fetch.bytes: 5242880
### KAFKA PRODUCER SECTION ### -----------------------------------------------------------------------------------------
compression.type: zstd
# 5min, tried to reduce and increase
linger.ms: 300000
# 2mb, tried to reduce and increase
batch.size: 2097152
# 10min, tried to reduce and increase
max.block.ms: 1200000
# 2gb
buffer.memory: 2147483647
# 2gb
max.request.size: 2147483647
# 30min, tried to reduce and increase
delivery.timeout.ms: 1800000
# 15min, tried to reduce and increase
transaction.timeout.ms: 900000
# 15min, tried to reduce and increase
producer.transaction.timeout.ms: 900000
# 20min, tried to reduce and increase
task.timeout.ms: 1200000
# 20min, tried to reduce and increase
request.timeout.ms: 1200000
the logic that I mostly follow when selecting configs https://stackoverflow.com/a/74518032
errors handlers
public class KafkaSplitterProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
...
return ProductionExceptionHandlerResponse.FAIL;
}
@Override
public void configure(Map<String, ?> configs) {}
}
public class KafkaSplitterStreamsUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
...
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
errors
randomly happens and producer can not recover
[] ERROR 2023-09-18 20:48:14,680 [processor.internals.RecordCollectorImpl, kafka-producer-network-thread | kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-1-producer] : stream-thread [kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-1] stream-task [0_337] Error encountered sending record to topic *TOPIC* for task 0_337 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
also sometimes partitions leaders change on brokers side
[] WARN 2023-09-18 20:48:12,756 [producer.internals.Sender, kafka-producer-network-thread | kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-2-producer] : [Producer clientId=kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-2-producer, transactionalId=kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-2] Got error produce response with correlation id 13414 on topic-partition *TOPIC-0*, retrying (2147483645 attempts left). Error: NOT_LEADER_OR_FOLLOWER
[] WARN 2023-09-18 20:48:12,756 [producer.internals.Sender, kafka-producer-network-thread | kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-2-producer] : [Producer clientId=kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-StreamThread-2-producer, transactionalId=kafka-splitter-b4222843-7a39-4a6a-83e4-e422217ac6cb-2] Received invalid metadata error in produce request on partition *TOPIC-0* due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
is this even a good idea?
I mean, I like the idea because of its simplicity and elegance. but I also feel that the implementation cannot be reliable and stable.
It does sound like a large scale deployment (and EOS will make it a little bit more tricky), however, given the simplicity of the application, I am quite confident that it's possible and would not call it a "bad idea".
how Kafka handles multiple ongoing transactions at time on one specific partition?
That's not an issue. Kafka TX are designed for this pattern.
are the partitions blocked for writing until the transaction ends? are other transactions waiting in line?
No, all TX will happen in parallel, and as a matter of fact, the write of different TX are still interleaved.
is it even possible to make this solution solid and durable?
I believe yes. (cf my first reply) -- Might require some fine tuning of course (but not sure what your requirement are, so hard to advice.)
About the "errors" -- for the first one, you exception handler should restart the StreamThread and thus you should get recovery? Given you 7 sec commit interval, it does not sound too bad with regard to retrying a bunch of data. Or is this overall behavior not good enough? -- Btw: what version are you using (both broker / KS): newer version got significant fixes to make EOS more robust.
For the warn logs: I am not a broker expert, so not sure why the leader moved, but in general it's a "normal" thing and it seem the client did handle it well, refreshing it's metadata. It's no error but a warn log, so wondering why it bothers you?
@mjsax thank you for your reply!
you exception handler should restart the StreamThread and thus you should get recovery?
I expect yes, because of return StreamThreadExceptionResponse.REPLACE_THREAD;
. or should I do something else to recover failed thread? I also can see that consumer for failed thread is working (not sure if it was recovered one in new thread or it is just wasn't failed), but producer is not.
Given you 7 sec commit interval, it does not sound too bad with regard to retrying a bunch of data. Or is this overall behavior not good enough?
it performs well until an error occurs. and after error producer is not able to continue producing. and error The producer attempted to use a producer id which is not currently assigned to its transactional id
sounds for me like TX is already done (doesn't matter was it committed or aborted), but producer is still trying to run TX with same ID.
Btw: what version are you using (both broker / KS): newer version got significant fixes to make EOS more robust.
we use 3.5 Kafka brokers (still on ZooKeeper) and Kafka Streams
but in general it's a "normal" thing and it seem the client did handle it well, refreshing it's metadata
yeah, I also understand that it should be normal thing, but there were a lot errors and long retries. but I believe because of long delivery.timeout.ms: 1800000
. there also was severe degradation on brokers side, unfortunately, I've already lost logs, but for bunch of topics leaders were skewed and data was under-replicated. maybe it happened because of long retries...
but not sure what your requirement are, so hard to advice
what specific information would you like to know to be able to advice something?
Thanks. -- When a thread is replace, we literally start a new one which creates a new producer and calls initTransaction()
what should put everything into clean state. There is one known issue (https://issues.apache.org/jira/browse/KAFKA-14567) which results in fencing, but it seems to be different to your report.
Guess we would need detailed Kafka Streams, producer, and broker logs to figure out what is happening.
Not sure if the high TX-timeout you configures has anything to do with it 🤔. On the other hand, even with the large timeout, calling initTransaction
on the new producer (if we re-use a TX-id as mentioned on the ticket above) should abort any pending TX for this TID and we should get a clean producer state. -- We might need to get some input from EOS experts. \cc @jolshan
hey @mjsax @jolshan.
sorry for the delayed response.
here are application logs from all instances and also some metrics.
app was running on 6 servers with 6 threads on each, so 36 threads in total.
after ~2 hours something went wrong and 4 producers (one on each of 1, 2, 5, 6 hosts) could no longer send messages.
app_logs_hosts1,2.zip
app_logs_hosts5,6.zip
app-logs_hosts3,4.zip
Hey sorry for the delay. Catching up on the situation.
I will try to answer the questions posed to me, but let me know if I missed something.
-
(is this even a good idea?)
In theory I think it is doable, but it could be tricky depending on the load for specific partitions and configurations. -
(how Kafka handles multiple ongoing transactions at time on one specific partition?)
So a single producer can only work on a single transaction at a time. This transaction can contain multiple partitions. Other producers can also concurrently write to the same partitions as part of separate transactions. So I'm not exactly sure how your Thread maps to producer instances, but each transaction needs its own producer.
As for writing, there is no blocking writes on the same partition. We do block read_committed reads. Each partition has something called the LSO. This is the first offset in the log that contains an uncommitted transaction. Read-committed consumers can not read past the LSO, so if you have transaction A and transaction B in the log like this:
(A message 1) (B message 1) (B message 2) (B message 3) (B commit)
The LSO would be at A message 1 and even though B is committed, you would not be able to read it until A is also committed.
(A message 1) (B message 1) (B message 2) (B message 3) (B commit) (C message 1) (A commit)
At this point LSO is at C message 1 and A and B transactions are readable.
- See 1. :)
As for some of the followups I saw:
The leadership errors should be retriable on metadata refresh and you shouldn't have to worry about that.
InvalidPidMapping happens when the transactional id expires. This happens when a transactional producer is left idle for the expiration period (transactional.id.expiration.ms) and the default is 7 days. (Note: the time starts when the transaction is idle, and we will never expire when the transaction is ongoing.)
As for the timeouts, I don't think it is good for your transactional timeout to be less than your delivery timeout.
Thanks @jolshan
So I'm not exactly sure how your Thread maps to producer instances, but each transaction needs its own producer.
With EOSv1 (deprecated) there would be a producer per task, and a StreamThread might have multiple assigned tasks. For EOSv2, there is one producer per StreamThread.
thank you for your replies @jolshan @mjsax!
@jolshan as for transactional.id.expiration.ms
, we have default value for it.
also, regarding transactional timeout, it was more than delivery timeout for my last run (details are here).
and still there were a lot of InvalidPidMappingException's (for instance, on host2, logs are provided above), as well as ProducerFencedException's (for instance, on host1). for some reasons producers were not able to recover.
as far as I understand, ProducerFencedException
is something that can happen time to time, but InvalidPidMappingException
doesn't seem so. and InvalidPidMappingException
might be the root of problem, but it's only a suggestion.