riemann/riemann

Question: How to determine fields exist and give default value?

keyboardfann opened this issue · 6 comments

Sorry to ask the stupid question, I have two sites and some events have the field "site", but some events don't have the field, I want to determine "if the site field exists, don't do anything, if the site field not exist, give the event site field with value b". Could anyone tell me how to modify the code?

raw log:
INFO [2018-06-05 14:16:08,153] defaultEventExecutorGroup-2-2 - riemann.config - #riemann.codec.Event{:host couchcache, :service couchspring/vpb_idempotency_cache/metrics_sent_buckets, :state nil, :description nil, :metric 58.0, :tags [couchbase], :time 1.528179368153E9, :ttl nil, :instancename vpb_idempotency_cache, :site a}
INFO [2018-06-05 14:16:08,165] defaultEventExecutorGroup-2-2 - riemann.config - #riemann.codec.Event{:host couchcache, :service node/10.94.232.64/swap_used_percent, :state nil, :description nil, :metric 0.0, :tags [couchbase], :time 1.528179368164E9, :ttl nil, :instancename 10.94.232.64 }

works fine:

(streams
        (where (tagged-any ["couchbase"])
        #(info %)
        (smap (fn [events]
         (dissoc
                  events
                        :okthreshold :criticalthreshold :departmentkey :alertformat :alerter :alertstate
                        :alertdescription :failure_message :step_message :sqlinstance_owner :is_job_running :original_host :original_service
                        :description :job_category :job_id :job_owner :failure_time :next_runtime :frequency :step_id :sqlinstanceowner
          ))
        (by [:host :service]
                (where (not (nil?(:instancename event)))
                        (smap (fn [events]
                          (let [        instance_name (:instancename events)
                                        service_string (:service events)
                                        instance_name_modified (clojure.string/join "" [instance_name "/"])]
                                        (assoc events :service (clojure.string/replace service_string instance_name_modified ""))
                                        ))
                        (where (tagged "couchbase") send_to_influx_db_couchbase )

                        )
                (else
                        (where (tagged "couchbase") send_to_influx_db_couchbase )

                ))
))))

will have exception:

(streams
        (where (tagged-any ["couchbase"])
        #(info %)
        (smap (fn [events]
         (dissoc
               (where (nil? (:site events))
                   (assoc events :site "b" ))
                        :okthreshold :criticalthreshold :departmentkey :alertformat :alerter :alertstate
                        :alertdescription :failure_message :step_message :sqlinstance_owner :is_job_running :original_host :original_service
                        :description :job_category :job_id :job_owner :failure_time :next_runtime :frequency :step_id :sqlinstanceowner
          ))
        (by [:host :service]
                (where (not (nil?(:instancename event)))
                        (smap (fn [events]
                          (let [        instance_name (:instancename events)
                                        service_string (:service events)
                                        instance_name_modified (clojure.string/join "" [instance_name "/"])]
                                        (assoc events :service (clojure.string/replace service_string instance_name_modified ""))
                                        ))
                        (where (tagged "couchbase") send_to_influx_db_couchbase )

                        )
                (else
                        (where (tagged "couchbase") send_to_influx_db_couchbase )

                ))
))))

exception in riemann.log

INFO [2018-06-05 14:15:14,768] defaultEventExecutorGroup-2-2 - riemann.config - #riemann.codec.Event{:host couchcache, :service bucket/accountapi_cache/curr_items, :state nil, :description nil, :metric 0.0, :tags [couchbase], :time 1.528179314768E9, :ttl nil, :instancename accountapi_cache, :site a}
WARN [2018-06-05 14:15:14,769] defaultEventExecutorGroup-2-2 - riemann.config - riemann.streams$smap$stream__6900@4627ecac threw
java.lang.ClassCastException: riemann.config$eval278$fn__1304$stream__1305 cannot be cast to clojure.lang.IPersistentMap
	at clojure.lang.RT.dissoc(RT.java:848)
	at clojure.core$dissoc.invokeStatic(core.clj:1462)
	at clojure.core$dissoc.invokeStatic(core.clj:1455)
	at clojure.core$dissoc.doInvoke(core.clj:1455)
	at clojure.lang.RestFn.invoke(RestFn.java:3913)
	at riemann.config$eval278$fn__1304.invoke(riemann.config:182)
	at riemann.streams$smap$stream__6900.invoke(streams.clj:161)
	at riemann.config$eval278$stream__1490$fn__1495.invoke(riemann.config:179)
	at riemann.config$eval278$stream__1490.invoke(riemann.config:179)
	at riemann.core$stream_BANG_$fn__8891.invoke(core.clj:19)
	at riemann.core$stream_BANG_.invokeStatic(core.clj:18)
	at riemann.core$stream_BANG_.invoke(core.clj:14)
	at riemann.transport$handle.invokeStatic(transport.clj:171)
	at riemann.transport$handle.invoke(transport.clj:165)
	at riemann.transport.tcp$tcp_handler.invokeStatic(tcp.clj:95)
	at riemann.transport.tcp$tcp_handler.invoke(tcp.clj:88)
	at riemann.transport.tcp$gen_tcp_handler$fn__11921.invoke(tcp.clj:66)
	at riemann.transport.tcp.proxy$io.netty.channel.ChannelInboundHandlerAdapter$ff19274a.channelRead(Unknown Source)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38)
	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:363)
	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:748)
