ansible/event-driven-ansible

unable to bootstrap from kafka source

morvencao opened this issue · 11 comments

tried to create a simple rulebook from kafka source with https enabled:

---
- name: Listen for events on a kafka
  hosts: all
  sources:
    - ansible.eda.kafka:
        host: test.kafka.example.com
        port: 443
        topic: test
        group_id: test
        offset: latest
  rules:
    - name: Say Hello
      condition: event.message == "Ansible is super cool"
      action:
        run_playbook:
          name: say-what.yml

then tun the rulebook, got the following error:

INFO:ansible_rulebook.app:Starting sources
INFO:ansible_rulebook.app:Starting rules
INFO:ansible_rulebook.engine:run_ruleset
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
INFO:ansible_rulebook.engine:ruleset define: {"name": "Listen for events on a kafka", "hosts": ["all"], "sources": [{"EventSource": {"name": "ansible.eda.kafka", "source_name": "ansible.eda.kafka", "source_args": {"host": "test.kafka.example.com", "port": 443, "topic": "test", "group_id": "test", "offset": "latest"}, "source_filters": []}}], "rules": [{"Rule": {"name": "Say Hello", "condition": {"AllCondition": [{"EqualsExpression": {"lhs": {"Event": "message"}, "rhs": {"String": "Ansible is super cool"}}}]}, "action": {"Action": {"action": "run_playbook", "action_args": {"name": "say-what.yml"}}}, "enabled": true}}]}
INFO:ansible_rulebook.engine:load source
INFO:ansible_rulebook.engine:load source filters
INFO:ansible_rulebook.engine:Calling main in ansible.eda.kafka
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'test'})
INFO:ansible_rulebook.engine:Waiting for event from Listen for events on a kafka
WARNING:aiokafka:Unable to request metadata from "test.kafka.example.com:443": KafkaConnectionError: Connection at test.kafka.example.com:443 closed
ERROR:ansible_rulebook.engine:Source error
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/ansible_rulebook/engine.py", line 150, in start_source
    await entrypoint(fqueue, args)
  File "/root/.ansible/collections/ansible_collections/ansible/eda/plugins/event_source/kafka.py", line 45, in main
    await kafka_consumer.start()
  File "/usr/local/lib/python3.8/dist-packages/aiokafka/consumer/consumer.py", line 351, in start
    await self._client.bootstrap()
  File "/usr/local/lib/python3.8/dist-packages/aiokafka/client.py", line 251, in bootstrap
    raise KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('test.kafka.example.com', 443, <AddressFamily.AF_UNSPEC: 0>)]
ERROR:asyncio:Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f25019da7c0>
INFO:ansible_rulebook.engine:Canceling all ruleset tasks
INFO:ansible_rulebook.app:Cancelling event source tasks
INFO:ansible_rulebook.app:Main complete

I replaced my real kafka host with test.kafka.example.com because it's public accessible address, the kafka server is enabled TLS. And it is available:

# nc -vz test.kafka.example.com 443
Connection to test.kafka.example.com 443 port [tcp/https] succeeded!

I'm not sure if this is caused by missing CA of kafka server? or by other reasons? Please take a look!

/cc @konstruktoid

a stupid question, @konstruktoid I encountered this when running standalone:

$ python3 ./event-driven-ansible/plugins/event_source/kafka.py
Traceback (most recent call last):
  File "./event-driven-ansible/plugins/event_source/kafka.py", line 6, in <module>
    from aiokafka import AIOKafkaConsumer
  File "/usr/local/lib/python3.8/dist-packages/aiokafka/__init__.py", line 3, in <module>
    from .abc import ConsumerRebalanceListener
  File "/usr/local/lib/python3.8/dist-packages/aiokafka/abc.py", line 2, in <module>
    from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener
  File "/root/workspace/event-driven-ansible/plugins/event_source/kafka.py", line 6, in <module>
    from aiokafka import AIOKafkaConsumer
ImportError: cannot import name 'AIOKafkaConsumer' from partially initialized module 'aiokafka' (most likely due to a circular import) (/usr/local/lib/python3.8/dist-packages/aiokafka/__init__.py)

yeah seems like that source need a bit of work.
can you try from aiokafka import * as a test?

looks like kafka lib conflicts with aiokafka lib:

$ python3 plugins/event_source/kafka.py
Traceback (most recent call last):
  File "plugins/event_source/kafka.py", line 6, in <module>
    from aiokafka import *
  File "/usr/local/lib/python3.8/dist-packages/aiokafka/__init__.py", line 3, in <module>
    from .abc import ConsumerRebalanceListener
  File "/usr/local/lib/python3.8/dist-packages/aiokafka/abc.py", line 2, in <module>
    from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener
ImportError: cannot import name 'ConsumerRebalanceListener' from 'kafka' (/root/workspace/event-driven-ansible/plugins/event_source/kafka.py)

