castorm/kafka-connect-http

DateTimeFormatter.parse error when try example. How to fix it

bob-tran-goldenowl opened this issue · 13 comments

Hi @bob-tran-goldenowl ,

Could you please share your configuration, omitting any confidential details?

Thanks,
Best regards.

My config:
{
"name": "my_index.elasticsearch.http",
"config": {
"connector.class": "com.github.castorm.kafka.connect.http.HttpSourceConnector",
"tasks.max": "1",
"http.offset.initial": "timestamp=1589500805994",
"http.request.url": "http://ip:9200/index/_searchh",
"http.request.method": "POST",
"http.auth.type": "Basic",
"http.auth.user": "",
"http.auth.password": "
*",
"http.request.headers": "Content-Type: application/json",
"http.request.body": "{"size": 100, "sort": [{"@created": "asc"}], "search_after": [${offset.timestamp}]}",
"http.response.list.pointer": "/hits/hits",
"http.response.record.pointer": "/_source",
"http.response.record.key.pointer": "/_id",
"http.response.record.offset.pointer": "timestamp=/sort/0",
"http.timer.interval.millis": "60000",
"http.timer.catchup.interval.millis": "60000",
"kafka.topic": "elasticsearch"
}
}

Hi @bob-tran-goldenowl,

For the first issue, http.offset.initial expects timestamp to be in ISO8601 format, e.g. 2007-12-03T10:15:30.00Z.

So instead of:

"http.offset.initial": "timestamp=1589500805994",

You'll need:

"http.offset.initial": "timestamp=2020-05-15T00:00:05.994Z",

For the second issue, I'd need to know more details about the error you are getting.

I hope this helps.

Best regards.

@castorm I still have a question:
has no request by http.offset.initial, http.offset.initial,http.response.record.offset.pointer fields?Because i have setup a connect with config:

{
    "name": "weatherapi.http.source",
    "config": {
        "connector.class": "com.github.castorm.kafka.connect.http.HttpSourceConnector",
        "tasks.max": "1",
        "http.request.url": "http://api.met.no/weatherapi/locationforecast/2.0/compact",
        "http.request.headers": "Accept: application/json",
        "http.request.params": "lat=51.5&lon=0",
        "http.response.list.pointer": "/properties/timeseries",
        "http.timer.interval.millis": "30000",
        "http.timer.catchup.interval.millis": "1000",
        "kafka.topic": "weatherapi"
    }
}

api not login but it error: Policy failed for response code: 403, body: 403 Forbidden
image

image

Hi @bob-tran-goldenowl,

For the first issue, http.offset.initial expects timestamp to be in ISO8601 format, e.g. 2007-12-03T10:15:30.00Z.

So instead of:

"http.offset.initial": "timestamp=1589500805994",

You'll need:

"http.offset.initial": "timestamp=2020-05-15T00:00:05.994Z",

For the second issue, I'd need to know more details about the error you are getting.

I hope this helps.

Best regards.

I will test it. Thanks you very much

Regarding the 403 issue, apparently that API doesn't like the default User-Agent, you'll have to override it, I checked with httpie on console and it worked, so I used the same one in the connector to solve it:

    "http.request.url": "http://api.met.no/weatherapi/locationforecast/2.0/compact",
    "http.request.headers": "Accept: application/json, User-Agent: HTTPie/2.3.0",
    "http.request.params": "lat=51.5 & lon=0",
    "http.response.list.pointer": "/properties/timeseries",
    "http.response.record.offset.pointer": "timestamp=/time",

I hope this helps.

I was also confused since the example in https://castorm.github.io/kafka-connect-http/examples/elasticsearch-search.html uses an epoch millisecond representation. I'd like to request that the example be updated, and the requirement documented in the documentation for http.offset.initial. Are the keys that property can take pre-defined? From the documentation I got the impression they were arbitrary strings with arbitrary values.

Since the http.offset.initial must be an ISO8601-formatted date, how can I get the timestamp in epoch format to match my rest API in the query parameters.

Here's my (redacted) config - again borrowed from the elastic search example:

        "connector.class": "com.github.castorm.kafka.connect.http.HttpSourceConnector",
        "tasks.max": "1",
        "http.offset.initial": "timestamp=2020-05-08T07:55:44Z",
        "http.request.url": "https://example.com/resource/resources",
        "http.request.params": "createdTimestamp.gte=${offset.timestamp}",
        "http.response.list.pointer": "/",
        "http.response.record.key.pointer": "/id",
        "http.response.record.offset.pointer": "timestamp=/createdTimestamp",
        "http.timer": "com.github.castorm.kafka.connect.timer.FixedIntervalTimer",
        "http.timer.interval.millis": "60000",
        "kafka.topic": "mytopic"

Here's what the connector is doing:

https://example.com/resource/resources?createdTimestamp.gte=2020-05-08T07:55:44Z

Here's what I need it to do:

https://example.com/resource/resources?createdTimestamp.gte=1606800000000

Hi @mnaseef-gloo,

I was also confused since the example in https://castorm.github.io/kafka-connect-http/examples/elasticsearch-search.html uses an epoch millisecond representation. I'd like to request that the example be updated, and the requirement documented in the documentation for http.offset.initial. Are the keys that property can take pre-defined? From the documentation I got the impression they were arbitrary strings with arbitrary values.

You are completely right, example is outdated, there's been a few changes around timestamp, and I'm afraid I might have missed some in the docs.

Furthermore, I might have broken this inadvertently. It's become apparent that the current representation of offset.timestamp as an ISO8601 string can be confusing and also the fact the parser is not applied on offset.initial. So I'll take that as feedback and think of a way to make this feel more natura, or at the very least be explicit in the docs.

Regarding what you are trying to achieve, the connector is using underneath FreeMarker templates, I haven't tested this specifically, but I believe something like this should do the trick:

"http.request.params": "createdTimestamp.gte=${offset.timestamp?datetime.iso?long}",

I hope this helps you get started until I get the docs updated.

Thanks for the quick response and suggestion. I haven't gotten it to work just yet, but you have pointed me in what I think is the right direction.

Regarding what you are trying to achieve, the connector is using underneath FreeMarker templates, I haven't tested this specifically, but I believe something like this should do the trick:

"http.request.params": "createdTimestamp.gte=${offset.timestamp?datetime.iso?long}",

I hope this helps you get started until I get the docs updated.

almost there...

kafka-connect      | [2020-12-03 16:36:41,793] DEBUG <-- 500 Internal Server Error https://REDACTED/datasource/datasources?createdTimestamp.gte=1,588,924,544,000 (655ms, 35-byte body) (com.github.castorm.kafka.connect.http.client.okhttp.OkHttpClient)

I got it to work with this - rather convoluted. I hope you can come up with a simpler solution.

        "http.request.params": "createdTimestamp.gte=${offset.timestamp?datetime.iso?long?replace(\",\",\"\")}",

I got it to work with this - rather convoluted. I hope you can come up with a simpler solution.

        "http.request.params": "createdTimestamp.gte=${offset.timestamp?datetime.iso?long?replace(\",\",\"\")}",

Apparently it can be done a bit simpler than that:

offset.timestamp?datetime.iso?c

The last c means "computer" representation of numbers (docs), which can be made default, and then it wouldn't be needed.

And this is essentially the direction I decided to take for now. Documentation has been also updated accordingly.

The PR with these changes has been linked: #76, and I will release a new version in a few minutes.

I hope this improvement is enough to facilitate its usage. Feedback is welcome of course.

Thanks,
Best regards.

Looks great. Thanks!