ivi-ru/flink-clickhouse-sink

Bug report when restarting flink.

Closed this issue · 4 comments

I'd like to request fix in ClickHouseSink.java
I have tried to push PR but it is forbidden.

Here is environments.

  • Flink 1.9.3 on yarn.
  • Using commit "23609ad"

It can be simply regenerated by just killing one of task managers with kill -9 "java process of flink task manager" or similar commands.

Then the dead task manager replaced by new task manager and the other task manager try to restart job.

In this restart cycle. flink calls "ClickHouseSink.close" and then "ClickHouseSink.open".

But the line here
prevents SinkManager from doing initialization again.

So I added simple code to nullify singManager after teardown, then I have checked that next open code reinitialize sink manager again normall.y

Could you please check if my fix is right ? Thank you in advance.

        if (sinkManager != null) {
            if (!sinkManager.isClosed()) {
                synchronized (DUMMY_LOCK) {
                    if (!sinkManager.isClosed()) {
                        sinkManager.close();
                        // added line .
                        sinkManager = null;
                    }
                }
            }
        }

@moweonlee hi, thanks for finding that.
Do you have the same behavior with 1.8.*?

PS I wonder why you're not able to create a PR. Could you create a branch and push it ?

@mchernyakov

Sure I think I can try it with 1.8 Flink in couple of days too.

I have pushed PR here.

@mchernyakov

With

  • 1.8.3 flink and
  • 1.3.1 flink-clickhouse-sink

, I was able to build application and test it in same environment.

And the bug was reproduced in this environment too.

Sink operation is stopped because closed instance is not the "null".

And I also confirmed that fixed code solves this bug too.

First setup with ClickHouseSinkManager initialization.

