/go-queue-worker

Primary LanguageGoMIT LicenseMIT

go-queue-worker

This Go Module implements a queue worker.

Supported clients

The module supports SQS queue out of the box but you can pass any struct implementing the interface Client.

Basic usage (SQS)

import (
    "context"
    "fmt"
    "log"

    "github.com/francescopepe/go-queue-worker"
    workerSqs "github.com/francescopepe/go-queue-worker/clients/sqs"
    
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
)

func main() {
    ctx := context.Background()

    awsCfg, err := config.LoadDefaultConfig(ctx)
    if err != nil {
        log.Fatalln("Unable to create AWS config", err)
    }
    
    sqsSvc := sqs.NewFromConfig(awsCfg)
    
    wkr := worker.NewWorker(worker.Configuration{
        Client: workerSqs.NewSqsClient(workerSqs.SqsClientConfiguration{
            Svc: sqsSvc,
            ReceiveMessageInput: &sqs.ReceiveMessageInput{
                QueueUrl:            aws.String(os.Getenv("SQS_QUEUE_URL")),
                MaxNumberOfMessages: 10,
                VisibilityTimeout:   30,
                WaitTimeSeconds:     20,
            },
        }),
        Concurrency: 100,
        Retrievers:  2,
        ErrorConfig: worker.ErrorConfiguration{
            Threshold: 5,
            Period:    time.Minute * 1,
            ReportFunc: func(err error) {
                log.Println("Worker error", err)
            },
        },
        Consumer: worker.NewMultiMessageConsumer(worker.MultiMessageConsumerConfiguration{
            BufferConfig: worker.MultiMessageBufferConfiguration{
                Size:    500,
                Timeout: time.Second * 1,
            },
            Handler: func(ctx context.Context, msgs []interface{}) error {
                // Your logic here...


                log.Println("Got batch", len(msgs))
				
                // Assert the type of message to get the body or any other attributes 
                log.Println("First message body", *msgs[0].(types.Message).Body)
    
                return nil
            },
        }),
    })
    
    err = wkr.Run(ctx)
    if err != nil {
        log.Fataln("Worker stopped with error", err)
    }
}