OAuth-Callback error due to replacing _ with . in BaseSettings
Closed this issue · 3 comments
Tell us about the bug
We are considering using Quix as a streaming tool at our company. Our plan is to integrate it with Azure Event Hubs as the Kafka broker and use OAuth authentication with DefaultAzureCredential. While configuring a Quix app based on this example, I encountered the following error:
$ python example_app/quix_error.py
[2024-11-06 12:08:40,793] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': 'xxx.servicebus.windows.net:9093', 'security.protocol': 'sasl_ssl', 'sasl.mechanism': 'OAUTHBEARER', 'oauth.cb': functools.partial(<function get_token at 0x0000028CA89BA3E0>, 'xxx.servicebus.windows.net')}" consumer_group="quix-consumer" auto_offset_reset="latest" commit_interval=5.0s commit_every=0 processing_guarantee="at-least-once"
[2024-11-06 12:08:40,793] [INFO] [quixstreams] : Topics required for this application: "test-hub"
[2024-11-06 12:08:40,804] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
Traceback (most recent call last):
File "xxx/quix-example/example_app/quix_error.py", line 42, in <module>
main()
File "xxx/quix-example/example_app/quix_error.py", line 38, in main
app.run()
File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/app.py", line 688, in run
self._run()
File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/app.py", line 715, in _run
self.setup_topics()
File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/app.py", line 790, in setup_topics
self._topic_manager.validate_all_topics()
File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/manager.py", line 447, in validate_all_topics
actual_configs = self._admin.inspect_topics(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/admin.py", line 116, in inspect_topics
cluster_topics = self.list_topics(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/admin.py", line 97, in list_topics
return self.admin_client.list_topics(timeout=timeout).topics
^^^^^^^^^^^^^^^^^
File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/admin.py", line 85, in admin_client
self._inner_admin = AdminClient(self._config)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "xxx/quix-example/.venv/Lib/site-packages/confluent_kafka/admin/__init__.py", line 122, in __init__
super(AdminClient, self).__init__(conf)
cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "oauth.cb""}
Code to reproduce (Azure Event Hubs required):
from functools import partial
import os
from azure.identity import DefaultAzureCredential
from quixstreams import Application
from quixstreams.kafka import ConnectionConfig
def get_token(
namespace: str,
config: str,
) -> tuple[str, float]:
token = DefaultAzureCredential().get_token(f"https://{namespace}/.default")
return token.token, token.expires_on
def main() -> None:
namespace = os.environ["NAMESPACE"]
app = Application(
ConnectionConfig(
bootstrap_servers=f"{namespace}:9093",
security_protocol="sasl_ssl",
sasl_mechanism="OAUTHBEARER",
oauth_cb=partial(get_token, namespace),
)
)
input_topic = app.topic(name="test-hub", value_serializer="json")
sdf = app.dataframe(input_topic)
sdf = sdf.apply(print)
app.run()
if __name__ == "__main__":
main()
What did you expect to see?
I expected that oauth would work :-)
What version of the library are you using?
quixstreams 3.1.1 with confluent-kafka 2.4.0
Workaround?
In quixstreams.utils.settings
, the underscores in all field names of the Settings class are replaced by periods. It turns out that the correct configuration property in the Confluent Kafka client is actually ouath_cb
with an underscore. See, for example, here.
Current BaseSettings-Class:
class BaseSettings(_BaseSettings):
model_config = SettingsConfigDict(
alias_generator=AliasGenerator(
# used during model_dumps
serialization_alias=lambda field_name: field_name.replace("_", "."),
),
)
A workaround would be to use a little helper function with a list of properties that require underscores. I haven't found a comprehensive overview of all properties yet. I would open a PR for this solution.
Proposed Changes:
def _replace_underscores_if_applicable(field_name: str) -> str:
keep_underscores = [
"oauth_cb" # more fields?
]
if field_name in keep_underscores:
return field_name
return field_name.replace("_", ".")
class BaseSettings(_BaseSettings):
model_config = SettingsConfigDict(
alias_generator=AliasGenerator(
# used during model_dumps
serialization_alias=_replace_underscores_if_applicable,
),
)
Anything else we should know?
Another problem arises with the Confluent Kafka AdminClient but I will write a separate issue.
Hi @mkp-jansen,
Thanks for such a detailed report!
I see where the issue is 👍
We'll fix it in the next release, and I'll post an update here.
In the meantime, another workaround could be to pass oauth_cb
in consumer_extra_config
and/or producer_extra_config
dicts to the Application
.
These parameters are passed as-is.
from quixstreams import Application
from quixstreams.kafka.configuration import ConnectionConfig
app = Application(
broker_address=ConnectionConfig(bootstrap_servers="<bootstrap>"),
consumer_extra_config={"oauth_cb": <oauth callback>},
producer_extra_config={"oauth_cb": <oauth callback>},
)
thanks for the quick fix!
The fix was released in v3.2.0