riemann/riemann

"Throttle" is not working as expected

Opened this issue · 5 comments

Describe the bug
As per the Riemann documentation, "Throttle" allows the first n events through, then ignores all the rest for the defined duration.
So I have used the throttle to send an email for the first 3 events and skip the rest for an hour. It was working when I implemented it, but recently only I noticed that instead of sending emails for the first 3 events, Riemann is sending 3 emails for the first event alone and skipping the rest of the events for an hour.

To Reproduce
Riemann config:
(streams (by [:host, :service] (where (state "critical") (where (tagged "dummy_alert") (throttle 3 3600 (email "my_email_id@email.com")) ) ) ) )

Event data:
{ host: "TESTING", service: "TESTING", state: 'critical', tags: ["dummy_alert"], ttl: 3600 }

Expected behavior
Emails should be sent for the first n events and the rest of the events should be skipped for the defined duration

Background (please complete the following information):

  • OS: Linux
  • openjdk version "1.8.0_332"
  • OpenJDK Runtime Environment (build 1.8.0_332-8u332-ga-1~deb9u1-b09)
  • OpenJDK 64-Bit Server VM (build 25.332-b09, mixed mode)
  • Riemann::Client is used to push data to Riemann (For Ruby)

Is it related to your (email ...)?

In order to test your code, I replaced it with #(info %):

(logging/init {:file "/var/log/riemann/riemann.log"})

(let [host "127.0.0.1"]
  (tcp-server {:host host})
  (udp-server {:host host})
  (ws-server  {:host host}))

; Expire old events from the index every 5 seconds.
(periodically-expire 5)

(let [index (index)]
  ; Inbound events will be passed to these streams:
  (streams
    (default :ttl 60
      ; Index all events immediately.
      ;index

      (by [:host :service]
          (where (state "critical")
                 (where (tagged "dummy_alert")
                        (throttle 3 30
                                  #(info %)))))

      ; Log expired events.
      (expired
        (fn [event] (info "expired" event))))))

Then send some events:

r = Riemann::Client.new.tcp
['n1', 'n2', 'n3'].each do |service|
  10.times do |n|
    r << {
           host: "TESTING",
           service: service,
           state: 'critical',
           tags: ["dummy_alert"],
           ttl: 3600,
           description: n,
         }
  end
end

and got the expected outcome in the log file:

INFO [2023-04-06 16:36:58,520] defaultEventExecutorGroup-2-3 - riemann.config - #riemann.codec.Event{:host TESTING, :service n1, :state critical, :description 0, :metric nil, :tags [dummy_alert], :time 1.680835018520078E9, :ttl 3600.0}
INFO [2023-04-06 16:36:58,522] defaultEventExecutorGroup-2-3 - riemann.config - #riemann.codec.Event{:host TESTING, :service n1, :state critical, :description 1, :metric nil, :tags [dummy_alert], :time 1.680835018521604E9, :ttl 3600.0}
INFO [2023-04-06 16:36:58,524] defaultEventExecutorGroup-2-3 - riemann.config - #riemann.codec.Event{:host TESTING, :service n1, :state critical, :description 2, :metric nil, :tags [dummy_alert], :time 1.680835018523878E9, :ttl 3600.0}
INFO [2023-04-06 16:36:58,531] defaultEventExecutorGroup-2-3 - riemann.config - #riemann.codec.Event{:host TESTING, :service n2, :state critical, :description 0, :metric nil, :tags [dummy_alert], :time 1.680835018531222E9, :ttl 3600.0}
INFO [2023-04-06 16:36:58,532] defaultEventExecutorGroup-2-3 - riemann.config - #riemann.codec.Event{:host TESTING, :service n2, :state critical, :description 1, :metric nil, :tags [dummy_alert], :time 1.680835018532433E9, :ttl 3600.0}
INFO [2023-04-06 16:36:58,533] defaultEventExecutorGroup-2-3 - riemann.config - #riemann.codec.Event{:host TESTING, :service n2, :state critical, :description 2, :metric nil, :tags [dummy_alert], :time 1.680835018533418E9, :ttl 3600.0}
INFO [2023-04-06 16:36:58,542] defaultEventExecutorGroup-2-3 - riemann.config - #riemann.codec.Event{:host TESTING, :service n3, :state critical, :description 0, :metric nil, :tags [dummy_alert], :time 1.680835018541492E9, :ttl 3600.0}
INFO [2023-04-06 16:36:58,543] defaultEventExecutorGroup-2-3 - riemann.config - #riemann.codec.Event{:host TESTING, :service n3, :state critical, :description 1, :metric nil, :tags [dummy_alert], :time 1.680835018543208E9, :ttl 3600.0}
INFO [2023-04-06 16:36:58,545] defaultEventExecutorGroup-2-3 - riemann.config - #riemann.codec.Event{:host TESTING, :service n3, :state critical, :description 2, :metric nil, :tags [dummy_alert], :time 1.680835018544439E9, :ttl 3600.0}

Hi @smortex, Your observation is very insightful, thank you. Since email is an inbuilt function, I don't think that could have been the issue. Can you be able to reproduce the issue when using email?

Hum 🤔 I can't find any email function in the API

I adjusted my test example like this, defining my own email:

--- /usr/local/etc/riemann/riemann.config.orig	2023-04-10 10:45:38.625538000 -1000
+++ /usr/local/etc/riemann/riemann.config	2023-04-10 10:46:30.465567000 -1000
@@ -13,7 +13,9 @@
 ; Expire old events from the index every 5 seconds.
 (periodically-expire 5)
 
-(let [index (index)]
+
+(let [index (index)
+      email (mailer)]
   ; Inbound events will be passed to these streams:
   (streams
     (default :ttl 60
@@ -24,6 +26,7 @@
           (where (state "critical")
                  (where (tagged "dummy_alert")
                         (throttle 3 30
+                                  (email "romain@localhost")
                                   #(info %)))))
 
       ; Log expired events.

I received the 9 expected messages, the 3 first of each host. Maybe you can post your email definition?

Apologies for the delayed response, the definition of email function is as follows:
(def email (mailer {:from "dummy@dummy.com" :host "email_api_service_provider.com" :user "dummy@dummy.com" :pass "password" :port port_number :ssl :yes :body (fn [events] (apply prn-str events)) }))

@Magesh-B can you create a simple riemann.config with #(info %), instead of (email) and try to reproduce this?