quixio/quix-streams

OAuth-Callback error due to replacing _ with . in BaseSettings

Closed this issue · 3 comments

Tell us about the bug
We are considering using Quix as a streaming tool at our company. Our plan is to integrate it with Azure Event Hubs as the Kafka broker and use OAuth authentication with DefaultAzureCredential. While configuring a Quix app based on this example, I encountered the following error:

$ python example_app/quix_error.py 
[2024-11-06 12:08:40,793] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': 'xxx.servicebus.windows.net:9093', 'security.protocol': 'sasl_ssl', 'sasl.mechanism': 'OAUTHBEARER', 'oauth.cb': functools.partial(<function get_token at 0x0000028CA89BA3E0>, 'xxx.servicebus.windows.net')}" consumer_group="quix-consumer" auto_offset_reset="latest" commit_interval=5.0s commit_every=0 processing_guarantee="at-least-once"
[2024-11-06 12:08:40,793] [INFO] [quixstreams] : Topics required for this application: "test-hub"
[2024-11-06 12:08:40,804] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
Traceback (most recent call last):
  File "xxx/quix-example/example_app/quix_error.py", line 42, in <module>
    main()
  File "xxx/quix-example/example_app/quix_error.py", line 38, in main
    app.run()
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/app.py", line 688, in run
    self._run()
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/app.py", line 715, in _run
    self.setup_topics()
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/app.py", line 790, in setup_topics
    self._topic_manager.validate_all_topics()
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/manager.py", line 447, in validate_all_topics
    actual_configs = self._admin.inspect_topics(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/admin.py", line 116, in inspect_topics
    cluster_topics = self.list_topics(timeout=timeout)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/admin.py", line 97, in list_topics
    return self.admin_client.list_topics(timeout=timeout).topics
           ^^^^^^^^^^^^^^^^^
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/admin.py", line 85, in admin_client
    self._inner_admin = AdminClient(self._config)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "xxx/quix-example/.venv/Lib/site-packages/confluent_kafka/admin/__init__.py", line 122, in __init__
    super(AdminClient, self).__init__(conf)
cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "oauth.cb""}

Code to reproduce (Azure Event Hubs required):

from functools import partial
import os

from azure.identity import DefaultAzureCredential
from quixstreams import Application
from quixstreams.kafka import ConnectionConfig


def get_token(
    namespace: str,
    config: str,
) -> tuple[str, float]:
    token = DefaultAzureCredential().get_token(f"https://{namespace}/.default")
    return token.token, token.expires_on


def main() -> None:

    namespace = os.environ["NAMESPACE"]

    app = Application(
        ConnectionConfig(
            bootstrap_servers=f"{namespace}:9093",
            security_protocol="sasl_ssl",
            sasl_mechanism="OAUTHBEARER",
            oauth_cb=partial(get_token, namespace),
        )
    )

    input_topic = app.topic(name="test-hub", value_serializer="json")

    sdf = app.dataframe(input_topic)
    sdf = sdf.apply(print)

    app.run()


if __name__ == "__main__":
    main()

What did you expect to see?
I expected that oauth would work :-)

What version of the library are you using?
quixstreams 3.1.1 with confluent-kafka 2.4.0

Workaround?
In quixstreams.utils.settings, the underscores in all field names of the Settings class are replaced by periods. It turns out that the correct configuration property in the Confluent Kafka client is actually ouath_cb with an underscore. See, for example, here.

Current BaseSettings-Class:

class BaseSettings(_BaseSettings):
    model_config = SettingsConfigDict(
        alias_generator=AliasGenerator(
            # used during model_dumps
            serialization_alias=lambda field_name: field_name.replace("_", "."),
        ),
    )

A workaround would be to use a little helper function with a list of properties that require underscores. I haven't found a comprehensive overview of all properties yet. I would open a PR for this solution.

Proposed Changes:

def _replace_underscores_if_applicable(field_name: str) -> str:
    keep_underscores = [
        "oauth_cb"  # more fields?
    ]

    if field_name in keep_underscores:
        return field_name
    
    return field_name.replace("_", ".")

class BaseSettings(_BaseSettings):
    model_config = SettingsConfigDict(
        alias_generator=AliasGenerator(
            # used during model_dumps
            serialization_alias=_replace_underscores_if_applicable,
        ),
    )

Anything else we should know?
Another problem arises with the Confluent Kafka AdminClient but I will write a separate issue.

Hi @mkp-jansen,
Thanks for such a detailed report!
I see where the issue is 👍

We'll fix it in the next release, and I'll post an update here.

In the meantime, another workaround could be to pass oauth_cb in consumer_extra_config and/or producer_extra_config dicts to the Application.
These parameters are passed as-is.

from quixstreams import Application
from quixstreams.kafka.configuration import ConnectionConfig

app = Application(
    broker_address=ConnectionConfig(bootstrap_servers="<bootstrap>"),
    consumer_extra_config={"oauth_cb": <oauth callback>},
    producer_extra_config={"oauth_cb": <oauth callback>},
)

thanks for the quick fix!

The fix was released in v3.2.0