faust-streaming/faust

Using Faust with Confluent cloud

dantodor opened this issue · 10 comments

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I'm trying to use Faust to do some processing on Kafka messages in Confluent Cloud.
Here's the example code I'm using:

broker_credentials=faust.SASLCredentials(
    mechanism=faust.types.auth.SASLMechanism.PLAIN,
    ssl_context=create_ssl_context(),
    username='XXX',
    password='XXX'
)

broker_credentials.protocol = AuthProtocol.SASL_SSL


app = faust.App('agents-demo',
                broker='kafka://XXX.cloud',
                store='memory://',
                broker_credentials=broker_credentials
                )

greetings_topic = app.topic('test-greetings', value_type=str, value_serializer='raw')

# Consumer
@app.agent(greetings_topic)
async def greet(stream):
    async for greeting in stream:
        print(greeting)

Topic is created and has some messages, just trying to read them and print them out.

Expected behavior

I would expect the messages to be printed out in the console

Actual behavior

It tries for a few times, then stops with this message:

[ERROR] [^Worker]: Error: PolicyViolationError('Cannot create topic: agents-demo-__assignor-__leader (44): Topic replication factor must be 3')

I also tried to set TOPIC_PARTITIONS=1 and TOPIC_REPLICATION_FACTOR=1

What am I doing wrong here ?
Thank you!

Full traceback

Traceback (most recent call last):
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/worker.py", line 273, in execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
    await child.maybe_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
    await child.maybe_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
    await self.on_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/assignor/leader_assignor.py", line 20, in on_start
    await self._enable_leader_topic()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/assignor/leader_assignor.py", line 24, in _enable_leader_topic
    await leader_topic.maybe_declare()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
    result = await self.fun(*self.args, **self.kwargs)
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/topics.py", line 455, in maybe_declare
    await self.declare()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/topics.py", line 468, in declare
    await producer.create_topic(
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 996, in create_topic
    await cast(Transport, self.transport)._create_topic(
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1164, in _create_topic
    await wrap()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
    result = await self.fun(*self.args, **self.kwargs)
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1252, in _really_create_topic
    raise for_code(code)(
kafka.errors.PolicyViolationError: [Error 44] PolicyViolationError: Cannot create topic: agents-demo-__assignor-__leader (44): Topic replication factor must be 3

Versions

  • Python version 3.8
  • Faust version 0.10.4
  • Operating system OsX
  • Kafka version Confluent Cloud
  • RocksDB version (if applicable)

facing same issue here

import faust
from faust.types.auth import AuthProtocol, SASLMechanism
from aiokafka.helpers import create_ssl_context

broker_credentials=faust.SASLCredentials(
    mechanism=SASLMechanism.PLAIN,
    ssl_context=create_ssl_context(),
    username='XXX',
    password='XXXX'
)

broker_credentials.protocol = AuthProtocol.SASL_SSL

app = faust.App(
    "greetings",
    broker="kafka://XXXX.us-east-2.aws.confluent.cloud:9092",
    value_serializer="json",
    broker_credentials=broker_credentials
)



greetings_topic = app.topic('greetings', replicas= 3)

@app.agent(greetings_topic)
async def greet(greetings):
    async for greeting in greetings:
        print(greeting)


ERROR : Cannot create topic: greetings-__assignor-__leader (44): Topic replication factor must be 3

A common issue is that your credentials lack permissions to create topics in your broker configuration. I suggest checking those and following up.

My application works with a Confluence cloud.
Try setting topic_replication_factor to 3.
https://faust.readthedocs.io/en/latest/userguide/settings.html#topic-replication-factor

If that does not work, try using this trick:

class MyTopic(faust.Topic):                                                     
     """                                                                         
    A workaround to change the replication factor                               
                                                                                
    https://github.com/faust-streaming/faust/issues/76                          
    """                                                                         
    def __init__(self, *args, **kwargs):                                        
    ¦   kwargs["replicas"] = int(os.environ.get("TOPIC_REPLICATION_FACTOR", 1)) 
    ¦   super().__init__(*args, **kwargs) 

and instantiate your topic from MyTopic.
You can set env var TOPIC_REPLICATION_FACTOR=3.
I got the idea from #76 (thx @ihor-rud )

Thanks @daigotanaka! I'll go ahead and close this.

I'm a little bit perplexed by the closing of this issue without a proper analysis.
From my personal experience, not too many people give full rights to a consumer, especially on Confluent Cloud. You know, the minimum permission principle. I fail to understand why a consumer should be able to create topics in order to read from a topic.
Long story short, the solution is to set topic_allow_declare to false, because it defaults to true and probably tries to create some utility topics in the Kafka cluster, action which would obviously fail if you gave only consume permissions.

@dantodor I feel I'm still new to Faust, but I see this in the doc:

This setting disables the creation of internal topics.
Faust will only create topics that it considers to be fully owned and managed, such as intermediate repartition topics, table changelog topics etc.
Some Kafka managers does not allow services to create topics, in that case you should set this to False.
https://faust-streaming.github.io/faust/userguide/settings.html#topic-allow-declare

This expectation seems reasonable to me as Faust needs to create internal topics to persist the state in case of failure and leader/partition assignment changes, etc.

I ran into the same error. Setting topic_allow_declare False worked, but resulted in a warning about the missing leader topic.

Setting topic_disable_leader True worked, and silenced the warning:

  app = faust.App(
      "hello-world",
      broker="xxxxxxxxxxx:9092",
      value_serializer="raw",
      broker_credentials=broker_credentials,
      topic_disable_leader=True)

I realize this is just a workaround (see @daigotanaka's comment), but it allowed me to continue with the tutorials on Confluent Cloud.

At the moment, I'm using a Confluent Cloud Basic cluster. If I'm reading this thread and the Confluent Cloud docs correctly, the settings that enable internal topic creation aren't available to Basic and Standard clusters:

You cannot edit cluster settings on Confluent Cloud on Basic or Standard clusters

I forget why I closed this... but I'll reopen it and try to add this solution into #418. It only makes sense to add these settings for Confluent Cloud brokers.

As mentioned by @wbarnha above this issue happen when the credentials used have missing ACL in place or if there are some cluster policy violations.

Here 2 possibile solutions:

  • create the topic used for partition assignation before hand. I used this technique with confluent cloud in combination with a terraform provider. Alternative ask a Kafka admin to create the topic for you.
  • add the necessary acl to let the kafka user creare the topic

Specifically looking at the error above seems like that there is a cluster violation policy, the topic must be created with a replication factor of 3, so the option 1 proposed above could definitely fix the issue, because when creating the topic a replication factor of 3 can be set. In case of option 2 TOPIC_REPLICATION_FACTOR must be set to 3

I also had this issue using confluent cloud and I fixed it by setting the setting topic_replication_factor=3, in the faust app initializer:

    app =  faust.App(
        app_name,
        broker=f"kafka://{config['bootstrap.servers']}",
        broker_credentials=broker_credentials,
        topic_replication_factor=3,  # <--- Set topic replication here
        web_port=web_port,
    )

Not sure if this is recommended but it solved it for me and I didn't have to disable the creation of faust topics. I can see that the faust leader topics were created successfully in my confluent dashboard.

This needs to be set on the app itself and not the topic replicas field, because this is the setting that the faust app will use when creating these leader topics.