/dempsy

Distributed Elastic Message Processing System

Primary LanguageJavaApache License 2.0Apache-2.0

The Dempsy Project

Table of Contents

Overview

Simply put Dempsy (Distributed Elastic Message Processing SYstem) is an framework for easily writing distributed and dynamically scalable applications that process unbounded streams of (near-)real-time messages. Conceptually it's similar to Apache Flink and Apache Storm.

Note: Dempsy does NOT guarantee message delivery and will opt to discard messages in the presence of "back-pressure." This means it's not suitable for all streaming applications. However, if your application doesn't require guaranteed delivery, then Dempsy provides programming model that makes distributed stream processing applications easier to develop and maintain than other frameworks.

Jumping right in.

An example - The ubiquitous "Word Count"

In this example we have an stream of Word messages and we want to keep track of how many times each Word appears in the stream.

You can find the complete working example here: Simple WordCount

The Adaptor

To start with we need a source of Word messages. This is done in Dempsy by implementing an Adaptor.

...
import net.dempsy.messages.Adaptor;
import net.dempsy.messages.Dispatcher;

public class WordAdaptor implements Adaptor {
    private Dispatcher dempsy;
    private final AtomicBoolean running = new AtomicBoolean(false);

    /**
     * This method is called by the framework to provide a handle to the
     * Dempsy message bus. It's called prior to start()
     */
    @Override
    public void setDispatcher(final Dispatcher dispatcher) {
        this.dempsy = dispatcher;
    }

    @Override
    public void start() {
        // ... set up the source for the words.
        running.set(true);
        while(running.get()) {
            // obtain data from an external source
            final String wordString = getNextWordFromSoucre();
            if(wordString == null) // the first null ends the stream.
                running.set(false);
            else {
                // Create a "Word" message and send it into the processing stream.
                try {
                    dempsy.dispatchAnnotated(new Word(wordString));
                } catch(IllegalAccessException | IllegalArgumentException | InvocationTargetException | InterruptedException e) {
                    throw new RuntimeException(e); // This will stop the flow of Words from this adaptor.
                                                   // Optimally you'd like to recover and keep going.
                }
            }
        }
    }

    @Override
    public void stop() {
        running.set(false);
    }

    private static final String[] wordSource = {"it","was","the","best","of","times","it","was","the","worst","of","times"};
    private int wordSourceIndex = 0;

    private String getNextWordFromSoucre() {
        if(wordSourceIndex >= wordSource.length)
            return null;
        return wordSource[wordSourceIndex++];
    }
}

When a WordAdaptor is registered with Dempsy, the following will happen in order:

  1. Dempsy will call setDispatcher and pass a Dispatcher that the Adaptor can use to dispatch messages.
  2. Dempsy will then call the start() method to indicate that the Adaptor can start sending messages. This will be called in a separate thread so the Adaptor doesn't have to return from the start() method until it's done sending messages. However, the Adaptor is free to use the Dispatcher in its own threads if it wants and can return from start() without causing a problem.
  3. When Dempsy is shut down, the Adaptor will be notified by calling the stop() method.

The Message

In the above the adaptor sends Word messages. Messages in Dempsy need to satisfy a few requirements.

  1. They need to have a MessageKey which uniquely identifies a MessageProcessor that will handle processing that message.
  2. The MessageKey needs to have the appropriate identity semantics (hashCode and equals)
  3. In most cases when Dempsy is distributed, the Message needs to be serializable according to whatever serialization technique is chosen.

So the Word message can be defined as follows:

import net.dempsy.lifecycle.annotation.MessageKey;

@MessageType
public class Word implements Serializable {
    private final String wordText;

    public Word(final String data) {
        this.wordText = data;
    }

    @MessageKey
    public String getWordText() {
        return this.wordText;
    }
}

Using annotations you can identify the class as a Message. The MessageType annotation tells Dempsy that the full class name identifies a Dempsy compliant message. Notice it satisfies the criteria:

  1. It has a MessageKey which can be retrieved by calling getWordText().
  2. The MessageKey is a String which has appropriate identity semantics.
  3. The Word class is serializable when using Java serialization.

The Message Processor (Mp)

Dempsy will route each message to an appropriate Message Processor. A unique Message Processor instance will handle each Word message with a given MessageKey. For example:

import net.dempsy.lifecycle.annotation.MessageHandler;
import net.dempsy.lifecycle.annotation.Mp;

@Mp
public class WordCount implements Cloneable {
    private long count = 0;

