/sf-streaming-sns-bridge

Node.js application (AWS Elastic Beanstalk friendly) for bridging Salesforce Platform Events to AWS SNS topics

Primary LanguageJavaScriptApache License 2.0Apache-2.0

Bridge for forwarding Salesforce Streaming Events to AWS SNS topics

Node.js application (Docker and AWS ECS/EB ready) and also NPM package for bridging Salesforce Streaming Events to AWS SNS topics.

Version Downloads/week License Docker Pulls Docker Image Size

This application listens for Salesforce Streaming Events and forwards all messages to AWS SNS topics.

It is also available as an NPM package so that you can just import Bridge and wrap it with your own UI if desired. See "How to use it in your code" section below for details.

Main features are:

  • All kinds of events are supported: PushTopics, Platform Events, and Change Data Capture Events.
  • You can configure multiple Salesforce logins each with multiple channels.
  • Each channel can be configured to have messages forwarded to an AWS SNS topic.
  • Configurations can be put in one of these sources:
    • Environment variable
    • AWS Systems Manager Parameter Store
  • Checkpoints can be stored in AWS DynamoDB to avoid missing events during restarts.
  • Reconnect and retry logic built-in.
  • REST API endpoints for managing and monitoring.
  • Docker image available on DockerHub.
  • Optimized for AWS Elastic Beanstalk and ECS but can also be executed in any Node.js environment as a console application.

Configuration

When starting, the application listens on a port to provide REST API for management purpose. The port number can be specified in environment variable BRIDGE_PORT or PORT, or if none of them is set port 8080 would be used.

All other configurations are structured as a JSON text, stored in one of these places:

  • Environment variable BRIDGE_CONFIG
  • AWS Systems Manager Parameter Store, the name of the parameter is read from environment variable BRIDGE_CONFIG_PARAMETER_STORE

New lines and spaces in the JSON text are not necessary, especially when the configuration is put in the environment variable.

The JSON text has this structure:

{
    "options": {
        "replayIdStoreTableName": "dynamodb-table-name", // Name of the DynamoDB table used for storing
                                                            // Replay ID checkpoints. It must exist in the
                                                            // default AWS region as the bridge is running in.
                                                            // If not set, checkpointing would be disabled.
        "replayIdStoreKeyName": "channel",  // Name of the partition key in the DynamoDb table.
                                               // If not set then default to "channel"
        "replayIdStoreDelay": 2000,         // The maximum delay (as number of milliseconds) before the
                                               // newly received Replay ID would be saved into the DynamoDB
                                               // table. If not set then default to 2000.
        "initialReplayId": -1               // The starting Replay ID to use in case there is no previously
                                               // saved checkpoint in the DynamoDB table. If not set then
                                               // default to -1.
        "debug": false                      // If it is true, then messages received and forwarded would
                                               // be logged to console.
    },
    "test1": {  // The name of the Salesforce environment/sandbox,
                   // can be any text you like, but please avoid having '//' in it.
        "connection": {     // Salesforce connection parameters
            "clientId": "of-the-connected-app",
            "clientSecret": "of-the-connected-app",
            "loginUrl": "https://test.salesforce.com",
            "redirectUri": "https://login.salesforce.com",
            "username": "user-name",
            "password": "password of the user",
            "token": "secure-token-of-the-user"
        },
        "channels": [       // Can have multiple channels configured here
            {
                "channelName": "/event/the-name-in-Salesforce__e",
                "snsTopicArn": "your-AWS-SNS-topic-ARN"
            },
            {
                "channelName": "/event/another-name-in-Salesforce__e",
                "snsTopicArn": "your-AWS-SNS-topic-ARN"
            }
        ]
    },
    "test2": {  // Can have multiple Salesforce environment/sandbox
        "connection": {     // Salesforce connection parameters
            "clientId": "of-the-connected-app",
            "clientSecret": "of-the-connected-app",
            "loginUrl": "https://test.salesforce.com",
            "redirectUri": "https://login.salesforce.com",
            "username": "user-name",
            "password": "password of the user",
            "token": "secure-token-of-the-user"
        },
        "channels": [       // Can have multiple channels configured here
            {
                "channelName": "/event/the-name-in-Salesforce__e",
                "snsTopicArn": "your-AWS-SNS-topic-ARN"
            },
            {
                "channelName": "/event/another-name-in-Salesforce__e",
                "snsTopicArn": "your-AWS-SNS-topic-ARN"
            }
        ]
    }
}

