bkatwal/kafka-solr-sink-connector

java.String cannot be casted to java.Map

Closed this issue · 3 comments

Hello, This issue has been asked before but no answer.

I've just downloaded your library a few days ago and try it locally on my Windows 10 laptop.
Zookeeper and kafka server run locally. Kafka version 2.6. Zookeeper is delivered with kafka.
Kafka sample file-to-file connector worked (i.e. output is test.sink.txt)

Solr v8.6.0 run as "solr.cmd start -e cloud"
I could insert and search document here.

When I try to do a file-to-solr test, it gave me the following error.
The JSON data is read from input text file and saved to kafka topic. It did not save to Solr.

[2020-10-23 16:56:34,257] ERROR WorkerSinkTask{id=SimpleKafkaSolrSinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.lang.String cannot be cast to java.util.Map (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.Map
at com.bkatwal.kafkaproject.SolrSinkTask.put(SolrSinkTask.java:72)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-10-23 16:56:34,257] ERROR WorkerSinkTask{id=SimpleKafkaSolrSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.util.Map
at com.bkatwal.kafkaproject.SolrSinkTask.put(SolrSinkTask.java:72)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
... 10 more
[2020-10-23 16:56:34,259] ERROR WorkerSinkTask{id=SimpleKafkaSolrSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
Terminate batch job (Y/N)? Y

[MY WORKER PROPERTIES]
bootstrap.servers=localhost:9092
header.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
#to deserialize plain JSON data keep schemas.enable=false
internal.key.converter.schemas.enable=false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets

Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000
plugin.path=/Software/Kafka/26/libs/bkatwal-solr-sink-lib

[MY CONNECT SOURCE PROPERTIES]
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=connect-test-data-json.txt
topic=connect-test

[connect-test-data-json.txt]
{
"name" : "anh",
"age" : 22,
"is_rich" : false
}

[MY SOLR SINK PROPERTIES]
name=SimpleKafkaSolrSinkConnector
topics=connect-test
solr.collection=demo
solr.url=localhost:8983/solr
#solr.mode=STANDALONE
#solr.url=localhost:2182,localhost:2183,localhost:2184
solr.mode=CLOUD
connector.class=com.bkatwal.kafkaproject.SolrSinkConnector
tasks.max=1
commit.within.ms=10

[MY TEST COMMAND]
connect-standalone.bat config\04-bkworker.properties config\02-connect-file-source.properties config\05-bkSolrSinkConnector.properties

[YOUR JAR FILES]
bkatwal-solr-sink-libs

Thank you for your input.

This is was never been asked.
If you are referring to #2
Stacktrace suggests, It was thrown by CloudSolrClient.requestWithRetryOnStaleState and not at all related to your issue.

Now, coming back to your issue. Looks like the data being sent to Kafka is a quoted string and not JSON.
As I do not know the code that you are using to write to Kafka, it would be impossible to answer this.
Could you please check the JSON that is being written to Kafka?
Or post your producer code, so I can check from my end.

Yes, the data sent to Kafka could very well be a string.
There is no codes, just config files and an input text file (e.g. connect-test-data-json.txt).
The data flow is: connect-test-data-json.txt as FileStreamSource --> SolrSinkConnector.
The FileStreamSource probably has read the file as String, thus caused the error. I cannot verify this but it makes sense.
My questions are:

  1. I could insert json into the kafka topic (using either kafka-console-producer script or Producer class) but how do I launch your SolrSinkConnector afterward? The only available command/script is the connect-standalone.bat and it requires 3 parameters. In another word, how can I use SolrSinkConnector to listen to a Kafka topic? I am using Apache Kafka here, not Confluent. Does Confluent community/free version provide a direct call to SolrSinkConnector?
  2. Is it possible to call SolrSinkConnector in Java code? If so, would you give me an example? I have been scraping the internet but not found anything.
  3. In my connect-source-properties file, is it possible to configure it to write the content of the input text file to Kafka topic as Json instead of String? I saw some Java example code that configure Producer with a JsonSerializer.
    Thank you very much for your help. Regards.

Normally any file read is done line by line. So, most probably your message is broken in multiple lines hence the String.

To your questions:

  1. You can check the documentation on the usage of connectors.
    You can send the JSON through the console producer. Make sure the JSON is a single line and not broken into multiple lines.
    Console producers push each line as a new message.
    I have mentioned the steps to run the sink connector at the end of this comment.

  2. Didn't quite get your question. But, if you mean by writing your own consumer. Yes, you can write a custom consumer API.

  3. Try using the same source connector and put your JSON in a single line. That should work.

you can verify the JSON posted in Kafka using the console consumer.
bin/kafka-console-consumer.sh --topic topic2 --from-beginning --bootstrap-server localhost:9092

Steps to run connector:

  1. Download the connector from confluent
  2. Update your connector.properties file and worker.properties file - you might want to update Solr host, bootstrap server, plugin path, etc.
  3. Start Kafka connect. You can run standalone or distributed mode
    I haven't used in windows, but I assume it's the same as OSX/Linux.
    connect-standalone.bat with worker.properties and connector.properties.
    You can find the 2 properties file in https://github.com/bkatwal/kafka-solr-sink-connector/tree/master/config
    update the file before using it.
    In linux/OSX, the command looks something like:
    bin/connect-standalone.sh <path to worker.properties>/bkworker.properties <path to connector.properties>/bkSolrSinkConnector.properties