linkedin/brooklin

Kafka SSL Configuration

TheKnowles opened this issue · 1 comments

What is the correct set of configuration items for the KafkaTransportProvider to have it be a kafkassl://... connection?

I took a cursory look at the source code and it appears that the regular configuration for a Kafka Producer should apply:

brooklin.server.transportProviderNames=kafkaTransportProvider
...
brooklin.server.transportProvider.kafkaTransportProvider.bootstrap.servers=localhost:9095
brooklin.server.transportProvider.kafkaTransportProvider.zookeeper.connect=localhost:2181
brooklin.server.transportProvider.kafkaTransportProvider.client.id=datastream-producer
...
brooklin.server.transportProvider.kafkaTransportProvider.security.protocol=ssl
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.location=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.key.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.location=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.endpoint.identification.algorithm=
brooklin.server.transportProvider.kafkaTransportProvider.ssl.enabled.protocols=TLSv1.2
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.type=JKS
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.type=JKS

Using the ingest text to kafka tutorial, I get a non ssl kafka connection string:

{
  "name": "first-file-datastream",
  "connectorName": "file",
  "transportProviderName": "kafkaTransportProvider",
  "source": {
    "connectionString": "./brooklin-1.0.0/NOTICE",
    "partitions": 1
  },
  "Status": "INITIALIZING",
  "destination": {
    "connectionString": "kafka://localhost:9095/first-file-datastream_20190722111027",
    "partitions": 1
  },
  "metadata": {
    "datastreamUUID": "0bbd95ea-3bef-4b4e-9ef2-5e8d2496aef6",
    "owner": "test-user",
    "system.creation.ms": "1563808227090",
    "system.destination.KafkaBrokers": "localhost:9095",
    "system.taskPrefix": "first-file-datastream"
  }
}

I'm sure I'm missing something simple. Thank you for any insight.

For anyone else having a similar issue for kafka mirroring using ssl:

kafkaTransportProvider - this is the destination server

There is no domain config name required, Kafka Producer property names come directly after brooklin.server.transportProvider.kafkaTransporProvider.

It will look like this in brooklin's server.properties:

brooklin.server.transportProvider.kafkaTransportProvider.factoryClassName=com.linkedin.datastream.kafka.KafkaTransportProviderAdminFactory
brooklin.server.transportProvider.kafkaTransportProvider.bootstrap.servers=kafkaserver:port
brooklin.server.transportProvider.kafkaTransportProvider.zookeeper.connect=zookeeper:port/dest
brooklin.server.transportProvider.kafkaTransportProvider.client.id=datastream-producer

brooklin.server.transportProvider.kafkaTransportProvider.security.protocol=ssl

brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.location=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.key.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.location=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.endpoint.identification.algorithm=

brooklin.server.transportProvider.kafkaTransportProvider.ssl.enabled.protocols=TLSv1.2
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.type=JKS
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.type=JKS

The Kafka Mirroring Connector requires "consumer" domain property after brooklin.server.connector.kafkaMirroringConnector.

It will look like this in brooklin's server.properties:

brooklin.server.connector.kafkaMirroringConnector.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory
brooklin.server.connector.kafkaMirroringConnector.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
brooklin.server.connector.kafkaMirroringConnector.consumer.security.protocol=ssl

brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.keystore.location=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.keystore.password=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.key.password=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.truststore.location=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.truststore.password=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.endpoint.identification.algorithm=

brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.enabled.protocols=TLSv1.2
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.keystore.type=JKS
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.truststore.type=JKS

Stream creation looks like this:

brooklin-rest-client.sh -o CREATE -u http://localhost:32311/ -n first-mirroring-stream -s "kafkassl://localhost:9093/first-topic" -c kafkaMirroringConnector -t kafkaTransportProvider -m '{"owner":"test-user","system.reuseExistingDestination":"false"}'

This is now an ssl to ssl configured Kafka Mirror.

HOWEVER, when you query for the description of the stream

brooklin-rest-client.sh -o READALL -u http://localhost:32311/

You will see a regular kakfa connection string, not kafkassl:

{
  "name" : "first-mirroring-stream",
  "connectorName" : "kafkaMirroringConnector",
  "transportProviderName" : "kafkaTransportProvider",
  "source" : {
    "connectionString" : "kafkassl://localhost:9093/first-topic"
  },
  "Status" : "READY",
  "destination" : {
    "connectionString" : "kafka://localhost:9095/*"
  },
  "metadata" : {
    "datastreamUUID" : "aa891768-abbe-44d9-8c4e-18615ef31e91",
    "group.id" : "first-mirroring-stream",
    "owner" : "test-user",
    "system.IsConnectorManagedDestination" : "true",
    "system.creation.ms" : "1580139099105",
    "system.destination.KafkaBrokers" : "localhost:9095",
    "system.reuseExistingDestination" : "false",
    "system.taskPrefix" : "first-mirroring-stream"
  }
}

This is just a visual bug as mirroring works just fine.