castorm/kafka-connect-http

Each JSON response is packed into "payload.value" as an invalid JSON string

rumline opened this issue · 5 comments

Describe the bug
The JSON documents returned by the REST API call are recorded to the Kafka topic in a single field of "payload.value" with extra backslashes and double quote characters. This behavior renders the string invalid JSON when passed to Elasticsearch and proper document field:value pairs are not created.

To Reproduce
I didn't track down the same exact record through the process. Examples are attached.

Use this source config:
nytimes_source_connector_config.txt

Getting this JSON response:
nytimes_response.txt

An example entry in the Kafka topic is created:
kafka_topic_entry.txt

Use this sink config:
nytime_sink_connector_config.txt

An example Elasticsearch document is created that is unparsed:
elasticsearch_document.txt

Expected behavior
The JSON responses are stored in the Kafka topic as valid JSON without the extra double quotes and backslashes that turn the response JSON into a very long string.

Kafka Connect:
Using the confluent-platform-2.12. All components are at version 5.5.0

Plugin:
Self complied 0.7.1 using Java JDK-1.8.0_251

Additional context
Perhaps I am simply misunderstanding how to properly configure the connector. I've tried many variations of configuration options and SMT's to try and work around this behavior.

First of all, thank you @rumline for using the plugin, and I'm sorry it didn't work out out of the box for you.

You seem to be using the plugin just fine. However, Kafka Connect source plugins produce data with a schema, this schema is usually an Avro schema that will be registered on the Schema Registry. You can find some additional information about how to manage this here: https://docs.confluent.io/current/schema-registry/connect.html

The schema produced by default by this plugin wraps key and value each in an envelope with a single string property with name "key" for the key and "value" for the value, although the name of these properties is configurable.

In order to simplify your use case, I've just released v0.7.3 that introduces two new mappers that produce a simplified schema without the envelopes. You'll find them here: StringKvSourceRecordMapper.java and BytesKvSourceRecordMapper.java. Configuration of your plugin in order to use them would require an additional property:

"http.response.record.mapper": "com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper"

or

"http.response.record.mapper": "com.github.castorm.kafka.connect.http.record.BytesKvSourceRecordMapper"

However, using the StringKvSourceRecordMapper probably won't solve your issue, as you could have achieved the same thing by using the ExtractField SMT in your Sink connector to extract the value field out of the envelope, like this:

"transforms": "ExtractKey,ExtractValue",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractKey.field": "key",
"transforms.ExtractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractValue.field": "value"

And the reason I don't think this will solve it is because by representing the json document as a String, with the default converter, it's being encoded escaping illegal characters (illegal because the value has to be wrapped in an Avro envelope represented as a json itself).

I haven't tried this, but my guess is you'll have to play with the following configuration properties documented here: connect-configuring-converters

I'm not 100% sure on what you should be using there, but my guess would be something like this in combination with one of the new mappers described above:

"key.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"

Please, let me know if there is anything else I can do to support you with this use case, and please come back if you manage to make it work so future users benefit from your findings.

Thanks,
Best regards.

Works perfectly now. Switched to the new StringKvSourceRecordMapper and then I only had to add "value.converter": "StringConverter". The key and schema are completely ignored by the Elasticsearch sink to allow Elasticsearch to do dynamic type mapping.

New connector config:

{ "name": "nytimesnewswire", "config": { "connector.class": "com.github.castorm.kafka.connect.http.HttpSourceConnector", "value.converter": "StringConverter", "tasks.max": "1", "http.request.url": "https://api.nytimes.com/svc/news/v3/content/all/all.json?api-key=your-api-key-here", "http.request.method": "GET", "http.response.record.mapper": "com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper", "http.response.list.pointer": "/results", "http.response.record.key.pointer": "/slug_name", "http.throttler.interval.millis": "10000", "http.throttler.catchup.interval.millis": "10000", "kafka.topic": "nytimesnewswire" } }

Important settings in the Elasticsearch sink config to ignore key and schema:

"key.ignore": "true", "schema.ignore": "true", "value.converter.schemas.enable": "false"

Thank you for the quick solution.

Since I was testing it out, here are examples of the different topic entries with the different settings. Perhaps useful for the examples section of the project.

RecordMapperExamples.txt

That's great news :) Thanks for the examples, I understand your use case will be common, so I'll have to find a place to document this.

By the way, I saw you noticed #20.

Although you are using the connector just fine, you might be interested to know that once that issue is resolved, it would be possible for you to use the NYTimes API the same way you are, periodically polling last articles, but avoiding articles to be published to Kafka if they've been already published. Not sure if this would be useful for you. I guess it depends on whether you value having a deduplicated Kafka topic and saving some unnecessary Elasticsearch writes.

All it'd require from you is:

  • Pointing the connector in the direction of timestamp field (not sure if published_date or
    created_date):
"http.response.record.timestamp.pointer": "/published_date"
  • Indicating the connector that results are in reverse order which is covered in #20.

Castor,
Thank you for the communication with me over email. Sorry that I did not spot this thread earlier as it would have prevented my emails.

I am now using the StringKvSourceRecordMapper and value converter StringConverter and works, and I get just the data back.

Maybe as a future enhancement might be the ability to add a custom schema like they enable you to do with the https://docs.confluent.io/kafka-connect-sftp/current/source-connector/json_source_connector.html connector.
Here you can use key.schema and value.schema to define the schemas.