Implement Cache in Lookup Http Source
Closed this issue ยท 6 comments
Currently http lookup source will forward every lookup query to external system.
This can be easily a bottle neck for processing pipeline.
A remedy for this would be to implement cache similar how it is done in jdbc-connector's lookup source
This jbdc'c connector implementation uses Flink's 1.16 enhance Lookup Source interfaces that provide cache abstraction [1] and [2]
This ticket is about adding same enhancement based on [1] and [2] to http lookup source.
Verify if this is possible to still run this connector on Flink 1.15 without cache, if not drop 1.15 support.
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric
[2] Support Partial and Full Caching in Lookup Table Source
@kristoffSC we are interested in having caching. I was thinking the change would be quite minimal to the http client code base.
I was thinking of a very similar approach to the JDBC connector
- pass in the cache on the HttpLookupTableSource constructor
- then in the getLookupRuntimeProvider replace
if (lookupConfig.isUseAsync()) {
log.info("Using Async version of HttpLookupTable.");
return AsyncTableFunctionProvider.of(
new AsyncHttpTableLookupFunction(dataLookupFunction));
} else {
log.info("Using blocking version of HttpLookupTable.");
return TableFunctionProvider.of(dataLookupFunction);
}
with something like:
if (lookupConfig.isUseAsync()) {
log.info("Using Async version of HttpLookupTable.");
if (cache != null) {
new AsyncHttpTableLookupFunction(dataLookupFunction, cache));
} else {
return AsyncTableFunctionProvider.of(
new AsyncHttpTableLookupFunction(dataLookupFunction));
}
} else {
log.info("Using blocking version of HttpLookupTable.");
if (cache != null) {
return PartialCachingLookupProvider.of(dataLookupFunction, cache);
} else {
return TableFunctionProvider.of(dataLookupFunction);
}
}
}
And change AsyncHttpTableLookupFunction to accept a cache.
What do you think ? I am happy to code this up with some tests, but wanted to check that this sounded reasonable. I am not sure we would need Full caching.
Hi @davidradl
I'm very glad to hear from you :)
This is a super news, go ahead with the impl. The High level view you presented sounds good.
I wonder whether we should make cache as default or not, maybe not since maybe (I hope) there are already some systems that uses it and they may not expect to have cache there ;)
Also I was thinking that this is a time to drop 1.15 support.
In your PR you can remove 1.15 build and move to 1.16, 1.17 and 1.18.
Actually There is a problem with some tests on 1.17 -> #64
I think we should tackle this first. This seems like some dependency issue rather then real code/impl issue but maybe I'm wrong.
I may have some time to try work on issue 64 (I'm not promising anything) but if you want you can start that also.
Cheers.
@kristoffSC sounds good. I am glad to be able to do more with this connector. I had used the connector with Flink 1.18 for our use cases in the past.
It is likely going to be a couple of weeks or so before I an work on this - I need to clear other commitments first.
@kristoffSC I have not yet got to it. But still intend to!
@kristoffSC I have had a look at this - it is not as simple as I had thought. It seems that the HttpClientLookupFunction does not extend LookupFunction. LookupFunction has its own eval
and HttpClientLookup has its own eval
. The PartialCachingLookupProvider
is working on LookupFunction
s so cannot be used as is.
It seems to me ideally we want to be using the Flink provided CachingLookupFunction
and construct this with a delegate that overrides the lookup function
https://github.com/apache/flink/blob/46cbf22147d783fb68f77fad95161dc5ef036c96/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L46
In this way we pick up the caching from the framework.
Is there a consideration around the Http client design that would make this difficult ? I am checking with you before I dive in !
WDYT
@kristoffSC any thoughts on this one. If not I will attempt a refactor to use the Flink caching framework.