doujiang24/lua-resty-kafka

some msg lose with error buffered messages send to kafka err: not found topic

dean-river opened this issue · 0 comments

i use-resty-kafka-lua two recive http request and upload msg to kafka, i can recive some msg in kafka, but not all. some is lost. nginx error.log report some error like bellow (i change nginx error log level to info) , when i use logstash http input plugin and kafka output plugin there is no msg loss. i don't know why!

2021/05/13 16:59:27 [error] 41485#0: *134288 [lua] producer.lua:272: buffered messages send to kafka err: not found topic, retryable: true, topic: dp_live_push_origin_quality_raw, partition_id: -1, length: 1, context: ngx.timer, client: 10.71.52.217, server: 0.0.0.0:8888

this is the way i use lua-resty-kafka, every request will use send_msg, dp_live_mcu_push_origin_quality_raw and dp_live_push_origin_quality_raw topic all have 300 brokers

`
local _M = {}
_M.topics_broker_list = {
test = {
{ host = "127.0.0.1", port = 9092 },
},

dp_live_mcu_push_origin_quality_raw = {
    { host = "dataarch-kafka-h1.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h2.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h3.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h4.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h5.idczw.hb1.kwaidc.com", port = 9092},
},

dp_live_push_origin_quality_raw = {
    { host = "dataarch-kafka-h1.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h2.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h3.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h4.idczw.hb1.kwaidc.com", port = 9092},
    { host = "dataarch-kafka-h5.idczw.hb1.kwaidc.com", port = 9092},
},

}

_M.log_type_2_topic = {
test = "test",
mcu = "dp_live_mcu_push_origin_quality_raw",
srs = "dp_live_push_origin_quality_raw",
}

_M.log_type_2_log_id = {
test = "test",
mcu = "logUploadKafkaTime",
srs = "logUploadKafkaTime",
}

-- return 0 ok
-- return 1 data error
-- return 2 server error
function _M.send_msg(json_data)
local cjson = require "cjson"
local producer = require "resty.kafka.producer"
local log_type = json_data["@log_type"]

if not log_type then
    return 1, "@log_type not exits"
end

local topic = _M.log_type_2_topic[log_type]
if not topic then
    return 1, "@log_type not config"
end

local broker_list = _M.topics_broker_list[topic]
local bp = producer:new(broker_list, { producer_type = "async" })
local json_str = cjson.encode(json_data)
local ok, err = bp:send(topic, nil, json_str)

if not ok then
    ngx.log(ngx.ERR, "send err=", err, "msg=", json_str)
    return 2, "send kafka error"
end

return 0, "ok"

end

`