Reader.consume wont timeout
Closed this issue · 1 comments
I have a test that writes messages on to a kafka topic, waits for the message to be processed and written forward to another kafka topic. When trying to consume the new kafka message, the reader.consume function is not timing out. After an hour, the scenario timeout kills it.
Following is how I am setting up the connection, writer and reader objects.
export function setupKafka(inputTopic, outputTopic, kafka_url, group_ID){
return [new Connection({ address: kafka_url, tls:tlsConfig }),
new Writer({ brokers: [kafka_url], topic: inputTopic, tls:tlsConfig }),
new Reader({ brokers: [kafka_url], tls:tlsConfig,
groupID: group_ID,
groupTopics: [outputTopic],
connectLogger: true,
heartbeatInterval: 200,
readBatchTimeout: 30000000000,
maxAttempt: 2,
maxWait: 30000000000,
sessionTimeout: 30000000000
})]
To verify the error case, I was intentionally reading an empty topic. The output on the console is an endless stream of following message
INFO[0041] no messages received from kafka within the allocated time for partition 0 of event-topic at offset 0: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
Reading through the documentation, I was hoping that sessionTimeout or maxWait or maxAttempts would do the trick but I had no success.
I also tried using only the broker, groupID, groupTopics and conectLogger arguments, still same.
export function setupKafka(inputTopic, outputTopic, kafka_url, group_ID){
return [new Connection({ address: kafka_url, tls:tlsConfig }),
new Writer({ brokers: [kafka_url], topic: inputTopic, tls:tlsConfig }),
new Reader({ brokers: [kafka_url], tls:tlsConfig,
groupID: group_ID,
groupTopics: [outputTopic],
connectLogger: true,
})]
Version of extension 0.19.0
What am I missing here? Any pointers will be helpful.
Thanks.