¶ ↑
fluent-plugin-zmq-pub¶ ↑
OverviewFluentd plugin to publish records to ZeroMQ.
¶ ↑
Why this plugin was created?Sometimes I wanted to ‘sniff’ fluentd stream – running my own programs to the stream without changing fluentd configuration and restarting fluentd. With this plugin, fluentd records are always published to ZeroMQ regardless of the existance of subscriber. After that I can start and stop my subcriber programs at any time.
¶ ↑
DependenceThis plugin use ffi-rzmq to support ZMQ, and need v3.2 or greater version of ZMQ library installed in your system.
¶ ↑
InstallationYou need to install ZeroMQ libraries before installing this plugin
## (RedHat/CentOS) # yum install zeromq3 zeromq3-devel # fluent-gem install fluent-plugin-zmq-pub
¶ ↑
Configuration<match zmq.**> type zmq_pub pubkey ${tag}:${key1} bindaddr tcp://*:5556 flush_interval 1s bulk_send true </match>
-
‘pubkey’ specifies the publish key to ZeroMQ.
-
‘${tag}’ is replace by fluentd tag. ‘${name}’ is replaced by fluentd record.
-
Actual record to be published is ‘<pubkey> <reocord.to_msgpack>’.
-
Subscriber can subscribe by ‘<pubkey>’.
-
-
‘bindaddr’ is the address to which ZeroMQ publisher socket to be bound.
-
If ‘bulk_send’ is set to true, send multiple records with the same publish key in one ‘send_string’ method. This improves the performance.
¶ ↑
Example usagePut the configuration above to fluentd.conf, and save this sample code as ‘sample_sub.rb’.
#!/usr/bin/env ruby require 'ffi-rzmq' require 'msgpack' context = ZMQ::Context.new(1) subscriber = context.socket(ZMQ::SUB) subscriber.connect("tcp://localhost:5556") if ARGV.length > 0 ARGV.each{|s| subscriber.setsockopt(ZMQ::SUBSCRIBE,s) } else subscriber.setsockopt(ZMQ::SUBSCRIBE,"") end while true msg = '' while subscriber.recv_string(msg,ZMQ::DONTWAIT) && msg.size > 0 record = MessagePack.unpack(msg.split(" ",2)[1]) puts "tag: #{record[0]}" puts "time: #{record[1]}" puts "record: #{record[2]}" msg = '' end sleep(0.1) end
Run sample_sub.rb. Argument is the key to subscribe. (Correspond to ‘pubkey’ in zmq_pub configuration). If you give no arguments, all key will be subscribed.
% ./sample_sub.rb zmq.test.tag:aaa
Submit records to fluentd.
% echo '{"key1": "aaa", "key2":"foo"}' | fluent-cat zmq.test.tag % echo '{"key1": "bbb", "key2":"foo"}' | fluent-cat zmq.test.tag
Then you will get the following output from sample_sub.rb
tag: zmq.test.tag time: 1376033265 record: {"key1"=>"aaa", "key2"=>"foo"}
(You should not get the second record(“key1”:“bbb”) because the publish key to zmq was “zmq.test.tag:bbb” and specified subscibe key was “zmq.test.tag:aaa”)
The nice thing is that once you put this plugin to your fluentd.conf and start fluentd, you can start and stop any subscriber programs without changing fluentd configuration.
¶ ↑
zmq_sub input pluginInput plugin to subscribe the output of zmq_pub is also included. Here is the example configuration.
<source> type zmq_sub publisher tcp://127.0.0.1:5556 bulk_send true subkey zmq.,zmq2 </source>
-
If zmq_pub set ‘bulk_send` to true, zmq_sub also set it to true.
-
‘subkey` is a comma separated list of keys to subscribe. In this example, keys starting “zmq.” or “zmq2.” will be subscribed.
¶ ↑
Copyright-
Copyright © 2013- OGIBAYASHI Hironori (@angostura11)
-
License
-
Apache License, Version 2.0
-