priam
A simple Cassandra driver for NodeJS. It wraps the
helenus and
cassandra-driver modules with additional error/retry handling, external
.cql file support, and connection option resolution from an external source, among other improvements.
By default, the driver uses cassandra-driver over a binary-protocol connection.
If a Thrift connection is desired, simply specify the helenus driver option
in config. Priam uses internal aliases to map
helenus and cassandra-driver
options to facilitate easily switching between the two drivers. Specifying the appropriate cqlVersion for your database
will ensure that the appropriate driver is selected.
Priam is designed to be used as a single instance in order to preserve the
connection pools. As an example, in an Express application,
priam should be initialized at server startup and attached to the request
object so that your controllers can access it.
Priam is actively developed and used by Go Daddy Website Builder to provide a high-availability and high-performance hosting platform based on the Cassandra database.
Example Usage
Check the example folder for a more complete example. Start by running: npm start followed by curl http://localhost:8080/.
Using Known Connection Information
var path = require('path');
var db = require('priam')({
config: {
cqlVersion: '3.0.0', /* optional, defaults to '3.1.0' */
timeout: 4000, /* optional, defaults to 4000 */
poolSize: 2, /* optional, defaults to 1 */
consistencyLevel: 'one', /* optional, defaults to one. Will throw if not a valid Cassandra consistency level*/
driver: 'helenus', /* optional, defaults to 'datastax' */,
protocol: 'thrift', /* optional, defaults to 'binary' */,
numRetries: 3, /* optional, defaults to 0. Retries occur on connection failures. */
retryDelay: 100, /* optional, defaults to 100ms. Used on error retry or consistency fallback retry */
enableConsistencyFailover: true, /* optional, defaults to true */
queryDirectory: path.join(__dirname, 'path/to/your/cql/files'), /* optional, required to use #namedQuery() */
user: '<your_username>',
password: '<your_password>',
keyspace: '<your_keyspace>', /* Default keyspace. Can be overwritten via options passed into #cql(), etc. */
hosts: [ /* Ports are optional */
'123.456.789.010:9042',
'123.456.789.011:9042',
'123.456.789.012:9042',
'123.456.789.013:9042'
]
}
});Executing CQL
The driver provides the #cql() method for executing CQL statements against Cassandra.
It provides the following arguments:
-
cql: The CQL statement to execute. Parameters should be replaced with?characters -
dataParams: The parameters array. Should match the order of?characters in thecqlparameter -
options: Optional. Additional options for the CQL call. Supportsconsistency,queryName,keyspace, andexecuteAsPrepared. -
callback(err, data): Optional. The callback for the CQL call. Provideserranddataarguments.datawill be anArray.
dataParams will be normalized as necessary in order to be passed to Cassandra. In addition to primitives
(Number/String), the driver supports JSON objects, Array and Buffer types. Object and Array types will be
stringified prior to being sent to Cassandra, whereas Buffer types will be encoded.
The executeAsPrepared option informs the node-cassandra-cql driver
to execute the given CQL as a prepared statement, which will boost performance if the query is executed multiple times.
This option is currently ignored if using the helenus thrift driver.
The queryName option allows metrics to be captured for the given query, assuming a metrics object was passed into
the constructor. See the Monitoring / Instrumentation section for more information.
The consistency option allows you to override any default consistency level that was specified in the constructor.
The resultHint option allows you to control how objects being returned by the underlying provider are treated. For
example, a data type of objectAscii will result in JSON.parse() being called on the resulting value. Special data types
of objectAscii and objectText are available for this purpose. If these data types are used in a parameter's hint
field, they will be automatically mapped to the corresponding data type (e.g. ascii or `text) prior to executing the
cql statement.
The deserializeJsonStrings option informs Priam to inspect any string results coming back from the driver and call
JSON.parse() before returning the value back to you. This works similar to providing resultHint options for specific
columns, but instead it applies to the entire set of columns. This was the default behavior prior to the 0.7.0 release.
The keyspace option allows you to specify another keyspace to execute a query against. This will override the default
keyspace set in the connection information.
The suppressDebugLog option allows you to disable debug logging of CQL for an individual query. This is useful for
queries that may contain sensitive data that you do not wish to show up in debug logs.
When using the cassandra-driver driver,
hinted parameters are supported.
Instead of the driver inferring the data type, it can be explicitly specified by using a
specially formatted object.
Similar to consistencies, data types are exposed via the <instance>.dataType object.
Other than type uuid, parameter hints will be ignored when using the helenus driver.
There is also a param(value [object], type [string]) helper method for creating hinted parameters, as shown below:
Example
var db = require('priam')({
config: { /* ... options ... */ }
});
db.cql(
'SELECT "myCol1", "myCol2" FROM "myColumnFamily" WHERE "keyCol1" = ? AND "keyCol2" = ?',
[db.param('value_of_keyCol1', 'ascii'), db.param('value_of_keyCol2', 'ascii')],
{ consistency: db.consistencyLevel.one, queryName: 'myQuery', executeAsPrepared: true },
function (err, data) {
if (err) {
console.log('ERROR: ' + err);
return;
}
console.log('Returned data: ' + data);
}
);Named Queries
The driver supports using named queries. These queries should be .cql files residing in a single folder. The query
name corresponds to the file name preceding the .cql extension. For example, file myObjectSelect.cql would have a
query name of myObjectSelect.
In order to use named queries, the optional queryDirectory option should be passed into the driver constructor.
Queries are loaded synchronously and cached when the driver is constructed.
Named queries will automatically provide the queryName and executeAsPrepared options when executing the CQL,
though the caller can override these options by providing them in the options object.
Example
var db = require('priam')({
config: { queryDirectory: path.join(__dirName, 'cql') }
}); /* 'cql' folder will be scanned and all .cql files loaded into memory synchronously */
db.namedQuery(
'myColumnFamilySelect', /* name of .cql file with contents: 'SELECT "myCol1", "myCol2" FROM "myColumnFamily" WHERE "keyCol1" = ? AND "keyCol2" = ?' */
['value_of_keyCol1', 'value_of_keyCol2'],
{ consistency: db.consistencyLevel.ONE },
function (err, data) {
if (err) {
console.log('ERROR: ' + err);
return;
}
console.log('Returned data: ' + data);
}
);Fluent Syntax
The driver provides a fluent syntax that can be used to construct queries.
Calling #beginQuery() returns a Query object with the following chainable functions:
-
#query(cql [string]): Sets the cql for the query to execute. -
#namedQuery(queryName [string]): Specifies the named query for the query to execute. -
#param(value [object], hint [optional, string], isRoutingKey [optional, bool]): Adds a parameter to the query. Note: They are applied in the order added.
If isRoutingKey is provided and true, the given parameter will be used to determine the coordinator node to execute a query against, when using the datastax driver and a prepared statement.
-
#params(parameters [Array]): Adds the array of parameters to the query. Parameters should be created usingdb.param() -
#options(optionsDictionary [object]): Extends the query options. See Executing CQL for valid options. -
#consistency(consistencyLevelName [string]): Sets consistency level for the query. Alias for#options({ consistency: db.consistencyLevel[consistencyLevelName] }). -
#all(): Default functionality. After calling execute will return array of any results found. -
#first(): After calling execute will return first, if any of the results found. -
#single(): Similar to first, will return first result however will yield an error if more than one record found. -
#execute(callback [optional, function]): Executes the query. If a callback is not supplied, this will return a Promise.
Fluent Syntax Examples
db
.beginQuery()
.query('SELECT "myCol1", "myCol2" FROM "myColumnFamily" WHERE "keyCol1" = ? AND "keyCol2" = ?')
.param('value_of_keyCol1', 'ascii')
.param('value_of_keyCol2', 'ascii')
.consistency("one")
.options({ queryName: 'myColumnFamilySelect' })
.options({ executeAsPrepared: true })
.execute(function (err, data) {
if (err) {
console.log('ERROR: ' + err);
return;
}
console.log('Returned data: ' + data);
});Similarly, fluent syntax can be used for named queries.
db
.beginQuery()
.namedQuery('myColumnFamilySelect') /* name of .cql file with contents: 'SELECT "myCol1", "myCol2" FROM "myColumnFamily" WHERE "keyCol1" = ? AND "keyCol2" = ?' */
.param('value_of_keyCol1', 'ascii')
.param('value_of_keyCol2', 'ascii')
.consistency('one')
.execute(function (err, data) {
if (err) {
console.log('ERROR: ' + err);
return;
}
console.log('Returned data: ' + data);
});The fluent syntax also supports promises, if a callback is not supplied to the #execute() function.
db
.beginQuery()
.namedQuery('myColumnFamilySelect') /* name of .cql file with contents: 'SELECT "myCol1", "myCol2" FROM "myColumnFamily" WHERE "keyCol1" = ? AND "keyCol2" = ?' */
.param('value_of_keyCol1', 'ascii')
.param('value_of_keyCol2', 'ascii')
.consistency('one')
.execute()
.fail(function (err) {
console.log('ERROR: ' + err);
})
.done(function (data) {
console.log('Returned data: ' + data);
});Batching Queries
Queries can be batched by using the fluent syntax to create a batch of queries. Standard CQL and named queries can be
combined. If a consistency level for the batch is not supplied, the strictest consistency from the batched queries will
be applied, if given. Similarly, if debug logs for one of the batched queries are suppressed, debug logs for the entire
batch will be suppressed. queryName and executeAsPrepared for individual queries will be ignored.
Note: Batching prepared statements is currently not supported. If prepared statements are used in a batch, the entire CQL query will be sent over the wire.
IMPORTANT: Batching is only supported with INSERT, UPDATE and DELETE commands. If SELECT statements are
added, the query will yield a runtime error.
For more information on batching, see the CQL 3.0 reference.
If using CQL 3.1, batches can be nested and timestamps will be applied at the query level instead of the batch level. If using CQL 3.0, only timestamps at the outermost batch level will be applied. Any others will be ignored.
Calling #beginBatch() returns a Query object with the following chainable functions:
-
#addQuery(query [Query]): Adds a query to the batch to execute. The query should be created bydb.beginQuery(). -
#addBatch(batch [Batch]): Adds the queries contained within thebatchparameter to the current batch.. The batch should be created bydb.beginBatch(). -
#add(batchOrQuery [Batch or Query]): Allowsnull,Query, orBatchobjects. See#addQuery()and#addBatch()above. -
#options(optionsDictionary [object]): Extends the batch. See Executing CQL for valid options. -
#timestamp(clientTimestamp [optional, long]): Specifies thatUSING TIMESTAMP <value>will be sent as part of the batch CQL. IfclientTimestampis not specified, the current time will be used. -
#type(batchTypeName [string]): Specifies the type of batch that will be used. Available types are'standard','counter'and'unlogged'. Defaults to'standard'. See CQL 3.1 reference for more details on batch types. -
#consistency(consistencyLevelName [string]): Sets consistency level for the batch. Alias for#options({ consistency: db.consistencyLevel[consistencyLevelName] }). -
#execute(callback [optional, function]): Executes the query. If a callback is not supplied, this will return a Promise.
Batch Syntax Example
db
.beginBatch()
.add(db.beginQuery(
.query('UPDATE "myColumnFamily" SET "myCol1" = ?, "myCol2" = ? WHERE "keyCol1" = ? AND "keyCol2" = ?')
.param('value_of_myCol1', 'ascii')
.param('value_of_myCol2', 'ascii')
.param('value_of_keyCol1', 'ascii')
.param('value_of_keyCol2', 'ascii')
)
.add(db.beginQuery(
.query('UPDATE "myOtherColumnFamily" SET "myCol" = ? WHERE "keyCol" = ?')
.param('value_of_myCol', 'ascii')
.param('value_of_keyCol', 'ascii')
)
.consistency('quorum')
.type('counter')
.timestamp()
.execute()
.fail(function (err) {
console.log('ERROR: ' + err);
})
.done(function (data) {
console.log('Returned data: ' + data);
});Helper Functions
The driver also provides the following functions that wrap #cql(). They should be used in place of #cql() where
possible, when not using named queries, as it will allow you to both use default consistency levels for different types
of queries, and easily find references in your application to each query type.
-
select: calls#cql()withdb.consistencyLevel.one -
insert: calls#cql()withdb.consistencyLevel.localQuorum -
update: calls#cql()withdb.consistencyLevel.localQuorum -
delete: calls#cql()withdb.consistencyLevel.localQuorum
Connection Management
Connection pools are automatically instantiated when the first query is run and kept alive for the lifetime of the driver. To manually initiate and/or close connections, you can use the following functions:
-
#connect(keyspace [string, optional], callback [Function]): Callscallbackparameter after connection pool is initialized, or existing pool is retrieved. Can be used at application startup to immediately start the connection pool. -
#close(callback [Function]): Callscallbackafter all connection pools are closed. Useful for testing purposes.
Error Retries
The driver will automatically retry on network-related errors. In addition, other errors will be retried in the following conditions:
-
db.consistencyLevel.allwill be retried atdb.consistencyLevel.eachQuorum -
db.consistencyLevel.quorumanddb.consistencyLevel.eachQuorumwill be retried atdb.consistencyLevel.localQuorum
The following retry options are supported in the driver constructor:
-
enableConsistencyFailover: Optional. Defaults totrue. Iffalse, the failover described above will not take place. -
numRetries: Optional. Defaults to 0 (no retry). The number of retries to execute on network failure. *Note: this will also affect the number of retries executed during consistency level fallback. For example, ifnumRetriesis 2 and a CQL query withdb.consistencyLevel.allis submitted, it will be executed 3 times atdb.consistencyLevel.all, 3 times atdb.constistencyLevel.eachQuorumand 3 times atdb.consistencyLevel.localQuorumbefore yielding an error back to the caller. -
retryDelay: Optional. Defaults to 100.
Logging
The driver supports passing a winston logger inside of the options.
require('priam')({
config: { /* connection information */ },
logger: new (require('winston')).Logger({ /* logger options */ })
});Debug logging of CQL can be turned off for an individual query by passing the suppressDebugLog = true option in the
query options dictionary. This is useful for queries that may contain sensitive data that you do not wish to show up
in debug logs.
Monitoring / Instrumentation
Instrumentation is supported via an optional metrics object passed into the driver constructor. The metrics object
should have a method #measurement(queryName [string], duration [number], unit [string]).
var logger = new (require('winston')).Logger({ /* logger options */ })
var metrics = new MetricsClient();
require('priam')({
config: { /* connection information */ },
logger: logger,
metrics: metrics
});Events
Each priam instance is an EventEmitter. The following events are emitted:
| Event | Params | Description |
|---|---|---|
| connectionRequested | connectionRequestId | A pooled connection has been requested. Expect a `connectionAvailable` event when this request has been fulfilled. |
| connectionResolving | connectionResolutionId | If a connection resolver is being used, emitted when the connection resolver is about to be invoked |
| connectionResolved | connectionResolutionId | If a connection resolver is being used, emitted when the connection resolver succeeds. |
| connectionResolvedError | connectionResolutionId, err | If a connection resolver is being used, emitted when the connection resolver fails. |
| connectionOpening | connectionOpenRequestId | Emitted when opening a new connection. |
| connectionOpened | connectionOpenRequestId | Emitted when the opening of a new connection has succeeded. |
| connectionFailed | connectionOpenRequestId, err | Emitted when the opening of a new connection has failed. |
| connectionAvailable | connectionRequestId | Emitted if a new or existing connection is ready to be used to execute a query. |
| connectionLogged | logLevel, message, details | Emitted when the underlying driver outputs log events. |
| connectionClosed | Emitted when a connection closes. | |
| queryStarted | requestId | Emitted when a query is about to be executed. |
| queryRetried | requestId | Emitted when a query is about to be retried. |
| queryCompleted | requestId | Emitted when a query has succeeded. |
| queryFailed | requestId, err | Emitted when a query has failed after exhausting any retries. |
Using a Connection Resolver
If you are required to pull database credentials from a credential store (e.g. to support database failover or expiring
old credentials), you should use the connectionResolver or connectionResolverPath options.
If used, the supplied connection resolver will be called before every CQL query is issued. It is up to the supplied connection resolver to follow whatever caching strategies are required for the environment.
The connectionResolver option allows you to pass in a connectionResolver object that has already been constructed.
var resolver = new MyConnectionResolver();
var db = require('priam')({
config: {
/* ... connection options ... */
}
connectionResolver: resolver
});The connectionResolverPath will be loaded via a #require() call. Note: it is required from /lib/drivers/, so
it is recommended to supply a resolver this way via a node module, as paths to source files should be relative to the
internal path.
var db = require('priam')({
config: {
/* ... other connection options ... */
connectionResolverPath: 'myResolverModule'
}
});Sample Connection Resolver Implementation
The supplied connection resolver should have the following method: #resolveConnection(config, callback).
config will be the priam configuration that was supplied to the constructor, so
connection resolvers can access any custom configuration information specified when the driver was initialized.
callback(error, connectionInformation) should be called with the results from the connection resolver. If error is
supplied, an error log message will be sent to the supplied logger. connectionInformation should always be supplied
if known. If it is not supplied when an error is thrown, the error will be supplied to the #cql() callback.
connectionInformation should contain the following properties: user, password, hosts.
See example application for a concrete example using a connection resolver.
Port Mapping Options
If your connectionResolver connection information includes port, and is returning the port for a protocol you do not
wish to use, (e.g. You want to use binary port 9042, but resolver is returning Thrift port 9160), you can use the
connectionResolverPortMap option to perform the mapping.
var db = require('priam')({
config: {
/* ... other connection options, including Nimitz ... */
connectionResolverPortMap = {
from: '9160',
to: '9042'
}
}
});Release Notes
0.9.7: Attach cql to error objects generated by query execution.0.9.6: Add support forroutingIndexeswhen usingTokenAwarePolicy.0.9.5: Add better error handling forcassandra-driver.0.9.4: Strip schema metadata from result sets over binary protocol v1.0.9.3: Adjust stringify for numeric bigint values.0.9.2: Fix parameterized queries over binary protocol v1.0.9.1: Dependency updates.0.9.0: Removednode-cassandra-cqlin favor ofcassandra-driver.0.8.17: Batch.execute no longer yields an error when the batch is empty.0.8.16: Simplified result set transformation fornode-cassandra-cqldrivers.0.8.15: Add isBatch and isQuery methods to base driver.0.8.14: FixresultTransformerbug when query generates an error.0.8.13: FixBatch.add()when given emptyBatchorQueryobjects.0.8.12: Remove github dependency viapriam-connection-cqlmodule. Added versioning logic aroundcqlVersionto use the appropriate driver.0.8.11: Coercetimestamphinted parameters fornode-cassandra-cqltoDateobjects fromstringornumber.0.8.10:Batch.add()can now take anArrayargument.0.8.9: Fix usage ofBatch.addBatch()in pre-2.0 Cassandra environments that do not support DML-level timestamps.0.8.8: Fixed bug whereQuery.single()andQuery.first()would return empty array instead of null on empty result sets.0.8.7: Fixed bug which caused boolean values to not be returned when their value is false0.8.6: Fixed bug which caused resultTransformers to not execute0.8.5: Changed config to look up consistency level enum if given a string Added resultTransformers to drivers and queries-- synchronous functions that are mapped over query results Query consistency is set to driver's at instantiation, rather than being looked up at execution if not present Addedquerymethod to base driver, alias forcql0.8.4: ModifiedBatch.execute()to send timestamps as parameters instead of CQL strings.0.8.3: AddedQuery.single(),Query.first(), andQuery.all()enhancements.0.8.2: Added generalizedBatch.add()that can take aQueryorBatchargument.0.8.1: AddedBatch.addBatch()enhancements.0.8.0: AddedBatch.addBatch(),Query.params([Array]), anddriver.connect([Function]). Updated internal file naming conventions.0.7.6: Updated to supportinsert/updatestatements onmap<,>types.0.7.5: Updated consistency failover strategy. AddedEventEmitterinheritance.0.7.4: Add support forCOUNTERandUNLOGGEDbatch types.0.7.3: Dependency bump.0.7.1: Revert back to Promises v1.0.7.0: Update to latest version of Promises (q.js). Potential breaking change - JSON is no longer auto-deserialized. See the Executing CQL section for more information. Useobjectdata types if auto-deserialization is required on specific fields, or usedeserializeJsonStringsoption to detect JSON as 0.6.x and prior did.0.6.9: Dependency bump.0.6.8: Bugfixes.0.6.7: Added batching support.0.6.6: Added fluent syntax. Updated example to include setup script.0.6.4: Added#param()helper method for hinted parameters.0.6.3: Dependency updates, test Travis CI hooks.0.6.2: Initial Public Release



