faust-streaming/faust

(question): signal, when a topic an agent tries to consume from is not present in the kafka cluster

SDJustus opened this issue · 6 comments

Hi folks,
is there a way to catch a signal or something similar, when a topic we try to consume from doesn't exists?
The only way i recognize this at the moment is the Error logged from the aiokafka.cluster with the message: Topic xxxxxxx not found in cluster metadata

In case you consume the topic:

As the assignor only logs an error, I guess we do not have a signal or error to catch. What you could do though is overwriting the partitioner class when initializing the app. That way you could inherit faust PartitionAssignor and overwrite the _get_copartitioned_groups function, probably call the super functionality check it's result and raise an error or return the result if nothing is wrong.

In case you produce:
You can config kafka to create the topic automatically (if this is an option)

Hi,
thanks for the very quick response.

Overwriting the partitioner class helps me a lot. So that solves the question on the consumer side.

The auto creation of topic is not an option for me unfortunatly, but I guess if the send() call on the topic fails, I have at an indication, that the topic might be missing (the send() call could fail on other things as well though).

So if there is no other way of catching the errors on producer side, I'll close this issue.
Thank you very much for the help. Much appreciated :)

I guess what you could do is get the aiokafka producer object and check the cluster metadata (on send error or also periodically 🤔) and do something like aiokafka itself: https://github.com/aio-libs/aiokafka/blob/00349a841b5b411ada7a3596817439b9d583e16d/aiokafka/cluster.py#L67-L90 and do your custom handling. I didn't see any way in faust to handle this.

I already tried this but I didn't find a way to access the driver from the faust app... Is there any way or do I have to somehow instanciate another producer to check that?

sorry for the delayed answer. The AIOKafkaProducer is accessible via

app.producer._ensure_producer()

But be careful. app.producer can be None, and as long as the producer is not started _ensure_producer() will raise faust.exception.NotReady

You could also use app.producer._producer but _producer could also be None until the producer is started.

Thank you very much, that was exactly, what I was looking for.