Question: How to determine fields exist and give default value?
keyboardfann opened this issue · 6 comments
Sorry to ask the stupid question, I have two sites and some events have the field "site", but some events don't have the field, I want to determine "if the site field exists, don't do anything, if the site field not exist, give the event site field with value b". Could anyone tell me how to modify the code?
raw log:
INFO [2018-06-05 14:16:08,153] defaultEventExecutorGroup-2-2 - riemann.config - #riemann.codec.Event{:host couchcache, :service couchspring/vpb_idempotency_cache/metrics_sent_buckets, :state nil, :description nil, :metric 58.0, :tags [couchbase], :time 1.528179368153E9, :ttl nil, :instancename vpb_idempotency_cache, :site a}
INFO [2018-06-05 14:16:08,165] defaultEventExecutorGroup-2-2 - riemann.config - #riemann.codec.Event{:host couchcache, :service node/10.94.232.64/swap_used_percent, :state nil, :description nil, :metric 0.0, :tags [couchbase], :time 1.528179368164E9, :ttl nil, :instancename 10.94.232.64 }
works fine:
(streams
(where (tagged-any ["couchbase"])
#(info %)
(smap (fn [events]
(dissoc
events
:okthreshold :criticalthreshold :departmentkey :alertformat :alerter :alertstate
:alertdescription :failure_message :step_message :sqlinstance_owner :is_job_running :original_host :original_service
:description :job_category :job_id :job_owner :failure_time :next_runtime :frequency :step_id :sqlinstanceowner
))
(by [:host :service]
(where (not (nil?(:instancename event)))
(smap (fn [events]
(let [ instance_name (:instancename events)
service_string (:service events)
instance_name_modified (clojure.string/join "" [instance_name "/"])]
(assoc events :service (clojure.string/replace service_string instance_name_modified ""))
))
(where (tagged "couchbase") send_to_influx_db_couchbase )
)
(else
(where (tagged "couchbase") send_to_influx_db_couchbase )
))
))))
will have exception:
(streams
(where (tagged-any ["couchbase"])
#(info %)
(smap (fn [events]
(dissoc
(where (nil? (:site events))
(assoc events :site "b" ))
:okthreshold :criticalthreshold :departmentkey :alertformat :alerter :alertstate
:alertdescription :failure_message :step_message :sqlinstance_owner :is_job_running :original_host :original_service
:description :job_category :job_id :job_owner :failure_time :next_runtime :frequency :step_id :sqlinstanceowner
))
(by [:host :service]
(where (not (nil?(:instancename event)))
(smap (fn [events]
(let [ instance_name (:instancename events)
service_string (:service events)
instance_name_modified (clojure.string/join "" [instance_name "/"])]
(assoc events :service (clojure.string/replace service_string instance_name_modified ""))
))
(where (tagged "couchbase") send_to_influx_db_couchbase )
)
(else
(where (tagged "couchbase") send_to_influx_db_couchbase )
))
))))
exception in riemann.log
INFO [2018-06-05 14:15:14,768] defaultEventExecutorGroup-2-2 - riemann.config - #riemann.codec.Event{:host couchcache, :service bucket/accountapi_cache/curr_items, :state nil, :description nil, :metric 0.0, :tags [couchbase], :time 1.528179314768E9, :ttl nil, :instancename accountapi_cache, :site a}
WARN [2018-06-05 14:15:14,769] defaultEventExecutorGroup-2-2 - riemann.config - riemann.streams$smap$stream__6900@4627ecac threw
java.lang.ClassCastException: riemann.config$eval278$fn__1304$stream__1305 cannot be cast to clojure.lang.IPersistentMap
at clojure.lang.RT.dissoc(RT.java:848)
at clojure.core$dissoc.invokeStatic(core.clj:1462)
at clojure.core$dissoc.invokeStatic(core.clj:1455)
at clojure.core$dissoc.doInvoke(core.clj:1455)
at clojure.lang.RestFn.invoke(RestFn.java:3913)
at riemann.config$eval278$fn__1304.invoke(riemann.config:182)
at riemann.streams$smap$stream__6900.invoke(streams.clj:161)
at riemann.config$eval278$stream__1490$fn__1495.invoke(riemann.config:179)
at riemann.config$eval278$stream__1490.invoke(riemann.config:179)
at riemann.core$stream_BANG_$fn__8891.invoke(core.clj:19)
at riemann.core$stream_BANG_.invokeStatic(core.clj:18)
at riemann.core$stream_BANG_.invoke(core.clj:14)
at riemann.transport$handle.invokeStatic(transport.clj:171)
at riemann.transport$handle.invoke(transport.clj:165)
at riemann.transport.tcp$tcp_handler.invokeStatic(tcp.clj:95)
at riemann.transport.tcp$tcp_handler.invoke(tcp.clj:88)
at riemann.transport.tcp$gen_tcp_handler$fn__11921.invoke(tcp.clj:66)
at riemann.transport.tcp.proxy$io.netty.channel.ChannelInboundHandlerAdapter$ff19274a.channelRead(Unknown Source)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:363)
at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Hi,
You can use the default
stream to add default values to events:
(streams
(default :site "some-site"
...))
More documentation can be found at: http://riemann.io/api/riemann.streams.html#var-default
@pyr Thank you for the reply, but how can I remain the original value if the site field exist?
This is what default will do. If the field exists it will be left as-is, otherwise the default value will be filled-in.
Hi @pyr,
I add the default to the code, but no site field in riemann log, anything wrong?
(streams
(where (tagged-any ["couchbase"])
(default :site "b" index)
#(info %)
(smap (fn [events]
(dissoc
events
:okthreshold :criticalthreshold :departmentkey :alertformat :alerter :alertstate
:alertdescription :failure_message :step_message :sqlinstance_owner :is_job_running :original_host :original_service
:description :job_category :job_id :job_owner :failure_time :next_runtime :frequency :step_id :sqlinstanceowner
))
(by [:host :service]
(where (not (nil?(:instancename event)))
(smap (fn [events]
(let [ instance_name (:instancename events)
service_string (:service events)
instance_name_modified (clojure.string/join "" [instance_name "/"])]
(assoc events :service (clojure.string/replace service_string instance_name_modified ""))
))
(where (tagged "couchbase") send_to_influx_db_couchbase )
)
(else
(where (tagged "couchbase") send_to_influx_db_couchbase )
))
))))
INFO [2018-06-05 15:43:09,658] defaultEventExecutorGroup-2-5 - riemann.config - #riemann.codec.Event{:host couchcache, :service couchspring/10.94.232.67/metrics_sent_nodes, :state nil, :description nil, :metric 27.0, :tags [couchbase], :time 1.528184589657E9, :ttl nil, :instancename 10.94.232.67}
@mcorbin and @pyr , follow your advice and it seems work.
(streams
(where (tagged-any ["couchbase"])
(default :site "b" index #(info %))
(smap (fn [events]
(dissoc
events
:okthreshold :criticalthreshold :departmentkey :alertformat :alerter :alertstate
:alertdescription :failure_message :step_message :sqlinstance_owner :is_job_running :original_host :original_service
:description :job_category :job_id :job_owner :failure_time :next_runtime :frequency :step_id :sqlinstanceowner
))
(by [:host :service]
(where (not (nil?(:instancename event)))
(smap (fn [events]
(let [ instance_name (:instancename events)
service_string (:service events)
instance_name_modified (clojure.string/join "" [instance_name "/"])]
(assoc events :service (clojure.string/replace service_string instance_name_modified ""))
))
(where (tagged "couchbase") send_to_influx_db_couchbase )
)
(else
(where (tagged "couchbase") send_to_influx_db_couchbase )
))
))))
result:
INFO [2018-06-05 16:35:33,984] defaultEventExecutorGroup-2-5 - riemann.config - #riemann.codec.Event{:host couchuss, :service bucket/upmonitor/vb_pending_queue_age, :state nil, :description nil, :metric 0.0, :tags [couchbase], :time 1.528187733984E9, :ttl nil, :instancename upmonitor, :site b}