/fabric

Realtime stream processing framework for apache mesos / marathon

Primary LanguageJavaApache License 2.0Apache-2.0

##Fabric - A real-time stream processing framework

###What? A scalable, practical and safe real-time computation framework designed for easy operability and extension.

Fabric is proven to work very well for:

  • High velocity multi-destination event ingestion with guaranteed persistence.
  • Rules/Filter based real-time triggers for advertising/broadcast
  • Online Fraud detection
  • Real-time pattern matching
  • Basic Streaming analytics

###Why?

  • Highly scalable and guaranteed availability using battle-tested clustering capabilities provided by Apache Mesos and Marathon
  • Framework level guarantees against message loss, support for replay, multiple sources and complex tuple trees.
  • Event batching is supported at the core level.
  • Source level event partitioning used as unit for scalability.
  • Uses capabilities provided by docker to ensure strong application and partition level isolation guarantees
  • Supports multiple topology types
    • Handcrafted topologies (ala Apache Storm) packaged as docker containers
    • Configuration driven, dynamically generated topologies that are created on the fly
  • Complete separation of specification from the code, so that minimal code changes are required to update running topologies and deploy new ones.
  • On the fly topology creation and deployment by dynamically assembling topologies using components directly from artifactory.
  • Inbuilt support for custom metrics and custom code level healthchecks to catch application failures right when they happen.
  • Framework level time based aggregation support with completely independent processor level clock pulse generators.
  • Single JVM processing per partition effectively eliminating message transfers over the wire, thereby providing very high throughputs upto 150k events per second per partition.
  • Easy to write components(processors and sources) without getting bogged down by the complexities of stream processing. The compute framework abstracts out the threading model and provides a simplified single-threaded code model (ala NodeJS), thereby eliminating the need to implement complex threading models.
  • Provides capabilities to create processors that can be easily configured from outside.
  • Inherent support for Jackson based json serialization and deserialization thus allowing events to have nested structure and be processed easily using common libraries like json-path.

Getting Started

Glossary

Computation: A computation is a directed acyclic graph which describes the flow of data and the computations done on them. In this graph, the nodes are the components written by users. Nodes can be of two types, SOURCE and PROCESSOR. A single instance of a topology runs in a single JVM. A computation consists of sources and processors written by users linked together according to a json specification. All sources written by users must extend PipelineSource. All processors written by users must extend StreamingProcessor or ScheduledProcessor.
EventSet: An event set is a collection of events. An event set is the basic transmission unit within the computation.
Source: A source is a component that ingests event sets into the computation. A source is responsible for managing the state of the events ingested into the computation. Events can be in acknowledged or unacknowledged state.
Processor: A processor is a component that performs some computation on an incoming event set and emits an outgoing event set. A processor can be of two types, Streaming Processor and Scheduled Processor.
Streaming Processor: A Streaming Processor is a processor that is triggered whenever and event set is sent to the processor.
Scheduled Processor: A Scheduled Processor is a processor which is triggered whenever a fixed period of time elapses in a periodic fashion.

###Walkthrough
Let’s write a word count computation that processes a list of sentences and outputs words frequency counts.
We need three components for this computation:

  1. A source that generates random sentences - RandomSentenceSource
  2. A processor that splits the sentences by space into its constituent words - SplitterProcessor
  3. A processor that outputs the word frequency counts at regular intervals - WordCountProcessor

####RandomSentenceSource.java

// Add this annotation for registering the source with the metadata server
@Source(
        namespace = "global",
        name = "random-sentence-source",
        version = "0.1",
        description = "A source that generates random sentences from a pool of sentences",
        cpu = 0.1,
        memory = 64,
        requiredProperties = {},
        optionalProperties = {"randomGeneratorSeed"}
)
public class RandomSentenceSource implements PipelineSource {

    Random random;
    String[] sentences = {
            "A quick brown fox jumped over the lazy dog",
            "Life is what happens to you when you are busy making other plans",
            "Mama always said that life is a box of chocolates",
            "I am going to make you an offer you cannot refuse",
            "I am speaking to a dead man on the other side of the phone",
            "The path of the righteous man is beset on all sides by the inequities of the selfish and the tyranny of evil men"
    };

