centreon/centreon-stream-connector-scripts

(stream) add InfluxDB 2 support

Opened this issue · 12 comments

Hi,

Is there a plan to take support for InfluxDB 2 ?
API v1 run but InfluxDB ask to switch to the v2.

Thanks.

Hi, yes why not. Do you have any pointer to the v2 InfluxDB documentation please?

Hello
We already have talked on slak ...
I'm tring to work on influxdbV2

Using librairies (sc_events), and a event (category=3 and element=1) I'm able to retrieve the information.
But the cache is empty (event if the script has call the function is_valid_event()

I'm pretty sure that I need to do something else

Here the sample of an event :

{ ["sc_logger"] = { ["logfile"] = /var/log/centreon-broker/influxdb-metrics-apiv2.log,["severity"] = 3,} ,["params"] = { ["host_severity_operator"] = >=,["accepted_authors"] = ,["__internal_ts_last_flush"] = 1624450359,["accepted_servicegroups"] = ,["max_buffer_size"] = 1,["ack_service_status"] = 0,1,2,3,["influxdb2_config"] = { } ,["skip_anon_events"] = 1,["state_type_mapping"] = { [0] = SOFT,[1] = HARD,} ,["service_status"] = 0,1,2,3,["hard_only"] = 1,["category_mapping"] = { ["correlation"] = 4,["bbdo"] = 2,["extcmd"] = 7,["bam"] = 6,["neb"] = 1,["dumper"] = 5,["storage"] = 3,} ,["local_time_diff_from_utc"] = 3600.0,["host_status"] = 0,1,2,["accepted_bvs"] = ,["element_mapping"] = { [1] = { ["log_entry"] = 17,["host_dependency"] = 9,["host"] = 12,["host_check"] = 8,["instance_configuration"] = 25,["service_group"] = 21,["flapping_status"] = 7,["comment"] = 2,["host_parent"] = 13,["service_status"] = 24,["acknowledgement"] = 1,["service"] = 23,["service_group_member"] = 22,["service_dependency"] = 20,["event_handler"] = 6,["service_check"] = 19,["custom_variable_status"] = 4,["host_group_member"] = 11,["custom_variable"] = 3,["host_status"] = 14,["module"] = 18,["host_group"] = 10,["instance"] = 15,["instance_status"] = 16,["downtime"] = 5,} ,[6] = { ["dimension_ba_timeperiod_relation"] = 14,["meta_service_status"] = 3,["dimension_timeperiod"] = 13,["ba_status"] = 1,["dimension_bv_event"] = 10,["dimension_truncate_table_signal"] = 11,["bam_rebuild"] = 12,["ba_event"] = 4,["inherited_downtime"] = 17,["dimension_timeperiod_exclusion"] = 16,["dimension_ba_event"] = 7,["dimension_kpi_event"] = 8,["kpi_status"] = 2,["ba_duration_event"] = 6,["dimension_timeperiod_exception"] = 15,["kpi_event"] = 5,["dimension_ba_bv_relation_event"] = 9,} ,[3] = { ["remove_graph"] = 3,["metric"] = 1,["rebuild"] = 2,["metric_mapping"] = 6,["status"] = 4,["index_mapping"] = 5,} ,} ,["timestamp_conversion_format"] = %Y-%m-%d %X,["influx_org"] = Serca,["ba_status"] = 0,1,2,["dt_host_status"] = 0,1,2,["acknowledged"] = 0,["influx_token"] = EXRksyBOp4HrpPqekVleVNPZTk80MaEN2gwe2sED2vzIwoYiJNVr5jcdSl8H11YEEUo0p8dQdVrSZZ0EqFb7mg==,["influx_bucket"] = Initial_Bucket,["skip_nil_id"] = 1,["accepted_pollers"] = ,["validatedEvents"] = { } ,["status_mapping"] = { [1] = { [24] = { [0] = OK,[1] = WARNING,[2] = CRITICAL,[3] = UNKNOWN,} ,[5] = { [2] = { [0] = UP,[1] = DOWN,[2] = UNREACHABLE,} ,[1] = { [0] = OK,[1] = WARNING,[2] = CRITICAL,[3] = UNKNOWN,} ,} ,[14] = { [0] = UP,[1] = DOWN,[2] = UNREACHABLE,} ,} ,[6] = { [1] = { [0] = OK,[1] = WARNING,[2] = CRITICAL,} ,} ,[3] = { } ,} ,["max_buffer_age"] = 5,["ack_host_status"] = 0,1,2,["enable_service_status_dedup"] = 0,["enable_host_status_dedup"] = 0,["influx_host"] = localhost,["accepted_categories"] = storage,["accepted_elements"] = metric,["in_downtime"] = 0,["service_severity_operator"] = >=,["max_stored_events"] = 10,["accepted_hostgroups"] = ,["dt_service_status"] = 0,1,2,3,} ,["sc_broker"] = { ["logger"] = { ["logfile"] = /var/log/centreon-broker/influxdb-metrics-apiv2.log,["severity"] = 3,} ,} ,["event"] = { ["rrd_len"] = 15552000,["interval"] = 120,["ctime"] = 1624450376,["element"] = 1,["category"] = 3,["service_id"] = 202,["value_type"] = 0,["name"] = Failures,["host_id"] = 27,["cache"] = { } ,["is_for_rebuild"] = false,["value"] = 0.0,["_type"] = 196609,["metric_id"] = 752,} ,["sc_common"] = { ["logger"] = { ["logfile"] = /var/log/centreon-broker/influxdb-metrics-apiv2.log,["severity"] = 3,} ,} ,}

Hello,

here are a few information you need to know.

You are talking about category 3 and element 1. This means that you want to send a metric event from the storage category. Here is the bad news. This is deprecated (and by deprecated, here is the code about that :

--- is_valid_storage: DEPRECATED method, use NEB category to get metric data instead
-- @return true (boolean)
function ScEvent:is_valid_storage_event()
  return true
end

If you want to get your metrics you'll need to use category 1, element 24 (service_status event from the neb cateogry)

in this neb event you have the perfdata information (here are all the information that are in a service_status event)

Knowing that, you can use the broker.parse_perfdata() method documented here

  local perf, err_str = broker.parse_perfdata("pl=45%;40;80;0;100", true)

  if perf then
    print("Content of 'pl'")
    for i,v in pairs(perf['pl']) do
      print(i .. " => " .. tostring(v))
    end
  else
    print("parse_perfdata error: " .. err_str)
  end

will output

  Content of 'pl'
  value => 45
  uom => %
  min => 0
  max => 100
  warning_low => 0
  warning_high => 40
  warning_mode => false
  critical_low => 0
  critical_high => 80
  critical_mode => false

based on all the above information, you can now create your stream connector like an event stream connector. You just need to format your data in order to send the metrics data instead of the status information and such things.

Have a nice day

Working LUA Script

#!/usr/bin/lua

local sc_common = require("centreon-stream-connectors-lib.sc_common")
local sc_logger = require("centreon-stream-connectors-lib.sc_logger")
local sc_broker = require("centreon-stream-connectors-lib.sc_broker")
local sc_event = require("centreon-stream-connectors-lib.sc_event")
local sc_params = require("centreon-stream-connectors-lib.sc_params")

local EventQueue = {}

function dump(o)
   if type(o) == 'table' then
      local s = '{ '
      for k,v in pairs(o) do
         if type(k) ~= 'number' then k = '"'..k..'"' end
         s = s .. '['..k..'] = ' .. dump(v) .. ','
      end
      return s .. '} '
   else
      return tostring(o)
   end
end


function EventQueue.new(params)
  local self = {}

  -- initiate EventQueue variables
  self.events = {}
  self.fail = false

  -- set up log configuration
  local logfile = params.logfile or "/var/log/centreon-broker/influxdb-metrics-apiv2.log"
  local log_level = params.log_level or 3

  -- initiate mandatory objects
  self.sc_logger = sc_logger.new(logfile, log_level)
  self.sc_common = sc_common.new(self.sc_logger)
  self.sc_broker = sc_broker.new(self.sc_logger)
  self.sc_params = sc_params.new(self.sc_common, self.sc_logger)

  -- initiate parameters dedicated to this stream connector
  self.sc_params.params.influx_host = params.influx_host
  self.sc_params.params.influx_org = params.influx_org
  self.sc_params.params.influx_bucket = params.influx_bucket
  self.sc_params.params.influx_token = params.influx_token

  -- overriding default parameters for this stream connector
  params.accepted_categories = "neb"
  params.accepted_elements = "service_status"

  -- checking mandatory parameters and setting a fail flag
  if not params.influx_host then
  self.sc_logger:error("influx_host is a mandatory parameter")
        self.fail = true
  end
  if not params.influx_org then
  self.sc_logger:error("influx_org is a mandatory parameter")
        self.fail = true
  end
  if not params.influx_bucket then
  self.sc_logger:error("influx_bucket is a mandatory parameter")
        self.fail = true
  end
  if not params.influx_token then
  self.sc_logger:error("influx_token is a mandatory parameter")
        self.fail = true
  end

  -- apply users params and check syntax of standard ones
  self.sc_params:param_override(params)
  self.sc_params:check_params()

  -- return EventQueue object
  setmetatable(self, { __index = EventQueue })
  return self
end

--------------------------------------------------------------------------------
-- EventQueue:send_data, send data to external tool
-- @return (boolean)
--------------------------------------------------------------------------------
function EventQueue:send_data ()

    local http = require"socket.http"
    local ltn12 = require"ltn12"
    local respbody = {}
    local data = self.sc_event.event.formated_event
    local reqbody = "centreon_metrics,host="
    local temp_reqbody
    local influx_url = "http://" .. self.sc_params.params.influx_host .. ":8086/api/v2/write?org="
          influx_url = influx_url .. self.sc_params.params.influx_org .. "&bucket=" .. self.sc_params.params.influx_bucket

    local auth = '["Authorization"] = "Token ' .. self.sc_params.params.influx_token .. '"'

    --self.sc_logger:debug("EventQueue:send_data: Write to InfluxDB " .. tostring(influx_url))
    --self.sc_logger:debug("EventQueue:send_data: Write to InfluxDB " .. tostring(auth))

    reqbody = reqbody .. data.my_host .. ",description=" .. data.my_description

    -- Warning if the description contain a space we need to use gsub(" ", "\\ ") to add a backslash otherwise influxdb will failed to insert

    self.sc_logger:debug("EventQueue:send_data: reqbody " .. tostring(reqbody))
--    self.sc_logger:debug("EventQueue:send_data: perf " .. tostring(dump(data.my_perf)))

    for k,v in pairs(data.my_perf) do
  temp_reqbody=reqbody .. " " .. k .. "="
  for i,j in pairs(v)
  do
    if i == "value" then
      temp_reqbody=temp_reqbody .. j .. " " .. os.time() .. "000000000 "
    end
  end
  self.sc_logger:debug("EventQueue:send_data: perf " .. tostring(dump(k)) .. " " .. tostring(temp_reqbody))
        local result, respcode, respheaders, respstatus = http.request {
             method = "POST",
             url = influx_url,
             source = ltn12.source.string(temp_reqbody),
             headers = {
                  ["Authorization"] = "Token xxxxxxxxxx",
--                    auth ,
                    ["Content-Type"]  = "text/plain; charset=utf-8",
                    ["content-length"] = tostring(#temp_reqbody)
             },
        sink = ltn12.sink.table(respbody)
        }
--    -- get body as string by concatenating table filled by sink
--       self.sc_logger:debug("EventQueue:send_data: url result " .. tostring(dump(respbody)))
       self.sc_logger:debug("EventQueue:send_data: url result " .. tostring(dump(result)))
--       respbody = table.concat(respbody)
    end
    return true
end


--------------------------------------------------------------------------------
-- EventQueue:flush, flush stored events
-- Called when the max number of events or the max age are reached
-- @return (boolean)
--------------------------------------------------------------------------------
function EventQueue:flush ()
  self.sc_logger:debug("EventQueue:flush: Concatenating all the events as one string")

  -- send stored events
  retval = self:send_data()

  -- reset stored events list
  self.events = {}

  -- and update the timestamp
  self.sc_params.params.__internal_ts_last_flush = os.time()

  return retval
end

--------------------------------------------------------------------------------
-- EventQueue:format_event, build your own table with the desired information
-- @return true (boolean)
--------------------------------------------------------------------------------
function EventQueue:format_event()
  -- starting to handle shared information between host and service
  self.sc_event.event.formated_event = {
    -- name of host has been stored in a cache table when calling is_valid_even()
    my_host = self.sc_event.event.cache.host.name,
    my_description = self.sc_event.event.cache.service.description,
        my_perf = {}
  }
  local perf, err_str = broker.parse_perfdata(self.sc_event.event.perfdata, true)
  if perf then
  self.sc_event.event.formated_event.my_perf = perf
  end

  self:add()

  return true
end

--------------------------------------------------------------------------------
-- EventQueue:add, add an event to the sending queue
--------------------------------------------------------------------------------
function EventQueue:add ()
  -- store event in self.events list
  self.events[#self.events + 1] = self.sc_event.event.formated_event
end





local queue

function init(params)
  queue = EventQueue.new(params)
end

function write(event)
  -- skip event if a mandatory parameter is missing
  if queue.fail then
    queue.sc_logger:error("Skipping event because a mandatory parameter is not set")
    return true
  end
  
  -- initiate event object
  queue.sc_event = sc_event.new(event, queue.sc_params.params, queue.sc_common, queue.sc_logger, queue.sc_broker)

  -- drop event if wrong category
  if not queue.sc_event:is_valid_category() then
    return true
  end

  -- drop event if wrong element
  if not queue.sc_event:is_valid_element() then
    return true
  end

  -- First, are there some old events waiting in the flush queue ?
  if (#queue.events > 0 and os.time() - queue.sc_params.params.__internal_ts_last_flush > queue.sc_params.params.max_buffer_age) then
    queue.sc_logger:debug("write: Queue max age (" .. os.time() - queue.sc_params.params.__internal_ts_last_flush .. "/" .. queue.sc_params.params.max_buffer_age .. ") is reached, flushing data")
    queue:flush()
  end

  -- Then we check that the event queue is not already full
  if (#queue.events >= queue.sc_params.params.max_buffer_size) then
    queue.sc_logger:debug("write: Queue max size (" .. #queue.events .. "/" .. queue.sc_params.params.max_buffer_size .. ") is reached BEFORE APPENDING AN EVENT, trying to flush data before appending more events, after 1 second pause.")
    os.execute("sleep " .. tonumber(1))
    queue:flush()
  end


  -- drop event if it is not validated
  if queue.sc_event:is_valid_event() then
     queue:format_event()
  else
    return true
  end

--  if queue.sc_event:is_valid_event() then
--    queue:format_event()
--  else
--    queue.sc_logger:debug("Not a valid event")
--    return true
--  end



  -- Then we check whether it is time to send the events to the receiver and flush
  if (#queue.events >= queue.sc_params.params.max_buffer_size) then
    queue.sc_logger:debug("write: Queue max size (" .. #queue.events .. "/" .. queue.sc_params.params.max_buffer_size .. ") is reached, flushing data")
    queue:flush()
  end

  return true
end

Hello,

thanks for that, i've edited your comment to put it in a code section and I've removed the token that was in the code. In the comming days or month I'll put it in our official repo. with some changes to make it uses our latest features.

is there a company that must be credited in addition to you ?

:) No, put uakm2201 and I'll be happy

Here is the 2 (new)
scripts.zip
scripts with influxDB2 parameters for metrics and status.
Some mistakes on the first script (Authentication was false, when the buffer was more than 1, the queue was not treated.)

UrBnW commented

As stated elsewhere, would be nice to keep the various upcoming InfluxDB Stream Connectors compatible with the current one.
It has various specificities in terms of (perf)data processing which we must keep supporting, to avoid breaking things.
Many thx 👍

The current one is not working with influx v2, that 's because I push this one ...

I've pushed, sorry

gqs commented

Here is the 2 (new) scripts.zip scripts with influxDB2 parameters for metrics and status. Some mistakes on the first script (Authentication was false, when the buffer was more than 1, the queue was not treated.)

Thanks for the files.
Can you describe how you configure Centreon to use lua files with InfluxDB2.
I get the following error

[lua] [error] lua: error running function `write' /usr/share/centreon-broker/lua/influxdb-metrics-apiv2.lua:148: attempt to call a nil value (global 'dump')

Thanks in advance.