Kafka "Local: Message timed out"
alexec opened this issue · 5 comments
alexec commented
alexec commented
msg="connecting sink" sink="{\"name\":\"default\",\"kafka\":{\"name\":\"default\",\"brokers\":[\"eventbus-kafka.pl-data-lake-e2e.a.intuit.com:19701\",\"eventbus-kafka.pl-data-lake-e2e.a.intuit.com:19801\",\"eventbus-kafka.pl-data-lake-e2e.a.intuit.com:19901\"],\"net\":{\"tls\":{}},\"topic\":\"ip-apigw-argo2-druid\",\"async\":true,\"batchSize\":\"100Ki\",\"linger\":\"1s\",\"compressionType\":\"lz4\",\"acks\":\"all\",\"enableIdempotence\":true}}"
time="2021-11-19T19:20:00Z" level=info msg="kafka config" config="{\"acks\":\"all\",\"batch.size\":102400,\"bootstrap.servers\":\"eventbus-***.com:19701,eventbus-***.com:19801,eventbus-***.com:19901\",\"compression.type\":\"lz4\",\"enable.idempotence\":true,\"go.logs.channel.enable\":true,\"linger.ms\":1000,\"retries\":2147483647,\"security.protocol\":\"ssl\"}" sink=default
alexec commented
alexec commented
Because librdkafka has put your message on its outbound queue and will try to recover from the error for message.timeout.ms before reporting failure. Delivery failures are reported asynchronously; meanwhile your program can be notified (also asynchronously) about connection issues if it registers an error callback.
alexec commented
I think the problem is this:
- Messages are sent to Kafka async.
- They timeout after 30s (
message.timeout.ms=30000
).
This leads to two problems:
- We ack messages before we know they are sunk.
- We don't allow enough time for sinking.
This is related to the historical assumption that sinking would be synchronous. Instead:
- Enable infinite retry for Kafka.
*we should give sink aAckCallback