qhenkart/gosqs

Wait group for senders

Opened this issue · 1 comments

lzap commented

Hey,

I am digging in the codebase and testing it, I have found a bug. Sending messages in separate gouroutines is not synchronized, meaning when a service is asked to shutdown and the main function finishes, all other goroutines will die immediately.

To solve this, I've implemented a simple wait group in my copy of your code:

type client struct {
	sqs      *sqs.Client
	queueURL string
	logger   log.Logger
	senderWG sync.WaitGroup // THIS

	handlers     map[string]Handler
	heartbeatSec int
	maxBeats     int
	workerPool   int
	maxMessages  int
}

func (c *client) Enqueue(ctx context.Context, jobType string, body interface{}, extraAttributes ...string) error {
	bytes, err := json.Marshal(body)
	if err != nil {
		return err
	}

	deduplicationId := generateDeduplicationId()
	attributes := []string{"dedup_id", deduplicationId}
	if len(extraAttributes) > 0 {
		attributes = append(attributes, extraAttributes...)
	}
	sqsInput := &sqs.SendMessageInput{
		MessageBody:            aws.String(string(bytes)),
		MessageAttributes:      defaultSQSAttributes(jobType, attributes...),
		MessageGroupId:         aws.String(jobType),
		MessageDeduplicationId: aws.String(deduplicationId),
		QueueUrl:               aws.String(c.queueURL),
	}

	c.senderWG.Add(1) // HERE
	go c.sendDirectMessage(ctx, sqsInput)
	return nil
}

func (c *client) sendDirectMessage(ctx context.Context, input *sqs.SendMessageInput, retryCount ...int) {
	var count int
	if len(retryCount) != 0 {
		count = retryCount[0]
	}

	if count > maxRetryCount-1 {
		c.logger.Log(ctx, log.LogLevelError, "too many failures, giving up", nil)
		c.senderWG.Done()
		return
	}

	if _, err := c.sqs.SendMessage(ctx, input); err != nil {
		if err.Error() == errDataLimit.Error() {
			c.logger.Log(ctx, log.LogLevelError, "payload limit overflow, giving up", nil)
			c.senderWG.Done()
			return
		}

		c.logger.Log(ctx, log.LogLevelWarn, "error publishing, trying again in 10 seconds: "+err.Error(), nil)
		time.Sleep(10 * time.Second)
		c.sendDirectMessage(ctx, input, count+1)
	} else {
		c.logger.Log(ctx, log.LogLevelTrace, "message sent", nil)
		c.senderWG.Done()
	}
}

func (c *client) Wait() {
	c.senderWG.Wait()
}

Then in the main function, I call Wait() at the very end to ensure all sender goroutines are finished before the service gracefully shutdowns.

Apologies I am not sending a patch, my copy of your codebase diverged quite a bit, my usecase is different. Cheers!

@lzap I really appreciate you taking the time to come and create an issue with your findings