spring-projects/spring-integration-aws

SNS MessageAttributes propagation to SQS in case raw message delivery is disabled

bikerp opened this issue · 9 comments

bikerp commented

When SQS subscription is set for SNS topic without raw message delivery option enabled then message attributes from original SNS message are wrapped in Body attribute.
This causes that tracing information is not found

Expected beahvior:
Message attributes should be mapped depending of raw message delivery option

I'm not fully sure what is going on here.
Would you mind to share more info, use-cases, some samples?
If you have already an idea in your mind how to fix, I'd be glad to review respective Pull Request.

bikerp commented

Here is the example of SQS message that has Raw delivery enbled. As you can see MessageAttributes are present including tracing information in b3 attribute. In that case tracing propagation works just fine

{
  "Messages": [
    {
      "MessageId": "6d4beb30-2b87-4870-b8aa-f3d6dd07d74e",
      "ReceiptHandle": "MGRlNjRjMGItNWIyZS00YjYxLWIyZTQtZmFlNzhmMGQ1ZmU1IGFybjphd3M6c3FzOmV1LXdlc3QtMTowMDAwMDAwMDAwMDA6c3FzLWNsaS1sb2NhbHN0YWNrLXJhcyA2ZDRiZWIzMC0yYjg3LTQ4NzAtYjhhYS1mM2Q2ZGQwN2Q3NGUgMTY4NTcxMDk3NS40MzU2NzE2",
      "MD5OfBody": "a8e03a1f60fcce3753d49c993a0f5721",
      "Body": "Hello message",
      "Attributes": {
        "SenderId": "000000000000",
        "SentTimestamp": "1685710975434",
        "ApproximateReceiveCount": "1",
        "ApproximateFirstReceiveTimestamp": "1685710975435"
      },
      "MD5OfMessageAttributes": "c33345af991f90db9ab13ba4c800a7ae",
      "MessageAttributes": {
        "b3": {
          "StringValue": "d5091d50d920828c-3e2c0ff1bcd41283-0",
          "DataType": "String"
        }
      }
    }
  ]
}

However this is how SQS message looks when Raw delivery is not enabled:

{
  "Messages": [
    {
      "MessageId": "4bcc90c7-5b7c-462d-bf47-da4ab0f3d32a",
      "ReceiptHandle": "ZmY2Y2VjNTgtMDNlMi00NzQwLWIxM2QtNDU1ZDk1Y2I1NmMxIGFybjphd3M6c3FzOmV1LXdlc3QtMTowMDAwMDAwMDAwMDA6c3FzLWJmZi1sb2NhbHN0YWNrLXJhcyA0YmNjOTBjNy01YjdjLTQ2MmQtYmY0Ny1kYTRhYjBmM2QzMmEgMTY4NTcwMjQ2NS4wNjQ5ODc0",
      "MD5OfBody": "c4cf7c76b3329ffbc93bf80f43cbe35a",
      "Body": "{\"Type\": \"Notification\", \"MessageId\": \"950ffc8c-f390-4bd8-9203-2cf7ef3b1fec\", \"TopicArn\": \"arn:aws:sns:eu-west-1:000000000000:sns-bff-localstack-ras\", \"Message\": \"Hello message\", \"Timestamp\": \"2023-06-02T10:40:50.072Z\", \"SignatureVersion\": \"1\", \"Signature\": \"EXAMPLEpH+..\", \"SigningCertURL\": \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem\", \"UnsubscribeURL\": \"http://localhost:4566/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:000000000000:sns-bff-localstack-ras:17c7a1c0-215b-4d5a-8669-dad566df4aa6\", \"MessageAttributes\": {\"b3\": {\"Type\": \"String\", \"Value\": \"072a7fbcd3e57527-837904d83ef49e81-0\"}, \"subject\": {\"Type\": \"String\", \"Value\": \"subject\"}, \"foo\": {\"Type\": \"String\", \"Value\": \"bar\"}, \"topic\": {\"Type\": \"String\", \"Value\": \"topic\"}}}",
      "Attributes": {
        "SenderId": "000000000000",
        "SentTimestamp": "1685702450082",
        "ApproximateReceiveCount": "1",
        "ApproximateFirstReceiveTimestamp": "1685702465064"
      }
    }
  ]
}

MessageAttributes are wrapped inside Body and the tracing propagation doesn't work.

I was thinking if I can register some custom header extractor and extract attributes from the Body it would solve the issue.
Is it possible?

Well, it is a bit different, than what is explained in the official AWS docs: https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html.
But probably I still didn't get it and they talk about whatever is inside that Body attribute.
The content of this article is more closer to what you are showing: https://realbigdeo.medium.com/raw-messaging-delivery-in-aws-sns-sqs-subscriptions-2b683d657b01
Please, confirm that I understood it correctly.

So, if that a case and you indeed uses an SnsInboundChannelAdapter to receive those messages from the topic, then you need to know that the whole received SNS message is converted to a HashMap and sent downstream as a payload.
Therefore if you know that you have received a disabled "raw delivery" format, then you need to have a downstream transformer which would extract a required info from that Body entry.

If you can share with us more info that some indicator is present in the SubscriptionConfirmation message type, then we may be able to adjust SnsInboundChannelAdapter for extracting that info automatically.

bikerp commented

Hi @artembilan , thanks for your involment.
I confirm that this is the case described in that article. However I'm not talking about receiving SNS messages but SQS messages and I'm using SqsMessageDrivenChannelAdapter

OK. So, probably there is no any clues in the incoming SQS message to determine that it was raw or not.
Then the logic remains with a custom transformer downstream as I explained above.
However, in that case, the SQS just returns for us a String for its body, so you probably would need to deserialize it into a Map using standard JsonToObjectTransformer and then the parsing logic to extract attributes and SNS Message into headers and payload respectively in the next transformer.

If there is any SQS attribute in the incoming message indicating that Body is an SNS message with attributes, we can look into some property to be added into that SqsMessageDrivenChannelAdapter to deal with that disabled raw delivery.

bikerp commented

I don't think that there is such attribute indicating raw delivery. Maybe SqsMessageDrivenChannelAdapter could implement the appropriate logic depending setRawDelivery(boolean)
Anyway I could implement custom extraction as you described. The only thing I don't know is how to hook my custom extractor into message processing. Can you give me a hint?

Not sure what you mean.
This project is fully based on Spring Integration and just provides channel adapters and utilities for AWS.
Everything else is a messaging flow. So, you produce data into the request channel of that SqsMessageDrivenChannelAdapter, then you subscribe to that channel to do some transformation and so building a pipes-n-filters structure.
See more info in the docs: https://docs.spring.io/spring-integration/docs/current/reference/html/index.html.
And of course, learn what is EIP and how they can be combined to the flow via message channels in between endpoints.

bikerp commented

I'm aware of EIP but this topic concerns instrumentation at low level. And this is where I'm lost. It probably involves TracingChannelInterceptor but honestly I don't know what needs to be done to register custom code to extract b3 tracing data from the SQS message.

Custom transformer to parse that b3 attribute from the payload of that complex Body. There you build a new message and probably set this b3 into a message header yo are going to return from this transformer.
Then that TracingChannelInterceptor on the output channel of this transformer probably can populate the current span into the context based on that b3 header.