Can't use more than one Consumer
xwlcn opened this issue · 1 comments
xwlcn commented
yml:
bindings:
store-in-0:
group: torrent-store-group
destination: torrentMessages
contentType: application/json
consumer:
batch-mode: true
compressionType: gzip
autoCommitOffset: false
index-in-0:
group: torrent-index-group
destination: torrentMessages
contentType: application/json
consumer:
batch-mode: true
max-poll-records: 100
compressionType: gzip
autoCommitOffset: false
java:
@Slf4j
@SpringBootApplication
public class TorrentStoreServiceApplication {
@Autowired
private TorrentService torrentService;
public static Boolean filterXxx;
@Value("${dodder.filter-sensitive-torrent}")
public void setFilterXxx(Boolean filterXxx) {
TorrentStoreServiceApplication.filterXxx = filterXxx;
}
public static void main(String[] args) {
SpringApplication.run(TorrentStoreServiceApplication.class, args);
}
@Bean
Consumer<Message<List<Torrent>>> index() {
return message -> {
try {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
List<Torrent> torrents = message.getPayload();
//System.out.println(torrents);
//torrentService.index(torrents);
//no error, execute acknowledge
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
} catch (Exception e) {
log.error("Insert or update torrent error: {}", e);
}
};
}
@Bean
Consumer<Message<List<Torrent>>> store() {
return message -> {
try {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
List<Torrent> torrents = message.getPayload();
System.out.println(torrents);
torrentService.upsert(torrents);
//no error, execute acknowledge
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
} catch (Exception e) {
log.error("Insert or update torrent error: {}", e);
}
};
}
}
console info:
Found more then one function beans in BeanFactory: [index, store]. If you did not intend to use functions, ignore this message. However, if you did intend to use functions in the context of spring-cloud-function, consider providing 'spring.cloud.function.definition' property pointing to a function bean(s) you intend to use. For example, 'spring.cloud.function.definition=myFunction'
Before using version 2.X, use @StreamListener annotation had no problem, but I wanted to do batch processing. Two different groups consumed the same topic, and the above code could not run correctly. I needed to comment out a consumer method will it can be run.
xwlcn commented
Addspring.cloud.function.definition=index;store
it works.