Spring Cloud Function issue for reactive pipeline
KafkaProServerless opened this issue · 1 comments
Hello team,
I would like to reach out with a small issue:
- What I am trying to achieve:
Send a flux of string to webapi and also to kafka in a reactive way.
- Showing some code:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringIntegrationReactorKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringIntegrationReactorKafkaApplication.class, args);
}
}
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyConfiguration {
@Bean
public KafkaSender<String, String> kafkaSender() {
final Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return KafkaSender.create(SenderOptions.create(properties));
}
@Bean
public WebClient getWebClient(final WebClient.Builder builder) {
final var clientHttpConnector = new ReactorClientHttpConnector(HttpClient.create().wiretap(true).protocol(HttpProtocol.HTTP11));
return builder.baseUrl("http://localhost:9090/api/bulk").defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).clientConnector(clientHttpConnector).build();
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import java.time.Duration;
@Service
public final class MyRun implements CommandLineRunner {
@Autowired
SendToService sendToService;
@Override
public void run(final String... args) {
sendToService.sendTo(someFlux()).subscribe();
}
public Flux<String> someFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(one -> {
System.out.println("current : " + one);
return one.toString();
});
}
}
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import java.time.Duration;
import java.util.function.Function;
@Service
public final class SendToService {
@Autowired
FunctionCatalog functionCatalog;
@Autowired
KafkaSender<String, String> kafkaSender;
@Autowired
WebClient webClient;
public Flux<String> sendTo(final Flux<String> stringFlux) {
final Function<Flux<String>, Flux<String>> destinations = functionCatalog.lookup("sendFluxToWeb|sendFluxToKafka");
return destinations.apply(stringFlux);
}
@Bean
public Function<Flux<String>, Flux<String>> sendFluxToKafka() {
return fooFlux ->
kafkaSender.send(
fooFlux.map(oneString -> SenderRecord.create(new ProducerRecord<>("first_topic", null, oneString), oneString)))
.map(oneSenderResult -> oneSenderResult.correlationMetadata());
}
@Bean
public Function<Flux<String>, Flux<String>> sendFluxToWeb() {
return fooFlux -> fooFlux
.bufferTimeout(10, Duration.ofSeconds(3))
.map(logs -> {
System.out.println("client " + logs.getFirst());
return logs;
})
.flatMap(bulkRequest -> webClient.post().bodyValue(bulkRequest).exchangeToMono(oneClientResponse -> oneClientResponse.bodyToMono(String.class)).then())
.thenMany(fooFlux);
}
}
- What did I try:
on this line:
final Function<Flux<String>, Flux<String>> destinations = functionCatalog.lookup("sendFluxToWeb|sendFluxToKafka");
I tried these 4 combinations:
functionCatalog.lookup("sendFluxToWeb")
functionCatalog.lookup("sendFluxToKafka")
functionCatalog.lookup("sendFluxToKafka|sendFluxToWeb")
functionCatalog.lookup("sendFluxToWeb|sendFluxToKafka")
- What is the expected result:
For case 1, I am expecting the flux to be sent to the external third party web api
For case 2, I am expecting the flux to be sent to kafka
For case 3, I am expecting the flux to be sent to both kafka and the third party web api
For case 4, I am expecting the flux to be sent to both the third party web api and to kafka
- What is the actual result:
For case 1, 2 and 3, the actual result matches the expected result, happy here, no problem.
However, for case number 4, functionCatalog.lookup("sendFluxToWeb|sendFluxToKafka")
, it is only sending to the external web api, but NOT to kafka
- Question:
May I ask why case 4 result would differ from case 3?
What did I do wrong?
What to do to have case functionCatalog.lookup("sendFluxToWeb|sendFluxToKafka") working, i.e sending to both places?
Thank you so much for your time
So you are using function composition for reasons other then obtaining a composite result for final processing (i.e., send somewhere).
So, I would first discourage that as function composition and specifically its enhancements within s-c-function were not designed for that.
That said, I don't see why it should not work. However, i believe its a question unrelated to s-c-function and would be better asked in some reactive forum.
In fact to test it you don't even need s-c-function. You can just simply have two functions and manually compose them sendFluxToWeb().andThen(sendFluxToKafka())
and debug the issue.
If at some point you still believe it is s-c-function issue feel free to re-open it or open a new issue with more details