/HSLdevcom-dbclient

Database client for Pelias import pipelines

Primary LanguageJavaScript

This repository is part of the Pelias project. Pelias is an open-source, open-data geocoder originally sponsored by Mapzen. Our official user documentation is here.

Pelias Elasticsearch database client

This module provides a Node.js stream for bulk-inserting documents into Elasticsearch.

Build Status Greenkeeper badge

Install Dependencies

$ npm install

Usage

This module returns “streamFactory” —a function that produces a transforming stream. The stream puts documents into elasticsearch during import pipeline. Note: this stream triggers finish event only after all documents are stored into elasticsearch.

'use strict';

// some_importer.js

const streamify = require('stream-array');
const through = require('through2');
const Document = require('pelias-model').Document;
const dbMapper = require('pelias-model').createDocumentMapperStream;
const dbclient = require('pelias-dbclient');

const elasticsearch = require('elasticsearch');
const config = require('pelias-config').generate();
const elasticDeleteQuery = require('elastic-deletebyquery');

const timestamp = Date.now();

const stream = streamify([1, 2, 3])
  .pipe(through.obj((item, enc, next) => {
    const uniqueId = [ 'docType', item ].join(':'); // documents with the same id will be updated
    const doc = new Document( 'sourceType', 'venue', uniqueId );
    doc.timestamp = timestamp;
    next(null, doc);
  }))
  .pipe(dbMapper())
  .pipe(dbclient()); // put documents into elasticsearch

stream.on('finish', () => {
  // let's assume that documents with the same type but another timestamp (for example old copies)
  // have to be deleted
  const client = new elasticsearch.Client(config.esclient);
  elasticDeleteQuery(client);

  const options = {
    index: config.schema.indexName,
    type: 'venue',
    body: {
      query: {
        "bool": {
          "must": [
            {"term": { "source":  "sourceType" }}
          ],
          "must_not": [
            {"term": { "timestamp":  timestamp }}
          ]
        }
      }
    }
  };

  client.deleteByQuery(options, (err, response) => {
    console.log('The elements deleted are: %s', response.elements);
  });
});

Update ES index by adding new fields to existing docs

var client = peliasDbclient({
    merge: true,
    mergeFields: ['name'],     // optional list of fields that need merging (default = whole doc)
    mergeAssignFrom: ['name'], // to keep the phrase field valid when name changes.
    mergeAssignTo: ['phrase']  // target field for each 'From' array entry above.
                               // mergeAssignFrom.length must match mergeAssignTo.length
  });
  // Index as usual. Whenever document ids match, the new data updates the old doc.

Contributing

Please fork and pull request against upstream master on a feature branch.

Pretty please; provide unit tests and script fixtures in the test directory.

Running Unit Tests

$ npm test

Continuous Integration

Travis tests every release against all currently supported Node.js versions.