spring-cloud/spring-cloud-function

Spring Cloud Function issue for reactive pipeline

KafkaProServerless opened this issue · 1 comments

Hello team,

I would like to reach out with a small issue:

  • What I am trying to achieve:

Send a flux of string to webapi and also to kafka in a reactive way.

  • Showing some code:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringIntegrationReactorKafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringIntegrationReactorKafkaApplication.class, args);
	}

}
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MyConfiguration {

    @Bean
    public KafkaSender<String, String> kafkaSender() {
        final Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return KafkaSender.create(SenderOptions.create(properties));
    }

    @Bean
    public WebClient getWebClient(final WebClient.Builder builder) {
        final var clientHttpConnector = new ReactorClientHttpConnector(HttpClient.create().wiretap(true).protocol(HttpProtocol.HTTP11));
        return builder.baseUrl("http://localhost:9090/api/bulk").defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).clientConnector(clientHttpConnector).build();
    }

}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;

import java.time.Duration;

@Service
public final class MyRun implements CommandLineRunner {

    @Autowired
    SendToService sendToService;

    @Override
    public void run(final String... args) {
        sendToService.sendTo(someFlux()).subscribe();
    }

    public Flux<String> someFlux() {
        return Flux.interval(Duration.ofSeconds(1))
                .map(one -> {
                    System.out.println("current : " + one);
                    return one.toString();
                });
    }

}
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;

import java.time.Duration;
import java.util.function.Function;

@Service
public final class SendToService {

    @Autowired
    FunctionCatalog functionCatalog;

    @Autowired
    KafkaSender<String, String> kafkaSender;

    @Autowired
    WebClient webClient;

    public Flux<String> sendTo(final Flux<String> stringFlux) {
        final Function<Flux<String>, Flux<String>> destinations = functionCatalog.lookup("sendFluxToWeb|sendFluxToKafka");
        return destinations.apply(stringFlux);
    }

    @Bean
    public Function<Flux<String>, Flux<String>> sendFluxToKafka() {
        return fooFlux ->
                kafkaSender.send(
                fooFlux.map(oneString -> SenderRecord.create(new ProducerRecord<>("first_topic", null, oneString), oneString)))
                .map(oneSenderResult -> oneSenderResult.correlationMetadata());
    }

    @Bean
    public Function<Flux<String>, Flux<String>> sendFluxToWeb() {
        return fooFlux -> fooFlux
                .bufferTimeout(10, Duration.ofSeconds(3))
                .map(logs -> {
                    System.out.println("client " + logs.getFirst());
                    return logs;
                })
                .flatMap(bulkRequest -> webClient.post().bodyValue(bulkRequest).exchangeToMono(oneClientResponse -> oneClientResponse.bodyToMono(String.class)).then())
                .thenMany(fooFlux);
    }

}
  • What did I try:

on this line:

        final Function<Flux<String>, Flux<String>> destinations = functionCatalog.lookup("sendFluxToWeb|sendFluxToKafka");

I tried these 4 combinations:

  1. functionCatalog.lookup("sendFluxToWeb")
  2. functionCatalog.lookup("sendFluxToKafka")
  3. functionCatalog.lookup("sendFluxToKafka|sendFluxToWeb")
  4. functionCatalog.lookup("sendFluxToWeb|sendFluxToKafka")
  • What is the expected result:

For case 1, I am expecting the flux to be sent to the external third party web api
For case 2, I am expecting the flux to be sent to kafka
For case 3, I am expecting the flux to be sent to both kafka and the third party web api
For case 4, I am expecting the flux to be sent to both the third party web api and to kafka

  • What is the actual result:

For case 1, 2 and 3, the actual result matches the expected result, happy here, no problem.
However, for case number 4, functionCatalog.lookup("sendFluxToWeb|sendFluxToKafka"), it is only sending to the external web api, but NOT to kafka

  • Question:

May I ask why case 4 result would differ from case 3?

What did I do wrong?

What to do to have case functionCatalog.lookup("sendFluxToWeb|sendFluxToKafka") working, i.e sending to both places?

Thank you so much for your time

So you are using function composition for reasons other then obtaining a composite result for final processing (i.e., send somewhere).
So, I would first discourage that as function composition and specifically its enhancements within s-c-function were not designed for that.

That said, I don't see why it should not work. However, i believe its a question unrelated to s-c-function and would be better asked in some reactive forum.
In fact to test it you don't even need s-c-function. You can just simply have two functions and manually compose them sendFluxToWeb().andThen(sendFluxToKafka()) and debug the issue.

If at some point you still believe it is s-c-function issue feel free to re-open it or open a new issue with more details