Send events to different Kinesis Data Firehose delivery streams based on custom field
yassinejaffoo-sanuk opened this issue · 2 comments
A single delivery stream can only deliver data to one Amazon OpenSearch Service domain and one index currently. The delivery stream name contains the Kubernetes cluster name and namespace of the workload that has emitted the event.
I have ensured that each record or event contains this information as a new field using the record_transformer filter plugin and this works fine i.e.
<filter kubernetes.**>
@type record_transformer
enable_ruby true
<record>
kinesis_firehose_delivery_stream_name "kdf-ds-#{ENV['KUBERNETES_CLUSTER_NAME']}-${record['kubernetes']['namespace_name']}"
</record>
</filter>
I would now like to use the kinesis_firehose_delivery_stream_name
field to ensure that the event is sent to the right Amazon Kinesis Data Firehose delivery stream i.e.
<match **>
@log_level debug
@type kinesis_firehose
@id out_kinesis_firehose
region "#{ENV['FLUENT_KINESIS_FIREHOSE_REGION'] || 'eu-west-2'}"
http_proxy "#{ENV['FLUENT_KINESIS_FIREHOSE_HTTP_PROXY']}"
delivery_stream_name "${$.kinesis_firehose_delivery_stream_name}"
include_time_key "#{ENV['FLUENT_KINESIS_FIREHOSE_INCLUDE_TIME_KEY'] || false}"
<buffer $.kinesis_firehose_delivery_stream_name>
@type file
path "#{ENV['FLUENT_KINESIS_FIREHOSE_BUFFER_PATH'] || '/var/log/fluentd'}"
flush_interval 1
chunk_limit_size "#{ENV['FLUENT_KINESIS_FIREHOSE_CHUNK_LIMIT_SIZE'] || '1m'}"
flush_thread_interval 0.1
flush_thread_burst_interval 0.01
flush_thread_count 5
</buffer>
<web_identity_credentials>
role_arn "#{ENV['AWS_ROLE_ARN']}"
role_session_name "#{ENV['AWS_ROLE_SESSION_NAME'] || 'fluent-firehose-session'}"
web_identity_token_file "#{ENV['AWS_WEB_IDENTITY_TOKEN_FILE']}"
</web_identity_credentials>
</match>
But I get the following error:
2022-12-01 21:43:16 +0000 [warn]: #0 [out_kinesis_firehose] failed to flush the buffer. retry_times=0 next_retry_time=2022-12-01 21:43:18 +0000 chunk="5eeca9500abed033c3667be76e659a1e" error_class=Aws::Firehose::Errors::ValidationException error="2 validation errors detected: Value '' at 'deliveryStreamName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+; Value '' at 'deliveryStreamName' failed to satisfy constraint: Member must have length greater than or equal to 1"
What should I be doing differently in order to achieve my desired outcome? Thanks for the support in advance.
@cosmo0920 @clcpolevaulter would really appreciate your input on this please.
I've got it working by removing the @id
and targeting events tagged as kubernetes.**
.
<match kubernetes.**>
@log_level debug
@type kinesis_firehose
region "#{ENV['FLUENT_KINESIS_FIREHOSE_REGION'] || 'eu-west-2'}"
http_proxy "#{ENV['FLUENT_KINESIS_FIREHOSE_HTTP_PROXY']}"
delivery_stream_name "${$.kinesis_firehose_delivery_stream_name}"
include_time_key "#{ENV['FLUENT_KINESIS_FIREHOSE_INCLUDE_TIME_KEY'] || false}"
<buffer $.kinesis_firehose_delivery_stream_name>
@type file
path "#{ENV['FLUENT_KINESIS_FIREHOSE_BUFFER_PATH'] || '/var/log/fluentd'}"
flush_interval 1
chunk_limit_size "#{ENV['FLUENT_KINESIS_FIREHOSE_CHUNK_LIMIT_SIZE'] || '1m'}"
flush_thread_interval 0.1
flush_thread_burst_interval 0.01
flush_thread_count 5
</buffer>
<web_identity_credentials>
role_arn "#{ENV['AWS_ROLE_ARN']}"
role_session_name "#{ENV['AWS_ROLE_SESSION_NAME'] || 'fluent-firehose-session'}"
web_identity_token_file "#{ENV['AWS_WEB_IDENTITY_TOKEN_FILE']}"
</web_identity_credentials>
</match>