strimzi/strimzi-lab

AMQP KAFKA Connector

iamrsaravana opened this issue · 3 comments

I am planning to pull the data from Hono using Strimzi kafka(Version 0.20.0) connector (Camel AMQP source connector). I have followed the below steps to read data from Hono.

I downloaded Camel-amqp-kafka-connector, JMS Jar files from below link:

https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-amqp-kafka-connector/0.7.0/camel-amqp-kafka-connector-0.7.0-package.tar.gz
https://downloads.apache.org/qpid/jms/0.51.0/apache-qpid-jms-0.51.0-bin.tar.gz

After downloaded the above tar and unzipped and created docker image file using below command

cat <Dockerfile
FROM strimzi/kafka:0.20.1-kafka-2.6.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-activemq-kafka-connector/* /opt/kafka/plugins/camel/
USER 1001
EOF

Docker build -f ./Dockerfile -t localhost:5000/my-connector-amqp_new .
docker push localhost:5000/my-connector-amqp_new

using below command i have created kafkaConnect( here used my local image created above)

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: 10.128.0.6:5000/my-connector-amqp_new
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1

**I was referring the below link to create kafkaConnector properties:

https://github.com/apache/camel-kafka-connector/blob/master/examples/CamelAmqpSourceConnector.properties**

Values are in this file as mentioned it below:

name=CamelAmqpSourceConnector
topics=mytopic
tasks.max=1
connector.class=org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector

camel.source.path.destinationType=queue
camel.source.path.destinationName=test-queue

camel.component.amqp.includeAmqpAnnotations=true
camel.component.amqp.connectionFactory=#class:org.apache.qpid.jms.JmsConnectionFactory
camel.component.amqp.connectionFactory.remoteURI=amqp://localhost:5672
camel.component.amqp.username=admin
camel.component.amqp.password=admin
camel.component.amqp.testConnectionOnStartup=true

I am using the below configurations. Could you suggest me what is the correct value for this property?

camel.component.amqp.connectionFactory

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: camelamqpsourceconnector
labels:
strimzi.io/cluster: my-connect-cluster-new
spec:
class: org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector
tasksMax: 1
config:
camel.component.amqp.includeAmqpAnnotations: true
camel.component.amqp.connectionFactory: org.apache.qpid.jms.JmsConnectionFactory
camel.component.amqp.connectionFactory.remoteURI: amqp://$( kubectl get service eclipse-hono-dispatch-router-ext --output="jsonpath={.spec.clusterIP}" -n hono):15671
camel.component.amqp.username: consumer@HONO
camel.component.amqp.password: verysecret
camel.component.amqp.testConnectionOnStartup: true
camel.source.kafka.topic: mytopic
camel.source.path.destinationType: queue
camel.source.path.destinationName: test-queue

I am getting below error messages:

Could you help me, what values we need to give for the below properties:

camel.component.amqp.connectionFactory: org.apache.qpid.jms.JmsConnectionFactory
camel.component.amqp.connectionFactory.remoteURI: amqp://$( kubectl get service eclipse-hono-dispatch-router-ext --output="jsonpath={.spec.clusterIP}" -n hono):15671

Error message:

2020-12-26 12:50:54,474 INFO Creating task camelamqpsourceconnector-0 (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,479 INFO ConnectorConfig values:
config.action.reload = restart
connector.class = org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = camelamqpsourceconnector
predicates = []
tasks.max = 1
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,480 INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = camelamqpsourceconnector
predicates = []
tasks.max = 1
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,481 INFO TaskConfig values:
task.class = class org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceTask
(org.apache.kafka.connect.runtime.TaskConfig) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,481 INFO Instantiated task camelamqpsourceconnector-0 with version 0.7.0 of type org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceTask (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,482 INFO JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
(org.apache.kafka.connect.json.JsonConverterConfig) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,483 INFO Set up the key converter class org.apache.kafka.connect.json.JsonConverter for task camelamqpsourceconnector-0 using the worker config (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,485 INFO JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
(org.apache.kafka.connect.json.JsonConverterConfig) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,486 INFO Set up the value converter class org.apache.kafka.connect.json.JsonConverter for task camelamqpsourceconnector-0 using the worker config (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,486 INFO Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task camelamqpsourceconnector-0 using the worker config (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,492 INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = camelamqpsourceconnector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,493 INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = camelamqpsourceconnector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,497 INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,500 INFO ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = connector-producer-camelamqpsourceconnector-0
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 2147483647
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 9223372036854775807
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 2147483647
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,517 WARN The configuration 'metrics.context.connect.group.id' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,518 WARN The configuration 'metrics.context.connect.kafka.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,518 INFO Kafka version: 2.6.0 (org.apache.kafka.common.utils.AppInfoParser) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,518 INFO Kafka commitId: 62abe01bee039651 (org.apache.kafka.common.utils.AppInfoParser) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,518 INFO Kafka startTimeMs: 1608987054518 (org.apache.kafka.common.utils.AppInfoParser) [StartAndStopExecutor-connect-1-2]
2020-12-26 12:50:54,543 INFO [Worker clientId=connect-1, groupId=connect-cluster-new] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
2020-12-26 12:50:54,600 INFO Starting CamelSourceTask connector task (org.apache.camel.kafkaconnector.CamelSourceTask) [task-thread-camelamqpsourceconnector-0]
2020-12-26 12:50:54,655 INFO [Producer clientId=connector-producer-camelamqpsourceconnector-0] Cluster ID: AxvFw7iiSDCEUx-RwUy0gw (org.apache.kafka.clients.Metadata) [kafka-producer-network-thread | connector-producer-camelamqpsourceconnector-0]
2020-12-26 12:50:54,660 INFO CamelAmqpSourceConnectorConfig values:
camel.aggregation.size = 10
camel.aggregation.timeout = 500
camel.beans.aggregate = null
camel.component.amqp.acceptMessagesWhileStopping = false
camel.component.amqp.acknowledgementModeName = AUTO_ACKNOWLEDGE
camel.component.amqp.allowAutoWiredConnectionFactory = true
camel.component.amqp.allowAutoWiredDestinationResolver = true
camel.component.amqp.allowReplyManagerQuickStop = false
camel.component.amqp.allowSerializedHeaders = false
camel.component.amqp.artemisStreamingEnabled = true
camel.component.amqp.asyncConsumer = false
camel.component.amqp.asyncStartListener = false
camel.component.amqp.asyncStopListener = false
camel.component.amqp.autoStartup = true
camel.component.amqp.autowiredEnabled = true
camel.component.amqp.cacheLevel = null
camel.component.amqp.cacheLevelName = CACHE_AUTO
camel.component.amqp.clientId = null
camel.component.amqp.concurrentConsumers = 1
camel.component.amqp.configuration = null
camel.component.amqp.connectionFactory = org.apache.qpid.jms.JmsConnectionFactory
camel.component.amqp.consumerType = Default
camel.component.amqp.defaultTaskExecutorType = null
camel.component.amqp.destinationResolver = null
camel.component.amqp.disableReplyTo = false
camel.component.amqp.durableSubscriptionName = null
camel.component.amqp.eagerLoadingOfProperties = false
camel.component.amqp.eagerPoisonBody = Poison JMS message due to ${exception.message}
camel.component.amqp.errorHandler = null
camel.component.amqp.errorHandlerLogStackTrace = true
camel.component.amqp.errorHandlerLoggingLevel = WARN
camel.component.amqp.exceptionListener = null
camel.component.amqp.exposeListenerSession = false
camel.component.amqp.headerFilterStrategy = null
camel.component.amqp.idleConsumerLimit = 1
camel.component.amqp.idleTaskExecutionLimit = 1
camel.component.amqp.includeAllJMSXProperties = false
camel.component.amqp.includeAmqpAnnotations = true
camel.component.amqp.jmsKeyFormatStrategy = null
camel.component.amqp.jmsMessageType = null
camel.component.amqp.lazyCreateTransactionManager = true
camel.component.amqp.mapJmsMessage = true
camel.component.amqp.maxConcurrentConsumers = null
camel.component.amqp.maxMessagesPerTask = -1
camel.component.amqp.messageConverter = null
camel.component.amqp.messageCreatedStrategy = null
camel.component.amqp.messageIdEnabled = true
camel.component.amqp.messageListenerContainerFactory = null
camel.component.amqp.messageTimestampEnabled = true
camel.component.amqp.password = verysecret
camel.component.amqp.pubSubNoLocal = false
camel.component.amqp.queueBrowseStrategy = null
camel.component.amqp.receiveTimeout = 1000
camel.component.amqp.recoveryInterval = 5000
camel.component.amqp.replyTo = null
camel.component.amqp.replyToDeliveryPersistent = true
camel.component.amqp.replyToSameDestinationAllowed = false
camel.component.amqp.requestTimeoutCheckerInterval = 1000
camel.component.amqp.selector = null
camel.component.amqp.subscriptionDurable = false
camel.component.amqp.subscriptionName = null
camel.component.amqp.subscriptionShared = false
camel.component.amqp.taskExecutor = null
camel.component.amqp.testConnectionOnStartup = true
camel.component.amqp.transacted = false
camel.component.amqp.transactedInOut = false
camel.component.amqp.transactionManager = null
camel.component.amqp.transactionName = null
camel.component.amqp.transactionTimeout = -1
camel.component.amqp.transferException = false
camel.component.amqp.transferExchange = false
camel.component.amqp.useMessageIDAsCorrelationID = false
camel.component.amqp.username = consumer@HONO
camel.component.amqp.waitForProvisionCorrelationToBeUpdatedCounter = 50
camel.component.amqp.waitForProvisionCorrelationToBeUpdatedThreadSleepingTime = 100
camel.error.handler = default
camel.error.handler.max.redeliveries = 0
camel.error.handler.redelivery.delay = 1000
camel.idempotency.enabled = false
camel.idempotency.expression.header = null
camel.idempotency.expression.type = body
camel.idempotency.kafka.bootstrap.servers = localhost:9092
camel.idempotency.kafka.max.cache.size = 1000
camel.idempotency.kafka.poll.duration.ms = 100
camel.idempotency.kafka.topic = kafka_idempotent_repository
camel.idempotency.memory.dimension = 100
camel.idempotency.repository.type = memory
camel.remove.headers.pattern =
camel.source.camelMessageHeaderKey = null
camel.source.component = amqp
camel.source.contentLogLevel = OFF
camel.source.endpoint.acceptMessagesWhileStopping = false
camel.source.endpoint.acknowledgementModeName = AUTO_ACKNOWLEDGE
camel.source.endpoint.allowReplyManagerQuickStop = false
camel.source.endpoint.allowSerializedHeaders = false
camel.source.endpoint.artemisStreamingEnabled = true
camel.source.endpoint.asyncConsumer = false
camel.source.endpoint.asyncStartListener = false
camel.source.endpoint.asyncStopListener = false
camel.source.endpoint.autoStartup = true
camel.source.endpoint.cacheLevel = null
camel.source.endpoint.cacheLevelName = CACHE_AUTO
camel.source.endpoint.clientId = null
camel.source.endpoint.concurrentConsumers = 1
camel.source.endpoint.connectionFactory = null
camel.source.endpoint.consumerType = Default
camel.source.endpoint.defaultTaskExecutorType = null
camel.source.endpoint.destinationResolver = null
camel.source.endpoint.disableReplyTo = false
camel.source.endpoint.durableSubscriptionName = null
camel.source.endpoint.eagerLoadingOfProperties = false
camel.source.endpoint.eagerPoisonBody = Poison JMS message due to ${exception.message}
camel.source.endpoint.errorHandler = null
camel.source.endpoint.errorHandlerLogStackTrace = true
camel.source.endpoint.errorHandlerLoggingLevel = WARN
camel.source.endpoint.exceptionHandler = null
camel.source.endpoint.exceptionListener = null
camel.source.endpoint.exchangePattern = null
camel.source.endpoint.exposeListenerSession = false
camel.source.endpoint.headerFilterStrategy = null
camel.source.endpoint.idleConsumerLimit = 1
camel.source.endpoint.idleTaskExecutionLimit = 1
camel.source.endpoint.includeAllJMSXProperties = false
camel.source.endpoint.jmsKeyFormatStrategy = null
camel.source.endpoint.jmsMessageType = null
camel.source.endpoint.lazyCreateTransactionManager = true
camel.source.endpoint.mapJmsMessage = true
camel.source.endpoint.maxConcurrentConsumers = null
camel.source.endpoint.maxMessagesPerTask = -1
camel.source.endpoint.messageConverter = null
camel.source.endpoint.messageCreatedStrategy = null
camel.source.endpoint.messageIdEnabled = true
camel.source.endpoint.messageListenerContainerFactory = null
camel.source.endpoint.messageTimestampEnabled = true
camel.source.endpoint.password = null
camel.source.endpoint.pubSubNoLocal = false
camel.source.endpoint.receiveTimeout = 1000
camel.source.endpoint.recoveryInterval = 5000
camel.source.endpoint.replyTo = null
camel.source.endpoint.replyToDeliveryPersistent = true
camel.source.endpoint.replyToSameDestinationAllowed = false
camel.source.endpoint.requestTimeoutCheckerInterval = 1000
camel.source.endpoint.selector = null
camel.source.endpoint.subscriptionDurable = false
camel.source.endpoint.subscriptionName = null
camel.source.endpoint.subscriptionShared = false
camel.source.endpoint.synchronous = false
camel.source.endpoint.taskExecutor = null
camel.source.endpoint.testConnectionOnStartup = false
camel.source.endpoint.transacted = false
camel.source.endpoint.transactedInOut = false
camel.source.endpoint.transactionManager = null
camel.source.endpoint.transactionName = null
camel.source.endpoint.transactionTimeout = -1
camel.source.endpoint.transferException = false
camel.source.endpoint.transferExchange = false
camel.source.endpoint.useMessageIDAsCorrelationID = false
camel.source.endpoint.username = null
camel.source.endpoint.waitForProvisionCorrelationToBeUpdatedCounter = 50
camel.source.endpoint.waitForProvisionCorrelationToBeUpdatedThreadSleepingTime = 100
camel.source.marshal = null
camel.source.maxBatchPollSize = 1000
camel.source.maxPollDuration = 1000
camel.source.path.destinationName = test-queue
camel.source.path.destinationType = queue
camel.source.pollingConsumerBlockTimeout = 0
camel.source.pollingConsumerBlockWhenFull = true
camel.source.pollingConsumerQueueSize = 1000
camel.source.unmarshal = null
camel.source.url = null
topics = test
(org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnectorConfig) [task-thread-camelamqpsourceconnector-0]
2020-12-26 12:50:54,905 INFO Setting initial properties in Camel context: [{camel.source.path.destinationName=test-queue, connector.class=org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector, camel.component.amqp.username=consumer@HONO, tasks.max=1, camel.component.amqp.connectionFactory=org.apache.qpid.jms.JmsConnectionFactory, camel.source.component=amqp, camel.component.amqp.testConnectionOnStartup=true, camel.component.amqp.connectionFactory.remoteURI=amqp://$( kubectl get service eclipse-hono-dispatch-router-ext --output="jsonpath={.spec.clusterIP}" -n hono):15671, camel.component.amqp.password=verysecret, camel.component.amqp.includeAmqpAnnotations=true, camel.source.kafka.topic=mytopic, camel.source.path.destinationType=queue, task.class=org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceTask, name=camelamqpsourceconnector}] (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain) [task-thread-camelamqpsourceconnector-0]
2020-12-26 12:50:54,985 INFO Using properties from: classpath:application.properties;optional=true (org.apache.camel.main.BaseMainSupport) [task-thread-camelamqpsourceconnector-0]
2020-12-26 12:50:55,175 INFO WorkerSourceTask{id=camelamqpsourceconnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-camelamqpsourceconnector-0]
2020-12-26 12:50:55,175 INFO WorkerSourceTask{id=camelamqpsourceconnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-camelamqpsourceconnector-0]
2020-12-26 12:50:55,176 ERROR WorkerSourceTask{id=camelamqpsourceconnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-camelamqpsourceconnector-0]
org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context
at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:144)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: Cannot find getter method: connectionFactory on bean: class org.apache.camel.component.amqp.AMQPComponent when binding property: connectionFactory.remoteURI
at org.apache.camel.support.PropertyBindingSupport.doBuildPropertyOgnlPath(PropertyBindingSupport.java:282)
at org.apache.camel.support.PropertyBindingSupport.doBindProperties(PropertyBindingSupport.java:210)
at org.apache.camel.support.PropertyBindingSupport.access$100(PropertyBindingSupport.java:88)
at org.apache.camel.support.PropertyBindingSupport$Builder.bind(PropertyBindingSupport.java:1785)
at org.apache.camel.main.MainHelper.setPropertiesOnTarget(MainHelper.java:163)
at org.apache.camel.main.BaseMainSupport.autoConfigurationFromProperties(BaseMainSupport.java:1133)
at org.apache.camel.main.BaseMainSupport.autoconfigure(BaseMainSupport.java:424)
at org.apache.camel.main.BaseMainSupport.postProcessCamelContext(BaseMainSupport.java:472)
at org.apache.camel.main.SimpleMain.doInit(SimpleMain.java:32)
at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
at org.apache.camel.support.service.BaseService.start(BaseService.java:111)
at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:141)
... 8 more
2020-12-26 12:50:55,176 ERROR WorkerSourceTask{id=camelamqpsourceconnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-camelamqpsourceconnector-0]
2020-12-26 12:50:55,177 INFO Stopping CamelSourceTask connector task (org.apache.camel.kafkaconnector.CamelSourceTask) [task-thread-camelamqpsourceconnector-0]
2020-12-26 12:50:55,177 INFO CamelSourceTask connector task stopped (org.apache.camel.kafkaconnector.CamelSourceTask) [task-thread-camelamqpsourceconnector-0]
2020-12-26 12:50:55,177 INFO [Producer clientId=connector-producer-camelamqpsourceconnector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer) [task-thread-camelamqpsourceconnector-0]

I do not think the Strimzi-lab is the right place to ask this. The Apache Camel discussions might be the best place to ask about the actual configuration of the Camel connector. The Strimzi Discussions (https://github.com/strimzi/strimzi-kafka-operator/discussions) might be best to ask about how to deploy the configuration with Strimzi.

The way you add the connector to the Strimzi image seems good to me. But for the KafkaConnetor custom resource, this part:

$( kubectl get service eclipse-hono-dispatch-router-ext --output="jsonpath={.spec.clusterIP}" -n hono)

would need to be evaluated before you create the custom resource. You should be able to double check with kubectl get kafkaconnector -o yaml to see what the actual value there is.

PS: It would be much easier to read your comment and try to help if you properly format the different sections or logs as code. This way it is hard to actually just go through it.

Thanks for your reply, I Gave direct IP address in "KafkaConnector" yaml file instead this "$( kubectl get service eclipse-hono-dispatch-router-ext --output="jsonpath={.spec.clusterIP}" -n hono)"

I am getting below error messages now:

        org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context
    at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:144)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.lang.IllegalArgumentException: Cannot find getter method: connectionFactory on bean: class org.apache.camel.component.amqp.AMQPComponent when binding property: connectionFactory.remoteURI
at org.apache.camel.support.PropertyBindingSupport.doBuildPropertyOgnlPath(PropertyBindingSupport.java:282)
at org.apache.camel.support.PropertyBindingSupport.doBindProperties(PropertyBindingSupport.java:210)
at org.apache.camel.support.PropertyBindingSupport.access$100(PropertyBindingSupport.java:88)
at org.apache.camel.support.PropertyBindingSupport$Builder.bind(PropertyBindingSupport.java:1785)
at org.apache.camel.main.MainHelper.setPropertiesOnTarget(MainHelper.java:163)
at org.apache.camel.main.BaseMainSupport.autoConfigurationFromProperties(BaseMainSupport.java:1133)
at org.apache.camel.main.BaseMainSupport.autoconfigure(BaseMainSupport.java:424)
at org.apache.camel.main.BaseMainSupport.postProcessCamelContext(BaseMainSupport.java:472)
at org.apache.camel.main.SimpleMain.doInit(SimpleMain.java:32)
at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
at org.apache.camel.support.service.BaseService.start(BaseService.java:111)
at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:141)

Anybody gives a suggestion how to pass source AMQP IP address in the configuration:

Now I am able to send the data from Hono to Strimzi Kafka Connector using Camel AMQP Source Connector.

Follow the below steps to pull data from Hono and send to Strimzi Kafka Connector:

  1. Creating Name space:
    kubectl create namespace kafka

  2. install latest Strimzi operator using the below command:

    kubectl apply -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

  3. creating Kafka (1 pod) and zookeeper(2 pods ) using the below yaml configuration:

     **kafka.yaml:**
    
     apiVersion: kafka.strimzi.io/v1beta1
     kind: Kafka
     metadata:
       name: my-cluster
     spec:
       kafka:
         version: 2.6.0
         replicas: 1
         listeners:
           - name: plain
             port: 9092
             type: internal
             tls: false
           - name: tls
             port: 9093
             type: internal
             tls: true
         config:
           offsets.topic.replication.factor: 1
           transaction.state.log.replication.factor: 1
           transaction.state.log.min.isr: 1
           log.message.format.version: "2.6"
         storage:
           type: jbod
           volumes:
           - id: 0
             type: persistent-claim
             size: 10Gi
             deleteClaim: false
       zookeeper:
         replicas: 2
         storage:
           type: persistent-claim
           size: 10Gi    
           deleteClaim: false
       entityOperator:
         topicOperator: {}
         userOperator: {}
     
     Kuebctl apply -f kafka.yaml  -n kafka.
     
     _ps: Here I use glusterfs persistent storage._
    
  4. Creating AMQP - Source Connector image file using below commands:

I downloaded Camel-amqp-kafka-connector, JMS Jar files from below link:

        https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-amqp-kafka-connector/0.7.0/camel-amqp-kafka-connector-0.7.0-package.tar.gz
        https://downloads.apache.org/qpid/jms/0.51.0/apache-qpid-jms-0.51.0-bin.tar.gz
        
        After downloaded the above tar and unzipped and created docker image file using below command
        
        **Dockerfile:**
        FROM strimzi/kafka:0.20.1-kafka-2.6.0
        USER root:root
        RUN mkdir -p /opt/kafka/plugins/camel-kafka-connectors/camel-amqp-kafka-connector/
        COPY ./plugin/apache-qpid-jms-0.55.0/* /opt/kafka/plugins/camel-kafka-connectors/camel-amqp-kafka-connector/
        COPY ./camel-amqp-kafka-connector/* /opt/kafka/plugins/camel-kafka-connectors/camel-amqp-kafka-connector/
        USER 1001
        
        
        Docker build -f ./Dockerfile -t localhost:5000/my-connector-amqp_new .
        docker push localhost:5000/my-connector-amqp_new

       _ps: Here I have used local docker repository_
  1. Creating Kafkaconnect using below Yaml configuration:

     Kafkaconnect.yaml:
     
     apiVersion: kafka.strimzi.io/v1beta1
     kind: KafkaConnect
     metadata:
       name: my-connect-cluster-new
       annotations:
         strimzi.io/use-connector-resources: "true"
     spec:
       image: 10.128.0.6:5000/my-connector-amqp_new5
       replicas: 1
       bootstrapServers: my-cluster-kafka-bootstrap:9092
       config:
         group.id: connect-cluster-new
         offset.storage.topic: connect-cluster-offsets
         config.storage.topic: connect-cluster-configs
         status.storage.topic: connect-cluster-status
         config.storage.replication.factor: 1
         offset.storage.replication.factor: 1
         status.storage.replication.factor: 1
    

    kubectl apply -f Kafkaconnect.yaml -n kafka

  2. Creating AMQP-KafkaConnector using below yaml file:

amqp_connector.yaml

    apiVersion: kafka.strimzi.io/v1alpha1
    kind: KafkaConnector
    metadata:
      name: camelamqpsourceconnector
      labels:
        strimzi.io/cluster: my-connect-cluster-new
    spec:
      class: org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector
      tasksMax: 1
      config:
        camel.component.amqp.includeAmqpAnnotations: true
        camel.component.amqp.connectionFactory: '#class:org.apache.qpid.jms.JmsConnectionFactory'
        camel.component.amqp.connectionFactory.remoteURI: amqp://10.106.88.243:15672
        camel.component.amqp.username: consumer@HONO
        camel.component.amqp.password: verysecret
        camel.component.amqp.testConnectionOnStartup: true
        camel.source.path.destinationType: queue 
        #( my usecase i am reading from hono)
        camel.source.path.destinationName: telemetry/LTTS_TENANT
        # ( this is the place where hono writing the data)
        topics: mytopic 
        #(kafka topics where you want to write it)

kubectl apply -f amqp_connector.yaml -n kafka.
ps: IP address given here all are my pod ip address. it may vary according to your pod.

7 ) viewing data in strimzi kafka using the below command:
kubectl run kafka-consumer -ti --image=strimzi/kafka:0.20.1-kafka-2.6.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic mytopic

I Hope this may be useful for somebody who wants to pull the data directly using AMQP Camel Connector.