Use timer for extending visitibity
Opened this issue · 1 comments
lzap commented
Hey, thanks for the library, I have an improvement, however, I work with a copy of your code (my use case is slightly different). I have found out that the time visibility extending goroutine always sleeps even after the message was processed. This possibly can leave many goroutines sleeping for no reason when under load.
More efficient implementation is to use timer, it should be very easy to modify your version of the code as it's almost the same:
func (c *client) extend(ctx context.Context, m *sqsJob) {
// add extra 10 seconds for HTTP REST processing
tick := time.Duration(c.heartbeatSec-10) * time.Second
timer := time.NewTimer(tick)
count := 0
for {
if count >= c.maxBeats {
c.logger.Log(ctx, log.LogLevelWarn, "exceeded maximum amount of heartbeats", nil)
return
}
count++
select {
case <-m.err:
// worker is done
c.logger.Log(ctx, log.LogLevelTrace, "worker is done: "+*m.MessageId, nil)
timer.Stop()
return
case <-timer.C:
c.logger.Log(ctx, log.LogLevelTrace, "extending: "+*m.MessageId, nil)
input := &sqs.ChangeMessageVisibilityInput{
QueueUrl: &c.queueURL,
ReceiptHandle: m.ReceiptHandle,
VisibilityTimeout: int32(c.heartbeatSec),
}
_, err := c.sqs.ChangeMessageVisibility(ctx, input)
if err != nil {
c.logger.Log(ctx, log.LogLevelError, "unable to extend message visibility: "+err.Error(), nil)
return
}
timer.Reset(tick)
}
}
}
Cheers!