/kafka-connect-cdc-mssql

Kafka Connect Connector for Microsoft SQL Server

Primary LanguageJavaApache License 2.0Apache-2.0

Introduction

All versions of Microsoft SQL Server has built in support for tracking changes against a database schema. There are two ways to read the changes from the source system as they are generated. Change Data Capture is a feature that is only available on SQL Server Enterprise and Developer editions. Change Tracking is a lightweight solution that will efficiently find rows that have changed. If the rows are modified in quick succession all of the changes might not be found. The latest version of the change will be returned.

Configuration

MsSqlSourceConnector

The Microsoft SQL Server connector utilizes Change Tracking to identify changes. There are two ways to read the changes from the source system as they are generated. Change Data Capture is a feature that is only available on SQL Server Enterprise and Developer editions. Change Tracking is a lightweight solution that will efficiently find rows that have changed. If the rows are modified in quick succession all of the changes might not be found. The latest version of the change will be returned.

name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.cdc.mssql.MsSqlSourceConnector

# Set these required values
initial.database=
server.name=
password=
server.port=
username=
Name Description Type Default Valid Values Importance
initial.database The initial database to connect to. string high
password JDBC Password to connect to the database with. password high
server.name The server to connect to. string high
server.port The port on the server to connect to. int high
username JDBC Username to connect to the database with. string high
schema.key.name.format Format used to generate the name for the key schema. The following template properties are available for string replacement. ${databaseName}, ${schemaName}, ${tableName}, ${namespace} string ${namespace}.${tableName}Key high
schema.namespace.format The namespace for the schemas generated by the connector. The following template properties are available for string replacement. ${databaseName}, ${schemaName}, ${tableName}, ${namespace} string com.example.data.${databaseName} high
schema.value.name.format Format used to generate the name for the value schema. The following template properties are available for string replacement. ${databaseName}, ${schemaName}, ${tableName}, ${namespace} string ${namespace}.${tableName}Value high
topicFormat.format The topicFormat to write the data to. string ${databaseName}.${tableName} high
change.tracking.tables The tables in the source database to monitor for changes. If no tables are specified the [sys].[change_tracking_tables] view is queried for all of the available tables with change tracking enabled. list [] medium
jdbc.pool.max.idle The maximum number of idle CONNECTIONS in the connection pool. int 10 medium
jdbc.pool.max.total The maximum number of CONNECTIONS for the connection pool to open. If a number greater than this value is requested, the caller will block waiting for a connection to be returned. int 30 medium
jdbc.pool.min.idle The minimum number of idle CONNECTIONS in the connection pool. int 3 medium
backoff.time.ms The number of milliseconds to wait when no records are returned. int 1000 [50,...] low
batch.size The number of records to return in a batch. int 512 [1,...] low
schema.cache.ms The number of milliseconds to cache schema metadata in memory. int 300000 [60000,...] low
schema.caseformat.column.name This setting is used to control how the column names are cased when the resulting schemas are generated. string NONE ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} low
schema.caseformat.database.name This setting is used to control how the ${databaseName} variable is cased when it is passed to the formatters defined in the schema.namespace.format, schema.key.name.format, schema.value.name.format, topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a database name of USER_TRACKING to a more java like case of userTracking or all lowercase usertracking. string NONE ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} low
schema.caseformat.input The naming convention used by the database format. This is used to define the source naming convention used by the other schema.caseformat.* properties. string UPPER_UNDERSCORE ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} low
schema.caseformat.schema.name This setting is used to control how the ${schemaName} variable is cased when it is passed to the formatters defined in the schema.namespace.format, schema.key.name.format, schema.value.name.format, topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a schema name of SCOTT to a more java like case of Scott or all lowercase scott. string NONE ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} low
schema.caseformat.table.name This setting is used to control how the ${tableName} variable is cased when it is passed to the formatters defined in the schema.namespace.format, schema.key.name.format, schema.value.name.format, topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a table name of USER_SETTING to a more java like case of UserSetting or all lowercase usersetting. string NONE ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} low
multi.subnet.failover This setting is used to indicate whether MultiSubnetFailover is included as part of the connection to the SQL database. boolean FALSE [true,false] low

Installation

Microsoft does not deploy a jar of their JDBC driver to Maven Central. Due to this you will have to download the driver manually and install it in your local maven repository. You can download the JDBC Driver from here. The installation packages do not contain the JDBC driver due to licensing.

Local maven installation

mvn install:install-file -DgroupId=com.microsoft.sqlserver -DartifactId=sqljdbc4 -Dversion=6.0.7130 -Dpackaging=jar -Dfile=<path to the download>

Upload artifacts to Nexus

export NEXUS_URL='http://nexus-01:8081/repository/maven-releases/'
export NEXUS_REPO_ID='ldap-jeremy'
mvn deploy:deploy-file -DrepositoryId=$NEXUS_REPO_ID -Durl=$NEXUS_URL -DgeneratePom=true -Dpackaging=jar -DgroupId=com.microsoft.sqlserver -DartifactId=sqljdbc4 -Dversion=6.0.7130 -Dfile=sqljdbc4-6.0.7130.jar