Mentioning topics with the help of regex is not working.
mann2108 opened this issue · 2 comments
mann2108 commented
Describe the bug
I have configure the kafka input with multiple topics with the help of regex pattern -
<source>
@type
kafka brokers 10.30.1.101:9092
topics /*_firewall/
add_prefix kafka.saas
format json
</source>
For matching all topics with name - cust1_firewall, cust2_firewall and so on.
To Reproduce
Use regex pattern for mentioning topics name. Ex. - /*_firewall/
Expected behavior
It should work properly without any error.
Your Environment
- Fluentd version: 1.14.3
- TD Agent version: v4
- fluent-plugin-kafka version: 0.17.4
- ruby-kafka version: 1.4.0
- Operating system: Ubuntu
- Kernel version: Bionic
Your Configuration
<source>
@type kafka
brokers 10.30.1.101:9092
topics /*_firewall/
add_prefix kafka.saas
format json
</source>
<match kafka.saas.**>
@type rewrite_tag_filter
<rule>
key integration
pattern /^(\w+)/
tag saas.$1
</rule>
</match>
<filter saas.jira>
@type record_transformer
<record>
saas_integration "jira"
tag ${tag}
</record>
</filter>
<filter saas.slack>
@type record_transformer
<record>
saas_integration "slack"
tag ${tag}
</record>
</filter>
<match saas.**>
@type opensearch
host 10.30.1.101
port 9200
index_name saas_fluentd
user admin
password admin
</match>
Your Error Log
2022-02-02 13:14:42 +0000 [info]: adding source type="kafka"
2022-02-02 13:14:42 +0000 [info]: #0 starting fluentd worker pid=59700 ppid=59696 worker=0
2022-02-02 13:14:42 +0000 [info]: #0 fluentd worker is now running worker=0
2022-02-02 13:14:43 +0000 [error]: #0 Kafka::InvalidTopic
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/protocol.rb:160:in `handle_error'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/protocol/metadata_response.rb:94:in `find_leader_id'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/cluster.rb:411:in `get_leader_id'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/cluster.rb:118:in `get_leader'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/fetch_operation.rb:63:in `block (2 levels) in execute'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/fetch_operation.rb:62:in `each'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/fetch_operation.rb:62:in `block in execute'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/fetch_operation.rb:61:in `each'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/fetch_operation.rb:61:in `execute'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/client.rb:516:in `fetch_messages'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.4/lib/fluent/plugin/in_kafka.rb:298:in `consume'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.4/lib/fluent/plugin/in_kafka.rb:288:in `call'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.4/lib/fluent/plugin/in_kafka.rb:288:in `on_timer'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run_once'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run'
2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.4/lib/fluent/plugin/in_kafka.rb:252:in `run'
2022-02-02 13:14:44 +0000 [error]: #0 Kafka::InvalidTopic
2022-02-02 13:14:44 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:45 +0000 [error]: #0 Kafka::InvalidTopic
2022-02-02 13:14:45 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:46 +0000 [error]: #0 Kafka::InvalidTopic
2022-02-02 13:14:46 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:47 +0000 [error]: #0 Kafka::InvalidTopic
2022-02-02 13:14:47 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:48 +0000 [error]: #0 Kafka::InvalidTopic
2022-02-02 13:14:48 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:49 +0000 [error]: #0 Kafka::InvalidTopic
2022-02-02 13:14:49 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:50 +0000 [error]: #0 Kafka::InvalidTopic
Additional context
No response
github-actions commented
This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days
ashie commented
topics
of in_kafka doesn't support glob or regexp, it supports only comma separated topic list:
fluent-plugin-kafka/lib/fluent/plugin/in_kafka.rb
Lines 80 to 84 in 00086cc