How to create NATS connection for the same topic with queue groups
Closed this issue · 1 comments
Hello,
I am publishing a message to a topic and making my durable consumer listen to that topic, if multiple processes are started to consume from same topic it raises exception as client is already registered. in kubernetes/openshift enviroment multiple pods will try to consume from the same topic so queu group would help to make sure only one of them receive the message, but what about the clientId? it would not start multiple containers with the same clientId?
pasted below is sample code:
@component
@slf4j
public class MessagePubSubImpl implements Closeable {
private final StreamingConnection connection;
private Subscription subscription;
@Autowired
public MessagePubSubImpl(final ApplicationProperties applicationProperties) throws IOException, InterruptedException {
Options options = new Options.Builder()
.natsUrl(applicationProperties.getNatsUrl())
.clusterId(applicationProperties.getNatsClusterId())
.clientId(applicationProperties.getNatsClientId()).build();
StreamingConnectionFactory connectionFactory = new StreamingConnectionFactory(options);
connection = connectionFactory.createConnection();
}
@PostConstruct
public void subscribe() throws InterruptedException, TimeoutException, IOException {
SubscriptionOptions options = new SubscriptionOptions.Builder().durableName("durable-consumer").build();
subscription = connection.subscribe("test-topic", "test", this::onMessage, options);
}
@OverRide
public void close() {
log.info("nats unsubscribe.");
if (subscription != null) {
try {
subscription.close();
} catch (final Exception e) {
log.warn("Ignoring unsubscribe exception", e);
}
}
log.info("nats unsubscribe, completed.");
}
when the 2nd process starts it prints
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-03-12 19:32:37.488 ERROR 25652 --- [ main] o.s.boot.SpringApplication : Application run failed
Caused by: java.io.IOException: stan: clientID already registered
at io.nats.streaming.StreamingConnectionImpl.connect(StreamingConnectionImpl.java:216) ~[java-nats-streaming-2.2.3.jar:2.2.3]
at io.nats.streaming.StreamingConnectionFactory.createConnection(StreamingConnectionFactory.java:78) ~[java-nats-streaming-2.2.3.jar:2.2.3]
at ca.bc.gov.educ.api.student.messaging.MessagePubSubImpl.(MessagePubSubImpl.java:35) ~[classes/:na]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_231]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_231]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_231]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_231]
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:200) ~[spring-beans-5.2.0.RELEASE.jar:5.2.0.RELEASE]
I see this issue is a bit old, but the client ID is not part of the queue group.
You cannot reuse the same client ID, so a random client ID should work well.
https://docs.nats.io/nats-streaming-concepts/channels/subscriptions/queue-group