Aiven-Open/s3-connector-for-apache-kafka

Aiven s3 KafkaConnect not consuming from SSL/SASL cluster

javierarilos opened this issue · 3 comments

Hello there,

we are trying to use Aiven s3 KafkaConnect, but it isn't consuming any messages after changing to a SSL and authenticated Kafka cluster.

With a cluster without SSL or authentication it's working perfectly fine with the same configuration except for the sasl/ssl settings.

I think probably is a small config error, but I've been struggling to fix it, so your help will be very much appreciated.

No errors appear on logs, and it seems to connect properly since if I intentionally change connection parameters to wrong values (usr/pass/truststore or IPs) I see errors.

In order to check the SSL configuration I've done kafka-console-producer.sh and kafka-console-consumer.sh with the very same .properties and hosts successfully sending/receiving messages.

In order to try to isolate the problem from the KafkaConnect Connector, I developed my own simple Connnector that just outputs calls to the Console, but the behavior is the same: runs OK when connecting to NON-SSL brokers while does not receive messages or prints errors when connecting to a SSL enabled Broker.

The plugin I was trying to use is aiven-kakfa-s3-connector.

Thank you very much.

Best,
Javier Arias

connect-sslbroker.properties

# Kafka broker IP addresses to connect to
bootstrap.servers=sslbrokers:9092
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.location=truststore.jks
ssl.truststore.password=truststorepass
ssl.protocol=TLS
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="usr" \
    password="pass";

group.id=connect
config.storage.topic=connect-config
offset.storage.topic=connect-offsets
status.storage.topic=connect-status

# Path to directory containing the connector jar and dependencies
plugin.path=/plugins
# Converters to use to convert keys and values
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter


# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084

connect-localhost.properties

# Kafka broker IP addresses to connect to
bootstrap.servers=unsecuredbrokers:9092

group.id=connect-local
config.storage.topic=connect-config
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.replication.factor=1
offset.storage.replication.factor=1
offset.storage.partitions=1
status.storage.replication.factor=1
status.storage.partitions=1

# Path to directory containing the connector jar and dependencies
plugin.path=/plugins
# Converters to use to convert keys and values
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter


# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084

Hi Javier

Thank you for reporting this.
I've seen your message in users@kafka.apache.org. It seems the problem is fixed, am I right?

Cross-Posting here the solution from Kafka Users group for future reference;

Hi,
The thing that always seem to catch people out with this is that it’s necessary to repeat the SSL/SASL configuration.

For a sink connector, you need something like:
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
sasl.mechanism=PLAIN
sasl.jaas.config=...

And you also need the same with "consumer." prefixed on each of the configuration items:
consumer.security.protocol=SASL_SSL
consumer.ssl.protocol=TLSv1.2
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=..

Hope this helps.

If you are publishing messages to Kafka, you'll need also to configure the publisher by replicating the configurations with prefix producer.*.