
Delimiter seperated value file loader for MongoDB. Built upon Firehose framework.

Description:A DSV import tool and ThreadPool framework for MongoDB clients
Author: Bryan Reinero <breinero@gmail.com>


Firehose is both a mulithreaded DSV import tool for MongoDB, AND an instrumented execution framework which you can use to benchmark your own applications.

Firehose includes these major components:
  • A simple code instrumentation and reporting library
  • A customizable command line interface builder
  • A multithreaded worker pool
  • An application framework so that you may use all of these components together for your own load testing purposes

The Main Take Away

While Firehose does include a DSV import tool, that functionality is actually just an example application which uses all of the components together to do useful work. Let's take a closer look at the import tool to understand how you might want to use Firehose's features.

Firehose by Example: The DSV Import Tool

Ok, so I want to read a CSV file and import those records into MongoDB as fast as I can. To get that file into MongoDB I will need to execute a three step process on each line of the file;

  • Read the next line from the file
  • Parse the line, converting it into a object prepped for insertion
  • Insert the new object into MongoDB

As a curious and conscientious software engineer, I am very interested to know how much time each of these steps takes so that I can establish performance baselines. I can use Firehose's instrumentation library to mark the start and end of each step with use of the Interval class class. For instance, here's how I determine how long and individual insertion takes.

Interval insertDuration = samples.set("insert"); // Set the time marker
dao.insert( object ); // perform the actual work
insertDuration.mark(); // mark the operation as complete

The insertDuration Interval is collected and averaged automatically by Firehose giving me mean latency of the insertion operation. Firehose will then pretty-print the running average to the console so that I may see how fast my operations are executing in real time.

