[Improve][Producer]Simplify the flush logic
gunli opened this issue · 0 comments
gunli commented
Is your feature request related to a problem? Please describe.
Currently, when partitionProducer.internalFlush()
is called, it do the following steps:
- get the last
pendingItem
frompendingQueue
; - update the last
pendingItem
by appending a newsendRequest
with nil msg and a new callback; - in
partitionProducer.ReceivedSendReceipt
, identify thesendRequest
by checking if the msg is nil.
This mixed the message sendRequest
s and the flush sendRequest
together which make the code is broken by the if msg!=nil
checks.
Describe the solution you'd like
- add a callback field to the
pendingItem
, default is nil; - in
partitionProducer.internalFlush()
get the lastpendingItem
frompendingQueue
; - update the last
pendingItem
by setup a new callback; - in
partitionProducer.ReceivedSendReceipt
, no need to identify thesendRequest
by checking if the msg is nil; - in
pendingItem.Complete()
, invoke its callback to notify the flush is done.
Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.
Additional context
Add any other context or screenshots about the feature request here.
@merlimat @wolfstudy @RobertIndie