unable to bootstrap from kafka source
morvencao opened this issue · 11 comments
tried to create a simple rulebook from kafka source with https enabled:
---
- name: Listen for events on a kafka
hosts: all
sources:
- ansible.eda.kafka:
host: test.kafka.example.com
port: 443
topic: test
group_id: test
offset: latest
rules:
- name: Say Hello
condition: event.message == "Ansible is super cool"
action:
run_playbook:
name: say-what.yml
then tun the rulebook, got the following error:
INFO:ansible_rulebook.app:Starting sources
INFO:ansible_rulebook.app:Starting rules
INFO:ansible_rulebook.engine:run_ruleset
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
INFO:ansible_rulebook.engine:ruleset define: {"name": "Listen for events on a kafka", "hosts": ["all"], "sources": [{"EventSource": {"name": "ansible.eda.kafka", "source_name": "ansible.eda.kafka", "source_args": {"host": "test.kafka.example.com", "port": 443, "topic": "test", "group_id": "test", "offset": "latest"}, "source_filters": []}}], "rules": [{"Rule": {"name": "Say Hello", "condition": {"AllCondition": [{"EqualsExpression": {"lhs": {"Event": "message"}, "rhs": {"String": "Ansible is super cool"}}}]}, "action": {"Action": {"action": "run_playbook", "action_args": {"name": "say-what.yml"}}}, "enabled": true}}]}
INFO:ansible_rulebook.engine:load source
INFO:ansible_rulebook.engine:load source filters
INFO:ansible_rulebook.engine:Calling main in ansible.eda.kafka
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'test'})
INFO:ansible_rulebook.engine:Waiting for event from Listen for events on a kafka
WARNING:aiokafka:Unable to request metadata from "test.kafka.example.com:443": KafkaConnectionError: Connection at test.kafka.example.com:443 closed
ERROR:ansible_rulebook.engine:Source error
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/ansible_rulebook/engine.py", line 150, in start_source
await entrypoint(fqueue, args)
File "/root/.ansible/collections/ansible_collections/ansible/eda/plugins/event_source/kafka.py", line 45, in main
await kafka_consumer.start()
File "/usr/local/lib/python3.8/dist-packages/aiokafka/consumer/consumer.py", line 351, in start
await self._client.bootstrap()
File "/usr/local/lib/python3.8/dist-packages/aiokafka/client.py", line 251, in bootstrap
raise KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('test.kafka.example.com', 443, <AddressFamily.AF_UNSPEC: 0>)]
ERROR:asyncio:Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f25019da7c0>
INFO:ansible_rulebook.engine:Canceling all ruleset tasks
INFO:ansible_rulebook.app:Cancelling event source tasks
INFO:ansible_rulebook.app:Main complete
I replaced my real kafka host with test.kafka.example.com
because it's public accessible address, the kafka server is enabled TLS. And it is available:
# nc -vz test.kafka.example.com 443
Connection to test.kafka.example.com 443 port [tcp/https] succeeded!
I'm not sure if this is caused by missing CA of kafka server? or by other reasons? Please take a look!
/cc @konstruktoid
does https://github.com/ansible/event-driven-ansible/blob/main/plugins/event_source/kafka.py work when running standalone? be sure to update https://github.com/ansible/event-driven-ansible/blob/main/plugins/event_source/kafka.py#L64 before testing
a stupid question, @konstruktoid I encountered this when running standalone:
$ python3 ./event-driven-ansible/plugins/event_source/kafka.py
Traceback (most recent call last):
File "./event-driven-ansible/plugins/event_source/kafka.py", line 6, in <module>
from aiokafka import AIOKafkaConsumer
File "/usr/local/lib/python3.8/dist-packages/aiokafka/__init__.py", line 3, in <module>
from .abc import ConsumerRebalanceListener
File "/usr/local/lib/python3.8/dist-packages/aiokafka/abc.py", line 2, in <module>
from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener
File "/root/workspace/event-driven-ansible/plugins/event_source/kafka.py", line 6, in <module>
from aiokafka import AIOKafkaConsumer
ImportError: cannot import name 'AIOKafkaConsumer' from partially initialized module 'aiokafka' (most likely due to a circular import) (/usr/local/lib/python3.8/dist-packages/aiokafka/__init__.py)
yeah seems like that source need a bit of work.
can you try from aiokafka import *
as a test?
looks like kafka lib conflicts with aiokafka lib:
$ python3 plugins/event_source/kafka.py
Traceback (most recent call last):
File "plugins/event_source/kafka.py", line 6, in <module>
from aiokafka import *
File "/usr/local/lib/python3.8/dist-packages/aiokafka/__init__.py", line 3, in <module>
from .abc import ConsumerRebalanceListener
File "/usr/local/lib/python3.8/dist-packages/aiokafka/abc.py", line 2, in <module>
from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener
ImportError: cannot import name 'ConsumerRebalanceListener' from 'kafka' (/root/workspace/event-driven-ansible/plugins/event_source/kafka.py)
sorry for not helping out with testing, I don't have a kafka environment available.
have you tried pip uninstall kafka kafka-python
to ensure there's no conflicts?
After I update event-driven-ansible/blob/main/plugins/event_source/kafka.py
to event-driven-ansible/blob/main/plugins/event_source/testkafka.py
, it looks like the dependency issue is resolved, now I got:
# python3 plugins/event_source/testkafka.py
Unable to request metadata from "test.kafka.example.com:443": KafkaConnectionError: Connection at test.kafka.example.com:443 closed
Traceback (most recent call last):
File "plugins/event_source/testkafka.py", line 64, in <module>
asyncio.run(main(MockQueue(), {"topic": "spec", "host": "test.kafka.example.com", "port": "443", "group_id": "hub1"}))
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "plugins/event_source/testkafka.py", line 45, in main
await kafka_consumer.start()
File "/usr/local/lib/python3.8/dist-packages/aiokafka/consumer/consumer.py", line 351, in start
await self._client.bootstrap()
File "/usr/local/lib/python3.8/dist-packages/aiokafka/client.py", line 251, in bootstrap
raise KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('test.kafka.example.com', 443, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7fb22bedd370>
keep the line unchanged: https://github.com/ansible/event-driven-ansible/blob/main/plugins/event_source/kafka.py#L64. ot will report similar error:
# python3 plugins/event_source/testkafka.py
Unable connect to "localhost:9092": [Errno 111] Connect call failed ('127.0.0.1', 9092)
Traceback (most recent call last):
File "plugins/event_source/testkafka.py", line 64, in <module>
asyncio.run(main(MockQueue(), {"topic": "eda", "host": "localhost", "port": "9092", "group_id": "test"}))
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "plugins/event_source/testkafka.py", line 45, in main
await kafka_consumer.start()
File "/usr/local/lib/python3.8/dist-packages/aiokafka/consumer/consumer.py", line 351, in start
await self._client.bootstrap()
File "/usr/local/lib/python3.8/dist-packages/aiokafka/client.py", line 251, in bootstrap
raise KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('localhost', 9092, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f241cb08370>
kafka.errors.KafkaConnectionError
so I guess the script can't connect.
poking @josephpisciotta and @benthomasson since they wrote it and I got no real experience with Kafka.
finally I found the root cause is that I haven't set the SSL for the aiokafka consumer client. Currently the rulebook-cli doesn't support that, so I do some hack with the https://github.com/ansible/event-driven-ansible/blob/main/plugins/event_source/kafka.py#L64
...
context = create_ssl_context(
cafile="./ca.crt",
)
kafka_consumer = AIOKafkaConsumer(
topic,
bootstrap_servers="{0}:{1}".format(host, port),
group_id=group_id,
enable_auto_commit=True,
max_poll_records=1,
auto_offset_reset=offset,
security_protocol="SSL",
ssl_context=context,
)
...
Now the ansible-rulebook can subscribe the message from kafka:
# ansible-rulebook --rulebook kafka-example.yml -i inventory.yml --verbose
INFO:ansible_rulebook.app:Starting sources
INFO:ansible_rulebook.app:Starting rules
INFO:ansible_rulebook.engine:run_ruleset
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
INFO:ansible_rulebook.engine:ruleset define: {"name": "Listen for events on a kafka", "hosts": ["all"], "sources": [{"EventSource": {"name": "ansible.eda.kafka", "source_name": "ansible.eda.kafka", "source_args": {"host": "test.kafka.example.com", "port": 443, "topic": "spec", "group_id": "hub1", "offset": "latest"}, "source_filters": []}}], "rules": [{"Rule": {"name": "Say Hello", "condition": {"AllCondition": [{"EqualsExpression": {"lhs": {"Event": "message"}, "rhs": {"String": "Ansible is super cool"}}}]}, "action": {"Action": {"action": "run_playbook", "action_args": {"name": "say-what.yml"}}}, "enabled": true}}]}
INFO:ansible_rulebook.engine:load source
INFO:ansible_rulebook.engine:load source filters
INFO:ansible_rulebook.engine:Calling main in ansible.eda.kafka
INFO:aiokafka.helpers:Loading SSL CA from ./ca.crt
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'spec'})
INFO:ansible_rulebook.engine:Waiting for event from Listen for events on a kafka
INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 0 for group hub1
INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group hub1
INFO:aiokafka.consumer.group_coordinator:(Re-)joining group hub1
INFO:aiokafka.consumer.group_coordinator:Joined group 'hub1' (generation 1) with member_id aiokafka-0.8.0-33046b26-58ea-4458-81ec-d7fd14bcc633
INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using roundrobin
INFO:aiokafka.consumer.group_coordinator:Successfully synced group hub1 with generation 1
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='spec', partition=0)} for group hub1
Do I need to open another issue for the SSL support for kafka plugin?
since you have working code, why not submit a pull request?