2021-12-30 14:26:47.623 [Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (22/60)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.10.2.1
2021-12-30 14:26:47.623 [Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (22/60)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.10.2.1
2021-12-30 14:26:47.624 [Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (22/60)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : e89bffd6b2eff799
2021-12-30 14:26:47.624 [Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (22/60)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : e89bffd6b2eff799
2021-12-30 14:26:47.648 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> Map -> Sink: clickhouse-sink-url (18/30)] INFO  r.i.opensource.flinkclickhousesink.applied.ClickHouseWriter  - Building components
2021-12-30 14:26:47.653 [clickhouse-writer-0] INFO  r.i.o.f.applied.ClickHouseWriter$WriterTask  - Start writer task, id = 0
2021-12-30 14:26:47.654 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> Map -> Sink: clickhouse-sink-url (18/30)] INFO  r.i.o.f.applied.ClickHouseSinkScheduledCheckerAndCleaner  - Build Sink scheduled checker, timeout (sec) = 5
2021-12-30 14:26:47.654 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> Map -> Sink: clickhouse-sink-url (18/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkManager  - Build sink writer's manager. params = ClickHouseSinkCommonParams{clickHouseClusterSettings=ClickHouseClusterSettings{hostsWithPorts=[http://xkr03.ace.nfra.io:8123], credentials='', authorizationRequired=false, currentHostId=0}, failedRecordsPath='/home1/irteam/flink-sink-dropped', numWriters=1, queueMaxCapacity=40, ignoringClickHouseSendingExceptionEnabled=true, timeout=5, maxRetries=30}
2021-12-30 14:26:47.655 [Window(TumblingEventTimeWindows(20000), EventTimeTrigger, LCSSTMSSideOutputProcessor) -> Map -> Sink: clickhouse-late-sink-stms (18/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000
2021-12-30 14:26:47.655 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> (Map -> Sink: clickhouse-sink-stms, Timestamps/Watermarks) (7/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000
2021-12-30 14:26:47.655 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> Map -> Sink: clickhouse-sink-url (18/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_url_array_basic, buffer size = 6000
2021-12-30 14:26:47.655 [Window(TumblingEventTimeWindows(20000), EventTimeTrigger, LCSSTMSSideOutputProcessor) -> Map -> Sink: clickhouse-late-sink-stms (13/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000
2021-12-30 14:26:47.655 [Window(TumblingEventTimeWindows(20000), EventTimeTrigger, LCSSTMSSideOutputProcessor) -> Map -> Sink: clickhouse-late-sink-stms (1/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000

Failing message with closing operation of ClickHouseSinkManager. Then it goes to the restart cycle.

2021-12-30 14:28:51.267 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> (Map -> Sink: clickhouse-sink-stms, Timestamps/Watermarks) (18/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkManager  - ClickHouse sink manager is shutting down.
2021-12-30 14:28:51.267 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> Map -> Sink: clickhouse-sink-url (7/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - ClickHouse sink buffer shutdown complete.
...
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

Restarting procecedure without reinitialization of ClickHouseSinkManager.

2021-12-30 14:29:08.216 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (6/60)] WARN  org.apache.kafka.clients.ClientUtils  - Removing server akafka003.logm.aio.nfra.io:9092 from bootstrap.servers as DNS resolution failed for akafka003.logm.aio.nfra.io
2021-12-30 14:29:08.216 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (6/60)] WARN  org.apache.kafka.clients.ClientUtils  - Removing server akafka003.logm.aio.nfra.io:9092 from bootstrap.servers as DNS resolution failed for akafka003.logm.aio.nfra.io
2021-12-30 14:29:08.218 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (6/60)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.10.2.1
2021-12-30 14:29:08.218 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (6/60)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.10.2.1
2021-12-30 14:29:08.218 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (6/60)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : e89bffd6b2eff799
2021-12-30 14:29:08.218 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (6/60)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : e89bffd6b2eff799
2021-12-30 14:29:09.111 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> (Map -> Sink: clickhouse-sink-stms, Timestamps/Watermarks) (13/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000
2021-12-30 14:29:09.148 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> (Map -> Sink: clickhouse-sink-stms, Timestamps/Watermarks) (28/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000
2021-12-30 14:29:09.177 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> (Map -> Sink: clickhouse-sink-stms, Timestamps/Watermarks) (7/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000
2021-12-30 14:29:09.719 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> (Map -> Sink: clickhouse-sink-stms, Timestamps/Watermarks) (19/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000
2021-12-30 14:29:10.301 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> (Map -> Sink: clickhouse-sink-stms, Timestamps/Watermarks) (18/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000
2021-12-30 14:29:10.434 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> (Map -> Sink: clickhouse-sink-stms, Timestamps/Watermarks) (6/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000
2021-12-30 14:29:12.903 [Window(TumblingEventTimeWindows(10000), EventTimeTrigger, LCSSTMSProcessor) -> (Map -> Sink: clickhouse-sink-stms, Timestamps/Watermarks) (1/30)] INFO  r.i.o.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = dlcs_stms_array_basic, buffer size = 6000
2021-12-30 14:29:13.546 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (4/60)] INFO  o.a.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator akafka002.logm.nfra.io:9092 (id: 2147483646 rack: null) for group real-lcs57f4ab2f-74a1-4896-89b2-431a8743578f.
2021-12-30 14:29:13.546 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (4/60)] INFO  o.a.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator akafka002.logm.nfra.io:9092 (id: 2147483646 rack: null) for group real-lcs57f4ab2f-74a1-4896-89b2-431a8743578f.
2021-12-30 14:29:13.552 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (44/60)] INFO  o.a.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator akafka002.logm.nfra.io:9092 (id: 2147483646 rack: null) for group real-lcs57f4ab2f-74a1-4896-89b2-431a8743578f.
2021-12-30 14:29:13.552 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (44/60)] INFO  o.a.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator akafka002.logm.nfra.io:9092 (id: 2147483646 rack: null) for group real-lcs57f4ab2f-74a1-4896-89b2-431a8743578f.
2021-12-30 14:29:13.552 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (22/60)] INFO  o.a.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator akafka002.logm.nfra.io:9092 (id: 2147483646 rack: null) for group real-lcs57f4ab2f-74a1-4896-89b2-431a8743578f.
2021-12-30 14:29:13.552 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (22/60)] INFO  o.a.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator akafka002.logm.nfra.io:9092 (id: 2147483646 rack: null) for group real-lcs57f4ab2f-74a1-4896-89b2-431a8743578f.
2021-12-30 14:29:13.600 [Kafka 0.10 Fetcher for Source: Custom Source -> Flat Map -> Timestamps/Watermarks -> Process (45/60)] INFO  o.a.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator akafka002.logm.nfra.io:9092 (id: 2147483646 rack: null) for group real-lcs57f4ab2f-74a1-4896-89b2-431a8743578f.

the PR has been merged, pls check version 1.3.2.

Many thanks.