/kafka-connect-jsonschema

Kafka Connect Converter using JSONSchema

Primary LanguageJava

Kafka Connect JsonSchema Converter

Uses JSONSchema to convert from JSON messages in Kafka to Kafka ConnectRecords.

Schemas are resolved from URIs expected to be in each of your JSON messages. These schemas can be resolved over http://, or any protocol supported by java.net.URI. This allows for integration between schemaed JSON data in Kafka with various Kafka connectors.

JSONSchema is too comprehensive a specification to be fully supported here. This converter only examines the JSONSchema field types to build a Connect Schema. Fancy features like additionalProperties, patternProperties, oneOf, anyOf, etc. are not supported.

Usage

In connect.properties:

# Connect record values will be converted from JSON by JsonSchemaConverter.
value.converter=org.wikimedia.kafka.connect.jsonschema.JsonSchemaConverter

# Each JSON record is expected to have this field set to its JSON Schema.
# This value would expect it to exist in the meta.schema_uri field, e.g.
#   { "meta": { "schema_uri": user/create/1 }, ... }
# Note the schema version at the end of the example schema_uri. This
# will be extracted by the default value of schema.uri.version.regex.
value.converter.schema.uri.field=/meta/schema_uri

# Prefix all extracted schema_uris with this value before attempting to
# request the JSONSchema.  This can be a remote http service, or a local
# file:// hierarchy.
value.converter.schema.uri.prefix=http://schema.service.org/v1/schemas/

# If using a local file hierarchy, it is likely that your files all end in some
# file format extension. If your schema_uris are extensionless, you could use
# this to append the extension before the JSONSchema will be requested.
#value.converter.schema.uri.suffix=".yaml"

With this config, each Kafka messages value will look for a nested meta.schema_uri field, append it to the schema.uri.prefix http://schema.service.org/v1/schemas/, and then get the JSONSchema for the message. The returned JSONSchema will be parsed and converted into a ConnectSchema. The value JSON data will be converted into a Java connect value.

Configuration Options

Name Description Default
schema.uri.field JsonPointer path to schema URI field in JSON record. This will be used to extract the JSONSchema URI for the JSON record. /meta/schema_uri
schema.uri.prefix Prefix added to every schema.uri.field. If your schema URIs are relative, you can use this to prefix them to build a fully qualified URI. ""
schema.uri.suffix Suffix added to every schema.uri.field. If your schema URIs don't include a required suffix (e.g. a file extension), you can use this to append suffix to build a fully qualified URI. ""
schema.uri.version.regex This regex is used to capture the schema version from the schema URI. There should be a named capture group for 'version'. If no version can be captured, the schema version will be null, and the schema will not be cached. ([\w\-\./:@]+)/(?<version>\d+)
sanitize.field.names If true, field names with bad characters like '.' and '/' will be replaced with underscores during conversion. The pattern used to replace characters is the one specificed by Avro at https://avro.apache.org/docs/1.8.0/spec.html#names true
schemas.cache.size The maximum number of schemas that can be cached in this converter instance. 1000

NOTE: JsonSchemaConverter extends from Apache Kafka Connect JsonConverter in order to leverage its implementation to convert from Connect records back to JSON bytes. It overrides methods that convert from JSON bytes to Connect records in order to do so via JSONSchemas.