getindata/flink-http-connector

Async lookups return incorrect results because JsonRowDataSerializationSchema is not thread-safe

grzegorz8 opened this issue · 3 comments

I noticed that lookup in async mode may not work as expected. Namely, when multiple HTTP requests are sent at once in async mode, some of the results seems to be duplicated for other rows.

I believe the root cause is the fact that org.apache.flink.formats.json.JsonRowDataSerializationSchema is not thread-safe.

    /** Reusable object node. */
    private transient ObjectNode node;

    @Override
    public byte[] serialize(RowData row) {
        if (node == null) {
            node = mapper.createObjectNode();
        }

        try {
            runtimeConverter.convert(mapper, node, row);
            return mapper.writeValueAsBytes(node);
        } catch (Throwable t) {
            throw new RuntimeException(String.format("Could not serialize row '%s'.", row), t);
        }
    }

The serialization schema is used in com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreator#createLookupQuery.

Hi @grzegorz8
Many thanks for debugging and reporting this, its a tricky one... also I think we have the same issue with responseBodyDecoder...

I've looked at KinesisFireHoseSink Connector - https://issues.apache.org/jira/browse/FLINK-24228 where it seems the JsonRowDataSerializationSchema is used in a same way as we do here... the only difference is that we are using it in Lookup not Sink...

It seems that AsyncSinkWriter has a build in support for this via ElementConverter interface -> KinesisFirehoseSinkElementConverter and this parts seems not existing for Lookup Sinks, or maybe I dont see it.

My suggestion for now is to... wrap
LookupQueryInfo lookupQueryInfo = lookupQueryCreator.createLookupQuery(lookupRow);
and
return Optional.ofNullable(responseBodyDecoder.deserialize(responseBody.getBytes()));

with synchronized block...

Also maybe worth reporting this lack of functionality to Flink?

Ok @grzegorz8
I think we have an alternative solution.

To sole this problem we have to decouple query creation and response processing from request send.
Currently all those three things are hidden in JavaNetHttpPollingClient under queryAndProcess and processHttpResponse methods.

I think we can extract HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData); from queryAndProcess and then add two new methods to PollingClient interface:

  1. prepareQuery(RowData lookupData)
  2. processHttpResponse

After that we need to change the AsyncHttpTableLookupFunction::asyncLookup line 77 a little bit.

Instead what we have now:
future.completeAsync(() -> decorate.lookup(keyRow), pullingThreadPool);

We would need something like that:

SomeQueryObject query = decorate.prepareQuery(llookupRow);
future.completeAsync(() -> decorate.lookup(query), pullingThreadPool)

And later modify future.whenCompleteAsync so it will call decorate.processHttpResponse on one thread and follow up with current code.

This is a raw sketch, but I think this will solve the problem by ensuring that schema will be used only by one thread.
WDYT?
Do you want to implement this change @grzegorz8 ?

I spent some time refactoring the way you suggested, but I came to the conclusion this approach requires quite huge refactoring which does not bring much benefit in comparison to calling SerializationSchema in "synchronized" way. In both approaches serialization is the bottleneck with parallelism=1. So I propose to implement the simple approach as temporary solution, and in the future try to find the way to be able to run serialization concurrently in safe way.