cdi-rabbitmq-adapter
it is a convenient java library to make easy to integration with the rabbitMQ in project that use
CDI 2.0 or highers it may be a Java Se or a Jakarta EE application.
-
connection factory for long living single connections
-
managed simple, confirmed and transactional publishers that recover from connection loss
-
managed consumers that recover from connection loss and re-attach to the broker
-
create the rabbitMQ Schema
-
convenient integration for JakartaEE8/CDI applications
-
publishing of AMQP messages to exchanges for CDI events
-
consuming of AMQP messages from queues as CDI events
The basic principle that allows to integrate rabbitMQ in a JakartaEE environment is to use the facilities of the ** CDI** (Context Dependency Injection). Basically we need:
- to fire CDI events remotely, bind them to be published as messages to broker exchanges
- to observe CDI events remotely, bind them to be consumed as messages from broker queues
<dependency>
<groupId>io.github.jlmc</groupId>
<artifactId>cdi-rabbitmq-adapter</artifactId>
<version>1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.11.0</version>
<scope>runtime</scope>
</dependency>
A single connection factory always provides the same connection on calling newConnection() as long as the connection persists. A new connection is established as soon as the current connection is lost.
SingleConnectionFactory extends ConnectionFactory from the RabbitMQ standard library and is used just the same way as the factory from the standard library. The only difference: From now on you don't have to care about too many connections being established to a broker any more.
We must configure the Connection Factory using an implementation of ConnectionFactoryConfigurationsConfigurator
, the
definition of the class is mandatory:
import io.github.jlmc.cdi.adapter.amqp.rabbit.ConnectionFactoryConfigurations;
import io.github.jlmc.cdi.adapter.amqp.rabbit.ConnectionFactoryConfigurationsConfigurator;
import javax.annotation.Resource;
import javax.enterprise.concurrent.ManagedExecutorService;
import javax.enterprise.concurrent.ManagedScheduledExecutorService;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class DemoFactoryConfigurationsConfigurator implements ConnectionFactoryConfigurationsConfigurator {
@Resource(name = "DefaultManagedExecutorService", lookup = "java:comp/DefaultManagedExecutorService")
ManagedExecutorService mes;
@Resource(name = "DefaultManagedScheduledExecutorService", lookup = "java:comp/DefaultManagedScheduledExecutorService")
ManagedScheduledExecutorService mses;
@Override
public ConnectionFactoryConfigurations configure() {
return ConnectionFactoryConfigurations.defaults()
.setUsername("admin")
.setPassword("admin")
.setVirtualHost("example")
.setExecutorService(mes)
.setHeartbeatExecutor(mses);
}
}
We can create the rabbitMQ schema by implementing the DeclarablesConfigurator
import io.github.jlmc.cdi.adapter.amqp.rabbit.DeclarablesConfigurator;
import io.github.jlmc.cdi.adapter.amqp.rabbit.core.Binding;
import io.github.jlmc.cdi.adapter.amqp.rabbit.core.BindingBuilder;
import io.github.jlmc.cdi.adapter.amqp.rabbit.core.Declarables;
import io.github.jlmc.cdi.adapter.amqp.rabbit.core.DirectExchange;
import io.github.jlmc.cdi.adapter.amqp.rabbit.core.ExchangeBuilder;
import io.github.jlmc.cdi.adapter.amqp.rabbit.core.FanoutExchange;
import io.github.jlmc.cdi.adapter.amqp.rabbit.core.Queue;
import io.github.jlmc.cdi.adapter.amqp.rabbit.core.QueueBuilder;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class DemoDeclarablesConfigurator implements DeclarablesConfigurator {
@Override
public Declarables configure() {
DirectExchange exchangeDirect =
ExchangeBuilder.directExchange("x.direct")
.durable(true)
.build();
FanoutExchange exchangeFanout =
ExchangeBuilder.fanoutExchange("x.fanout")
.durable(true)
.build();
Queue queueA =
QueueBuilder.durable("q.queueA")
.deadLetterExchange(exchangeFanout.getName())
.deadLetterRoutingKey("A")
.build();
Queue queueB =
QueueBuilder.durable("q.queueB")
.deadLetterExchange(exchangeFanout.getName())
.deadLetterRoutingKey("B")
.build();
Binding bindingQueueHelloA =
BindingBuilder.bind(queueA)
.to(exchangeDirect)
.with("helloA");
Binding bindingQueueHelloB =
BindingBuilder.bind(queueB)
.to(exchangeDirect)
.with("helloB");
return new Declarables(
exchangeDirect,
exchangeFanout,
queueA,
queueB,
bindingQueueHelloA,
bindingQueueHelloB
);
}
}
To bind events, first create an implementation of BindingsConfigurator
and override its publisherBindings()
method:
import io.github.jlmc.cdi.adapter.amqp.rabbit.BindingsConfigurator;
import io.github.jlmc.cdi.adapter.amqp.rabbit.EventBindingBuilder;
import io.github.jlmc.cdi.adapter.amqp.rabbit.ExchangeBinding;
import javax.enterprise.context.ApplicationScoped;
import java.util.List;
@ApplicationScoped
public class DemoPublisherBindingsConfigurator implements BindingsConfigurator {
@Override
public List<ExchangeBinding> publisherBindings() {
ExchangeBinding exchangeBinding =
EventBindingBuilder.bind(MealBookedEvent.class)
.toExchange("x.direct")
.withRoutingKey("helloA")
.withPersistentMessages()
.withPublisherConfirms();
//.withPublisherTransactions();
// We can map how many ExchangeBinding we need...
return List.of(exchangeBinding);
}
}
Firing a CDI event is going to publish a message to the given exchange and routing key:
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.inject.Inject;
@ApplicationScoped
public class MealBookingCommandService {
@Inject
private Event<MealBookedEvent> mealBookedEventControl;
public void bookCargo(BookCargoCommand bookCargoCommand) {
CargoBookedEvent cargoBookedEvent = new CargoBookedEvent();
cargoBookedEvent.setId("1234");
mealBookedEventControl.fire(cargoBookedEvent);
}
}
This is going to publish the fired event to local observers of MealBookedEvent
and is also going to publish a message
to the exchange x.direct
with routing key hello
as we have defined it in the binding.
Binding an event to a queue for consuming events works the same. We should first create an implementation
of BindingsConfigurator
and override its consumerBindings()
method:
import io.github.jlmc.cdi.adapter.amqp.rabbit.BindingsConfigurator;
import io.github.jlmc.cdi.adapter.amqp.rabbit.EventBindingBuilder;
import io.github.jlmc.cdi.adapter.amqp.rabbit.QueueBinding;
import javax.enterprise.context.ApplicationScoped;
import java.util.List;
@ApplicationScoped
public class DemoConsumerBindingsConfigurator implements BindingsConfigurator {
@Override
public List<QueueBinding> consumerBindings() {
QueueBinding queueBinding =
EventBindingBuilder.bind(MealBookedEvent.class)
.fromQueue("q.queueA")
.consumerInstances(3)
.autoAck()
.prefetchMessageCount(5);
return List.of(queueBinding);
}
}
Now, CDI observers of the bound event are going to consume messages from q.queueA
in form of the bound event:
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
@ApplicationScoped
public class MealBookedEventConsumer {
public void onMealBookedEvent(@Observes MealBookedEvent event) {
// business implementation ...
}
}