This plugin is forked from https://github.com/mdoi/fluent-plugin-gcloud-pubsub
Google Cloud Pub/Sub Input/Output(BufferedOutput) plugin for Fluentd with google-cloud gem
- Publish messages to Google Cloud Pub/Sub
- Pull messages from Google Cloud Pub/Sub
- Create a project on Google Developer Console
- Add a topic of Cloud Pub/Sub to the project
- Add a pull style subscription to the topic
- Download your credential (json) or set scope on GCE instance
When using output plugin, you need to grant Pub/Sub Publisher and Pub/Sub Viewer role to IAM.
Also, when using input plugin, you need to grant Pub/Sub Subscriber and Pub/Sub Viewer role to IAM.
| fluent-plugin-gcloud-pubsub-custom | fluentd | ruby |
|---|---|---|
| >= 1.0.0 | >= v0.14.0 | >= 2.1 |
| < 1.0.0 | >= v0.12.0 | >= 1.9 |
Install by gem:
$ gem install fluent-plugin-gcloud-pubsub-customCaution
This plugin doesn't work in td-agent.
Please use in Fluentd installed by gem.
Use gcloud_pubsub output plugin.
<match example.publish>
@type gcloud_pubsub
project <YOUR PROJECT>
key <YOUR KEY>
topic <YOUR TOPIC>
autocreate_topic false
max_messages 1000
max_total_size 9800000
max_message_size 4000000
<buffer>
@type memory
flush_interval 1s
</buffer>
<format>
@type json
</format>
</match>
project(optional)- Set your GCP project.
- Running fluentd on GCP, you don't have to specify.
- You can also use environment variable such as
GCLOUD_PROJECT.
key(optional)- Set your credential file path.
- Running fluentd on GCP, you can use scope instead of specifying this.
- You can also use environment variable such as
GCLOUD_KEYFILE.
topic(required)- Set topic name to publish.
- You can use placeholder in this param. See: https://docs.fluentd.org/v1.0/articles/buffer-section
autocreate_topic(optional, default:false)- If set to
true, specified topic will be created when it doesn't exist.
- If set to
max_messages(optional, default:1000)- Publishing messages count per request to Cloud Pub/Sub.
max_total_size(optional, default:9800000=9.8MB)- Publishing messages bytesize per request to Cloud Pub/Sub. This parameter affects only message size. You should specify a little smaller value than quota.
max_message_size(optional, default:4000000=4MB)- Messages exceeding
max_message_sizeare not published because Pub/Sub clients cannot receive it.
- Messages exceeding
attribute_keys(optional, default:[])- Publishing the set fields as attributes.
Use gcloud_pubsub input plugin.
<source>
@type gcloud_pubsub
tag example.pull
project <YOUR PROJECT>
key <YOUR KEY>
topic <YOUR TOPIC>
subscription <YOUR SUBSCRIPTION>
max_messages 1000
return_immediately true
pull_interval 0.5
pull_threads 2
parse_error_action exception
enable_rpc true
rpc_bind 0.0.0.0
rpc_port 24680
<parse>
@type json
</parse>
</source>
tag(required)- Set tag of messages.
- If
tag_keyis specified,tagis used as tag when record don't have specified key.
tag_key(optional)- Set key to be used as tag.
project(optional)- Set your GCP project
- Running fluentd on GCP, you don't have to specify.
- You can also use environment variable such as
GCLOUD_PROJECT.
key(optional)- Set your credential file path.
- Running fluentd on GCP, you can use scope instead of specifying this.
- You can also use environment variable such as
GCLOUD_KEYFILE.
topic(required)- Set topic name to pull.
subscription(required)- Set subscription name to pull.
max_messages(optional, default:100)return_immediately(optional, default:true)- See returnImmediately on https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull
- If
return_immediatelyistrueor pulling message is stopped by HTTP RPC, this plugin waitpull_intervaleach pull.
pull_interval(optional, default:5.0)- Pulling messages by intervals of specified seconds.
pull_threads(optional, default:1)- Set number of threads to pull messages.
attribute_keys(optional, default:[])- Specify the key of the attribute to be emitted as the field of record.
parse_error_action(optional, default:exception)- Set error type when parsing messages fails.
exception: Raise exception. Messages are not acknowledged.warning: Only logging as warning.
- Set error type when parsing messages fails.
enable_rpc(optional, default:false)- If
trueis specified, HTTP RPC to stop or start pulling message is enabled.
- If
rpc_bind(optional, default:0.0.0.0)- Bind IP address for HTTP RPC.
rpc_port(optional, default:24680)- Port for HTTP RPC.
- Fork it
- Create your feature branch (
git checkout -b my-new-feature) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Create a new Pull Request
- Add
tagattribute in output plugin and usetagattribute as tag in input plugin. - Send ack after other output plugin committed (if possible).