fluent/fluent-plugin-kafka

Mentioning topics with the help of regex is not working.

mann2108 opened this issue · 2 comments

Describe the bug

I have configure the kafka input with multiple topics with the help of regex pattern -

<source>
 @type
 kafka brokers 10.30.1.101:9092 
 topics /*_firewall/  
 add_prefix kafka.saas 
 format json
</source>

For matching all topics with name - cust1_firewall, cust2_firewall and so on.

To Reproduce

Use regex pattern for mentioning topics name. Ex. - /*_firewall/

Expected behavior

It should work properly without any error.

Your Environment

- Fluentd version: 1.14.3
- TD Agent version: v4
- fluent-plugin-kafka version: 0.17.4
- ruby-kafka version: 1.4.0
- Operating system: Ubuntu
- Kernel version: Bionic

Your Configuration

<source>
  @type kafka
  brokers 10.30.1.101:9092
  topics /*_firewall/
  add_prefix kafka.saas
  format json
</source>
<match kafka.saas.**>
    @type rewrite_tag_filter
    <rule>
        key integration
        pattern /^(\w+)/
        tag saas.$1
    </rule>
</match>
<filter saas.jira>
  @type record_transformer
  <record>
    saas_integration "jira"
    tag ${tag}
  </record>
</filter>
<filter saas.slack>
  @type record_transformer
  <record>
    saas_integration "slack"
    tag ${tag}
  </record>
</filter>
<match saas.**>
    @type opensearch
    host 10.30.1.101
    port 9200
    index_name saas_fluentd
    user admin
    password admin
</match>

Your Error Log

2022-02-02 13:14:42 +0000 [info]: adding source type="kafka"
2022-02-02 13:14:42 +0000 [info]: #0 starting fluentd worker pid=59700 ppid=59696 worker=0
2022-02-02 13:14:42 +0000 [info]: #0 fluentd worker is now running worker=0
2022-02-02 13:14:43 +0000 [error]: #0 Kafka::InvalidTopic
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/protocol.rb:160:in `handle_error'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/protocol/metadata_response.rb:94:in `find_leader_id'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/cluster.rb:411:in `get_leader_id'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/cluster.rb:118:in `get_leader'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/fetch_operation.rb:63:in `block (2 levels) in execute'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/fetch_operation.rb:62:in `each'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/fetch_operation.rb:62:in `block in execute'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/fetch_operation.rb:61:in `each'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/fetch_operation.rb:61:in `execute'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.4.0/lib/kafka/client.rb:516:in `fetch_messages'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.4/lib/fluent/plugin/in_kafka.rb:298:in `consume'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.4/lib/fluent/plugin/in_kafka.rb:288:in `call'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.4/lib/fluent/plugin/in_kafka.rb:288:in `on_timer'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run_once'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run'
  2022-02-02 13:14:43 +0000 [error]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.4/lib/fluent/plugin/in_kafka.rb:252:in `run'
2022-02-02 13:14:44 +0000 [error]: #0 Kafka::InvalidTopic
  2022-02-02 13:14:44 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:45 +0000 [error]: #0 Kafka::InvalidTopic
  2022-02-02 13:14:45 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:46 +0000 [error]: #0 Kafka::InvalidTopic
  2022-02-02 13:14:46 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:47 +0000 [error]: #0 Kafka::InvalidTopic
  2022-02-02 13:14:47 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:48 +0000 [error]: #0 Kafka::InvalidTopic
  2022-02-02 13:14:48 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:49 +0000 [error]: #0 Kafka::InvalidTopic
  2022-02-02 13:14:49 +0000 [error]: #0 suppressed same stacktrace
2022-02-02 13:14:50 +0000 [error]: #0 Kafka::InvalidTopic

Additional context

No response

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

ashie commented

topics of in_kafka doesn't support glob or regexp, it supports only comma separated topic list:

@topic_list = []
if @topics
@topic_list = @topics.split(',').map { |topic|
TopicEntry.new(topic.strip, @partition, @offset)
}