/yamq

Primary LanguageErlang

Overview

Small collection of utilities that hasn’t yet justified their own repositories.

gen_lb

A simple loadbalancer / circuitbreaker

Used between services in a SOA, loadbalances requests over multiple nodes, automatically removes failing nodes and stops cascading failures.

Example

{ok, _} = gen_lb:start_link(service_x,
                            [ %% nodes in this cluster
                              {cluster, ["node1","node2","node3"]}
                              %% module implementing the actual code
                              %% communicating with cluster
                            , {cb_mod, testmod}
                              %% interval upon which failures are
                              %% calculated (ms)
                            , {interval, 600000} %10 min
                              %% max errors during interval that a node can
                              %% experience before being taken out of rotation
                            , {max_errors, 2}
                              %% max crasches during interval that a node can
                              %% experience before being taken out of rotation
                            , {max_crasches, 1}
                              %% Time a blocked node will be kept out of cluster
                              %% before being automatically inserted again
                            , {block_time, 300000} %5 min
                            ]),

%% call service_x that can be handled by node[123]. gen_lb keeps track
%% of crasches, failures, load-balancing etc.
case gen_lb:call(service_x, do_something,
                 [ %% do up to two attempts (ie: if the first fail do another).
                   %% multiple attempts will always hit atleast two nodes
                   {attempts, 2}
                 ]) of
  {ok, Res} ->
    %% everything went good
    {ok, Res};
  {error, cluster_down} ->
    %% all nodes in the cluster are down (previous crasches/errors)
    %% nothing we can do except wait until they are automatically reinserted
    {error, cluster_down};
  {error, Rsn} ->
    %% call failed for whatever reason.
    %% the failure is stored in gen_lb and might result in node being taken
    %% out of rotation
    {error, Rsn}
end.

%% manually block a node (infinite time)
gen_lb:block(service_x, "node1").

%% manually unblock a node
gen_lb:unblock(service_x, "node1").

Callback module testmod.erl

-module(testmod).
-behaviour(gen_lb).

-export([exec/2]).

exec(Node, Args) ->
  %% do the actual call to the node selected by gen_lb
  case do_something(Node, Args) of
    {ok, Res} ->
      {ok, Res};
    {error, Rsn} ->
      %% we failed, this will increment number of errors on this node.
      %% request will possibly be retried with another node
      {error, Rsn}
  end.

Todo

All requests go through a gen_server, nodes can be kept in an ETS-table in order to avoid this.

Better support for timeouts

More metrics

yamq

A persistant priority queue with the ability to set ‘due’ time on enqueued items.

Example

%% start your storage backend (postgres/riak/ETS/etc)
%% for this example we use a gen_server keeping track of a dets table. Source
%% is found in test/
{ok, _} = yamq_dets_store:start_link("foo.dets"),

%% define a function that does something with enqueued items
F = fun(Task) ->
      io:format("got a task ~p", [Task]),
      do_something(Task)
    end,

{ok, _} = yamq:start_link([ {store, yamq_dets_store}
                            %% function that will be executed for enqueued
                            %% tasks
                          , {func, F}
                            %% number of concurrent workers
                          , {workers, 4}
                          ]),

%% enqueue a task with priority 1 that will be handled as soon as
%% there is a worker thread ready
ok = yamq:enqueue(foo, [{priority, 1}]),

%% enqueue a task with priority 8 that will be handled in 10minutes or later
ok = yamq:enqueue(bar, [{priority, 8}, {due, 600000}]),

%% enqueue three tasks and make sure they are executed in serial
SerializeID = 123456789,
ok = yamq:enqueue(baz, [{serialize, SerializeID}]),
ok = yamq:enqueue(buz, [{serialize, SerializeID}]),
ok = yamq:enqueue(bar, [{serialize, SerializeID}]),

%% stop queue
ok = yamq:stop(),

Internals

Very brief:

  • Upon startup store is consulted in order to rebuild index
  • New items are written to store before they are executed
  • 8 priority levels, one ETS table (ordered_set) / level

Store

Needs to look somewhat like this

-module(yamq_store_impl).
-behaviour(yamq_store).

-export([ get/1
        , put/2
        , delete/1
        , list/0
        ]).

get(K)    -> {ok, V}.
put(K,V)  -> ok.
delete(K) -> ok.
list()    -> {ok, []}.

Performance

Some rough estimates can be found in test/perf1.erl.

My thinkpad x230 handled several million entries with a due time from now to +6 months without any problems which is more than good enough for my usecase.