Version: 0.1.3
Pulserl is an Erlang client for the Apache Pulsar Pub/Sub system with both producer and consumer implementations. It requires a version 2.0+ of Apache Pulsar and 18.0+ of Erlang. Pulserl uses the binary protocol to interact with the Pulsar brokers and exposes a very simple API.
The examples assume you have a running Pulsar broker at localhost:6650
, a topic called test-topic
(can be partitioned or not) and rebar3
installed.
Note: Pulserl uses pulserl
and Shared
as the default subscription name and type.
So, if that subscription (not the consumer) under the topic test-topic
does not exists, we make sure in this example to create it first by creating
the consumer before producing any message to the topic.
Fetch, compile and start the erlang shell.
git clone https://github.com/skulup/pulserl.git,
cd pulserl
rebar3 shell
In the Erlang shell
rr(pulserl). %% load the api records
%% A demo function to log the value of consumed messages
%% that will be produced blow.
pulserl:start_consumption_in_background("test-topic").
%% Asycnhrounous produce
Promise = pulserl:produce("test-topic", "Asycn produce message").
pulserl:await(Promise). %% Wait broker ack
#messageId{ledger_id = 172,entry_id = 7,
topic = <<"persistent://public/default/test-topic">>,
partition = -1,batch = undefined}
%% Asycnhrounous produce. Response notification is via callback (fun/1)
pulserl:produce("test-topic", "Hello", fun(Res) -> io:format("Response: ~p~n", [Res]) end).
%% Synchronous produce
pulserl:sync_produce("test-topic", "Sync produce message").
#messageId{ledger_id = 176,entry_id = 11,
topic = <<"persistent://public/default/test-topic">>,
partition = -1,batch = undefined}
- Basic Producer
- Basic Consumer
- Partitioned topics
- Batching
- Compression
- TLS
- Authentication (token, tls)
- Reader API
- Proxy Support (for Kubernetes)
- Effectively-Once
- Schema
- Consumer seek
- Multi-topics consumer
- Topics regex consumer
- Compacted topics
- User defined properties producer/consumer
- Reader hasMessageAvailable
- Hostname verification
- Multi Hosts Service Url support
- Key_shared
- key based batcher (didn't find a documentation) ?
- Negative Acknowledge
- Delayed Delivery Messages
- Dead Letter Policy
- Interceptors
Thanks to Sabudaye for this information
Pulserl is available in Hex for easy installation by added it to your project dependencies.
In your Erlang project's rebar.config
{deps, [
{pulserl, "<latest-version>"}
]}
In your Elixir project's mix.exs
def deps do
[
{:pulserl, "~> <latest-version>"}
]
end
In pulserl, the client as of now (for API simplicity) is a singleton (local registered gen_server
)
and can be created during startup by the application controller
or on demand at a later time.
The client has the responsibility of creating the TCP connections,
maintaining the connection pool, and ensures these connections are
maximally used by the producers and consumers. The client is also responsible
for querying metadata needed to initialize a producer or consumer; it does this
by creating a metadata socket during initialization by using the provided configurations.
You can configure the client that will be auto-started by providing
the following configuration for pulserl
in your sys.config
file.
[
{pulserl, [
{autostart, true} %% If true, the client will be created on startup. Default is true.
%% The TCP connect timeout in milliseconds. Default is 30000.
, {connect_timeout_ms, 30000}
%% The maximum connections to each broker the client should create.
%% Default is 1. Increasing this may improve I/O throughput
, {max_connections_per_broker, 1}
%% The underlying TCP socket options.
%% https://erlang.org/doc/man/gen_tcp.html#type-connect_option
, {socket_options, [{nodelay, true}]}
%% The service url. Default is the non TLS url: "pulsar://${hostname}:6650"
, {service_url, "pulsar+ssl://localhost:6651/"}
%% The trust certificate file path. Required only if the TLS service url is used.
%% See http://pulsar.apache.org/docs/en/security-tls-transport/
, {tls_trust_certs_file, "/path/to/cacert.pem"}
]}
].
The pulserl:start_client/1,2
API can be used to start the pulserl client when needed.
ServiceUrl = "pulsar+ssl://localhost:6651/",
Config = #clientConfig{
connect_timeout_ms = 30000,
max_connections_per_broker = 1,
socket_options = [{nodelay, true}],
tls_trust_certs_file = "/path/to/cacert.pem"
},
ok = pulserl:start_client(ServiceUrl, ClientConfig).
Pulserl creates a gen_server
process per topic. For a topic of n
partition, it creates
a parent producer under the pulserl_producer_sup
supervision tree which in turn start_link
and manage n
child producers. The parent producer serves as a facade to the internal producers.
The parent monitor the child processes (internal partitioned producers) for resilience,
route client calls to one of the child processes using different
routing modes.
A producer during initialization is assigned a connection by the client based on its topic metadata.
A producer uses a queueing mechanism on message sending.
Each send is internally a gen_server.call/2
to the producer process. The caller's reference is
added to a queue and replied immediately with ok.
This initial early reply frees up the caller to do
other tasks if the response is not needed immediately. Internally if message send is trigger, i.e
when batching is not enable or batching enabled, but a batch send is triggered, the producer
asynchronously send (gen_sever.cast/2
) the message(s) to the pulserl_conn
process. When the
connection process receives the response it will !
send it to the associated producer which in
turn dequeue the associated caller and reply to it.
The producer provides synchronous and asynchronous send API.
In synchronous mode, the call will wait for the broker to acknowledge the message.
If the acknowledgment is not received and a send_timeout
is specified, a {error, send_timeout}
is sent to client on timed out.
The asynchronous mode provides two APIs. One returns a reference()
that will be used to probe
for a response or error. The other allows one to pass a callback fun/1
that will be invoked
internally by the producer process when there is a response or error.
Start a producer with pulserl:start_producer/1,2
by passing a topic name and list of options.
If start_producer/1
is used, the producer will be started with a default options which can be provided
as an environment variable in sys.config
.
[
{pulserl, [
{producer_opts, [{batch_enable, true}]}
]}
]
A sample start producer API code:
ProducerOpts = [
{producer, [
{name, "my_producer_name"}, %% Name of producer in the pulsar cluster
%% Metadata attached to the producer for easier identification
{properties, #{"language" => "erlang"}}, %% This can be proplist of as well
%% The initial sequence id for the producer
{initial_sequence_id, 0}, %% Default with 0
]},
%% The time duration in milliseconds after which if the broker does not
%% acknowledge an error will be reported. The default is 30000
{send_timeout, 20000},
%% This behaviour is applied when a message without a key is to be published.
%% If the key is set, then the hash of the key will be used to choose
%% which partition the message will be published to.
%% Possible values: round_robin_routing, single_routing
%% and {Module, Function} which will be called with the key and
%% partition count to return the selected partition.
{routing_mode, round_robin_routing}, %% Default with round_robin_routing
{batch_enable, true} %% Default is true
%% The time duration (milliseconds) within which the messages sent will be batched.
{batch_max_delay_ms, 100}, %% Default is 10
%% Maximum number of messages that can be in a batch
{batch_max_messages, 100}, %% Default is 1000
%% Max size of the queue of requests waiting for acknowledgement.
{max_pending_requests, 50000}, %% Default is 100000
],
{ok, Pid1} = pulserl:start_producer("topic-name"), %% Will use default option values
{ok, Pid2} = pulserl:start_producer("persistent://public/default/topic-name", ProducerOpts),
Message = <<"message">>,
Topic = topic_utils:parse("test-topic"),
pulserl:sync_produce(Topic, Message).
Similar to producer pulserl:start_consumer/1,2
starts a "parent" gen_server
under the pulserl_consumer_sup
which in turn start_link
and manage n
child consumers per partition, parent consumer serves as a facade to the internal consumers.
The parent monitor the child processes (internal partitioned consumers) for resilience.
Start a consumer with pulserl:start_consumer/1,2
by passing a topic name and list of options.
If start_consumer/1
is used, the consumer will be started with a default options which can be provided in sys.config
using the consumer_opts
environment variable.
[
{pulserl, [
{consumer_opts, [{queue_size, 100}]}
]}
]
A sample start consumer API code:
ConsumerOpts = [
{queue_size, 100}, %% Default is 1000
{subscription_name, "pulserl-app"}, %% Default is "pulserl"
{subscription_type, 'Failover'} %% Default is 'Shared'
{acknowledgment_timeout, 1000}, %% Default is 0 (disabled)
{nack_message_redelivery_delay, 30000} %% Default is 60000
{dead_letter_topic_max_redeliver_count, 100} %% Default is 0 (disabled)
],
{ok, Pid1} = pulserl:start_consumer("topic-name"), %% Will use default option values
{ok, Pid2} = pulserl:start_consumer("persistent://public/default/topic-name", ConsumerOpts),
On the start, every consumer process automatically sends subscription command to the broker, with shared subscription type as default.
After subscribtion command consumers sends the flow command with the value of the consumer's message queue(not Erlang process's
) len. At every point, the number of messages in the queue is <= queue_size
and are readily available for upstream consumption. The consumer will resend a new flow permits every time the queue's size reaches the specified queue_refill_threshold
(default to 50% of queue_size
).
To receive a message pulserl:consume/1
should be used in a loop, for example:
receive_message(PidOrTopic) ->
case pulserl:consume(PidOrTopic) of
#consMessage{} = ConsumerMsg ->
pulserl:ack(ConsumerMsg),
ConsumerMsg;
{error, _} = Error ->
error(Error);
_ ->
receive_message(Pid)
end.
where PidOrTopic
is a pid of parent consumer process returned by pulserl:start_consumer/1,2
or a topic name.
Each received message should be acknowledged with pulserl:ack/1,2
.
pulsar:nack/1,2
could be used to ask the broker for redelivering of the message, but it should be send before acknowledgement timeout which is disabled by default.
To redeliver all the unacknowledged messages, one can use the pulserl_consumer:redeliver_unack_messages/1
API
For issues, comments, recommendation or feedback please do it here.
Contributions are highly welcome.
👍