apache/pulsar-dotpulsar

producer status evaluation before creating new producer

amareshmad opened this issue · 7 comments

Is there way to restrict/avoid producer creation for same topic for multiple instances.

Using the same producer name should fix that (only one will be allowed).

Thanks @blankensteiner. Consider now I have two different producers (for two different regions East & West and both are in connected status) and both are having same data, but I want to send only from one producer, if same data tries to send by another producer which was connected other region, it should not send. How to restrict this?

or

I have two producers for two different regions (East & West), if one fails, I have to send data to another (for this reason I have created two producers). if both are connected and sending same data to two different regions, will it affect/duplicate data at the consumer (consumer also active for two regions) of the produced data.

Is this what you are looking for? https://pulsar.apache.org/docs/concepts-messaging/#access-mode

If not, then you need some kind of leader election (which is not in DotPulsar's scope) to find out who is allowed to run.

@blankensteiner Is there batch concept in C# dot pulsar producer by default? if there how to disable batch for producer.

We can see in server it is not able to parse some metadata. what they are telling in java they have batch concept and as to disable then they can parse metadata info.

Similarly, for C# client, they could not parse metadata they are suspecting batch enable?

this is our code.
ExceptionHandler producerException = new ExceptionHandler();

                await using var client = PulsarClient.Builder()
                                                     .Authentication(new AuthenticationBasic(mqUserName, mqPassword))
                                                     .ServiceUrl(brokertUrl)
                                                     .ExceptionHandler(producerException.OnException)
                                                     .ConnectionSecurity(EncryptionPolicy.EnforceEncrypted)
                                                     .Build();

                log.Info($"Creating producer for brokerUrl:{brokertUrl}, topic url:{responseQueueName}.");
                await using var producer = client.NewProducer(Schema.String)
                                        .Topic(responseQueueName)
                                        .StateChangedHandler(DESPulsarProducerQueue.Monitor)
                                        .ProducerName(ProducerName)
                                        .Create();

                await ProduceMessages((IProducer<string>)producer, reponseData, messageBody, cts.Token);
                
                
                 private static async Task ProduceMessages(IProducer<string> producer, Dictionary<string, string> responseData, string messageBody, CancellationToken cancellationToken)
    {
        var delay = TimeSpan.FromSeconds(2);

        try
        {
            string messageBodyData = null;
            string intuitTid = null;
            string desJobId = null;

            var metadata = new MessageMetadata();

            foreach (var item in responseData)
            {
                metadata[item.Key] = item.Value;
                log.Info($"Response data key:{item.Key}, value:{item.Value}");
            }

            if (messageBody != null)
            {
                messageBodyData = messageBody;
            }
            else
            {
                    messageBodyData = "Response to west queue";
            }
          
            var messageId = await producer.Send(metadata, messageBodyData);
            log.Info($"Generated message id:{messageId}, intuit tid :{intuitTid}, des job id:{desJobId}");

            await Task.Delay(delay, cancellationToken);
        }         
        catch (Exception ex)
        {
            log.Error($"Producer exception for topic :{producer.Topic}, exception: {ex.Message}, stack trace :{ex.StackTrace}");
        }
    }

DotPulsar doesn't support batching on the producer side, only when reading/consuming.
Not sure how this is connected to your request? Did you have a look at the access modes?

@blankensteiner I have consumer received a message, acknowledged same but failed to process the message, I want ask broker to redeliver.

In case any failure in the acknowledged message to process, is there way we can ask broker to redeliver.
consumer.RedeliverUnacknowledgedMessages(messageId) => this is only for Unacknowledge.
what is the scenario for acknowledged to ask redeliver.

No, get the message, process it, and if successful when acknowledge.