qhenkart/gosqs

Use timer for extending visitibity

Opened this issue · 1 comments

lzap commented

Hey, thanks for the library, I have an improvement, however, I work with a copy of your code (my use case is slightly different). I have found out that the time visibility extending goroutine always sleeps even after the message was processed. This possibly can leave many goroutines sleeping for no reason when under load.

More efficient implementation is to use timer, it should be very easy to modify your version of the code as it's almost the same:

func (c *client) extend(ctx context.Context, m *sqsJob) {
	// add extra 10 seconds for HTTP REST processing
	tick := time.Duration(c.heartbeatSec-10) * time.Second
	timer := time.NewTimer(tick)
	count := 0
	for {
		if count >= c.maxBeats {
			c.logger.Log(ctx, log.LogLevelWarn, "exceeded maximum amount of heartbeats", nil)
			return
		}
		count++

		select {
		case <-m.err:
			// worker is done
			c.logger.Log(ctx, log.LogLevelTrace, "worker is done: "+*m.MessageId, nil)
			timer.Stop()
			return
		case <-timer.C:
			c.logger.Log(ctx, log.LogLevelTrace, "extending: "+*m.MessageId, nil)
			input := &sqs.ChangeMessageVisibilityInput{
				QueueUrl:          &c.queueURL,
				ReceiptHandle:     m.ReceiptHandle,
				VisibilityTimeout: int32(c.heartbeatSec),
			}
			_, err := c.sqs.ChangeMessageVisibility(ctx, input)
			if err != nil {
				c.logger.Log(ctx, log.LogLevelError, "unable to extend message visibility: "+err.Error(), nil)
				return
			}
			timer.Reset(tick)
		}
	}
}

Cheers!

@lzap thank you very much for the input!