ivi-ru/flink-clickhouse-sink

The TIMEOUT_SEC parameter does not seem to take effect

Closed this issue · 2 comments

I build up ClickHouseSink according to the documentation as follows:

    Map<String, String> ckSetting = new HashMap<>();
    ckSetting.put(CLICKHOUSE_HOSTS, ckGlobalProperties.getProperty("ck.hosts"));
    ckSetting.put(CLICKHOUSE_USER, ckGlobalProperties.getProperty("ck.user"));
    ckSetting.put(CLICKHOUSE_PASSWORD, ckGlobalProperties.getProperty("ck.pass"));
    ckSetting.put(TIMEOUT_SEC, "60");
    ckSetting.put(NUM_WRITERS, "1");
    ckSetting.put(NUM_RETRIES, "3");
    ckSetting.put(QUEUE_MAX_CAPACITY, "10");
    ckSetting.put(FAILED_RECORDS_PATH, "/tmp");
    ckSetting.put(IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");

    ParameterTool ckParams = ParameterTool.fromMap(ckSetting);
    env.getConfig().setGlobalJobParameters(ckParams);

    Properties ckSinkProperties = new Properties();
    ckSinkProperties.put(TARGET_TABLE_NAME, CLICKHOUSE_DWD_ORDER_DONE_LOG);
    ckSinkProperties.put(MAX_BUFFER_SIZE, "1000");

    orderCsvStream
      .addSink(new ClickHouseSink(ckSinkProperties))
      .setParallelism(5);

According to my understanding (and the source code of this project), the batch should be flushed in at most TIMEOUT_SEC seconds, even though there is not enough records to reach the MAX_BUFFER_SIZE limit.

However, what I observed is that the batches are always flushing by MAX_BUFFER_SIZE, resulting in very infrequent sinking when data flow is low. Logs are shown below:

21-01-24 02:46:04 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseSinkScheduledCheckerAndCleaner  - Build Sink scheduled checker, timeout (sec) = 60
21-01-24 02:46:04 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseSinkManager  - Build sink writer's manager. params = ClickHouseSinkCommonParams{clickHouseClusterSettings=ClickHouseClusterSettings{hostsWithPorts=[..............], credentials='ZGVmYXVsdDpzaHRjazIwMjA=', authorizationRequired=true, currentHostId=0}, failedRecordsPath='/tmp', numWriters=1, queueMaxCapacity=10, ignoringClickHouseSendingExceptionEnabled=false, timeout=60, maxRetries=3}
21-01-24 02:46:04 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseSinkBuffer  - Instance ClickHouse Sink, target table = rtdw_dwd.order_done_log, buffer size = 1000
21-01-24 02:46:04 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Ready to load data to rtdw_dwd.order_done_log, size = 1000
21-01-24 02:46:04 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Ready to load data to rtdw_dwd.order_done_log, size = 1000
21-01-24 02:46:05 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Successful send data to ClickHouse, batch size = 1000, target table = rtdw_dwd.order_done_log, current attempt = 0
21-01-24 02:46:05 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Successful send data to ClickHouse, batch size = 1000, target table = rtdw_dwd.order_done_log, current attempt = 0
21-01-24 03:00:43 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Ready to load data to rtdw_dwd.order_done_log, size = 1000
21-01-24 03:00:43 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Successful send data to ClickHouse, batch size = 1000, target table = rtdw_dwd.order_done_log, current attempt = 0
21-01-24 03:31:30 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Ready to load data to rtdw_dwd.order_done_log, size = 1000
21-01-24 03:31:30 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Successful send data to ClickHouse, batch size = 1000, target table = rtdw_dwd.order_done_log, current attempt = 0
21-01-24 04:03:33 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Ready to load data to rtdw_dwd.order_done_log, size = 1000
21-01-24 04:03:33 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Successful send data to ClickHouse, batch size = 1000, target table = rtdw_dwd.order_done_log, current attempt = 0
21-01-24 04:32:34 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Ready to load data to rtdw_dwd.order_done_log, size = 1000
21-01-24 04:32:34 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Successful send data to ClickHouse, batch size = 1000, target table = rtdw_dwd.order_done_log, current attempt = 0
21-01-24 04:58:05 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Ready to load data to rtdw_dwd.order_done_log, size = 1000
21-01-24 04:58:05 INFO  ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseWriter$WriterTask  - Successful send data to ClickHouse, batch size = 1000, target table = rtdw_dwd.order_done_log, current attempt = 0

What do you think is the problem? Many thanks.

@lmagic233 Reproduced this case, thanks. This bug appeared in 1.2.0 version. We'll fix it soon.

Fixed in version 1.3.0