    @Override
    public void initialize(final String instanceName,
            final Properties global,
            final Properties local,
            final ProcessingContext processingContext,
            final ComponentMetadata componentMetadata) throws Exception {
        // this method is called to initialize the source
        // use this utility method to read properties passed
        int seed = ComponentPropertyReader
                .readInteger(local, global, "randomGeneratorSeed", instanceName, componentMetadata, 42);
        random = new Random(seed);
    }

    @Override
    public RawEventBundle getNewEvents() {
        // this method is called to get new events
        return RawEventBundle.builder()
                .events(getSentences(5).stream()
                        .map(sentence -> Event.builder().id(random.nextInt()).data(sentence.toLowerCase()).build())
                        .collect(Collectors.toCollection(ArrayList::new)))
                .meta(Collections.emptyMap())
                .partitionId(Integer.MAX_VALUE)
                .transactionId(Integer.MAX_VALUE)
                .build();
    }

    private List<String> getSentences(int n) {
        List<String> listOfSentences = new ArrayList<>();
        for (int i = 0; i < n; i++) {
            listOfSentences.add(sentences[random.nextInt(sentences.length)]);
        }
        return listOfSentences;
    }
}

####SplitterProcessor.java

@Processor(
        namespace = "global",
        name = "splitter-processor",
        version = "0.1",
        cpu = 0.1,
        memory = 32,
        description = "A processor that splits sentences by a given delimiter",
        processorType = ProcessorType.EVENT_DRIVEN,
        requiredProperties = {},
        optionalProperties = {"delimiter"}
)
public class SplitterProcessor extends StreamingProcessor {
    String delimiter;

    @Override
    protected EventSet consume(final ProcessingContext processingContext, final EventSet eventSet) throws
            ProcessingException {
        List<Event> events = new ArrayList<>();
        eventSet.getEvents().stream()
                .forEach(event -> {
                    String sentence = (String) event.getData();
                    String[] words = sentence.split(delimiter);
                    events.add(Event.builder()
                            .data(words)
                            .id(Integer.MAX_VALUE)
                            .properties(Collections.emptyMap())
                            .build());
                });
        return EventSet.eventFromEventBuilder()
                .isAggregate(false)
                .partitionId(eventSet.getPartitionId())
                .events(events)
                .build();
    }

    @Override
    public void initialize(final String instanceName,
            final Properties global,
            final Properties local,
            final ComponentMetadata componentMetadata) throws InitializationException {
        delimiter = ComponentPropertyReader.readString(local, global, "delimiter", instanceName, componentMetadata, ",");

    }

    @Override
    public void destroy() {
        // do some cleanup if necessary
    }
}

####WordCountProcessor.java

@Processor(
        namespace = "global",
        name = "word-count-processor",
        version = "0.2",
        description = "A processor that prints word frequency counts within a tumbling window",
        cpu = 0.1,
        memory = 128,
        processorType = ProcessorType.TIMER_DRIVEN,
        requiredProperties = {"triggering_frequency"},
        optionalProperties = {}
)
public class WordCountProcessor extends ScheduledProcessor {
    Map<String, Integer> wordCounts = new HashMap<>();

    @Override
    protected void consume(final ProcessingContext processingContext, final EventSet eventSet) throws
            ProcessingException {
        eventSet.getEvents().stream()
                .forEach(event -> {
                    String[] words = (String[]) event.getData();
                    for (String word: words) {
                        if (wordCounts.containsKey(word)) {
                            wordCounts.put(word, wordCounts.get(word) + 1);
                        } else {
                            wordCounts.put(word, 1);
                        }
                    }
                });
    }

    @Override
    public void initialize(final String instanceName,
            final Properties global,
            final Properties local,
            final ComponentMetadata componentMetadata) throws InitializationException {
        // nothing to initialize here
    }

    @Override
    public List<Event> timeTriggerHandler(ProcessingContext processingContext) throws ProcessingException {
        // this method will be called after a fixed interval of time, say 5 seconds
        System.out.println(Joiner.on(",").withKeyValueSeparator("=").join(wordCounts));
        wordCounts.clear();
        // nothing to send to downstream processors
        return Collections.emptyList();

    }

