At the beginning we were sending records to kinesis one by one. This led to some errors and lost of events.
But Amazon Kinesis can also send events in batch which is more efficient.
Also Elixir has a GenStage which is appropriate in our case. Moreover if each analytic type handle the sending of there own batch this will avoid that the all application crash and restart in case of an error and increase the throughput.
D = Dynamic Supervisor GS = GenStage component
+---------------------+
| |
+-----------------------------------+ Application +----------------------------------------------+
| +--------------+ +------------------+ |
| | +----------+----------+ | |
| | | | |
| | | | |
v v v v v
+---------+---------+ +-------+------+ +---------------+---------------+ +----------+------------+ +-----------+----------+
| | | | | | | | | |
| ServerSupervisor | | Registry | | ConsumerDynamicSupervisor(D) | | ProducerSupervisor(D) | | RecordsSupervisor(D) |
| | | | | | | | | |
+---------+---------+ +--------------+ +---------------+---------------+ +----------+------------+ +-----------+----------+
| | | |
| +--------------------|-----------------------------|---------------------------|-----------------+
v | v v v |
+--------+-------+ | +-----------+------------+ +------+-------+ +----+---+ |
| | | | | | | | | |
| DispatchServer | | | ConsumerSupervisor(GS) | | Producer(GS) | | Record | |
| | | | | | | | | |
+----------------+ | +-----------+------------+ +--------------+ +--------+ |
| | |
| | |
| v |
| +-----+----+ |
| | | |
| | +--------+-+ |
| | | | |
| +-+ +--------+-+ |
| | | | |
| +-+ Consumer | |
| | | |
| +----------+ Per record type |
| |
+------------------------------------------------------------------------------------------------+
This is an exemple with a MyEvent analytic module
When a module wants to record an event, it calls MyEvent.record(event)
.
This function will first determine if it needs to start a GenServer
.
In the case is required to start one, MyEvent.record/1
will call the start_child
of the RecordsSupervisor
.
RecordsSupervisor
is a dynamic supervisor meaning that it can start a GenServer
and supervise it dynamically (for more information on DynamicSupervisor)
Because MyEvent
use the Records
module, the handling of starting the GenServer
is transparent to you, but this is what is happening behind the scene.
On the initialisation of the GenServer
it will:
- Create an
ETS
table tunned for concurrent reading and writting to store the incomming event to be streamed. - Call
ProducerSupervisor.start_child/1
with his own module name as parameter. As for theRecordsSupervisor
ProduceSupervisor
is also aDynamicSupervisor
. It will start dynamically aProducer
, which in turn will do those actions:- Get is own
PID
- Call
ConsumerDynanicSupervisor.start_child/2
module with the record module name and thePID
. As before, this is aDynamicSupervisor
which will start aConsumerSupervisor
. TheConsumerSupervisor
module is actually a ConsumerSupervisor fronGenStage
which will subscribed to thePID
received as argument and can handle amax_demand
defined in configuration and start a process callingConsumer.start_link/2
. The extra parameter passed toConsumer.start_link/2
is the record module name. - Start the internal timer to call himself every X seconds (defined in the configuration).
- Create or get the current queue of the records calling
record_module.create_or_get_link/0
.
- Get is own
- Add the event into the queue.
As for the timer of the Producer
it will:
- Call
record_module.create_batches_and_flush/0
which will:- Get all the current events in the queue
- Create chunks of the same size (defined in the configuration)
- Flush the current queue
- Returns those chunks
- Add up the new chunks with the current internal batch queue.
- If the number of batches is enough, it will dispatch them through the
GenStage
mechanism. Otherwise nothing else is done.
When a new batch is received in the GenStage
pipe, the ConsumerSupervisor
will create the correct amount of Consumer
processes.
The Consumer
will start a task and if the batch is not empty will call the record_module.handle_event/1
with the current batch as parameter.
The handle_event
function will send the batch of events to Kinesis with only one request.
It is allowed to send to several streams if required.
defmodule MyApp.MyEvent do
use Analytics.Records
alias Analytics.Adapter.Kinesis.Message
def record(my_data), do: send_record(my_data)
@impl true
def handle_event(events) do
stream =
events
|> Enum.map(fn my_data ->
event_id = generate_event_id()
%Message{
partition_key: to_string(my_data.id),
data: ~s/{"event_id": "#{event_id}", "data": "#{my_data}"}/
}
end)
# `send_data/3`, `Analytics.Records`
send_data(__MODULE__, "my_stream", stream)
rescue
exception ->
reraise exception, __STACKTRACE__
end
defp event_json(event_id) do
timestamp = Timex.now() |> Timex.to_unix()
[
"{",
[~s("event_id": "), event_id, ~s(")],
"}"
]
|> List.to_string()
end
end