castorm/kafka-connect-http

How to transform with REST API

lzuchowska opened this issue · 0 comments

Hello,
I've been struggling with applying the ExtractField transform into a simple REST API response.
This is the REST API response:

    "d": {
        "results": [
            {
                "__metadata": {
                    "uri": "https://some_uri.com",
                    "type": "c4codata.ServiceRequest",
                    "etag": "W/\"datetimeoffset'2021-10-26T10%3A21%3A19.3991180Z'\""
                },
                "ObjectID": "00163E0A47311ED89AC41CD2E1EF8932",
                "ProductRecipientPartyID": "000000000000000000000000000000000000000000000000000013597241",

With the configuration file containing this property:
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
When I try to extract a field, I get an error: Unknown field: fieldname or Expected Struct, found String.
How should I work with transforms on fields, when the REST API returns such a JSON?
Is there a way to get into deeper fields?
I've looked at issue #19 , however this solution didn't work for me either.
What I want to extract is the whole value list, without the __metadata field, and to flatten some specific fields.
Expected result:

{ "ChangedByCustomerIndicator" : false,
  "ResponseByProcessorDateTimeContent" : "", ... }

The 'base' output, without any transforms, value converters or response mappers:

{
  "value": "{\"__metadata\":{\"uri\":\"https://some_uri.com\",\"type\":\"c4codata.ServiceRequest\",\"etag\":\"W/\\\"datetimeoffset'2021-10-19T10%3A11%3A42.3716190Z'\\\"\"},\"ObjectID\":\"00163EADAC081EEC8C988DA861144CDB\",\"ProductRecipientPartyID\":\"000000000000000000000000000000000000000000000000000014555600\", .... }}",
  "key": {
    "string": "00163EADAC081EEC8C988DA861144CDB"
  },
  "timestamp": {
    "long": 1634638091225
  }
}

The result I have managed to achieve by adding:

"http.response.record.mapper": "com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
{
  "__metadata": {
    "uri": "https://some_uri.com",
    "type": "c4codata.ServiceRequest",
    "etag": "W/\"datetimeoffset'2021-10-28T10%3A06%3A09.6248630Z'\""
  },
  "ObjectID": "00163E0A47311ED89AC41CD2E1EF8932",
  "ProductRecipientPartyID": "000000000000000000000000000000000000000000000000000010005577",
  "ProductRecipientPartyUUID": "888888-111111-000000-222222",
  "ProductRecipientPartyName": "DFJSOWJRFNLDARI02URJE",
  "IncidentServiceIssueCategoryID": "VAPROGIJAL",