See the source code for this sample in this github repo: https://github.com/luiscoco/AWS_SNS_with_dotNET8_WebAPI_consumer
We navigate to SQS service in AWS Console
And we create a new SQS
We select the Standard queue and the queue name
For this example we leave the rest of the default values and press the Create queue button
We get this confirmation message after the queue was created
Now we press the Subscribe to Amazon SNS Topic button and we select the topic
Creating a .NET 8 Web API using Visual Studio Code (VSCode) and the .NET CLI is a straightforward process
This guide assumes you have .NET 8 SDK, VSCode, and the C# extension for VSCode installed. If not, you'll need to install these first
Step 1: Install .NET 8 SDK
Ensure you have the .NET 8 SDK installed on your machine: https://dotnet.microsoft.com/es-es/download/dotnet/8.0
You can check your installed .NET versions by opening a terminal and running:
dotnet --list-sdks
If you don't have .NET 8 SDK installed, download and install it from the official .NET download page
Step 2: Create a New Web API Project
Open a terminal or command prompt
Navigate to the directory where you want to create your new project
Run the following command to create a new Web API project:
dotnet new webapi -n SNSReceiverApi
This command creates a new directory with the project name, sets up a basic Web API project structure, and restores any necessary packages
Step 3: Open the Project in VSCode
Once the project is created, you can open it in VSCode by navigating into the project directory and running:
code .
This command opens VSCode in the current directory, where . represents the current directory
We run this command to add the Azure Service Bus library
dotnet add package AWSSDK.SimpleNotificationService
and
dotnet add package AWSSDK.SQS
We also have to add the Swagger and OpenAPI libraries to access the API Docs
This is the csproj file including the project dependencies
ReceiverController.cs
using Amazon.SQS;
using Amazon.SQS.Model;
using Microsoft.AspNetCore.Mvc;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace ServiceBusReceiverApi.Controllers
{
[ApiController]
[Route("api/[controller]")]
public class SnsSqsController : ControllerBase
{
private static string awsAccessKeyId = "AKIA54SNDJKIETHXVI6S";
private static string awsSecretAccessKey = "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";
private static string sqsQueueUrl = "https://sqs.eu-west-3.amazonaws.com/954718177936/myqueue";
private static AmazonSQSClient sqsClient = new AmazonSQSClient(awsAccessKeyId, awsSecretAccessKey, Amazon.RegionEndpoint.EUWest3);
private static ConcurrentQueue<MessageDto> receivedMessages = new ConcurrentQueue<MessageDto>();
[HttpGet("receive")]
public async Task<ActionResult<IEnumerable<MessageDto>>> ReceiveMessages(string? priorityFilter = null)
{
var receiveMessageRequest = new ReceiveMessageRequest
{
QueueUrl = sqsQueueUrl,
MaxNumberOfMessages = 10 // Adjust based on your needs
};
var response = await sqsClient.ReceiveMessageAsync(receiveMessageRequest);
foreach (var message in response.Messages)
{
string body = message.Body;
// Assuming 'priority' is a property or data within the message. Rename this if it conflicts.
string? messagePriority = priorityFilter; // Adjust this logic based on your message structure
Console.WriteLine($"Received message: {body}, Priority: {messagePriority}");
receivedMessages.Enqueue(new MessageDto { Body = body, Priority = messagePriority });
// Optionally, delete the message from the queue if it's successfully processed
await sqsClient.DeleteMessageAsync(sqsQueueUrl, message.ReceiptHandle);
}
if (string.IsNullOrEmpty(priorityFilter))
{
return receivedMessages.ToList();
}
else
{
return receivedMessages.Where(m => m.Priority == priorityFilter).ToList();
}
}
}
public class MessageDto
{
public string? Body { get; set; }
public string? Priority { get; set; }
}
}
Program.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.OpenApi.Models;
using ServiceBusReceiverApi.Controllers;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
// Add Swagger
builder.Services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "ServiceBusReceiverApi", Version = "v1" });
});
var app = builder.Build();
// Configure the HTTP request pipeline.
app.UseRouting();
app.UseSwagger();
app.UseSwaggerUI(c =>
{
c.SwaggerEndpoint("/swagger/v1/swagger.json", "ServiceBusReceiverApi v1");
});
app.UseAuthorization();
app.MapControllers();
app.Run();
We execute this command to run the application
dotnet run
We navigate to the application endpoint: http://localhost:5031/swagger/index.html
curl -X 'GET' \
'http://localhost:5051/api/SnsSqs/receive?priorityFilter=high' \
-H 'accept: text/plain'
After executing the above request we get this response
In the context of AWS SNS (Simple Notification Service) and SQS (Simple Queue Service), there's no direct "start message processing" mechanism similar to what you might use with Azure Service Bus,
where a background process continuously pulls messages from a queue or subscription
Instead, message consumption from an SQS queue is typically done by polling the queue to retrieve messages
However, you can simulate a continuous processing mechanism in your application by creating a background service in your .NET application that polls the SQS queue for messages at regular intervals
This can be achieved using .NET's IHostedService interface, which allows you to run background tasks in a web application
We can create a new file SqsMessageProcessor.cs for defining the messages HostedService processor
SqsMessageProcessor.cs
using Amazon.SQS;
using Amazon.SQS.Model;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
using System.Threading.Tasks;
public class SqsMessageProcessor : IHostedService, IDisposable
{
private Timer? _timer; // Make timer nullable
private readonly AmazonSQSClient _sqsClient;
private readonly string _queueUrl = "https://sqs.eu-west-3.amazonaws.com/954718177936/myqueue";
public SqsMessageProcessor()
{
Console.WriteLine("Initializing AmazonSQSClient with Region: EUWest3");
try
{
_sqsClient = new AmazonSQSClient(Amazon.RegionEndpoint.EUWest3);
Console.WriteLine("AmazonSQSClient initialization successful.");
}
catch (Exception ex)
{
Console.WriteLine($"Error initializing AmazonSQSClient: {ex.Message}");
throw;
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
// Poll every 1 second (adjust as necessary)
_timer = new Timer(async _ => await DoWork(), null, TimeSpan.Zero, TimeSpan.FromSeconds(1));
return Task.CompletedTask;
}
private async Task DoWork()
{
var receiveMessageRequest = new ReceiveMessageRequest
{
QueueUrl = _queueUrl,
MaxNumberOfMessages = 10 // Adjust based on your needs
};
try
{
var response = await _sqsClient.ReceiveMessageAsync(receiveMessageRequest);
foreach (var message in response.Messages)
{
Console.WriteLine($"Received message: {message.Body}");
// var deleteMessageRequest = new DeleteMessageRequest
// {
// QueueUrl = _queueUrl,
// ReceiptHandle = message.ReceiptHandle
// };
// await _sqsClient.DeleteMessageAsync(deleteMessageRequest);
}
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
// Handle exception (e.g., log the error)
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
public void Dispose()
{
_timer?.Dispose();
}
}
We also have to modify the application middleware for registering the background service
Program.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.OpenApi.Models;
using ServiceBusReceiverApi.Controllers;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
builder.Services.AddHostedService<SqsMessageProcessor>(); // Register the background service
// Add Swagger
builder.Services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "ServiceBusReceiverApi", Version = "v1" });
});
var app = builder.Build();
// Configure the HTTP request pipeline.
app.UseRouting();
app.UseSwagger();
app.UseSwaggerUI(c =>
{
c.SwaggerEndpoint("/swagger/v1/swagger.json", "ServiceBusReceiverApi v1");
});
app.UseAuthorization();
app.MapControllers();
app.Run();
This is the final project structure
When we test the application we verify the output for the background hostedservice
IMPORTANT NOTE: for running the HostedService please configure the AWS CLI running this command, and set the aws_access_key_id and the aws_secret_access_key
aws configure
IMPORTANT NOTE: also is very important to set the Visibility timeout to 1 second
Visibility timeout: when a message is received, SQS temporarily hides it from subsequent retrieve requests for a duration known as the visibility timeout
If you're not deleting messages after processing, ensure the visibility timeout is appropriately set for your use case to prevent immediate reprocessing by other consumers