(stream) add InfluxDB 2 support
Opened this issue · 12 comments
Hi,
Is there a plan to take support for InfluxDB 2 ?
API v1 run but InfluxDB ask to switch to the v2.
Thanks.
Hi, yes why not. Do you have any pointer to the v2 InfluxDB documentation please?
Hello
We already have talked on slak ...
I'm tring to work on influxdbV2
Using librairies (sc_events), and a event (category=3 and element=1) I'm able to retrieve the information.
But the cache is empty (event if the script has call the function is_valid_event()
I'm pretty sure that I need to do something else
Here the sample of an event :
{ ["sc_logger"] = { ["logfile"] = /var/log/centreon-broker/influxdb-metrics-apiv2.log,["severity"] = 3,} ,["params"] = { ["host_severity_operator"] = >=,["accepted_authors"] = ,["__internal_ts_last_flush"] = 1624450359,["accepted_servicegroups"] = ,["max_buffer_size"] = 1,["ack_service_status"] = 0,1,2,3,["influxdb2_config"] = { } ,["skip_anon_events"] = 1,["state_type_mapping"] = { [0] = SOFT,[1] = HARD,} ,["service_status"] = 0,1,2,3,["hard_only"] = 1,["category_mapping"] = { ["correlation"] = 4,["bbdo"] = 2,["extcmd"] = 7,["bam"] = 6,["neb"] = 1,["dumper"] = 5,["storage"] = 3,} ,["local_time_diff_from_utc"] = 3600.0,["host_status"] = 0,1,2,["accepted_bvs"] = ,["element_mapping"] = { [1] = { ["log_entry"] = 17,["host_dependency"] = 9,["host"] = 12,["host_check"] = 8,["instance_configuration"] = 25,["service_group"] = 21,["flapping_status"] = 7,["comment"] = 2,["host_parent"] = 13,["service_status"] = 24,["acknowledgement"] = 1,["service"] = 23,["service_group_member"] = 22,["service_dependency"] = 20,["event_handler"] = 6,["service_check"] = 19,["custom_variable_status"] = 4,["host_group_member"] = 11,["custom_variable"] = 3,["host_status"] = 14,["module"] = 18,["host_group"] = 10,["instance"] = 15,["instance_status"] = 16,["downtime"] = 5,} ,[6] = { ["dimension_ba_timeperiod_relation"] = 14,["meta_service_status"] = 3,["dimension_timeperiod"] = 13,["ba_status"] = 1,["dimension_bv_event"] = 10,["dimension_truncate_table_signal"] = 11,["bam_rebuild"] = 12,["ba_event"] = 4,["inherited_downtime"] = 17,["dimension_timeperiod_exclusion"] = 16,["dimension_ba_event"] = 7,["dimension_kpi_event"] = 8,["kpi_status"] = 2,["ba_duration_event"] = 6,["dimension_timeperiod_exception"] = 15,["kpi_event"] = 5,["dimension_ba_bv_relation_event"] = 9,} ,[3] = { ["remove_graph"] = 3,["metric"] = 1,["rebuild"] = 2,["metric_mapping"] = 6,["status"] = 4,["index_mapping"] = 5,} ,} ,["timestamp_conversion_format"] = %Y-%m-%d %X,["influx_org"] = Serca,["ba_status"] = 0,1,2,["dt_host_status"] = 0,1,2,["acknowledged"] = 0,["influx_token"] = EXRksyBOp4HrpPqekVleVNPZTk80MaEN2gwe2sED2vzIwoYiJNVr5jcdSl8H11YEEUo0p8dQdVrSZZ0EqFb7mg==,["influx_bucket"] = Initial_Bucket,["skip_nil_id"] = 1,["accepted_pollers"] = ,["validatedEvents"] = { } ,["status_mapping"] = { [1] = { [24] = { [0] = OK,[1] = WARNING,[2] = CRITICAL,[3] = UNKNOWN,} ,[5] = { [2] = { [0] = UP,[1] = DOWN,[2] = UNREACHABLE,} ,[1] = { [0] = OK,[1] = WARNING,[2] = CRITICAL,[3] = UNKNOWN,} ,} ,[14] = { [0] = UP,[1] = DOWN,[2] = UNREACHABLE,} ,} ,[6] = { [1] = { [0] = OK,[1] = WARNING,[2] = CRITICAL,} ,} ,[3] = { } ,} ,["max_buffer_age"] = 5,["ack_host_status"] = 0,1,2,["enable_service_status_dedup"] = 0,["enable_host_status_dedup"] = 0,["influx_host"] = localhost,["accepted_categories"] = storage,["accepted_elements"] = metric,["in_downtime"] = 0,["service_severity_operator"] = >=,["max_stored_events"] = 10,["accepted_hostgroups"] = ,["dt_service_status"] = 0,1,2,3,} ,["sc_broker"] = { ["logger"] = { ["logfile"] = /var/log/centreon-broker/influxdb-metrics-apiv2.log,["severity"] = 3,} ,} ,["event"] = { ["rrd_len"] = 15552000,["interval"] = 120,["ctime"] = 1624450376,["element"] = 1,["category"] = 3,["service_id"] = 202,["value_type"] = 0,["name"] = Failures,["host_id"] = 27,["cache"] = { } ,["is_for_rebuild"] = false,["value"] = 0.0,["_type"] = 196609,["metric_id"] = 752,} ,["sc_common"] = { ["logger"] = { ["logfile"] = /var/log/centreon-broker/influxdb-metrics-apiv2.log,["severity"] = 3,} ,} ,}
Hello,
here are a few information you need to know.
You are talking about category 3 and element 1. This means that you want to send a metric event from the storage category. Here is the bad news. This is deprecated (and by deprecated, here is the code about that :
--- is_valid_storage: DEPRECATED method, use NEB category to get metric data instead
-- @return true (boolean)
function ScEvent:is_valid_storage_event()
return true
end
If you want to get your metrics you'll need to use category 1, element 24 (service_status event from the neb cateogry)
in this neb event you have the perfdata information (here are all the information that are in a service_status event)
Knowing that, you can use the broker.parse_perfdata() method documented here
local perf, err_str = broker.parse_perfdata("pl=45%;40;80;0;100", true)
if perf then
print("Content of 'pl'")
for i,v in pairs(perf['pl']) do
print(i .. " => " .. tostring(v))
end
else
print("parse_perfdata error: " .. err_str)
end
will output
Content of 'pl'
value => 45
uom => %
min => 0
max => 100
warning_low => 0
warning_high => 40
warning_mode => false
critical_low => 0
critical_high => 80
critical_mode => false
based on all the above information, you can now create your stream connector like an event stream connector. You just need to format your data in order to send the metrics data instead of the status information and such things.
Have a nice day
Working LUA Script
#!/usr/bin/lua
local sc_common = require("centreon-stream-connectors-lib.sc_common")
local sc_logger = require("centreon-stream-connectors-lib.sc_logger")
local sc_broker = require("centreon-stream-connectors-lib.sc_broker")
local sc_event = require("centreon-stream-connectors-lib.sc_event")
local sc_params = require("centreon-stream-connectors-lib.sc_params")
local EventQueue = {}
function dump(o)
if type(o) == 'table' then
local s = '{ '
for k,v in pairs(o) do
if type(k) ~= 'number' then k = '"'..k..'"' end
s = s .. '['..k..'] = ' .. dump(v) .. ','
end
return s .. '} '
else
return tostring(o)
end
end
function EventQueue.new(params)
local self = {}
-- initiate EventQueue variables
self.events = {}
self.fail = false
-- set up log configuration
local logfile = params.logfile or "/var/log/centreon-broker/influxdb-metrics-apiv2.log"
local log_level = params.log_level or 3
-- initiate mandatory objects
self.sc_logger = sc_logger.new(logfile, log_level)
self.sc_common = sc_common.new(self.sc_logger)
self.sc_broker = sc_broker.new(self.sc_logger)
self.sc_params = sc_params.new(self.sc_common, self.sc_logger)
-- initiate parameters dedicated to this stream connector
self.sc_params.params.influx_host = params.influx_host
self.sc_params.params.influx_org = params.influx_org
self.sc_params.params.influx_bucket = params.influx_bucket
self.sc_params.params.influx_token = params.influx_token
-- overriding default parameters for this stream connector
params.accepted_categories = "neb"
params.accepted_elements = "service_status"
-- checking mandatory parameters and setting a fail flag
if not params.influx_host then
self.sc_logger:error("influx_host is a mandatory parameter")
self.fail = true
end
if not params.influx_org then
self.sc_logger:error("influx_org is a mandatory parameter")
self.fail = true
end
if not params.influx_bucket then
self.sc_logger:error("influx_bucket is a mandatory parameter")
self.fail = true
end
if not params.influx_token then
self.sc_logger:error("influx_token is a mandatory parameter")
self.fail = true
end
-- apply users params and check syntax of standard ones
self.sc_params:param_override(params)
self.sc_params:check_params()
-- return EventQueue object
setmetatable(self, { __index = EventQueue })
return self
end
--------------------------------------------------------------------------------
-- EventQueue:send_data, send data to external tool
-- @return (boolean)
--------------------------------------------------------------------------------
function EventQueue:send_data ()
local http = require"socket.http"
local ltn12 = require"ltn12"
local respbody = {}
local data = self.sc_event.event.formated_event
local reqbody = "centreon_metrics,host="
local temp_reqbody
local influx_url = "http://" .. self.sc_params.params.influx_host .. ":8086/api/v2/write?org="
influx_url = influx_url .. self.sc_params.params.influx_org .. "&bucket=" .. self.sc_params.params.influx_bucket
local auth = '["Authorization"] = "Token ' .. self.sc_params.params.influx_token .. '"'
--self.sc_logger:debug("EventQueue:send_data: Write to InfluxDB " .. tostring(influx_url))
--self.sc_logger:debug("EventQueue:send_data: Write to InfluxDB " .. tostring(auth))
reqbody = reqbody .. data.my_host .. ",description=" .. data.my_description
-- Warning if the description contain a space we need to use gsub(" ", "\\ ") to add a backslash otherwise influxdb will failed to insert
self.sc_logger:debug("EventQueue:send_data: reqbody " .. tostring(reqbody))
-- self.sc_logger:debug("EventQueue:send_data: perf " .. tostring(dump(data.my_perf)))
for k,v in pairs(data.my_perf) do
temp_reqbody=reqbody .. " " .. k .. "="
for i,j in pairs(v)
do
if i == "value" then
temp_reqbody=temp_reqbody .. j .. " " .. os.time() .. "000000000 "
end
end
self.sc_logger:debug("EventQueue:send_data: perf " .. tostring(dump(k)) .. " " .. tostring(temp_reqbody))
local result, respcode, respheaders, respstatus = http.request {
method = "POST",
url = influx_url,
source = ltn12.source.string(temp_reqbody),
headers = {
["Authorization"] = "Token xxxxxxxxxx",
-- auth ,
["Content-Type"] = "text/plain; charset=utf-8",
["content-length"] = tostring(#temp_reqbody)
},
sink = ltn12.sink.table(respbody)
}
-- -- get body as string by concatenating table filled by sink
-- self.sc_logger:debug("EventQueue:send_data: url result " .. tostring(dump(respbody)))
self.sc_logger:debug("EventQueue:send_data: url result " .. tostring(dump(result)))
-- respbody = table.concat(respbody)
end
return true
end
--------------------------------------------------------------------------------
-- EventQueue:flush, flush stored events
-- Called when the max number of events or the max age are reached
-- @return (boolean)
--------------------------------------------------------------------------------
function EventQueue:flush ()
self.sc_logger:debug("EventQueue:flush: Concatenating all the events as one string")
-- send stored events
retval = self:send_data()
-- reset stored events list
self.events = {}
-- and update the timestamp
self.sc_params.params.__internal_ts_last_flush = os.time()
return retval
end
--------------------------------------------------------------------------------
-- EventQueue:format_event, build your own table with the desired information
-- @return true (boolean)
--------------------------------------------------------------------------------
function EventQueue:format_event()
-- starting to handle shared information between host and service
self.sc_event.event.formated_event = {
-- name of host has been stored in a cache table when calling is_valid_even()
my_host = self.sc_event.event.cache.host.name,
my_description = self.sc_event.event.cache.service.description,
my_perf = {}
}
local perf, err_str = broker.parse_perfdata(self.sc_event.event.perfdata, true)
if perf then
self.sc_event.event.formated_event.my_perf = perf
end
self:add()
return true
end
--------------------------------------------------------------------------------
-- EventQueue:add, add an event to the sending queue
--------------------------------------------------------------------------------
function EventQueue:add ()
-- store event in self.events list
self.events[#self.events + 1] = self.sc_event.event.formated_event
end
local queue
function init(params)
queue = EventQueue.new(params)
end
function write(event)
-- skip event if a mandatory parameter is missing
if queue.fail then
queue.sc_logger:error("Skipping event because a mandatory parameter is not set")
return true
end
-- initiate event object
queue.sc_event = sc_event.new(event, queue.sc_params.params, queue.sc_common, queue.sc_logger, queue.sc_broker)
-- drop event if wrong category
if not queue.sc_event:is_valid_category() then
return true
end
-- drop event if wrong element
if not queue.sc_event:is_valid_element() then
return true
end
-- First, are there some old events waiting in the flush queue ?
if (#queue.events > 0 and os.time() - queue.sc_params.params.__internal_ts_last_flush > queue.sc_params.params.max_buffer_age) then
queue.sc_logger:debug("write: Queue max age (" .. os.time() - queue.sc_params.params.__internal_ts_last_flush .. "/" .. queue.sc_params.params.max_buffer_age .. ") is reached, flushing data")
queue:flush()
end
-- Then we check that the event queue is not already full
if (#queue.events >= queue.sc_params.params.max_buffer_size) then
queue.sc_logger:debug("write: Queue max size (" .. #queue.events .. "/" .. queue.sc_params.params.max_buffer_size .. ") is reached BEFORE APPENDING AN EVENT, trying to flush data before appending more events, after 1 second pause.")
os.execute("sleep " .. tonumber(1))
queue:flush()
end
-- drop event if it is not validated
if queue.sc_event:is_valid_event() then
queue:format_event()
else
return true
end
-- if queue.sc_event:is_valid_event() then
-- queue:format_event()
-- else
-- queue.sc_logger:debug("Not a valid event")
-- return true
-- end
-- Then we check whether it is time to send the events to the receiver and flush
if (#queue.events >= queue.sc_params.params.max_buffer_size) then
queue.sc_logger:debug("write: Queue max size (" .. #queue.events .. "/" .. queue.sc_params.params.max_buffer_size .. ") is reached, flushing data")
queue:flush()
end
return true
end
Hello,
thanks for that, i've edited your comment to put it in a code section and I've removed the token that was in the code. In the comming days or month I'll put it in our official repo. with some changes to make it uses our latest features.
is there a company that must be credited in addition to you ?
:) No, put uakm2201 and I'll be happy
Here is the 2 (new)
scripts.zip
scripts with influxDB2 parameters for metrics and status.
Some mistakes on the first script (Authentication was false, when the buffer was more than 1, the queue was not treated.)
As stated elsewhere, would be nice to keep the various upcoming InfluxDB Stream Connectors compatible with the current one.
It has various specificities in terms of (perf)data processing which we must keep supporting, to avoid breaking things.
Many thx 👍
The current one is not working with influx v2, that 's because I push this one ...
I've pushed, sorry
Here is the 2 (new) scripts.zip scripts with influxDB2 parameters for metrics and status. Some mistakes on the first script (Authentication was false, when the buffer was more than 1, the queue was not treated.)
Thanks for the files.
Can you describe how you configure Centreon to use lua files with InfluxDB2.
I get the following error
[lua] [error] lua: error running function `write' /usr/share/centreon-broker/lua/influxdb-metrics-apiv2.lua:148: attempt to call a nil value (global 'dump')
Thanks in advance.