    @MessageHandler
    public void countWord(final Word word) {
        count++;
        System.out.println("The word \"" + word.getWordText() + "\" has a count of " + count);
    }

    @Override
    public Object clone() throws CloneNotSupportedException {
        return super.clone();
    }
}

So when Dempsy receives a message of type Word, it retrieves the MessageKey using the annotated method getWordText(). That MessageKey will become the address of a message processor somewhere on the system. Dempsy will find the message processor instance (in this case an instance of the class WordCount) within a cluster of nodes responsible for running the WordCount message processing. In the case that the instance doesn't already exist, Dempsy will clone() a WordCount instance prototype.

Note: You should consider the MessageKey as the address of a unique MessageProcessor instance.

Dempsy will manage the lifecycle of Message Processor instances. It will start with a single instance that will be used as a Prototype. When it needs more instances it will clone() the prototype. In this example Dempsy will create an instance of WordCount for every unique MessageKey of a Word message that gets dispatched. It will call the MessageHandler on the corresponding instance.

Running the example.

The following will pull all the pieces together and process a group of Words.

...

import net.dempsy.NodeManager;
import net.dempsy.cluster.local.LocalClusterSessionFactory;
import net.dempsy.config.Cluster;
import net.dempsy.config.Node;
import net.dempsy.lifecycle.annotation.MessageProcessor;
import net.dempsy.monitoring.dummy.DummyNodeStatsCollector;
import net.dempsy.transport.blockingqueue.BlockingQueueReceiver;

public class SimpleWordCount {

    public static void main(final String[] args) {

        @SuppressWarnings("resource")
        final NodeManager nodeManager = new NodeManager()
            // add a node
            .node(
                // a node in an application called word-count
                new Node.Builder("word-count")
                    // with the following clusters
                    .clusters(
                        // a cluster that has the adaptor
                        new Cluster("adaptor")
                            .adaptor(new WordAdaptor()),
                        // and a cluster that contains the WordCount message processor
                        new Cluster("counter")
                            .mp(new MessageProcessor<WordCount>(new WordCount()))
                            // with the following routing strategy
                            .routingStrategyId("net.dempsy.router.simple")

                    )
                    // this will basically disable monitoring for the example
                    .nodeStatsCollector(new DummyNodeStatsCollector())
                    // use a blocking queue as the transport mechanism since this is all running in the same process
                    .receiver(new BlockingQueueReceiver(new ArrayBlockingQueue<>(100000)))
                    .build()

            )

            // define the infrastructure to be used. Since we're in a single process
            // we're going to use a local collaborator. Alternatively we'd specify
            // using zookeeper to coordinate across processes and machines.
            .collaborator(new LocalClusterSessionFactory().createSession());

        // start dempsy processing for this node in the background.
        nodeManager.start();

        // wait for the processing to be complete
        ...
        nodeManager.stop();

        System.out.println("Exiting Main");
    }
}

The output from running the example is:

The word "it" has a count of 1
The word "worst" has a count of 1
The word "was" has a count of 1
The word "times" has a count of 1
The word "the" has a count of 1
The word "of" has a count of 1
The word "best" has a count of 1
The word "it" has a count of 2
The word "the" has a count of 2
The word "times" has a count of 2
The word "was" has a count of 2
The word "of" has a count of 2
Exiting Main

Explanation.

In this example we have a Dempsy application with a single node with two clusters. One cluster contains the WordAdaptor and another contains the set of WordCount instances being used as message processors.

Once the example runs to completion, the number of WordCount message processors will be equal to the number of unique message keys from all of the messages streamed. In this case the number is:

   Set.of("it","was","the","best","of","times","it","was","the","worst","of","times").size()

So there will be 7 instances of WordCount being used as message processors and an additional one representing the message processor prototype.

This is illustrated in the following:

WordCount pipline so far
Fig. 1 Simple WordCount example

Running the example distributed

To run the "Word Count" example distributed we need to change some of the infrastructure we instantiated. But first, lets convert the stream of words to an unbounded stream by looping in the WordAdaptor. We'll simply change WordAdaptor.getNextWordFromSoucre() to the following:

    private String getNextWordFromSoucre() {
        if(wordSourceIndex >= wordSource.length)
            wordSourceIndex = 0;
        return wordSource[wordSourceIndex++];
    }

Distributed Infrastructure Selection

To change the infrastructure we need start Dempsy selecting distributed implementations. The updated SimpleWordCount class would be:

