apache/pulsar-client-go

[Improve][Producer]Simplify the flush logic

gunli opened this issue · 0 comments

gunli commented

Is your feature request related to a problem? Please describe.
Currently, when partitionProducer.internalFlush() is called, it do the following steps:

  1. get the last pendingItem from pendingQueue;
  2. update the last pendingItem by appending a new sendRequest with nil msg and a new callback;
  3. in partitionProducer.ReceivedSendReceipt, identify the sendRequest by checking if the msg is nil.

This mixed the message sendRequests and the flush sendRequest together which make the code is broken by the if msg!=nil checks.

Describe the solution you'd like

  1. add a callback field to the pendingItem, default is nil;
  2. in partitionProducer.internalFlush() get the last pendingItem from pendingQueue;
  3. update the last pendingItem by setup a new callback;
  4. in partitionProducer.ReceivedSendReceipt, no need to identify the sendRequest by checking if the msg is nil;
  5. in pendingItem.Complete(), invoke its callback to notify the flush is done.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.
@merlimat @wolfstudy @RobertIndie