sorry for not helping out with testing, I don't have a kafka environment available.
have you tried pip uninstall kafka kafka-python to ensure there's no conflicts?

After I update event-driven-ansible/blob/main/plugins/event_source/kafka.py to event-driven-ansible/blob/main/plugins/event_source/testkafka.py , it looks like the dependency issue is resolved, now I got:

# python3 plugins/event_source/testkafka.py
Unable to request metadata from "test.kafka.example.com:443": KafkaConnectionError: Connection at test.kafka.example.com:443 closed
Traceback (most recent call last):
  File "plugins/event_source/testkafka.py", line 64, in <module>
    asyncio.run(main(MockQueue(), {"topic": "spec", "host": "test.kafka.example.com", "port": "443", "group_id": "hub1"}))
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "plugins/event_source/testkafka.py", line 45, in main
    await kafka_consumer.start()
  File "/usr/local/lib/python3.8/dist-packages/aiokafka/consumer/consumer.py", line 351, in start
    await self._client.bootstrap()
  File "/usr/local/lib/python3.8/dist-packages/aiokafka/client.py", line 251, in bootstrap
    raise KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('test.kafka.example.com', 443, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7fb22bedd370>

keep the line unchanged: https://github.com/ansible/event-driven-ansible/blob/main/plugins/event_source/kafka.py#L64. ot will report similar error:

 # python3 plugins/event_source/testkafka.py
Unable connect to "localhost:9092": [Errno 111] Connect call failed ('127.0.0.1', 9092)
Traceback (most recent call last):
  File "plugins/event_source/testkafka.py", line 64, in <module>
    asyncio.run(main(MockQueue(), {"topic": "eda", "host": "localhost", "port": "9092", "group_id": "test"}))
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "plugins/event_source/testkafka.py", line 45, in main
    await kafka_consumer.start()
  File "/usr/local/lib/python3.8/dist-packages/aiokafka/consumer/consumer.py", line 351, in start
    await self._client.bootstrap()
  File "/usr/local/lib/python3.8/dist-packages/aiokafka/client.py", line 251, in bootstrap
    raise KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('localhost', 9092, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f241cb08370>

kafka.errors.KafkaConnectionError so I guess the script can't connect.

poking @josephpisciotta and @benthomasson since they wrote it and I got no real experience with Kafka.

finally I found the root cause is that I haven't set the SSL for the aiokafka consumer client. Currently the rulebook-cli doesn't support that, so I do some hack with the https://github.com/ansible/event-driven-ansible/blob/main/plugins/event_source/kafka.py#L64

...
    context = create_ssl_context(
        cafile="./ca.crt",
    )
    kafka_consumer = AIOKafkaConsumer(
        topic,
        bootstrap_servers="{0}:{1}".format(host, port),
        group_id=group_id,
        enable_auto_commit=True,
        max_poll_records=1,
        auto_offset_reset=offset,
        security_protocol="SSL",
        ssl_context=context,
    )
...

Now the ansible-rulebook can subscribe the message from kafka:

# ansible-rulebook --rulebook kafka-example.yml -i inventory.yml --verbose
INFO:ansible_rulebook.app:Starting sources
INFO:ansible_rulebook.app:Starting rules
INFO:ansible_rulebook.engine:run_ruleset
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
INFO:ansible_rulebook.engine:ruleset define: {"name": "Listen for events on a kafka", "hosts": ["all"], "sources": [{"EventSource": {"name": "ansible.eda.kafka", "source_name": "ansible.eda.kafka", "source_args": {"host": "test.kafka.example.com", "port": 443, "topic": "spec", "group_id": "hub1", "offset": "latest"}, "source_filters": []}}], "rules": [{"Rule": {"name": "Say Hello", "condition": {"AllCondition": [{"EqualsExpression": {"lhs": {"Event": "message"}, "rhs": {"String": "Ansible is super cool"}}}]}, "action": {"Action": {"action": "run_playbook", "action_args": {"name": "say-what.yml"}}}, "enabled": true}}]}
INFO:ansible_rulebook.engine:load source
INFO:ansible_rulebook.engine:load source filters
INFO:ansible_rulebook.engine:Calling main in ansible.eda.kafka
INFO:aiokafka.helpers:Loading SSL CA from ./ca.crt
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'spec'})
INFO:ansible_rulebook.engine:Waiting for event from Listen for events on a kafka
INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 0 for group hub1
INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group hub1
INFO:aiokafka.consumer.group_coordinator:(Re-)joining group hub1
INFO:aiokafka.consumer.group_coordinator:Joined group 'hub1' (generation 1) with member_id aiokafka-0.8.0-33046b26-58ea-4458-81ec-d7fd14bcc633
INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using roundrobin
INFO:aiokafka.consumer.group_coordinator:Successfully synced group hub1 with generation 1
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='spec', partition=0)} for group hub1

Do I need to open another issue for the SSL support for kafka plugin?

since you have working code, why not submit a pull request?

submitted a PR for this: #62