ivi-ru/flink-clickhouse-sink

WriterTask thread will be closed one by one.

Opened this issue · 6 comments

Thanks for your work!
I encounter a problem!After the program runs for a while, I found that ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask will be closed one by one;

I checked the log:
There are a large number of Task id = 10 is finished

2022-11-04 10:42:28,128 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 9 is finished
2022-11-04 10:42:32,783 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Ready to load data to monitor.qunhe_log, size = 100000
2022-11-04 10:42:32,783 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Ready to load data to monitor.qunhe_log, size = 100000
2022-11-04 10:42:34,532 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 1 is finished
2022-11-04 10:42:35,861 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Ready to load data to monitor.qunhe_log, size = 100000
2022-11-04 10:42:39,153 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 12 is finished
2022-11-04 10:42:40,263 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 18 is finished
2022-11-04 10:42:46,699 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 3 is finished
2022-11-04 10:42:47,717 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 10 is finished
2022-11-04 10:42:53,492 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 6 is finished
2022-11-04 10:42:55,086 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 8 is finished
2022-11-04 10:42:59,064 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 2 is finished
2022-11-04 10:43:00,173 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask [] - Task id = 19 is finished

Then I looked up the clickhouse sink source, where OOM happened

     @Override
        public void run() {
            try {
                isWorking = true;

                logger.info("Start writer task, id = {}", id);
                while (isWorking || queue.size() > 0) {
                    ClickHouseRequestBlank blank = queue.poll(300, TimeUnit.MILLISECONDS);
                    if (blank != null) {
                        CompletableFuture<Boolean> future = new CompletableFuture<>();
                        futures.add(future);
                        send(blank, future);
                    }
                }
// Can't catch Throwable for OOM
            } catch (Exception e) {
                logger.error("Error while inserting data", e);
                throw new RuntimeException(e);
            } finally {
                logger.info("Task id = {} is finished", id);
            }
        }

I added catch Throwable to this piece of code,arise java.lang.OutOfMemoryError: Direct buffer memory

java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Direct buffer memory
	at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
	at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:201) ~[job.jar:?]
	at cksink.applied.ClickHouseWriter$WriterTask.lambda$responseCallback$0(ClickHouseWriter.java:219) ~[job.jar:?]
	at org.asynchttpclient.netty.NettyResponseFuture.lambda$addListener$0(NettyResponseFuture.java:294) ~[job.jar:?]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) [?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) [?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
	at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
	at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
	at java.nio.DirectByteBuffer.<init>(Unknown Source) ~[?:?]
	at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758) ~[job.jar:?]
	at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:748) ~[job.jar:?]
	at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:260) ~[job.jar:?]
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:232) ~[job.jar:?]
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:147) ~[job.jar:?]

Then I turned up flink's out-of-heap memory taskmanager.memory.task.off-heap.size from 512MB to 1GB,have no effect.

I am confused now, can you give me some advice? thanks.

@ChaoHsupin hi, I suppose it's related to netty's off-heap size from async HTTP client:

....
at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:201) ~[job.jar:?]
....

At the moment there is no support for such a configuration. If you want, you could create a PR, which supports configurable http-client +

} catch (Throwable e) {

which you noticed already.

@ChaoHsupin hi, I suppose it's related to netty's off-heap size from async HTTP client:

....
at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:201) ~[job.jar:?]
....

At the moment there is no support for such a configuration. If you want, you could create a PR, which supports configurable http-client +

} catch (Throwable e) {

which you noticed already.

@mchernyakov Thanks for your advice !
I temporarily used Apache HttpClient to replace Netty AsyncHttpClient, the problems has resolved. and later I will focus on the solutions to netty OOM.

@mchernyakov
Hi! I have some views:
Now design:
image

Some of my points:

  • If async http client are used, the existence of multiple threads in WriterTask is unnecessary
  • Upstream data can be sent to async http client by calling WriterTask always, So off-heap will be accumulation, it can only be OOM;
  • So, the clickhouse sink feedback pressure to upstream is difficult;
  • Whether to use sync http client in writerTask would be a good idea?

@ChaoHsupin hi,

the initial idea of having such a design was to be able to buffer quite some amount of events before writing to ClickHouse.
For example, if you have in your app only 1k events per second, it's better to make 1 call with a batch with 10k rather than 10 times with 1k events.
Also, we can release the flink-worker thread quickly, to give it the ability to process events faster.

For such a case we went for that design. The common queue and writers can help to achieve that.

Another possible approach with accumulation could be performing the http-call here https://github.com/ivi-ru/flink-clickhouse-sink/blob/master/src/main/java/ru/ivi/opensource/flinkclickhousesink/applied/ClickHouseSinkBuffer.java#L71 .
We can accumulate records in the Sinkbuffer and send the request from there.

I am personally open to changes.

So off-heap will be accumulation, it can only be OOM

could you pls elaborate on that? what do you mean exactly here?

FYI @aleksanchezz @ashulenko

@mchernyakov
I tend to use Sync httpClient to replace Async httpClient for avoid complex Netty problems. WriterTask had use pooling tech, it send http request one by one, satisfy it the ability to process events faster

@ChaoHsupin you could try to implement an alternative way to insert data to clickhouse via sync-client.
Please open a PR with your idea.