Spring For Apache Pulsar allows any java/kotlin application to easily integrate with Apache Pulsar avoiding boilerplate code. It supports configuration and annotation based creation of pulsar components.
Defining producer or consumer is a step-by-step process as described below.
- Add spring-pulsar library as dependency in your project.
- Define client configuration in property source file.
- Create producer using template.
- Create consumer using annotation.
- Define error handlers.
- Sample applications for reference.
For Spring Pulsar client to work you need to add spring-pulsar library as a dependency in your project. So go ahead and add below dependency in your application.
For Maven application
<dependency>
<groupId>com.intuit.pulsar</groupId>
<artifactId>spring-pulsar-core</artifactId>
<version>${spring-pulsar-core.version}</version>
</dependency>
For Gradle application
dependencies {
implementation 'com.intuit.pulsar:spring-pulsar-core:$springPulsarCoreVersion'
}
To create pulsar client you need to define the client properties in your application's properties file and add scanning of base package of the spring-pulsar-core package in your Spring boot application.
Pulsar spring library will automatically detect the client configuration present in your application and generate client using defined properties. See the details below on how to create client.
Add scan base package on spring-pulsar-core package
For Kotlin application
`@SpringBootApplication(scanBasePackages = ["com.intuit.spring.pulsar.client"])`
For Java application
`@SpringBootApplication(scanBasePackages = {"com.intuit.spring.pulsar.client"})`
Define client config in applications property source
In application.yml
pulsar:
client:
serviceUrl: pulsar+ssl://your.service.url:6651
tls:
tlsAllowInsecureConnection: true
tlsHostnameVerificationEnable: false
auth:
username: UserName
password: Password
In application.properties
pulsar.client.serviceUrl=pulsar+ssl://your.service.url:6651
pulsar.client.tls.tlsAllowInsecureConnection=true
pulsar.client.tls.tlsHostnameVerificationEnable=false
pulsar.client.auth.username=UserName
pulsar.client.auth.password=Password
In order to create a producer, you need to register a producer template bean with properties related to producer. Once the template is registered, you can autowire the template anywhere in your application and use methods like send() and sendAsync() to publish messages to topic.
Below code shows an example of defining a producer template.
@Configuration
open class ProducerConfiguration(val applicationContext: ApplicationContext) {
@Bean
open fun producerTemplate(): PulsarProducerTemplate<ByteArray> {
return PulsarProducerTemplateImpl<ByteArray>(
pulsarProducerConfig = PulsarProducerConfig(
schema = Schema.BYTES,
topicName = "persistent://tenent/namespace/topicName",
autoFlush = true),
applicationContext = applicationContext)
}
}
Once you have created and registered a producer template as spring bean, now you can autowire the producer template in your application and use it to publish messages as below.
@Component
class SomeClass(val producerTemplate: PulsarProducerTemplate<ByteArray>) {
fun publishMessage(message: String): MessageId {
val messageId: MessageId = producerTemplate.send(message.toByteArray())
return messageId
}
}
Below is the code snippet to define producer template in Java based application.
@Configuration
class ProducerConfiguration {
private ApplicationContext applicationContext;
ProducerConfiguration(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Bean
public PulsarProducerTemplate<byte[]> producerTemplate() {
Map<String, String> config = new HashMap<>();
config.put(TOPIC_NAME, "my-test-topic");
config.put(AUTO_FLUSH, "true");
return new PulsarProducerTemplateImpl(
Schema.BYTES,
config,
applicationContext);
}
}
Below code uses the producer template define above to produce messages in a Java based application.
@Component
public class SomeClass {
private final PulsarProducerTemplate<byte[]> producerTemplate;
public SomeClass(PulsarProducerTemplate<byte[]> producerTemplate) {
this.producerTemplate = producerTemplate;
}
public void sendMessage(String message) {
this.producerTemplate.send(message.getBytes(StandardCharsets.UTF_8),null,new HashMap<String,String>(),null);
}
}
Defining a consumer is a two-step process as described below.
- First, you define a consumer listener bean.
- Second, you register your consumer listener bean as pulsar consumer by annotating it with @PulsarConsumer.
Create a consumer listener bean by implementing either of the below interfaces.
Implementing IPulsarListener<?>
This interface gives you control over the message and acknowledgement process and also exposes the internal Message and Consumer object.Take a look at below example.
@Component
class MyMessageListener: IPulsarListener<ByteArray> {
override fun onException(
exception: Exception,
consumer: org.apache.pulsar.client.api.Consumer<ByteArray>,
message: Message<ByteArray>
) {
// Here you can define logic on how to handle the exception and
// send either negative or positive ack.
}
override fun onSuccess(
consumer: org.apache.pulsar.client.api.Consumer<ByteArray>,
message: Message<ByteArray>
) {
// Here you can define logic on how to handle a successful
// processing and send either negative or positive ack.
}
override fun processMessage(
consumer: org.apache.pulsar.client.api.Consumer<ByteArray>,
message: Message<ByteArray>
) {
// Whenever a message is recived by pulsar runtime
// it first lands in this method.
//
// Here you can define logic to process the message.
// If an exception is thrown from this method then the onException()
// is executed. If no exception is thrown then the onSuccess() is
// executed.
}
}
- Control over negative and positive ack.
- Access to internal Message and Consumer object.
- Automation delegation to onSuccess and onException to provide unified handling capabilities
- Preferred when corrective action required when an exception occurs is consistent irrespective of the exception
Implementing MessageListener<?>
Standard MessageListener from pulsar gives you full control over what you want to do when your listener receives a message.
@Component
class MyMessageListener: MessageListener<ByteArray> {
override fun received(
consumer: org.apache.pulsar.client.api.Consumer<ByteArray>?,
message: Message<ByteArray>?
) {
// Whenever a message is recieved it lands in this method
// Write code here to handle the received message
// Any exception thrown from here can be handled
// by using the in-built exception handling aspect provided by this library
}
}
- received() is called for each message received by consumer.
- Gives you access to Message and Consumer objects.
- Control over how to process the message and when to send negative and positive ack.
- Preferred if you have specific definitive actions to be taken based on the exception thrown
Once you have created a consumer listener class and registered it as a spring bean, you can now identify your listener class as a pulsar consumer by annotating it with @PulsarConsumer annotation defined by pulsar-spring-client.
In this @PulsarConsumer annotation you can pass all the configuration related to the consumer as can be seen in the below example.
@Component
@PulsarConsumer(
topic = Topic(
topicNames = "Topic_names"
),
subscription = Subscription(
subscriptionName = "Subscription_Name",
subscriptionType = "Subscription_Type"))
class MyMessageListener: MessageListener<MessageData> {
override fun received(
org.apache.pulsar.client.api.Consumer<MessageData> consumer,
Message<MessageData> message
) {
// Code to handle mesasge
}
}
Below is the code snippet to define consumer in Java based application
@Component
@PulsarConsumer(
topic = @Topic(
topicNames = "Topic_names"
),
subscription = @Subscription(
subscriptionName = "Subscription_Name",
subscriptionType = "Subscription_Type"))
class MyMessageListener implements MessageListener<MessageData> {
override fun received(
consumer: org.apache.pulsar.client.api.Consumer<MessageData>?,
message: Message<MessageData>?
) {
// Code to handle mesasge
}
}
While creating the @PulsarConsumer annotation, you can utilize Spring's property resolution feature to directly incorporate values from your application.yml or application.properties file into the consumer properties.
For instance, if you have defined properties in your application file as follows:
pulsar:
sample01:
topic:
name: java-sample-topic01
subscription:
name: java-sample-sub01
type: Key_Shared
consumer:
count: 1
You can use them in your consumer by providing the property path, and they will be resolved automatically:
@PulsarConsumer(
topic = Topic(topicNames = "#{pulsar.sample01.topic.name}"),
subscription = Subscription(
subscriptionName = "#{pulsar.sample01.subscription.name}",
subscriptionType = "#{pulsar.sample01.subscription.type}"
),
count = "#{pulsar.sample01.consumer.count}"
)
This library provides exception handling capabilities, both while producing and consuming a message, internally using an aspect.
Steps to follow to integrate exception handling capabilities
- Annotate your producer/consumer with PulsarAction annotations - @PulsarProducerAction, @PulsarConsumerAction
- Define an exception handler class with the annotation @PulsarExceptionHandlerClass and add exception handler methods with annotations @PulsarProducerExceptionHandlerFunction and @PulsarConsumerExceptionHandlerFunction
Add the annotation @PulsarProducerAction to the method where you are using the instance of PulsarProducerTemplateImpl to send a message. The action param on the annotation is to provide a short description on how/what is the message being generated before being sent on Pulsar.
Ex:
@PulsarProducerAction("description of the BL step")
fun produce(): String {
//BL logic that is generating the message that could result in an exception
val messageId = producerTemplate.send(message.toByteArray())
return messageId.toString()
}
Add the annotation @PulsarConsumerAction to the
override fun received(consumer: Consumer<ByteArray>?, message: Message<ByteArray>)
method inside your consumer listener bean class
Ex:
@PulsarConsumerAction("description of the BL step")
override fun received(consumer: Consumer<ByteArray>?, message: Message<ByteArray>) {
val messageString: String = String(message.value)
//Bl step to process the message that could result in an exception
consumer?.acknowledge(message.messageId)
}
Note: This strategy of exception handling on the consumer side can only be used
when the consumer listener bean is implementing the
org.apache.pulsar.client.api.MessageListener<?>
interface.
When a consumer listener bean is implementing the interface
com.intuit.spring.pulsar.client.consumer.listener.IPulsarListener<?>
,
any exceptions occurring within the override fun processMessage(consumer: Consumer<ByteArray>, message: Message<ByteArray>)
method are delegated
to override fun onException(e: Exception, consumer: Consumer<ByteArray>, message: Message<ByteArray>)
and hence would not reach the exception handler methods even if you defined one.
Add an exception handler class and annotate it with @PulsarExceptionHandlerClass and within the class, add exception handler methods with details on which exceptions are being handled using annotations @PulsarProducerExceptionHandlerFunction and @PulsarConsumerExceptionHandlerFunction.
The exception handler methods need to implement the below functional interface
com.intuit.spring.pulsar.client.exceptions.PulsarExceptionHandler
, and only then they qualify
to be handlers used when an exception occurs
Ex:
@PulsarExceptionHandlerClass
@Component
/**
* This class will add exception handlers to handle all exceptions
* thrown by Pulsar producers and consumers
*/
class PulsarExceptionHandlers {
//This method is invoked when a producer throws a BLServiceException or IOException
@PulsarProducerExceptionHandlerFunction(BLServiceException::class, IOException::class)
var pulsarProducerExceptionHandler = PulsarExceptionHandler { exceptionHandlerParams ->
println("Exception occurred while performing ${exceptionHandlerParams.action}")
println("Handling producer exception ${exceptionHandlerParams.exception}")
}
//This method is invoked when a consumer throws a DownstreamServiceException
@PulsarConsumerExceptionHandlerFunction(DownstreamServiceException::class)
var pulsarConsumerExceptionHandler = PulsarExceptionHandler { exceptionHandlerParams ->
println("Exception occurred while performing ${exceptionHandlerParams.action}")
println("Handling consumer exception ${exceptionHandlerParams.exception}")
}
}
If the above descriptions are not enough , and you want to see some sample applications with working code on how to use the library, worry not we have you covered.
We have written some sample spring boot application both in java and kotlin for you to refer to. You can find the samples here. The samples project also has a README which describes in detail the sample applications and how to run them in your local machine. If you are not interested in running these samples and just want to look at the code directly you can visit below links.
Refer to the Getting Started section which contains instructions on setting up the library for development/debugging purposes.
Please see our Code of conduct.
For more information on how to use this library check below reference manual: Reference Manual
Check Contributing for contribution
This Spring Pulsar library is released under the terms of the MIT License (see LICENSE.md).