/node-qfifo

fifo file for newline terminated strings

Primary LanguageJavaScript

qfifo

Build Status

Quick file-based fifo to buffer newline terminated strings, designed to be a very low overhead, very fast local journal that can both quickly persist large volumes of data and efficiently feed the ingest process that consumes it.

Highlights

  • Simple open / getline / putline / close semantics.
  • Node readline emulation.
  • Synchronous calls, asynchronous file i/o.
  • Efficient batched file reads and writes.
  • Very high throughput, hundreds of megabytes of data per second.
  • Universally supported, very easy, very fast newline terminated plaintext data format.
  • Metadata is optional, and is kept alongside in a separate file. Only the consumer might need metadata.
  • No external dependencies.
  • Tested to work with node-v0.6 and up.

Limitations

  • QFifos are inherently single-reader, the consuming process owns the fifo.
  • Single-writer; concurrent writes are not supported. (File locking is missing from nodejs, so this incarnation differs from the quicklib version it was based on.)

Api

fifo = new QFifo( filename, modeOrOptions )

Create a fifo for reading or appending the named fifo file.

Fifos operate in read mode or append mode. In 'a' append-only mode and 'a+' read-write mode fifos are always written to at the end, with missing fifos created; in 'r' read mode it is an error if the fifo does not already exist. The 'r+' mode is accepted as a performance optimization (see Options below), but should not be used for writing. For exclusive read-only or append-only access, omit the + from the mode flag.

NOTE: Reading fifos is always safe; writing 'r+' read-mode fifos is currently not supported. For now (as of version 0.7.0) always write fifos in 'a' or 'a+' append mode to ensure that data is not overwritten.

Creating a new QFifo is a fast, it only allocates the object and initializes state; the filesystem is only accessed on open (but note that getline, putline and readlines automatically open the fifo).

Options may contain the following settings:

  • flag: open mode flag for fs.open(), default 'r'. The flag may be 'r' for read-only, 'a' for append-only, or 'a+' for read-write. 'r+' is allowed as an optimization for faster in-file header rsyncs, but should not be used for data writes.
  • readSize: how many bytes to read at a time, default 64K.
  • writeSize: how many pending chars trigger an immediate write instead of waiting writeDelay ms, default 16K. Set to 0 zero to always write immediately.
  • writeDelay: write-combining optimization, how many ms to wait for more data to arrive before writing to file, default 2 ms.
  • updatePosition: whether to update fifo.position with the byte offset in the file of the next line to be read. Default true. position is needed to checkpoint the read state, but omitting it is 25% faster (for e.g. when the whole file is consumed without checkpointing).
  • reopenInterval: how frequently to reopen the fifo file, -1 never. Default every 20 ms to ensure that writes will cease after a grab or an external file rename.

fifo.open( callback(err, fd) )

Open the file for use by the fifo. Returns the file descriptor used or the open error. The fifo has no lines yet after the open, reading is started by the first getline(). It is safe to open the fifo more than once, the duplicate calls are harmless. Note that getline and putline will open the file if it hasn't been already.

fifo.close( [callback(err)] )

Close the file descriptor. The fifo will not be usable until it is reopened. Close is not synchronized with reads/writes, closing the fifo while still in use will produce and error. Sync data with flush() and/or read state with rsync() first. If a callback is provided it will be invoked once the files are closed.

line = fifo.getline( )

Return the next unread line from the file or the empty string '' if there are no new lines available. Reading is asynchronous, it is done in batches in the background, so lines may become available later. Always returns a utf8 string. The fifo.eof flag is set to indicate that zero bytes were read from the file. Retrying the read may clear the eof flag. Read errors are saved in fifo.error and stop the file being read.

var readFile(filename, callback) {
    var fifo = new QFifo(filename, { flag: 'r', readSize: 256 * 1024 });
    var contents = '';
    (function readLines() {
        for (var i=0; i<100; i++) contents += fifo.getline();
        if (fifo.error || fifo.eof) callback(fifo.error, contents);
        // yield the cpu so the fifo can read more of the file
        else setTimeout(readLines);
    })();
}

fifo.rsync( callback )

Checkpoint the byte offset of the next unread line from fifo.position so if the fifo is reopened it can resume reading where it left off. The information is saved in JSON form to a separate file named the same as the fifo filename but with '.hd' appended.

fifo.putline( line [,callback(err)] )

Append a line of text to the file. All lines must be newline terminated; putline will supply the newline if it is missing. Putline is a non-blocking call that buffers the data and returns immediately. Writing is is done in batches asychronously in the background. The application must periodically yield the cpu for the writes to happen. Any write errors are saved in fifo.error and no further writes will be performed. The optional callback function will be called after the line has been written.

fifo.write( string )

Internal fifo append method that does not mandate newlines. Use with caution.

fifo.flush( callback )

Invoke callback() once the currently buffered writes have completed. New writes made after the flush call are not waited for (but may still have been included in the batch).

