Wait group for senders
Opened this issue · 1 comments
lzap commented
Hey,
I am digging in the codebase and testing it, I have found a bug. Sending messages in separate gouroutines is not synchronized, meaning when a service is asked to shutdown and the main function finishes, all other goroutines will die immediately.
To solve this, I've implemented a simple wait group in my copy of your code:
type client struct {
sqs *sqs.Client
queueURL string
logger log.Logger
senderWG sync.WaitGroup // THIS
handlers map[string]Handler
heartbeatSec int
maxBeats int
workerPool int
maxMessages int
}
func (c *client) Enqueue(ctx context.Context, jobType string, body interface{}, extraAttributes ...string) error {
bytes, err := json.Marshal(body)
if err != nil {
return err
}
deduplicationId := generateDeduplicationId()
attributes := []string{"dedup_id", deduplicationId}
if len(extraAttributes) > 0 {
attributes = append(attributes, extraAttributes...)
}
sqsInput := &sqs.SendMessageInput{
MessageBody: aws.String(string(bytes)),
MessageAttributes: defaultSQSAttributes(jobType, attributes...),
MessageGroupId: aws.String(jobType),
MessageDeduplicationId: aws.String(deduplicationId),
QueueUrl: aws.String(c.queueURL),
}
c.senderWG.Add(1) // HERE
go c.sendDirectMessage(ctx, sqsInput)
return nil
}
func (c *client) sendDirectMessage(ctx context.Context, input *sqs.SendMessageInput, retryCount ...int) {
var count int
if len(retryCount) != 0 {
count = retryCount[0]
}
if count > maxRetryCount-1 {
c.logger.Log(ctx, log.LogLevelError, "too many failures, giving up", nil)
c.senderWG.Done()
return
}
if _, err := c.sqs.SendMessage(ctx, input); err != nil {
if err.Error() == errDataLimit.Error() {
c.logger.Log(ctx, log.LogLevelError, "payload limit overflow, giving up", nil)
c.senderWG.Done()
return
}
c.logger.Log(ctx, log.LogLevelWarn, "error publishing, trying again in 10 seconds: "+err.Error(), nil)
time.Sleep(10 * time.Second)
c.sendDirectMessage(ctx, input, count+1)
} else {
c.logger.Log(ctx, log.LogLevelTrace, "message sent", nil)
c.senderWG.Done()
}
}
func (c *client) Wait() {
c.senderWG.Wait()
}
Then in the main function, I call Wait()
at the very end to ensure all sender goroutines are finished before the service gracefully shutdowns.
Apologies I am not sending a patch, my copy of your codebase diverged quite a bit, my usecase is different. Cheers!