Riak Protocol Buffer Client for Node.js.
Features:
- streaming
- fail-over
- automatic retry
- connection pooling
- load balancing
Include riak-pb
in your dependency list or install:
$ npm install riak-pb
var riak = require('riak-pb');
var client = riak();
Or, with options:
var options = {
nodes: [
{
host: 'myriakhostname.acme.com',
port: 8087 }],
maxPool: 5, // Maximum number of connections in the connection pool - default is 5
maxRetries: 10, // maximum times the client tries to reconnect before failing - default is 10
maxDelay: 2000, // maximum time (ms) between reconnections - reconnections have an exponential backoff, but limited to this value - default is 2000
};
var client = riak(options);
The API is based on the Riak Protocol spec, check it out to find out about what arguments you need.
Examples:
client.put({
bucket: 'test',
key: 'test',
content: {
value: '{"test":"data"}',
content_type: 'application/json',
indexes: [{ key: 'test_bin', value: 'test' }]
}
},
function (err, reply) {
//...
console.warn("ERr: "+JSON.stringify(err)+" Reply: "+JSON.stringify(reply));
});
With multiple indexes:
var indexes = [{ key: 'key1_bin', value: 'value1' }, { key: 'key2_bin', value: 'value2' }];
var options = { bucket: 'test', key: 'test-put-index', content: { value: '{"test":"data"}', content_type: 'application/json', indexes: indexes }, return_body: true };
client.put(options, function(err, reply) {
//...
});
With vector clock:
var options = { bucket: 'test', key: 'test-vclock', content: { value: '{"test":"data"}', content_type: 'application/json' }, return_body: true };
client.put(options, function (err, reply) {
if (err) throw err;
var options = { bucket: 'test', key: 'test-vclock', content: { value: '{"test":"data"}', content_type: 'application/json' }, return_body: true };
options.vclock = reply.vclock;
client.put(options, function(reply) {
// ...
});
});
Example:
client.get({ bucket: 'test', key: 'test' }, function (err, reply) {
t.equal(++cbCount, 1);
t.notOk(err, err && err.message);
t.ok(Array.isArray(reply.content));
t.equal(reply.content.length, 1);
t.equal(reply.content[0].value, '{"test":"data"}');
t.end();
});
Example:
client.getIndex({
bucket: 'test',
index: 'test_bin',
qtype: 0,
key: 'test' },
function (err, reply) {
//...
});
Range query example:
client.getIndex({
bucket: 'test',
index: 'test_bin',
qtype: 1,
range_min: 'abc',
range_max: 'abcdef' },
function (err, reply) {
//...
});
Example:
client.setBucket('test', { allow_mult: true, n_val: 3 },
function (err, reply) {
/// ...
});
With callback:
client.getKeys('test', function (err, keys) {
/// ...
});
Streaming:
var s = client.getKeys('test');
s.on('readable', function() {
var key;
while(key = s.read()) {
console.log('got key:', key);
}
});
With callback:
client.search({ index: 'key1_bin', q: 'test' }, function (err, reply) {
/// ...
});
With callback:
var request = {
inputs: 'test',
query: [
{
map: {
source: 'function (v) { return [[v.bucket, v.key]]; }',
language: 'javascript',
keep: true
}
}]};
var params = { request: JSON.stringify(request), content_type: 'application/json' };
client.mapred(params, function (err, responses) {
/// ...
});
Streaming:
var request = {
inputs: 'test',
query: [
{
map: {
source: 'function (v) { return [[v.bucket, v.key]]; }',
language: 'javascript',
keep: true
}
}]};
var params = { request: JSON.stringify(request), content_type: 'application/json' };
var s = client.mapred(params);
s.on('readable', function() {
var res;
while(res = s.read()) {
console.log('got res:', res);
}
});
client.del('test', key, function(err) {
// ...
});
or, with options:
client.del('test', {key: key, vclock: vclock}, function(err) {
// ...
});
Queues a disconnect after all the pending requests are complete
client.disconnect();
- getBuckets(callback) // callback(err, buckets)
- getBucket(bucket[, callback]) // callback(err, bucketInfo)
- setBucket(bucket, props[, callback]) // callback(err)
- setClientId (client_id[, callback]) // callback(err)
- getClientId (callback) // callback(err, clientId)
- ping (callback) // callback(err)
- getServerInfo(callback) // callback(err, reply)
The client object emits these events:
- 'error' - (err)
- 'warning' - (warning) - Emitted when there is an internal error, like a disconnection. In this case, the client will transparently attempt to reconnect (up to a limit of attempts) and a "warning" will be emitted with the underlying error object.