/kafka-salesforce-connect

Primary LanguageJavaApache License 2.0Apache-2.0

Overview

This Kafka Connect connector integrates with the Salesforce Streaming Api to provide realtime updates to Kafka as objects are modified in SalesForce. This is done by registering a PushTopic for specific objects and fields. This object can be managed externally from the connector or you can specify salesforce.push.topic.create=true which will query the descriptor for the object specified in salesforce.object, generating a PushTopic for all of the fields in the descriptor. The descriptor will contain all of the fields that are available. The dynamically generated PushTopic will use all of the fields that are available at the time the PushTopic is created. If you manually create a PushTopic the schema will still include all of the fields that are defined in the descriptor.

Breaking change in 0.3.x

Issue-14 introduces a required incompatible change. The address type was incorrectly identified as a string. This has been corrected to a child object.

Configuration

SalesforceSourceConnector

name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.salesforce.SalesforceSourceConnector

# Set these required values
salesforce.username=
salesforce.consumer.key=
salesforce.push.topic.name=
salesforce.password=
salesforce.password.token=
kafka.topic=
salesforce.consumer.secret=
salesforce.object=
Name Description Type Default Valid Values Importance
kafka.topic The Kafka topic to write the SalesForce data to. This is a template driven by the data returned by Salesforce. Any field in the schema can be used but you should always pick a value that is guarenteed to be there. _EventType and _ObjectType are two metadata fields that are included on every record. For example you could put update and deletes in a different topic by using salesforce.${_ObjectType}.${_EventType} string high
salesforce.consumer.key The consumer key for the OAuth application. string high
salesforce.consumer.secret The consumer secret for the OAuth application. password high
salesforce.object The Salesforce object to create a topic for. string high
salesforce.password Salesforce password to connect with. password high
salesforce.password.token The Salesforce security token associated with the username. password high
salesforce.push.topic.name The Salesforce topic to subscribe to. If salesforce.push.topic.create is set to true, a PushTopic with this name will be created. string high
salesforce.username Salesforce username to connect with. string high
kafka.topic.lowercase Flag to determine if the kafka topic should be lowercased. boolean true high
salesforce.instance The Salesforce instance to connect to. string "" high
connection.timeout The amount of time to wait while connecting to the Salesforce streaming endpoint. long 30000 [5000,...,600000] low
curl.logging If enabled the logs will output the equivalent curl commands. This is a security risk because your authorization header will end up in the log file. Use at your own risk. boolean false low
salesforce.push.topic.create Flag to determine if the PushTopic should be created if it does not exist. boolean true low
salesforce.push.topic.notify.create Flag to determine if the PushTopic should respond to creates. boolean true low
salesforce.push.topic.notify.delete Flag to determine if the PushTopic should respond to deletes. boolean true low
salesforce.push.topic.notify.undelete Flag to determine if the PushTopic should respond to undeletes. boolean true low
salesforce.push.topic.notify.update Flag to determine if the PushTopic should respond to updates. boolean true low
salesforce.version The version of the salesforce API to use. string latest ValidPattern{pattern=^(latest [\d.]+)$}

Building

git clone https://github.com/jcustenborder/kafka-connect-salesforce
cd kafka-connect-salesforce
mvn clean install

Running in development

mvn clean package
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
$CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties config/MySourceConnector.properties

Example Output

This is the output from the kafka-avro-console-consumer. This schema is dynamically generated based on the Object metadata rest api.

{
  "Id": "00Q5000001BqAICEA3",
  "IsDeleted": {
    "boolean": false
  },
  "MasterRecordId": null,
  "LastName": {
    "string": "Smith"
  },
  "FirstName": {
    "string": "Fred"
  },
  "Salutation": null,
  "Name": {
    "string": "Fred Smith"
  },
  "Title": {
    "string": "CEO"
  },
  "Company": {
    "string": "Testing Company"
  },
  "City": {
    "string": "New York"
  },
  "State": {
    "string": "NY"
  },
  "PostalCode": {
    "string": "12345"
  },
  "Country": null,
  "Latitude": null,
  "Longitude": null,
  "GeocodeAccuracy": null,
  "Address": {
    "com.github.jcustenborder.kafka.connect.salesforce.Address": {
      "GeocodeAccuracy": null,
      "State": {
        "string": "NY"
      },
      "Street": {
        "string": "123 Wall St"
      },
      "PostalCode": {
        "string": "12345"
      },
      "Country": null,
      "Latitude": null,
      "City": {
        "string": "New York"
      },
      "Longitude": null
    }
  },
  "Phone": {
    "string": "555-867-5309"
  },
  "MobilePhone": {
    "string": "000-000-0000"
  },
  "Fax": null,
  "Email": {
    "string": "fred.smith@example.com"
  },
  "Website": null,
  "PhotoUrl": null,
  "LeadSource": {
    "string": "Web"
  },
  "Status": {
    "string": "Open - Not Contacted"
  },
  "Industry": {
    "string": "Transportation"
  },
  "Rating": null,
  "AnnualRevenue": {
    "string": "100000000"
  },
  "NumberOfEmployees": {
    "int": 100
  },
  "OwnerId": {
    "string": "00550000005elXkAAI"
  },
  "IsConverted": {
    "boolean": false
  },
  "ConvertedDate": null,
  "ConvertedAccountId": null,
  "ConvertedContactId": null,
  "ConvertedOpportunityId": null,
  "IsUnreadByOwner": {
    "boolean": false
  },
  "CreatedDate": {
    "long": 1483228800000
  },
  "CreatedById": {
    "string": "00550000005elXkAAI"
  },
  "LastModifiedDate": {
    "long": 1483315200000
  },
  "LastModifiedById": {
    "string": "00550000005elXkAAI"
  },
  "SystemModstamp": {
    "long": 1483315200000
  },
  "LastActivityDate": null,
  "LastViewedDate": null,
  "LastReferencedDate": null,
  "Jigsaw": null,
  "JigsawContactId": null,
  "CleanStatus": {
    "string": "5"
  },
  "CompanyDunsNumber": null,
  "DandbCompanyId": null,
  "EmailBouncedReason": null,
  "EmailBouncedDate": null,
  "SICCode__c": null,
  "ProductInterest__c": {
    "string": "GC1000 series"
  },
  "Primary__c": null,
  "CurrentGenerators__c": null,
  "NumberofLocations__c": null,
  "_ObjectType": {
    "string": "Lead"
  },
  "_EventType": {
    "string": "updated"
  }
}