kafka-confluent-sqlserver-driver
fork关于通过condlunet平台实时捕获sqlserver变更数据的原始项目,添加传递内容:数据库,schema,时间戳
Introduction
All versions of Microsoft SQL Server has built in support for tracking changes against a database schema. There are two ways to read the changes from the source system as they are generated. Change Data Capture is a feature that is only available on SQL Server Enterprise and Developer editions. Change Tracking is a lightweight solution that will efficiently find rows that have changed. If the rows are modified in quick succession all of the changes might not be found. The latest version of the change will be returned.
Configuration
MsSqlSourceConnector
The Microsoft SQL Server connector utilizes Change Tracking to identify changes. There are two ways to read the changes from the source system as they are generated. Change Data Capture is a feature that is only available on SQL Server Enterprise and Developer editions. Change Tracking is a lightweight solution that will efficiently find rows that have changed. If the rows are modified in quick succession all of the changes might not be found. The latest version of the change will be returned.
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.cdc.mssql.MsSqlSourceConnector
# Set these required values
initial.database=
server.name=
password=
server.port=
username=
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
initial.database | The initial database to connect to. | string | high | ||
password | JDBC Password to connect to the database with. | password | high | ||
server.name | The server to connect to. | string | high | ||
server.port | The port on the server to connect to. | int | high | ||
username | JDBC Username to connect to the database with. | string | high | ||
schema.key.name.format | Format used to generate the name for the key schema. The following template properties are available for string replacement. ${databaseName} , ${schemaName} , ${tableName} , ${namespace}
|
string | ${namespace}.${tableName}Key | high | |
schema.namespace.format | The namespace for the schemas generated by the connector. The following template properties are available for string replacement. ${databaseName} , ${schemaName} , ${tableName} , ${namespace}
|
string | com.example.data.${databaseName} | high | |
schema.value.name.format | Format used to generate the name for the value schema. The following template properties are available for string replacement. ${databaseName} , ${schemaName} , ${tableName} , ${namespace}
|
string | ${namespace}.${tableName}Value | high | |
topicFormat.format | The topicFormat to write the data to. | string | ${databaseName}.${tableName} | high | |
change.tracking.tables | The tables in the source database to monitor for changes. If no tables are specified the [sys].[change_tracking_tables] view is queried for all of the available tables with change tracking enabled. |
list | [] | medium | |
jdbc.pool.max.idle | The maximum number of idle CONNECTIONS in the connection pool. | int | 10 | medium | |
jdbc.pool.max.total | The maximum number of CONNECTIONS for the connection pool to open. If a number greater than this value is requested, the caller will block waiting for a connection to be returned. | int | 30 | medium | |
jdbc.pool.min.idle | The minimum number of idle CONNECTIONS in the connection pool. | int | 3 | medium | |
backoff.time.ms | The number of milliseconds to wait when no records are returned. | int | 1000 | [50,...] | low |
batch.size | The number of records to return in a batch. | int | 512 | [1,...] | low |
schema.cache.ms | The number of milliseconds to cache schema metadata in memory. | int | 300000 | [60000,...] | low |
schema.caseformat.column.name | This setting is used to control how the column names are cased when the resulting schemas are generated. | string | NONE | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} | low |
schema.caseformat.database.name | This setting is used to control how the ${databaseName} variable is cased when it is passed to the formatters defined in the schema.namespace.format , schema.key.name.format , schema.value.name.format , topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a database name of USER_TRACKING to a more java like case of userTracking or all lowercase usertracking . |
string | NONE | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} | low |
schema.caseformat.input | The naming convention used by the database format. This is used to define the source naming convention used by the other schema.caseformat.* properties. |
string | UPPER_UNDERSCORE | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} | low |
schema.caseformat.schema.name | This setting is used to control how the ${schemaName} variable is cased when it is passed to the formatters defined in the schema.namespace.format , schema.key.name.format , schema.value.name.format , topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a schema name of SCOTT to a more java like case of Scott or all lowercase scott . |
string | NONE | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} | low |
schema.caseformat.table.name | This setting is used to control how the ${tableName} variable is cased when it is passed to the formatters defined in the schema.namespace.format , schema.key.name.format , schema.value.name.format , topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a table name of USER_SETTING to a more java like case of UserSetting or all lowercase usersetting . |
string | NONE | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} | low |
multi.subnet.failover | This setting is used to indicate whether MultiSubnetFailover is included as part of the connection to the SQL database. |
boolean | FALSE | [true,false] | low |
Installation
Microsoft does not deploy a jar of their JDBC driver to Maven Central. Due to this you will have to download the driver manually and install it in your local maven repository. You can download the JDBC Driver from here. The installation packages do not contain the JDBC driver due to licensing.
Local maven installation
mvn install:install-file -DgroupId=com.microsoft.sqlserver -DartifactId=sqljdbc4 -Dversion=6.0.7130 -Dpackaging=jar -Dfile=<path to the download>
Upload artifacts to Nexus
export NEXUS_URL='http://nexus-01:8081/repository/maven-releases/'
export NEXUS_REPO_ID='ldap-jeremy'
mvn deploy:deploy-file -DrepositoryId=$NEXUS_REPO_ID -Durl=$NEXUS_URL -DgeneratePom=true -Dpackaging=jar -DgroupId=com.microsoft.sqlserver -DartifactId=sqljdbc4 -Dversion=6.0.7130 -Dfile=sqljdbc4-6.0.7130.jar