pyr commented

Hi,

You can use the default stream to add default values to events:

(streams
   (default :site "some-site"
      ...))

More documentation can be found at: http://riemann.io/api/riemann.streams.html#var-default

@pyr Thank you for the reply, but how can I remain the original value if the site field exist?

pyr commented

This is what default will do. If the field exists it will be left as-is, otherwise the default value will be filled-in.

Hi @pyr,
I add the default to the code, but no site field in riemann log, anything wrong?

(streams
        (where (tagged-any ["couchbase"])
        (default :site "b" index)
        #(info %)
        (smap (fn [events]
         (dissoc
               events
                        :okthreshold :criticalthreshold :departmentkey :alertformat :alerter :alertstate
                        :alertdescription :failure_message :step_message :sqlinstance_owner :is_job_running :original_host :original_service
                        :description :job_category :job_id :job_owner :failure_time :next_runtime :frequency :step_id :sqlinstanceowner
          ))
        (by [:host :service]
                (where (not (nil?(:instancename event)))
                        (smap (fn [events]
                          (let [        instance_name (:instancename events)
                                        service_string (:service events)
                                        instance_name_modified (clojure.string/join "" [instance_name "/"])]
                                        (assoc events :service (clojure.string/replace service_string instance_name_modified ""))
                                        ))
                        (where (tagged "couchbase") send_to_influx_db_couchbase )

                        )
                (else
                        (where (tagged "couchbase") send_to_influx_db_couchbase )

                ))
))))

INFO [2018-06-05 15:43:09,658] defaultEventExecutorGroup-2-5 - riemann.config - #riemann.codec.Event{:host couchcache, :service couchspring/10.94.232.67/metrics_sent_nodes, :state nil, :description nil, :metric 27.0, :tags [couchbase], :time 1.528184589657E9, :ttl nil, :instancename 10.94.232.67}

@mcorbin and @pyr , follow your advice and it seems work.

(streams
        (where (tagged-any ["couchbase"])
        (default :site "b" index #(info %))
        (smap (fn [events]
         (dissoc
               events
                        :okthreshold :criticalthreshold :departmentkey :alertformat :alerter :alertstate
                        :alertdescription :failure_message :step_message :sqlinstance_owner :is_job_running :original_host :original_service
                        :description :job_category :job_id :job_owner :failure_time :next_runtime :frequency :step_id :sqlinstanceowner
          ))
        (by [:host :service]
                (where (not (nil?(:instancename event)))
                        (smap (fn [events]
                          (let [        instance_name (:instancename events)
                                        service_string (:service events)
                                        instance_name_modified (clojure.string/join "" [instance_name "/"])]
                                        (assoc events :service (clojure.string/replace service_string instance_name_modified ""))
                                        ))
                        (where (tagged "couchbase") send_to_influx_db_couchbase )

                        )
                (else
                        (where (tagged "couchbase") send_to_influx_db_couchbase )

                ))
))))

result:

INFO [2018-06-05 16:35:33,984] defaultEventExecutorGroup-2-5 - riemann.config - #riemann.codec.Event{:host couchuss, :service bucket/upmonitor/vb_pending_queue_age, :state nil, :description nil, :metric 0.0, :tags [couchbase], :time 1.528187733984E9, :ttl nil, :instancename upmonitor, :site b}