    @Override
    public void destroy() {
        wordCounts.clear();
    }
}

RandomSentenceSource -> SplitterProcessor -> WordCountProcessor

Let’s suppose all of these classes reside in a single maven project sample-topology
Suppose the jar sample-topology.jar is hosted at http://localhost:8080 To register these sources and processors, build and release this project into artifactory and then use the following fabric-server API

The json specification for this computation will look like this

{
    "name": "word-count-topology",
    "sources": [{
        "id": "random-sentence-source",
        "meta": {
            "id": "1cfda5cb-99d4-34e3-83f0-5b1364a92cce",
            "type": "SOURCE",
            "namespace": "global",
            "name": "random-sentence-source",
            "version": "0.1",
            "description": "A source that generates random sentences from a pool of sentences",
            "processorType": null,
            "requiredProperties": [],
            "optionalProperties": [
                "randomGeneratorSeed"
            ],
            "cpu": 0.1,
            "memory": 64,
            "source": {
                "type": "jar",
                "url": "file:///fabric-sample-processors/target/fabric-sample-processors-1.0-SNAPSHOT.jar"
            }
        },
        "properties": {}
    }],
    "processors": [{
        "id": "splitter-processor",
        "meta": {
            "id": "0b749006-d2dd-3684-a521-f76d6ab0dec8",
            "type": "PROCESSOR",
            "namespace": "global",
            "name": "splitter-processor",
            "version": "0.1",
            "description": "A processor that splits sentences by a given delimiter",
            "processorType": "EVENT_DRIVEN",
            "requiredProperties": [],
            "optionalProperties": [
                "delimiter"
            ],
            "cpu": 0.1,
            "memory": 32,
            "source": {
                "type": "jar",
                "url": "file:///fabric-sample-processors/target/fabric-sample-processors-1.0-SNAPSHOT.jar"
            }
        },
        "properties": {
            "processor.splitter-processor.delimiter": " "
        }
    }, {
        "id": "word-count-processor",
        "meta": {
            "id": "59f4fe28-b09b-3447-8bb2-26d3c23dd885",
            "type": "PROCESSOR",
            "namespace": "global",
            "name": "word-count-processor",
            "version": "0.2",
            "description": "A processor that prints word frequency counts within a tumbling window",
            "processorType": "TIMER_DRIVEN",
            "requiredProperties": [
                "triggering_frequency"
            ],
            "optionalProperties": [],
            "cpu": 0.1,
            "memory": 128,
            "source": {
                "type": "jar",
                "url": "file:///fabric-sample-processors/target/fabric-sample-processors-1.0-SNAPSHOT.jar"
            }
        },
        "properties": {
            "processor.word-count-processor.triggering_frequency": "5000"
        }
    }],
    "connections": [{
        "fromType": "SOURCE",
        "from": "random-sentence-source",
        "to": "splitter-processor"
    }, {
        "fromType": "PROCESSOR",
        "from": "splitter-processor",
        "to": "word-count-processor"
    }],
    "properties": {
        "computation.name": "word-count-topology",
        "computation.eventset.is_serialized": "false"
    }
}

###Benchmarks ####Performance Test Configuration No of messages: 1 million
Payload size: 258 bytes
####Topology Kafka Source -> Event Counter (Prints number of total events consumed every one second to the console)
No of partitions: 1
Topic Name: end-to-end-latency-perf
No of instances of topology: 1
Kafka source buffer size: 3 MB
Docker CPU (number of cpu shares): 1.0
Docker Memory: 2 GB
JVM Heap Size: 2 GB
####Kafka Broker configuration 2 cores Intel(R) Xeon(R) CPU E5-2666 v3 @ 2.90GHz 8 GB RAM
####Mesos Host configuration 8 cores Intel(R) Xeon(R) CPU E5-2666 v3 @ 2.90GHz 32 GB RAM

End to end latency to process all messages in seconds (ceiling) averaged over multiple runs is presented below

Configuration Without gzip With gzip
Local Zk, Local Kafka 8 5
Remote Zk, Remote Kafka 36 6

Throughput with best configuration T ~ 166666 events / second

NOTE: Using Disruptor with YieldWaitingStategy instead of LBQ for channel communication actually reduced the throughput