A simplified connector example to the Enterprise Messaging Platform.
This example connector provides support for SSL, HTTP proxies and supports both the long polling and websocket streaming transports. Easy subscription management and full support for event replay is provided.
To use, add the maven dependency:
<dependency>
<groupId>com.salesforce.conduit</groupId>
<artifactId>emp-connector</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
Please note that this repository is example code and is not supported by Salesforce. This code has not been rigorously tested nor performance tested for throughput and scale.
This code is provided as an example only. The underlying CometD library is what provides the meat here, as EMP Connector is a thin wrapper around this library.
// Replay from the start of the event window - may be any valid replayFrom position in the event stream
long replayFrom = EmpConnector.REPLAY_FROM_EARLIEST;
// get parameters from login
BayeuxParameters params = login("foo@bar.com", "password");
// The event consumer
Consumer<Map<String, Object>> consumer = event -> System.out.println(String.format("Received:\n%s", event));
// The EMP connector
EmpConnector connector = new EmpConnector(params);
// Wait for handshake with Streaming API
connector.start().get(5, TimeUnit.SECONDS);
// Subscribe to a topic
// Block and wait for the subscription to succeed for 5 seconds
TopicSubscription subscription = connector.subscribe("/topic/myTopic", replayFrom, consumer).get(5, TimeUnit.SECONDS);
// Here's our subscription
System.out.println(String.format("Subscribed: %s", subscription));
// Cancel a subscription
subscription.cancel();
// Stop the connector
connector.stop();
See LoginExample.java for full example.
Several example classes are provided to subscribe to a channel. All classes contain a main
function that starts the tool. All examples authenticate to Salesforce and subscribe to a channel. Some examples use a different authentication mechanism or provide verbose logging.
The LoginExample
class is the default class that EMP Connector executes. This class authenticates to your production Salesforce org using your Salesforce username and password.
The DevLoginExample
class accepts a custom login URL, such as a sandbox instance (https://test.salesforce.com). Also, DevLoginExample
logs to the console the Bayeux connection messages received on the /meta
channels, such as /meta/handshake
and /meta/connect
.
The BearerTokenExample
class uses the OAuth bearer token authentication and accepts an access token.
After cloning the project, build EMP Connector using Maven:
$ mvn clean package
The build generates the jar file in the target subfolder.
To run EMP Connector using the LoginExample
class with username and password authentication, use this command.
$ java -jar target/emp-connector-0.0.1-SNAPSHOT-phat.jar <username> <password> <topic>
To run EMP Connector using the DevLoginExample
class with username and password authentication, use this command.
$ java -classpath target/emp-connector-0.0.1-SNAPSHOT-phat.jar com.salesforce.emp.connector.example.DevLoginExample <login_URL> <username> <password> <topic>
To run EMP Connector using an OAuth access token, use this command.
$ java -classpath target/emp-connector-0.0.1-SNAPSHOT-phat.jar com.salesforce.emp.connector.example.BearerTokenExample <instance_URL> <token> <topic>
The LoggingListener class provides debug logging output of Bayeux messages received on the meta channels, such as /meta/handshake
and /meta/connect
. Each message is logged to the console with a timestamp, a "Success" prefix or a "Failure" prefix depending on whether the operation was successful or not, and then the body of the Bayeux message. For example, this log is for a handshake message.
[2018-01-19 10:54:12.701] Success:[/meta/handshake]
{ext={replay=true, payload.format=true}, minimumVersion=1.0, clientId=cn2vei6rz2pa01gqqvungzlppy,
supportedConnectionTypes=[Ljava.lang.Object;@6e2ce7d1,
channel=/meta/handshake, id=1, version=1.0, successful=true}
To add logging support to your connection, first create an instance of the LoggingListener
class. The LoggingListener
constructor accepts two boolean arguments that specify whether to log success and failure messages. Next, call the EmpConnector.addListener()
method for each meta channel to add logging for and pass in the channel and the LoggingListener
instance. This example adds logging for multiple channels.
LoggingListener loggingListener = new LoggingListener(true, true);
connector.addListener(META_HANDSHAKE, loggingListener)
.addListener(META_CONNECT, loggingListener)
.addListener(META_DISCONNECT, loggingListener)
.addListener(META_SUBSCRIBE, loggingListener)
.addListener(META_UNSUBSCRIBE, loggingListener);
The DevLoginExample class uses LoggingListener
to log the messages received.
Authentication becomes invalid when a Salesforce session is invalidated or an access token is revoked. EMP connector listens to 401::Authentication invalid
error messages that Streaming API sends when the authentication is no longer valid. To reauthenticate after a 401 error is received, call the EmpConnector.setBearerTokenProvider()
method, which accepts a function that reauthenticates and returns a new session ID or access token.
// Define the bearer token function
Function<Boolean, String> bearerTokenProvider = (Boolean reAuth) -> {
...
}
// Set the bearer token function
connector.setBearerTokenProvider(bearerTokenProvider);
For a full example, see LoginExample.java.
For more information about the components of the EMP Connector and a walkthrough, see the Java Client Example in the Streaming API Developer Guide.