...
import net.dempsy.NodeManager;
import net.dempsy.cluster.ClusterInfoException;
import net.dempsy.cluster.zookeeper.ZookeeperSessionFactory;
import net.dempsy.config.Cluster;
import net.dempsy.config.Node;
import net.dempsy.lifecycle.annotation.MessageProcessor;
import net.dempsy.monitoring.dummy.DummyNodeStatsCollector;
import net.dempsy.serialization.jackson.JsonSerializer;
import net.dempsy.serialization.java.JavaSerializer;
import net.dempsy.transport.tcp.nio.NioReceiver;

public class SimpleWordCount {

    public static void main(final String[] args) throws InterruptedException, IllegalStateException, IllegalArgumentException, ClusterInfoException {

        final WordAdaptor wordAdaptor = new WordAdaptor();

        @SuppressWarnings("resource")
        final NodeManager nodeManager = new NodeManager()
            // add a node
            .node(
                // a node in an application called word-count
                new Node.Builder("word-count")
                    // with the following clusters
                    .clusters(
                        // a cluster that has the adaptor
                        new Cluster("adaptor")
                            .adaptor(wordAdaptor),
                        // and a cluster that contains the WordCount message processor
                        new Cluster("counter")
                            .mp(new MessageProcessor<WordCount>(new WordCount()))
                            // with the following routing strategy
                            .routingStrategyId("net.dempsy.router.managed")

                    )
                    // this will basically disable monitoring for the example
                    .nodeStatsCollector(new DummyNodeStatsCollector())
                    // use a Java NIO the transport mechanism
                    .receiver(new NioReceiver<Object>(new JavaSerializer()))
                    .build()

            )

            // define the infrastructure to be used.

            // we want to connect to a zookeeper instance running on this machine.
            .collaborator(new ZookeeperSessionFactory("localhost:2181", 3000, new JsonSerializer()).createSession());

        // start dempsy processing for this node in the background.
        nodeManager.start();

        // wait for the node manager to be started.
        while(!nodeManager.isReady())
            Thread.yield();

        // we're just going to wait *forever*
        Thread.sleep(999999);
    }
}

The changes from the original example include:

  1. the routing strategy is now set using: .routingStrategyId("net.dempsy.router.managed"). The "managed" routing strategy attempts to dynamically distribute all message processors for a given cluster (in this case, all WordCount instances) across all available nodes.
  2. the receiver is set using: .receiver(new NioReceiver<Object>(new JavaSerializer())). This identifies the technique that this node can be reached as using Java NIO with the given serialization technique.
  3. the dempsy nodes will collaborate with each other using Apache Zookeeper as set using: .collaborator(new ZookeeperSessionFactory("localhost:2181", 3000, new JsonSerializer()).createSession()).

Now if we start multiple instances of this Java program, and we have Apache Zookeeper running on port 2181, then the instances of WordCount message processors will be balanced between the running nodes.

If you want to try it you can start a single node instance of Apache Zookeeper using docker with the following command:

docker run --name zookeeper --network=host -d zookeeper

Then you can run as many instances of SimpleWordCount as you like.

A working version of this example can be found here: Distributed Simple Word Count Example

WordCount pipline so far
Fig. 2 Distributed Simple WordCount example

Some terminology

Having gone through the "Word Count" example we should codify some of the terminology and concepts touched on.

TermDefinition
message processor an instance of a cloned message processor prototype responsible for processing every message of a particular type with a particular unique key.
message processor prototype an instance used by Dempsy to serve as a template when it needs to `clone()` more instances of a message processor.
message is an object that Dempsy routes to a message processor based on the message's key.
message key A key for the message, obtained from a message. Each unique message key addresses a unique message processor instance in a cluster
cluster a cluster is the collection of all message processors or adaptors of a common type in the same stage of processing of a Dempsy application. A cluster contains a complete set of potential message processors keyed by all of the potential keys from a particular message type.That is, a cluster of message processor instances covers the entire key-space of a message.
node a node is a subset of a set of clusters containing a portion of the each cluster's message processors. nodes are almost always (except in some possible test situations) the intersection of a cluster and a Java process. That is, the portion of a cluster that's running in a particular process is the cluster's node
container Sometimes also referred to as a message processor container, it is the part of the Dempsy infrastructure that manages the lifecycle of message processors within an individual node. That is, there is a one-to-one between the portion of a cluster running in a particular node, and a container.