spring-cloud/spring-cloud-stream-samples

Can't use more than one Consumer

xwlcn opened this issue · 1 comments

xwlcn commented

yml:

bindings:
        store-in-0:
          group: torrent-store-group
          destination: torrentMessages
          contentType: application/json
          consumer:
            batch-mode: true
            compressionType: gzip
            autoCommitOffset: false
        index-in-0:
          group: torrent-index-group
          destination: torrentMessages
          contentType: application/json
          consumer:
            batch-mode: true
            max-poll-records: 100
            compressionType: gzip
            autoCommitOffset: false

java:

@Slf4j
@SpringBootApplication
public class TorrentStoreServiceApplication {

	@Autowired
	private TorrentService torrentService;
	public static Boolean filterXxx;
	@Value("${dodder.filter-sensitive-torrent}")
	public void setFilterXxx(Boolean filterXxx) {
		TorrentStoreServiceApplication.filterXxx = filterXxx;
	}

	public static void main(String[] args) {
		SpringApplication.run(TorrentStoreServiceApplication.class, args);
	}

	@Bean
	Consumer<Message<List<Torrent>>> index() {
		return message -> {
			try {
				Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
				List<Torrent> torrents = message.getPayload();
				//System.out.println(torrents);
				//torrentService.index(torrents);
				//no error, execute acknowledge
				if (acknowledgment != null) {
					acknowledgment.acknowledge();
				}
			} catch (Exception e) {
				log.error("Insert or update torrent error: {}", e);
			}
		};
	}

	@Bean
	Consumer<Message<List<Torrent>>> store() {
		return message -> {
			try {
				Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
				List<Torrent> torrents = message.getPayload();
				System.out.println(torrents);
				torrentService.upsert(torrents);
				//no error, execute acknowledge
				if (acknowledgment != null) {
					acknowledgment.acknowledge();
				}
			} catch (Exception e) {
				log.error("Insert or update torrent error: {}", e);
			}
		};
	}
}

console info:

Found more then one function beans in BeanFactory: [index, store]. If you did not intend to use functions, ignore this message. However, if you did intend to use functions in the context of spring-cloud-function, consider providing 'spring.cloud.function.definition' property pointing to a function bean(s) you intend to use. For example, 'spring.cloud.function.definition=myFunction'

Before using version 2.X, use @StreamListener annotation had no problem, but I wanted to do batch processing. Two different groups consumed the same topic, and the above code could not run correctly. I needed to comment out a consumer method will it can be run.

xwlcn commented

Addspring.cloud.function.definition=index;store it works.