fluent-plugin-mongo provides input and output plugins for Fluentd (GitHub)
The gem is hosted at Rubygems.org. You can install the gem as follows:
$ fluent-gem install fluent-plugin-mongo
Store Fluentd event to MongoDB database.
Use mongo type in match.
<match mongo.**> type mongo database fluent collection test # Following attibutes are optional host fluenter port 10000 # Set 'capped' if you want to use capped collection capped capped_size 100m # Set 'user' and 'password' for authentication user handa password shinobu # Other buffer configurations here </match>
Tag mapped to MongoDB collection automatically.
Use tag_mapped parameter in match of mongo type.
If tag name is “foo.bar”, auto create collection “foo.bar” and insert data.
<match forward.*> type mongo database fluent # Set 'tag_mapped' if you want to use tag mapped mode. tag_mapped # If tag is "forward.foo.bar", then prefix "forward." is removed. # Collection name to insert is "foo.bar". remove_tag_prefix forward. # This configuration is used if tag not found. Default is 'untagged'. collection misc # Other configurations here </match>
Replica Set version of mongo.
Use mongo_replset type in match.
<match mongo.**> type mongo_replset database fluent collection logs # each node separated by ',' nodes localhost:27017,localhost:27018,localhost:27019 # num_retries is threshold at failover, default is 60. # If retry count reached this threshold, mongo plugin raises an exception. num_retries 30 # following optional parameters passed to ReplSetConnection of mongo-ruby-driver. # See mongo-ruby-driver docs for more detail. #name replset_name #read secondary #refresh_mode sync #refresh_interval 60 </match>
Store Fluentd event to local capped collection for backup.
Use mongo_backup type in match. mongo_backup alwalys use capped collection.
<match ...> type mongo_backup capped_size 100m <store> type tcp host 192.168.0.13 ... </store> </match>
Tail capped collection to input data.
Use mongo_tail type in source.
<source> type mongo_tail database fluent collection capped_log tag app.mongo_log # waiting time when there is no next document. default is 1s. wait_time 5 # Convert 'time'(BSON's time) to fluent time(Unix time). time_key time # You can store last ObjectId to tail over server's shutdown id_store_file /Users/repeatedly/devel/fluent-plugin-mongo/last_id </source>
You can also use url to specify the database to connect.
<source> type mongo_tail url mongodb://user:password@192.168.0.13:10249,192.168.0.14:10249/database collection capped_log ... </source>
This allows the plugin to read data from a replica set.
Fluentd event sometimes has an invalid record as a BSON. In such case, Mongo plugin marshals an invalid record using Marshal.dump and re-inserts its to same collection as a binary.
If passed following invalid record:
{"key1": "invalid value", "key2": "valid value", "time": ISODate("2012-01-15T21:09:53Z") }
then Mongo plugin converts this record to following format:
{"__broken_data": BinData(0, Marshal.dump result of {"key1": "invalid value", "key2": "valid value"}), "time": ISODate("2012-01-15T21:09:53Z") }
Mongo-Ruby-Driver cannot detect an invalid attribute, so Mongo plugin marshals all attributes excluding Fluentd keys(“tag_key” and “time_key”).
You can deserialize broken data using Mongo and Marshal.load. Sample code is below:
# _collection_ is an instance of Mongo::Collection collection.find({'__broken_data' => {'$exists' => true}}).each do |doc| p Marshal.load(doc['__broken_data'].to_s) #=> {"key1": "invalid value", "key2": "valid value"} end
If you want to ignore an invalid record, set true to ignore_invalid_record parameter in match.
<match forward.*> ... # ignore invalid documents at write operation ignore_invalid_record true ... </match>
If you want to exclude some fields from broken data marshaling, use exclude_broken_fields to specfiy the keys.
<match forward.*> ... # key2 is excluded from __broken_data. # e.g. {"__broken_data": BinData(0, Marshal.dump result of {"key1": "invalid value"}), "key2": "valid value", "time": ISODate("2012-01-15T21:09:53Z") exclude_broken_fields key2 ... </match>
Specified value is a comma separated keys(e.g. key1,key2,key3). This parameter is useful for excluding shard keys in shard environment.
BSON records which include ‘.’ or start with ‘$’ are invalid and they will be stored as broken data to MongoDB. If you want to sanitize keys, you can use replace_dot_in_key_with and replace_dollar_in_key_with.
<match forward.*> ... # replace '.' in keys with '__dot__' replace_dot_in_key_with __dot__ # replace '$' in keys with '__dollar__' # Note: This replaces '$' only on first character replace_dollar_in_key_with __dollar__ ... </match>
Mongo plugin has the limitation of buffer size. Because MongoDB and mongo-ruby-driver checks the total object size at each insertion. If total object size gets over the size limitation, then MongoDB returns error or mongo-ruby-driver raises an exception.
So, Mongo plugin resets buffer_chunk_limit if configurated value is larger than above limitation:
-
Before v1.8, max of buffer_chunk_limit is 2MB
-
After v1.8, max of buffer_chunk_limit is 8MB
Mongo plugin checks a collection’s configuration to prevent unexpected insertion to existing collection. In tag mapped mode, Mongo plugin accesses many collections. In this case, collection checking is not usable because current configuration format cannot write multiple values in one parameter. So, if you disable this checking, put “‘disable_collection_check true“` in match.
<match forward.*> ... disable_collection_check true ... </match>
You can tail mongo capped collection.
$ mongo-tail -f
Run following command:
$ bundle exec rake test
You can use ‘mongod’ environment variable for specified mongod:
$ mongod=/path/to/mongod bundle exec rake test
Note that source code in test/tools are from mongo-ruby-driver.
-
Multi process
-
etc
- Copyright
-
Copyright © 2011- Masahiro Nakagawa
- License
-
Apache License, Version 2.0