/reactive-agent

microservice communication infrastructure

Primary LanguageJavaScriptBSD 3-Clause "New" or "Revised" LicenseBSD-3-Clause

Agents observe the network and execute business logic based on observed signals and demands.

To create an agent just initialise it with a name and an URL pointing to the RabbitMQ broker.

var Agent = require('reactive-agent');
var agent = new Agent('Users', { rabbitmq: 'amqp://guest:guest@localhost:5672/topic' });

Signals

Are sent after business logic is executed to inform interested agents that they should synchronise with the current state.

var send = agent.output();
var Created = agent.signal('UserCreated');
send(Created({name: 'Alex Gravem'}));

Over the wire, the signal sent would look something like this:

{ signal: 'UserCreated',
cid: 'e3066cd0-93aa-11e4-ba68-2fac4b47aefc',
uuid: 'e3066cd1-93aa-11e4-ba68-2fac4b47aefc',
timestamp: '2015-01-04T01:44:49.054+0000',
payload: { name: 'Alex Gravem' },
source: 'Users' }

Where cid stands for causation id and is used to track the reactions to the signal from other agents and source is the name of the agent which created the signal.

Demands

Are used to ask for help to other agents in order to complete a job. Lets say an agent needs a list of all users beginning with the letter A.

var send = agent.output();
var UsersList = agent.demand('UsersList');
send(UserList({startsWith: 'A'}));

Over the wire it would look like this:

{ cid: '8e324810-93ae-11e4-ba68-2fac4b47aefc',
uuid: '8e324811-93ae-11e4-ba68-2fac4b47aefc',
timestamp: '2015-01-04T02:11:04.721+0100',
payload: { startsWith: 'A' },
source: 'UserAPI',
demand: 'UsersList',
solutions: {} }

The difference to signals is that demands have a map of solutions different agents proposed to fulfil the demand.

Reacting to signals

When writing agents it's common to update the persisted state based on events from other agents. Say you're writing a UserSearch agent and you need to update the search index every time a user is created.

var send = agent.output();
var SearchIndexUpdated = agent.signal('SearchIndexUpdated');
agent.input({signal: 'UserCreated'}).on('data', function(signal) {
  SearchIndex.update(signal.payload);
  send(SearchIndexUpdated({}).causedBy(signal));
});

Processing Demands

At some point an agent will need to make something useful with it's data. That's done by processing demands from other agents.

For the UserSearch from the last example that might be to return a list of users.

var send = agent.output();
agent.input({demand: 'UsersList'}).on('data', function(demand) {
  if( demand.solutions.get('List') ) return;

  var list = SearchIndex.list();
  var resolved = demand.resolve('List', list);
  send(resolved);
});

Notice that the agent didn't reply to the source. Instead it just enriched the demand and put it back into the network.

That's a very important concept of demand processing. It ensures the decoupling between the agent which created the demand and the agent which resolved it. Even more important, it enables agents to collaborate without orchestration.

In this contrived example, UserSearch doesn't care for applying filters like startsWith. So to resolve that kind of demand we're gonna create a UserSearchByName agent.

var send = agent.output()
agent.input({demand: 'UsersList'}).on('data', function(demand) {
  var filter = demand.payload.startsWith;
  if( !filter ) return;

  var all = demand.solutions.get('List')
  if( !all ) return;

  var list = all.filter(function(user) {
    return user.name.indexOf(filter) !== -1;
  });

  var resolved = demand.resolve('ByName', list);
  send(resolved);
});

When UsersList is created, it will be sent at the same time to both UserSearch and UserSearchByName. Since the ByName solution depends on All, there is nothing UserSearchByName can do and the demand will be ignored.

But because UserSearch returns the enriched demand to the network, UserSearchByName can then catch it and perform it's job.

An optimised version of UserSearchByName could cache previous All solutions and return a filtered list based on that cache if performance is more important than accuracy.

Also notice that because an agent may receive the same packet more then once, it should have guards in place to prevent resource wasting and data corruption.

The agent which created the demand have the option of wait for a ByName solution or try to do something with full list.

var send = agent.output();
var UsersStartingWithA = agent.demand('UsersList', {startsWith: 'A'});
send(UsersStartingWithA);

(function(demand){
  var solutions = demand.solutions;
  var proposals = agent.input({demand:'UsersList', cid: demand.cid});

  var timeout = setTimeout(function() {
    proposals.removeAllListeners();
    console.log("Could not filter, here's all users", solutions.get('List'));
  }, 200);

  proposals.on('data', function(demand) {
    solutions = solutions.merge(demand.solutions);
    if( !solutions.get('ByName') ) return;
    clearTimeout(timeout);
    proposals.removeAllListeners();
    console.log("Your list", solutions.get('ByName'));
  });
})(UsersStartingWithA);

After sending a demand, the agent creates an input handler to observe solutions to that demand. The agent will wait 200ms for it's preferred solution (ByName), otherwise it will use All.

If UserSearchByName is able to generate a solution before the deadline ends, the timeout is cleared and all further solutions will be ignored.