rusenask/casbin-go-cloud-watcher

Kafka subscription url is incorrect

Opened this issue · 10 comments

if we use kafka driver with following command
watcher, err := cloudwatcher.New(ctx, "kafka://my-topic")
it will throw an error
Error while receiving an update message: pubsub (code=Unknown): no topics provided

if we use original kafka driver subscription url as follows
watcher, err := cloudwatcher.New(ctx, "kafka://group?topic=my-topic")
it will throw an error
open topic kafka://group?topic=my-topic: invalid query parameter "topic"

Findings

pubsub.OpenTopic(ctx, w.url) here OpenTopic method expects "kafka://my-topic" this type of url and pubsub.OpenSubscription(ctx, w.url) expects "kafka://group?topic=my-topic" this type of url

@wgarunap what about use "my-topic" in arg like:

watcher, err := cloudwatcher.New(ctx, "my-topic")

Then build the required strings inside the code of this repo? Does it work for you?

Thanks for replying @hsluoyz

did you mean args for topic_name and then create the connection_string? if yes, that works for me.

Also if we are subscribing kafka cluster with a group consumer, on application restart it will only receive the uncommitted messages by the consumer. Here in the watchr all messages are committed on success read. so the application will be stuck in the initial policy definition.

haven't used kafka with gocloud.dev but it might be that we just need to update deps. URL examples are here https://pkg.go.dev/gocloud.dev/pubsub/kafkapubsub#hdr-URLs

Hi @rusenask
Issue is not resolved by the dependency upgrade.

do you import kafka driver?

    _ "github.com/rusenask/casbin-go-cloud-watcher/drivers/kafkapubsub"
    ```

Yes, I am using the same driver.

Considering the limitations in the watcher, I have moved to a custom implementation.
https://github.com/wgarunap/casbin-kafka-watcher

Change suggested by @hsluoyz will resolve this url issue.

@wgarunap thanks for the contribution! Added to the list: casbin/casbin-website@a227194