fifo.readlines( visitor(line) )

Loop getline() and call visitor with each line in the fifo. fifo.eof will be set once the fifo is empty and has no more lines. If the fifo is appended, eof is cleared but the readlines loop is not restarted. Note that readlines and getline return lines that include the terminating newline, which differs from node readline that strips them.

NOTE: This call is not reentrant, only a single function may be reading the fifo at a time. Calling readlines() with a new visitor displaces the first function.

fifo.readlines(function(line)) {
    // if (fifo.eof) then no more lines
}

fifo.pause( )

Suspend the readlines loop, do not deliver any more lines until resumed.

fifo.resume( )

Resume the readlines loop, start delivering lines again.

fifo.compact( options, callback )

Copy the unread portion of the fifo to the start of the file. This call assumes that all lines read have been successfully handled, and checkpoints the current read position, ie implicitly does an rsync. The fifo should not be read or written until this call completes.

Options:

  • minSize only compact if file has grown to this many bytes, default 1 million
  • minReadRatio only compact if this fraction of the file has been read, default 2/3
  • readSize copy in chunks of this many bytes, default fifo options.readSize (64K)
  • dstOffset how many bytes to leave without overwriting at the start of the file

fifo.copyBytes( srcFd, dstFd, srcOffset, srcLimit, dstOffset, buff, callback(err, nbytes) )

Copy the data bytes read from the file descriptor srcFd from between srcOffset and srcLimit into the file descriptor dstFd starting at offset dstOffset. Buff must be an appropriately sized Buffer to hold the data chunks as they are processed, typically between 8 and 64 KB. Calls callback with the number of bytes copied. Pass srcLimit = Infinity to copy the whole file.

fifo.remove( callback )

Remove the fifo file and its header. The fifo remains readable and writable until closed. Returns an error if the fifo file or header cannot be removed. It is not an error for the header to not exist.

fifo.rename( name, callback )

Rename the fifo file. The fifo is not closed, it remains readable and writable. Returns an error if the rename fails or the fifo file does not exist. It is not an error for the header file to not exist.

fifo.rotateFiles( filename, callback(err, errors, names) )

Helper method to rename the filename to filename.1. If filename.1 already exists, rename it to filename.2 and so on for all older versions of filename. When done, calls callback with the first error encountered, an array with all rename errors, and the list of successfully renamed filenames.

fifo.matchFiles( dirname, regex, callback(err, matches) )

Return the match results of all filenames in the directory that satisfy the regular expression. The match results are produced by filename.match(regex) so that match[0] is always the original filename. The matches are not sorted.

fifo.batchCalls( processBatch(batch, done(err)) [,options] )

Helper method to help process items in batches. Returns a function to be called with each item, and processBatch will be called once with each batch of items.

var processItem = fifo.batchCalls(function processBatch(batch, cb) {
    // first call => [1, 2]
    // second call => [3]
    cb();
}, { maxBatchSize: 2 });
processItem(1);
processItem(2);
processItem(3);

Options:

  • maxWaitMs - how long to wait for more items before processing the batch. Default 0, to process immediately.
  • maxBatchSize - the cap on how large a batch may grow before it will be processed. Default 10.
  • startBatch - function to call with maxBatchSize to obtain an empty batch. The empty batch is populated with push(item) or with growBatch(item). Default empty batch is an empty array [].
  • growBatch - function to call to add an item to the batch, called with batch and item. Default is batch.push.

fifo.position

Byte offset of the next unseen line in the input file.

fifo.error

Read or write error that was encountered. Either stops the fifo. Errors also set fifo.eof so loops that check just eof still terminate once no more data is forthcoming.

fifo.eof

Flag set when the fifo contains no more lines, ie when the end of the file has been reached and no more lines are left in the buffer. Appending lines to the fifo and retrying the read clears the eof flag.

Todo

  • preopen option
  • make r-mode open run _getmore to prefetch and wait for results before returning
  • allow for streaming Buffers straight to file
  • maybe queue pending writes and have _writesome() write the queued parts
  • optional concurrent true/false batchCalls config setting (default true)
  • harvest idle file descriptors whose reopenTime has expired

Changelog

  • 0.7.0 - fixes, optional callback to close, experimental support for in-file headers, much faster rsync for r+ mode fifos, fix flush callbacks, optional callback to putline()
  • 0.6.0 - rename method, remove method, compact method, move batchCalls options to front
  • 0.5.0 - matchFiles method, experimental reopenInterval option
  • 0.4.2 - rotateFiles helper, fledgeling batchCalls helper
  • 0.3.0 - readlines/pause/resume methods, updatePosition option for faster reading, set eof only when no more lines available
  • 0.2.2 - allow writing Buffers, space-pad header files
  • 0.2.1 - constructor options, pass-through options.flag, auto-open on fifo read/write, faster rsync
  • 0.1.0 - first version