Firehose pretty prints this output, ( refreshing the console each second)

   threads: 2,
   "linesread": 100000,
   samples: {
       units: "microseconds",
       "interval": 1000000,
       ops: [
               name: "total",
               count: 7926,
               average: 250
               name: "readline",
               count: 7926,
               average: 1
               name: "insert",
               count: 7926,
               average: 182
               name: "build",
               count: 7924,
               average: 3

This output tells me that inserts are taking an average of 182 microseconds, as averaged over a time interval of 1000000 microseconds, (1 second). During this 1 second interval I inserted 7926 documents. As the output is printed in JSON format I can insert these stats into MongoDB for benchmarking analysis!

You can take a look at how this workload is processed here

Firehose by Example: The DSV Import Command Line Interface

Under the hood, Firehose uses the Apache Commons CLI library to parse command line options passed in at runtime. Firehose wraps the Commons CLI into the framework such that we can configure our own set of command line options easily. Using the CLI framework is a two step process.

  1. Declare command line options in a properties file
  2. Assign callback methods to handle the input

As an example let's take a look at the usage for Firehose's DSV Import feature to see how it uses the Commons CLI:


option long form type description
-cr --noPretty   print out in CR-delimited lines. Default is console mode pretty printing (when possible)
-f, --file <filepath> filename to import
-fs, --fsync   write concern: wait for page flush
-h, --headers <name:type> ',' delimited list of columns
-j, --journal   enable write concern wait for journal commit
-m, --mongos <host:port> ',' delimited list of mongodb hosts to connect to. Default localhost:27017
-ns, --namespace <namespace> target database and collection this work will use (format: 'db.col')
-pi, --printInterval <seconds> print output every n seconds
-ri, --reportInterval <seconds> average stats over a time interval of i milliseconds
-t, --threads <threads> number of worker threads. Default 1
-v, --verbose   Enable verbose output
-wc, --writeConcern <concern> write concern. Default = w:1

To generate these options I first declared the options I wanted to use inside my options.json file. Here's a snippet of the file:

"application": "Firehose",
"options": [
        "op": "m",
        "longOpt" : "mongos",
        "name": "hostname:port",
        "description": "',' delimited list of mongodb host to connect to. Default localhost:27017,",
        "args": "multi",
        "separator": ","
        "op": "f",
        "longOpt" : "file",
        "name": "file",
        "description":"filename to import (full path)",
        "required": true,
        "args" : 1
        "op": "t",
        "longOpt" : "threads",
        "name": "threads",
        "description": "number of worker threads. Default 1",
        args: 1

Firehose will read this file at application start up, creating the specific command line options I need to run the application. Now, all I need to do is define a set of callbacks which handle the processing of my command line options when a user actually runs the DSV Import tool. For example, here's the callback for handling input on the "-t" (or number of worker threads in the pool) option.

cli.addCallBack("t", new CallBack() {
    public void handle(String[] values) {
        numThreads = Integer.parseInt(values[0]);

You can examine more callback examples in the code.

Example run

java -jar target/Firehose-0.1.0.one-jar.jar -f test.csv -d , -ns test.firehose -h _id:objectid,count:float,sum:float,name:string -t 2

This command line invokes Firehose with 2 threads, parsing a CSV file of 4 columns. Each column is to be translated into json fields named "_id", "count", "sum" and "name", of types ObjectId, float, float, string respectively.

Using The Application Framework

Firehose's application framework is made for standing up simple load tests quickly. As such, it comes with a set of command line options fully configured for control of the worker pool, instrumentation library, and access to MongoDB. Users of the application framework need only add:

  • Any extra command line options specific to their application
  • An instance of Executable which the worker pool calls as a unit of work

Let's again use the DSV import tool as an example. The application framework is initialized inside Firehose's constructor. The first step is to define the appropriate command line interface callbacks I need to handle user input.

public Firehose ( String[] args ) throws Exception {

Map<String, CallBack> myCallBacks = new HashMap<String, CallBack>();

// custom command line callback for csv conversion
myCallBacks.put("h", new CallBack() {
    public void handle(String[] values) {
        for (String column : values) {
            String[] s = column.split(":");
            converter.addField( s[0], Transformer.getTransformer( s[1] ) );

// custom command line callback for delimeter
myCallBacks.put("d", new CallBack() {
    public void handle(String[] values) {
        converter.setDelimiter( values[0] );

// custom command line callback for delimeter
myCallBacks.put("f", new CallBack() {
    public void handle(String[] values) {
        filename  = values[0];
        try {
            br = new BufferedReader(new FileReader(filename));
        }catch (Exception e) {

Remember, the Application class has already defined CLI callbacks for the worker pool, instrumentation engine and MongoDB driver. All I needed to add where the callbacks for the input file, value delimiter and column headers. I've defined these callbacks as a collection of anonymous functions which I pass to the Application class' constructor:

worker = Application.ApplicationFactory.getApplication(this, args, myCallBacks);
The Application class' constructor takes 3 parameters
  1. A class which implements Executor
  2. A String array of the command line options
  3. A list of custom command line callbacks

Bingo. I'm ready to rock and roll. Notice that the 'this' in the first parameter refers to an instance of the Firehose class, which implements Executable. The overridden execute() method is where all the work is done.

Build and Quickly Test Firehose

I've included a CSV file generator called RandomDSVGenerator so that you may test your build and see Firehose in action with minimal effort. Simply run the following commands from the the command line prompt.

$ mvn package
$ java -cp target/DSVLoader-0.1.1.jar:../Firehose-fork/target/Firehose-0.1.1.jar com.bryanreinero.dsvloader.test.RandomDSVGenerator -f test.csv -n 10000
$ java -jar target/DSVLoader-0.1.1.one-jar.jar -f test.csv -d , -h _id:objectid,count.0:float,count.1:float,name:string -t 20

Why Firehose?

As a consultant, I often advise my clients to instrument their application code such that they have a baseline of performance metrics. Getting baselines is extremely useful in identifying bottlenecks, understanding how much concurrency your application can handle, determining what latency is "normal" for the application, and indicating when performance is deviating from those norms.

While most developers will acknowledge the value of instrumentation, few actually implement it. So to help them along, Firehose was designed with some basic instrumentation boiled right into it.


Firehose is supported and somewhat tested on Java 1.7

Additional dependencies are:


To Do

  • Accept piped input from stdine
  • Write Javadocs
  • Accept json input
  • Accept mongoexport formated csv's
  • fix README formatting