Kafka subscription url is incorrect
Opened this issue · 10 comments
if we use kafka driver with following command
watcher, err := cloudwatcher.New(ctx, "kafka://my-topic")
it will throw an error
Error while receiving an update message: pubsub (code=Unknown): no topics provided
if we use original kafka driver subscription url as follows
watcher, err := cloudwatcher.New(ctx, "kafka://group?topic=my-topic")
it will throw an error
open topic kafka://group?topic=my-topic: invalid query parameter "topic"
Findings
pubsub.OpenTopic(ctx, w.url)
here OpenTopic
method expects "kafka://my-topic"
this type of url and pubsub.OpenSubscription(ctx, w.url)
expects "kafka://group?topic=my-topic"
this type of url
@wgarunap what about use "my-topic" in arg like:
watcher, err := cloudwatcher.New(ctx, "my-topic")
Then build the required strings inside the code of this repo? Does it work for you?
Thanks for replying @hsluoyz
did you mean args for topic_name
and then create the connection_string
? if yes, that works for me.
Also if we are subscribing kafka cluster with a group consumer, on application restart it will only receive the uncommitted messages by the consumer. Here in the watchr all messages are committed on success read. so the application will be stuck in the initial policy definition.
haven't used kafka with gocloud.dev but it might be that we just need to update deps. URL examples are here https://pkg.go.dev/gocloud.dev/pubsub/kafkapubsub#hdr-URLs
made a new release with updated deps, try https://github.com/rusenask/casbin-go-cloud-watcher/releases/tag/0.3.1
do you import kafka driver?
_ "github.com/rusenask/casbin-go-cloud-watcher/drivers/kafkapubsub"
```
Yes, I am using the same driver.
Considering the limitations in the watcher, I have moved to a custom implementation.
https://github.com/wgarunap/casbin-kafka-watcher
Change suggested by @hsluoyz will resolve this url issue.
@wgarunap thanks for the contribution! Added to the list: casbin/casbin-website@a227194