linkedin/brooklin

Error streaming topic with cleanup policy: compact,delete

haimhm opened this issue · 0 comments

Error streaming topic with cleanup policy: compact,delete

I'm trying to stream a topic with the above cleanup policy and I get this weird exception

Your environment

  • Operating System - Ubuntu 18
  • Brooklin version - 1.0.2
  • Kafka version - 2.7.0
  • ZooKeeper version - 3.5.8

Steps to reproduce

  1. create the topic in both sites
  2. create a stream from one cluster to another

Actual behavior

Nothing is streamed and I get this exceptions in the log (full log is attached):

[2021-03-09 14:02:36,965] INFO ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [ita-kafka01:9092, ita-kafka02:9092, ita-kafka03:9092]
buffer.memory = 33554432
client.id = datastream-producer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,362] WARN The configuration 'producer.acks' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'zookeeper.connect' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'producer.max.block.ms' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'producer.max.in.flight.requests.per.connection' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'factoryClassName' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] INFO Kafka version : 2.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2021-03-09 14:02:37,363] INFO Kafka commitId : fa14705e51bd2ce5 (org.apache.kafka.common.utils.AppInfoParser)
[2021-03-09 14:02:37,366] INFO Cluster ID: YdnuOPmjQ4KavUKJgY4aZw (org.apache.kafka.clients.Metadata)
[2021-03-09 14:02:37,375] WARN Unexpected error code: 87. (org.apache.kafka.common.protocol.Errors)
[2021-03-09 14:02:37,375] WARN sent failure, restart producer, exception: (class com.linkedin.datastream.kafka.KafkaTransportProvider:kafkaMirroringConnector:0)
org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
[2021-03-09 14:02:37,376] INFO [Producer clientId=datastream-producer] Closing the Kafka producer with timeoutMillis = 2000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-09 14:02:37,376] WARN [Producer clientId=datastream-producer] Overriding close timeout 2000 ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-09 14:02:37,376] INFO [Producer clientId=datastream-producer] Proceeding to force close the producer since pending requests could not be completed within timeout 2000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-09 14:02:37,376] ERROR Sending a message with source checkpoint testcompact/0/1 to topic testcompact partition 0 for datastream task second-mirroring-stream_f06ad4ca-1620-4035-9186-006f3abd18d1(kafkaMirroringConnector), partitionsV2=, partitions=[0], dependencies=[] threw an exception. (KafkaTransportProvider)
com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at com.linkedin.datastream.kafka.KafkaProducerWrapper.generateSendFailure(KafkaProducerWrapper.java:251)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
[2021-03-09 14:02:37,377] WARN Detect exception being thrown from callback for src partition: testcompact-0 while sending, metadata: Checkpoint: testcompact/0/1, Topic: testcompact, Partition: 0 , exception: (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
com.linkedin.datastream.server.api.transport.SendFailedException: com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at com.linkedin.datastream.server.EventProducer.onSendCallback(EventProducer.java:293)
at com.linkedin.datastream.server.EventProducer.lambda$send$0(EventProducer.java:194)
at com.linkedin.datastream.kafka.KafkaTransportProvider.doOnSendCallback(KafkaTransportProvider.java:189)
at com.linkedin.datastream.kafka.KafkaTransportProvider.lambda$send$0(KafkaTransportProvider.java:155)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at com.linkedin.datastream.kafka.KafkaProducerWrapper.generateSendFailure(KafkaProducerWrapper.java:251)
... 16 more
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
[2021-03-09 14:02:37,378] ERROR updateErrorRate with 1. Look for error logs right before this message to see what happened (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
[2021-03-09 14:02:37,378] INFO Trying to seek to previous checkpoint for partitions: [testcompact-0] (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
[2021-03-09 14:02:37,378] ERROR Partition rewind failed due to (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2199)
at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1701)
at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1676)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.lambda$seekToLastCheckpoint$4(AbstractKafkaBasedConnectorTask.java:605)
at java.util.Collections$SingletonSet.forEach(Collections.java:4769)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.seekToLastCheckpoint(AbstractKafkaBasedConnectorTask.java:604)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.rewindAndPausePartitionOnException(AbstractKafkaBasedConnectorTask.java:251)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.lambda$sendDatastreamProducerRecord$1(AbstractKafkaBasedConnectorTask.java:283)
at com.linkedin.datastream.server.EventProducer.onSendCallback(EventProducer.java:303)
at com.linkedin.datastream.server.EventProducer.lambda$send$0(EventProducer.java:194)
at com.linkedin.datastream.kafka.KafkaTransportProvider.doOnSendCallback(KafkaTransportProvider.java:189)
at com.linkedin.datastream.kafka.KafkaTransportProvider.lambda$send$0(KafkaTransportProvider.java:155)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
[2021-03-09 14:02:41,953] INFO Trying to flush the producer and commit offsets. (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)

brooklin.log