Some configuration items can be overridden by environment variables:

  • BRIDGE_CONFIG_REPLAY_ID_STORE_TABLE_NAME
  • BRIDGE_CONFIG_REPLAY_ID_STORE_KEY_NAME
  • BRIDGE_CONFIG_REPLAY_ID_STORE_DELAY
  • BRIDGE_CONFIG_INITIAL_REPLAY_ID
  • BRIDGE_CONFIG_DEBUG

Checkpoint

For production usage, checkpointing is needed. That means, the bridge would periodically save the Replay ID of the last forwarded message into the DynamoDB table specified by the configuration entry replayIdStoreTableName. And when the bridge starts or reloads, it would read the saved Replay ID from DynamoDB table and tries to catch up from there.

If replayIdStoreTableName is not configured, checkpointing is disabled. If both replayIdStoreTableName and initialReplayId are not configured, the bridge would not try to catch up, it would receive only new messages.

replayIdStoreDelay Controls how frequently checkpointing happens. The actual DynamoDB write frequency is always no more than the frequency that messages arrive. That means, if there is only one message every hour, setting replayIdStoreDelay to 1 millisecond won't result in 1000 writes per second, the actual write frequency would still be once per hour. However, if the message comes at a frequency of 1/second, setting replayIdStoreDelay to 60000 milliseconds would result in roughly 1 DynamoDB write per minute.

Manually modifying the checkpoint

Sometimes you may want to manually modify the Replay ID checkpoint. For example you may want to rewind or fast forward to a specific Replay ID.

In such case, if you don't want to stop the whole bridge, you need to follow these steps:

  1. Remove the channel from the configuration
  2. Do a /reload so that you will be able to persist your desired checkpoint configuration
  3. Delete or modify the checkpoint information in the DynamoDB table
  4. Add the channel back in the configuration
  5. Do a /reload again so that the channel will start again with your new checkpoint configuration

You can't just modify the checkpoint in DynamoDB table and then do a /reload because current checkpoint in memory would be updated into the DynamoDB table during /reload thus would override your modification.

How to run it

To run it locally for demo purpose, you can do this:

npm ci
export AWS_REGION=...
export BRIDGE_CONFIG=...
npm start

or this:

AWS_REGION=ap-southeast-2 BRIDGE_CONFIG_PARAMETER_STORE=/your/aws/param/store/name npm start

Or, you can get the pre-built Docker image from Docker Hub and use docker to run it: jameshu/sf-streaming-sns-bridge

REST API

After starting up, you can access the bridge's REST API endpoints to manage it:

  • /health - returns whether the bridge is UP or DOWN
  • /status - returns the detailed status of each channel-SNS pair
  • /reload - instruct the bridge to re-read configurations and then restart all channel-SNS pairs

How to use it in your code

You can import Bridge from the NPM package sf-streaming-sns-bridge and then use it like this:

const bridge = new Bridge();    // Create an instance, at this time the bridege does nothing because it has not been configured yet
await bridge.reload();          // Configurations will be read from environment variables, and the bridge would start up
console.log(bridge.status());   // Status is an object
await bridge.stopAll();         // Stop
await bridge.startAll();        // Start again (without re-reading configurations)
await bridge.reload();          // Re-read configurations and restart

Please note that Bridge does not expose any REST API. If you would like to expose REST API, consider importing and extending App class.

For developers

  • Run it locally: refer to "How to run it"
  • Run test cases locally: AWS_REGION=ap-southeast-2 BRIDGE_CONFIG_PARAMETER_STORE=<your parameter name> BRIDGE_CONFIG_REPLAY_ID_STORE_TABLE_NAME='' npx mocha
  • Bump up version number: npm version patch -m "..."