some msg lose with error buffered messages send to kafka err: not found topic
dean-river opened this issue · 0 comments
i use-resty-kafka-lua two recive http request and upload msg to kafka, i can recive some msg in kafka, but not all. some is lost. nginx error.log report some error like bellow (i change nginx error log level to info) , when i use logstash http input plugin and kafka output plugin there is no msg loss. i don't know why!
2021/05/13 16:59:27 [error] 41485#0: *134288 [lua] producer.lua:272: buffered messages send to kafka err: not found topic, retryable: true, topic: dp_live_push_origin_quality_raw, partition_id: -1, length: 1, context: ngx.timer, client: 10.71.52.217, server: 0.0.0.0:8888
this is the way i use lua-resty-kafka, every request will use send_msg, dp_live_mcu_push_origin_quality_raw and dp_live_push_origin_quality_raw topic all have 300 brokers
`
local _M = {}
_M.topics_broker_list = {
test = {
{ host = "127.0.0.1", port = 9092 },
},
dp_live_mcu_push_origin_quality_raw = {
{ host = "dataarch-kafka-h1.idczw.hb1.kwaidc.com", port = 9092},
{ host = "dataarch-kafka-h2.idczw.hb1.kwaidc.com", port = 9092},
{ host = "dataarch-kafka-h3.idczw.hb1.kwaidc.com", port = 9092},
{ host = "dataarch-kafka-h4.idczw.hb1.kwaidc.com", port = 9092},
{ host = "dataarch-kafka-h5.idczw.hb1.kwaidc.com", port = 9092},
},
dp_live_push_origin_quality_raw = {
{ host = "dataarch-kafka-h1.idczw.hb1.kwaidc.com", port = 9092},
{ host = "dataarch-kafka-h2.idczw.hb1.kwaidc.com", port = 9092},
{ host = "dataarch-kafka-h3.idczw.hb1.kwaidc.com", port = 9092},
{ host = "dataarch-kafka-h4.idczw.hb1.kwaidc.com", port = 9092},
{ host = "dataarch-kafka-h5.idczw.hb1.kwaidc.com", port = 9092},
},
}
_M.log_type_2_topic = {
test = "test",
mcu = "dp_live_mcu_push_origin_quality_raw",
srs = "dp_live_push_origin_quality_raw",
}
_M.log_type_2_log_id = {
test = "test",
mcu = "logUploadKafkaTime",
srs = "logUploadKafkaTime",
}
-- return 0 ok
-- return 1 data error
-- return 2 server error
function _M.send_msg(json_data)
local cjson = require "cjson"
local producer = require "resty.kafka.producer"
local log_type = json_data["@log_type"]
if not log_type then
return 1, "@log_type not exits"
end
local topic = _M.log_type_2_topic[log_type]
if not topic then
return 1, "@log_type not config"
end
local broker_list = _M.topics_broker_list[topic]
local bp = producer:new(broker_list, { producer_type = "async" })
local json_str = cjson.encode(json_data)
local ok, err = bp:send(topic, nil, json_str)
if not ok then
ngx.log(ngx.ERR, "send err=", err, "msg=", json_str)
return 2, "send kafka error"
end
return 0, "ok"
end
`