AWS Kinesis event parser and handler for Lambda consumers. Ability to parse kinesis events with error handling and JSON support. Supports Node 8.10+ on AWS Lambda.
npm i --save kinesis-events
const kinesisEvents = require('kinesis-events');
// Lambda function handler
exports.handler = async event => {
// Parse the records
const result = kinesisEvents.parse(event);
// Check for errors (optional)
if(result.hasErrors) {
console.error('There are errors while parsing, ending process...');
process.exit(1);
}
result.records.forEach(record => {
//... iterate through the parsed records
});
};
kinesisEvents : KinesisEvents
⏏
Instance of the KinesisEvents class which is exported when calling require('kinesis-events')
.
For more advanced usage, you may create a new instance of KinesisEvents (see example below).
Kind: Exported KinesisEvents Instance
Example
const kinesisEvents = require('kinesis-events');
// Advanced usage
const { KinesisEvents } = require('kinesis-events');
const kinesisEvents = new KinesisEvents({
// options...
});
Custom error that is generated when there is a parsing error.
Kind: global class
Extends: Error
The original data that caused the error.
Kind: instance property of ParseError
Kind: global class
Constructor for KinesisEvents.
Param | Type | Default | Description |
---|---|---|---|
[options] | Object |
{} |
Options object to control certain features of KinesisEvents. |
[options.transform(record, index)] | function |
Optional transform function to call for each record. See Transform Function. |
Options object for KinesisEvents. Allows overridding options after instantiation.
Kind: instance property of KinesisEvents
Example
kinesisEvents.options.transform = function(record, index) {
// transform record...
return record;
};
kinesisEvents.ParseError : ParseError
Access to the ParseError class.
Kind: instance property of KinesisEvents
Read only: true
kinesisEvents.parse(records, [json]) ⇒ RecordSet
Parses records from the incoming Kinesis event.
Kind: instance method of KinesisEvents
Returns: RecordSet
- New instance of RecordSet with the parsed records.
Param | Type | Default | Description |
---|---|---|---|
records | Array |
Event data (records) to parse. | |
[json] | Boolean |
true |
Enable/disable JSON parsing for each event. |
Example
const result = kinesisEvents.parse(event.Records);
result.records.forEach(record => {
// do something with each record...
});
A set of parsed records with additional functionality.
Kind: global class
- RecordSet
- recordSet.records :
Array
- recordSet.failed :
Array.<ParseError>
- recordSet.length :
Number
- recordSet.hasErrors :
Boolean
- recordSet.records :
The records within this record set.
Kind: instance property of RecordSet
recordSet.failed : Array.<ParseError>
List of failed records (ParseError).
Kind: instance property of RecordSet
The total number of parsed records in the record set.
Kind: instance property of RecordSet
Read only: true
Boolean flag if this record set has failed records.
Kind: instance property of RecordSet
Read only: true
New in v3.0.0, there is now an option to pass in a transform function that will allow you to transform the record before it is added to the RecordSet. This allows custom functionality or business logic to be implemented at a higher level.
The transform function takes 2 arguments, record
and index
. The function must return the transformed record in order for it to be added to the RecordSet. If the record is not returned from the function, it will be ignored.
const { KinesisEvents } = require('kinesis-events');
const kinesisEvents = new KinesisEvents({
transform: (record, index) => {
if(record.firstName && record.lastName) {
// example, remove record if data is missing
return null;
}
record.someCustomProperty = 'some custom value';
return record;
}
});
Tests are written and provided as part of the module. It requires mocha to be installed which is included as a devDependency
. You may run the tests by calling:
$ npm run test
MIT License. See License in the repository.