apache/pulsar-dotpulsar

Multi-threaded consumption issues

gungod2000 opened this issue · 8 comments

hello everyone,

I wrote a console test program that creates a cosumerpool first in main, for example
-----List<ICosumer list = new .....
-----list.add(newcosumer);
Then I start multiple tasks to receive the message at the same time, then I will get an index out of range error.
Why is this, isn't it thread-safe?how to fix it(add lock is not a good idea,it's to slowly)

thanks.

Hi @gungod2000
Please fill out the bug report template, providing a full sample and stack trace.

It will be happen IndexOutOfRangeException .
thanks.

--------sample code-----
namespace WindowsFormsApp1
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}

    private static List<IConsumer<string>> ConsumerPool = new List<IConsumer<string>>();
    private void Form1_Load(object sender, EventArgs e)
    {
        // add new consumer into pool
        ConsumerPool.Add(new consumer()...);
    }

    private void button1_Click(object sender, EventArgs e)
    {
        
        //start some tasks to receive message
        for (int i = 0; i < 10; i++)
        {
            Task.Factory.StartNew(() =>
            {
                //if add lock in here ,it will be fine ,but too slowly.
                //get random consumer from pool
                var consumer = ConsumerPool.GetSingleConsumer();

                //receive message
                var mesage = consumer.receive();
            });
        }
    }
}

}

Please provide a full stack trace.

Dear @gungod2000,

First and foremost, I’d like to express my gratitude for taking the time to submit your issue. However, in order to assist you effectively, I kindly request that you adhere to the proper template. You can find it here. Please fill it out to the best of your ability.

Additionally, when submitting the new issue, please provide a sample where you experience the error. Create a GitHub repository and link to it so that us DotPulsar maintainers can thoroughly examine the code.

If you have any uncertainties regarding the bug reporting steps, feel free to ask here. However, since you didn’t follow the outlined instructions, I regret to inform you that this issue will be closed in approximately 30 days.

Thank you for your understanding.

Best regards, Entvex DotPulsar Maintainer

Dear @entvex @blankensteiner ,

I put the demo code here------------------------https://github.com/gungod2000/PulsarIssueDemo_4.8

appreciated。

Best regards

Dear @entvex @blankensteiner ,

I put the demo code here------------------------https://github.com/gungod2000/PulsarIssueDemo_4.8

appreciated。

Best regards

The link you posted just returns a 404

Hello Everyone,
Even I'm facing the same issue.
Lets say, If I have 10 parallel threads and trying to receive the messages in parallel, Im getting index out of range exception.

Please find the below code, I have created sample console program to reproduce the issue.

System.IndexOutOfRangeException: Index was outside the bounds of the array.
at DotPulsar.Internal.Consumer1.Receive(CancellationToken cancellationToken) at DotPulsar.Internal.Consumer1.d__34.MoveNext()

internal class Program
{

    private static ConcurrentQueue<ValueTask<IMessage<ReadOnlySequence<byte>>>> _messageConcurrentQueue = new();
    static async Task Main(string[] args)
    {
        var client = PulsarClient.Builder().ServiceUrl(new Uri("pulsar://X.X.X.X:6650/")).Build();
        string topic = $"persistent://Mytenant/MySubscription/MyTopic";
        var consumer = client.NewConsumer()
            .Topic(topic)
            .SubscriptionName("AutomationSubscription")
            .SubscriptionType(SubscriptionType.Shared)
            .Create();

        while (true)
        {
            if (_messageConcurrentQueue.Count() < 10)
            {
                _messageConcurrentQueue.Enqueue(consumer.Receive());
            }
            if (_messageConcurrentQueue.Count() > 0)
            {
                _messageConcurrentQueue.TryDequeue(out var MessageResult);
                if (MessageResult.IsCompleted)
                {
                    if (!MessageResult.IsCanceled && !MessageResult.IsFaulted)
                    {
                        var Result = MessageResult.Result;
                        if (Result != null && Result.Data.Length > 0)
                        {
                            await consumer.Acknowledge(Result);
                            System.Console.WriteLine( Encoding.UTF8.GetString(Result.Data.ToArray()));
                        }
                    }
                    else
                    {
                        if (MessageResult.IsFaulted)
                        {
                            System.Console.WriteLine(MessageResult.AsTask().Exception?.InnerException);
                        }
                    }
                }
                else
                    _messageConcurrentQueue.Enqueue(MessageResult);
            }
        }
    }
}

Regards
Savan Rangrej

@gungod2000 @savan-rangrej
You can now use the release candidate, if you want.