spring-projects/spring-integration-aws

Support Reactive Streams API for S3 protocol

migroskub opened this issue ยท 13 comments

Hey. I would like to suggest support for Project Reactor when integrating with AWS and Spring Integration. I suggest the ability to write a Flux to S3, consuming Mono and more. Here is a good example.

<awssdk.version>2.10.27</awssdk.version>

We are aware about Reactive Streams API in the AWS SDK v2. Only the problem that we have to migrate into that version first and then we can see what we could do with the Reactive Streams support: #155.

I'm not closing this issue in favor of that, but rephrase it for S3 specifics, so we can come back to this when we do migration to AWS SDK v2.

Ok. I didn't know about the SDK v2 capabilities. Sounds great. Can you tell about the timeline expectations for the SDK v2 migrations?

Read the issue I've pointed in my answer.
And follow links to other related issues.
I cannot judge from here until we got some answers in the Spring Cloud AWS project.

+1

Are there any classes that can still work with internal implementation that is based on V2? Or maybe the case is that all of the existing classes doesn't support Reactor?

Technically even existing AWS SKD v1 is compatible with Reactive Stream - its has an async variants for AWS services, which we really use in this Spring Integration for AWS project. So, you are good to develop Reactive solution even right now.

Can you refer to the classes that are relevant for the asynchronous operations?

@artembilan So the outbound channel supports reactor or not? The docs (that doesn't appear in the website) doesn't mention this. Why?

I'd really like to see an example of usage of the OutboundGateway for S3 in the JavaDSL

It would be great to determine in your opinion what is compatibility with Reactor or not.
As I said before: the AWS API in most cases is async, so it can be adapted to Reactor Flux and Mono.
There is no such a docs just because these component does not expose reactive API directly.
When they will, it would be reflected in docs.
However I treat any async API as compatible with reactive streams.
More over even blocking one could be shifted to the specific reactive scheduler do not block the whole stream, but wait for the result asynchronously.

The S3MessageHandler can be configured as a bean and used in the .handle() of Java DSL.
The produceReply option of it has to be set to true to make it working as a gateway.
It does return a com.amazonaws.services.s3.transfer.Transfer as a result of the publishing to S3.
You can adapter that object to reactive streams (Reactor Mono) via its public void addProgressListener(ProgressListener listener); hook.
But this already out of Spring Integration scope.

The docs is mentioning how to use an arbitrary channel adapter in Java DSL: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-protocol-adapters

@artembilan After looking at the code I see 2 things that makes me wonder if its really async and non blocking (e.g. safe to use with reactor).

  1. There is no use of S3AsyncClient
  2. There is usage of InputStream when passing byte[] to the upload command. Source - https://stackoverflow.com/questions/67476133/upload-a-inputstream-to-aws-s3-asynchronously-non-blocking-using-aws-sdk-for-j

@jifeiiii ,

you probably didn't get the main point of this issue: we cannot use AWS SDK v2 yet. The Client you mention is missed in the version of AWS SKD we currently support in this project: https://www.baeldung.com/java-aws-s3-reactive.

See JavaDocs of the current API we use in the S3MessageHandler:


    /**
     * <p>
     * Schedules a new transfer to upload data to Amazon S3. This method is
     * non-blocking and returns immediately (i.e. before the upload has
     * finished).
     * </p>
     * <p>
     * Use the returned <code>Upload</code> object to query the progress of the
     * transfer, add listeners for progress events, and wait for the upload to
     * complete.
     * </p>
     * <p>
     * If resources are available, the upload will begin immediately.
     * Otherwise, the upload is scheduled and started as soon as
     * resources become available.
     * </p>
     * <p>
     * If you are uploading <a href="http://aws.amazon.com/kms/">Amazon Web Services
     * KMS</a>-encrypted objects, you need to specify the correct region of the
     * bucket on your client and configure Amazon Web Services Signature Version 4 for added
     * security. For more information on how to do this, see
     * http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#
     * specify-signature-version
     * </p>
     *
     * @param putObjectRequest
     *            The request containing all the parameters for the upload.
     *
     * @return A new <code>Upload</code> object to use to check
     * 		   the state of the upload, listen for progress notifications,
     * 		   and otherwise manage the upload.
     *
     * @throws AmazonClientException
     *             If any errors are encountered in the client while making the
     *             request or handling the response.
     * @throws AmazonServiceException
     *             If any errors occurred in Amazon S3 while processing the
     *             request.
     */
    public Upload upload(final PutObjectRequest putObjectRequest)

That's from where I make my assumptions about this channel adapter possible usage in the reactive streams.