ivi-ru/flink-clickhouse-sink

Does this support 1.4.* ?

apoorv007 opened this issue · 14 comments

Thanks for this project! I am considering giving this a try for flink-1.4.2.

Could you give me an idea of how well tested this is in terms of

  1. has this been deployed to production ?
  2. for how long ?
  3. amount of traffic handled?

Hello @apoorv007 ,

Thank you for the feedback!

This sink is working in production 24/7 for several weeks. We insert about 1 milliard records per day with the peak of about 200k records per second.

We use Flink 1.3.2 in our production, but we want to upgrade to 1.7.0 in January-February.
Also, we think about setting properties for Async Http Client (in this version it works with default properties).

About Flink 1.4.*: our experience says that you can have trouble with Netty version inside your application (because Async Http Client uses it), but we can resolve this through new version on maven central.

We would be very interested to know how it flink-clickhouse-sink will work in your application.

Hi, thanks for the detailed response. Unfortunately I would not be able to use this in my project, but the use case was the following:

We are using Apache Flink to aggregate data streams (coming from Kafka), and store them in Cassandra. At the same time, we wanted to fork the aggregated data stream to route it to Clickhouse, where some data from another application leaves (and hence it would be possible to do analytics in Clickhouse for both types of data). This is where this project would have been useful.

However, we decided to make the data stream fork at the kafka level itself i.e. before the streams enter Apache Flink, since unlike Cassandra, Clickhouse has advanced analytical capabilities (so no need to pre-aggregate).

This can only run on Flink1.3.2?
In My IDE, I Use Flink1.4.2\1.6.2\1.7.0 not 1.3.2,every time the program starts, the ClickhouseSink.close() method is called, which prevents data from being written to CK by ClickhouseWriter.

Hello @Aaronzk,

show some code, please.

Thanks for your response @mchernyakov
I'm new to Flink, please help. First, the code is work on Flink1.3.2. and I want to use Flink1.7,so I put the source code and dependencies to my project directly and change scala.version to 2.11 and flink.version to 1.7.0, but failed.

my code in a main function

       Properties pro = new Properties();
       pro.put("bootstrap.servers", "100.5.36.131:9092");
       pro.put("zookeeper.connect", "100.5.36.131:2181");
       Map<String, String> globalParameters = new HashMap<String, String>();
       globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_HOSTS, "100.5.106.153:8123");
       globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_USER, "default");
       globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_PASSWORD,"");
       // sink common
       globalParameters.put(ClickhouseSinkConsts.TIMEOUT_SEC,"60");

       globalParameters.put(ClickhouseSinkConsts.FAILED_RECORDS_PATH,"/tmp/flink");
       globalParameters.put(ClickhouseSinkConsts.NUM_WRITERS,"1");
       globalParameters.put(ClickhouseSinkConsts.NUM_RETRIES, "3");
       globalParameters.put(ClickhouseSinkConsts.QUEUE_MAX_CAPACITY,"5");

       // set global paramaters
       ParameterTool parameters = ParameterTool.fromMap(globalParameters);
       pro.put("group.id", "test");
       StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
       env.getConfig().setGlobalJobParameters(parameters);
       env.getConfig().setRestartStrategy(
               RestartStrategies.fixedDelayRestart(4, 10000));
       env.getConfig().disableSysoutLogging(); //设置日志不进行输出
   	// env.enableCheckpointing(5000);
       DataStream<String> sourceStream = env
               .addSource(new FlinkKafkaConsumer08<String>
                       ("test0", new SimpleStringSchema(),pro));

       Properties props = new Properties();
       props.put(ClickhouseSinkConsts.TARGET_TABLE_NAME, "test");
       props.put(ClickhouseSinkConsts.MAX_BUFFER_SIZE, "10");

       // build chain
       sourceStream.map(new MapFunction<String,String>() {
           public String map(String s) {
               String newS = s.replace(",", "','");
               return "('"+newS+"')";
           }
       })
               .addSink(new ClickhouseSink(props))
               .name("flinktest");
       sourceStream.print();
       env.execute();

my pom.xml, scala.version from 2.10 to 2.11 and flink.version from 1.3.2 to 1.7.0

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.aaron.flink</groupId>
    <artifactId>flink-clickhouse</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!-- Primary components -->
        <java.version>1.8</java.version>
        <scala.version>2.11</scala.version>
        <flink.version>1.7.0</flink.version>

        <!-- Secondary components -->
        <async.client.version>2.0.39</async.client.version>
        <logback.version>1.2.3</logback.version>
        <guava.version>23.0</guava.version>
        <typesafe.config.version>1.3.3</typesafe.config.version>
        <ch.driver.version>0.1.40</ch.driver.version>

        <!-- Testing components -->
        <junit.version>4.12</junit.version>
        <mockito.version>2.8.9</mockito.version>
        <awaitility.version>3.1.2</awaitility.version>
        <testcontainers.version>1.9.1</testcontainers.version>
        <hikari.version>2.3.8</hikari.version>
        <jmock.junit4.version>2.9.0</jmock.junit4.version>

        <!-- Plugins version -->
        <mvn.compiler.version>3.7.0</mvn.compiler.version>
    </properties>
    <dependencies>
        <!--<dependency>-->
            <!--<groupId>ru.ivi.opensource</groupId>-->
            <!--<artifactId>flink-clickhouse-sink</artifactId>-->
            <!--<version>1.0.0</version>-->
        <!--</dependency>-->

        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>${typesafe.config.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>

        <dependency>
            <groupId>org.asynchttpclient</groupId>
            <artifactId>async-http-client</artifactId>
            <version>${async.client.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>${ch.driver.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.awaitility</groupId>
            <artifactId>awaitility</artifactId>
            <version>${awaitility.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.jmock</groupId>
            <artifactId>jmock-junit4</artifactId>
            <version>${jmock.junit4.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <version>${hikari.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>clickhouse</artifactId>
            <version>${testcontainers.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>${mockito.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
            <scope>compile</scope>
        </dependency>

    </dependencies>

</project>

@Aaronzk , thanks for the code. And add the exception trace, please.

@mchernyakov I Found this case in debug mode, 10s after my program starts , it goes into the ClickhouseSink.close(), and during this 10s,there is no exception.

my log at info level as below

22:04:56.711 [main] INFO  o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
22:04:56.728 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Starting Flink Mini Cluster
22:04:57.127 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Starting Metrics Registry
22:04:57.165 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl  - No metrics reporter configured, no metrics will be exposed/reported.
22:04:57.339 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Starting RPC Service(s)
22:04:58.373 [flink-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
22:04:58.412 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Trying to start actor system at :0
22:04:58.634 [flink-metrics-2] INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
22:04:58.827 [flink-metrics-2] INFO  akka.remote.Remoting main - Starting remoting
22:04:59.048 [flink-metrics-2] INFO  akka.remote.Remoting main - Remoting started; listening on addresses :[akka.tcp://flink-metrics@10.5.106.220:54452]
22:04:59.055 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Actor system started at akka.tcp://flink-metrics@10.5.106.220:54452
22:04:59.058 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Starting high-availability services
22:04:59.198 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage directory C:\Users\Aaron\AppData\Local\Temp\blobStore-18d2822b-9aec-4af1-9f55-1fac71407ff1
22:04:59.207 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at 0.0.0.0:54453 - max concurrent requests: 50 - max backlog: 1000
22:04:59.208 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Starting ResourceManger
22:04:59.228 [main] INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager_c6f5111f-5c57-4d70-bab8-06a45aac77e3 .
22:04:59.278 [main] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.resourcemanager.StandaloneResourceManager@38f57b3d @ akka://flink/user/resourcemanager_c6f5111f-5c57-4d70-bab8-06a45aac77e3
22:04:59.299 [main] INFO  org.apache.flink.runtime.blob.PermanentBlobCache  - Created BLOB cache storage directory C:\Users\Aaron\AppData\Local\Temp\blobStore-0480fa43-9f28-446d-8a60-71f28bcb43d2
22:04:59.306 [flink-akka.actor.default-dispatcher-3] INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka://flink/user/resourcemanager_c6f5111f-5c57-4d70-bab8-06a45aac77e3 was granted leadership with fencing token 8a2fca632858249d49841cfad29a4206
22:04:59.307 [flink-akka.actor.default-dispatcher-3] INFO  o.a.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
22:04:59.311 [flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/resourcemanager_c6f5111f-5c57-4d70-bab8-06a45aac77e3 , session=49841cfa-d29a-4206-8a2f-ca632858249d
22:04:59.312 [main] INFO  org.apache.flink.runtime.blob.TransientBlobCache  - Created BLOB cache storage directory C:\Users\Aaron\AppData\Local\Temp\blobStore-9507de20-a83d-4940-b0fc-9fdc7cc739b1
22:04:59.312 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Starting 1 TaskManger(s)
22:04:59.322 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Starting TaskManager with ResourceID: bcb70e44-2234-414f-935c-5edfb4f0eef4
22:04:59.403 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary file directory 'C:\Users\Aaron\AppData\Local\Temp': total 120 GB, usable 71 GB (59.17% usable)
22:04:59.623 [main] INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 195 MB for network buffer pool (number of memory segments: 6257, bytes per segment: 32768).
22:04:59.631 [main] INFO  org.apache.flink.runtime.query.QueryableStateUtils  - Could not load Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.
22:04:59.631 [main] INFO  org.apache.flink.runtime.query.QueryableStateUtils  - Could not load Queryable State Server. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.
22:04:59.633 [main] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the network environment and its components.
22:04:59.635 [main] WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation  - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
22:04:59.635 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting managed memory to 0.7 of the currently free heap space (1226 MB), memory will be allocated lazily.
22:04:59.645 [main] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses directory C:\Users\Aaron\AppData\Local\Temp\flink-io-274efec7-2a39-4850-95b6-78865107ce82 for spill files.
22:04:59.660 [main] INFO  o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
22:04:59.672 [main] INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 .
22:04:59.692 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Connecting to ResourceManager akka://flink/user/resourcemanager_c6f5111f-5c57-4d70-bab8-06a45aac77e3(8a2fca632858249d49841cfad29a4206).
22:04:59.694 [main] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Start job leader service.
22:04:59.697 [main] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory C:\Users\Aaron\AppData\Local\Temp\flink-dist-cache-2c435fa5-dad6-4a75-8429-eb85cf430369
22:04:59.698 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Starting dispatcher rest endpoint.
22:04:59.726 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Resolved ResourceManager address, beginning registration
22:04:59.726 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Registration at ResourceManager attempt 1 (timeout=100ms)
22:04:59.743 [flink-akka.actor.default-dispatcher-2] INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID bcb70e44-2234-414f-935c-5edfb4f0eef4 (akka://flink/user/taskmanager_0) at ResourceManager
22:04:59.745 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Successful registration at resource manager akka://flink/user/resourcemanager_c6f5111f-5c57-4d70-bab8-06a45aac77e3 under registration id 21914c5d50bac7c62af6f4b82aeb959e.
22:04:59.786 [main] INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - Starting rest endpoint.
22:05:00.342 [main] WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils  - Log file environment variable 'log.file' is not set.
22:05:00.342 [main] WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils  - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'.
22:05:00.362 [main] INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath.
22:05:00.864 [main] INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - Rest endpoint listening at localhost:54472
22:05:00.865 [main] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint@703feacd @ http://localhost:54472
22:05:00.865 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Starting job dispatcher(s) for JobManger
22:05:00.866 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - http://localhost:54472 was granted leadership with leaderSessionID=d1d687c2-013f-41eb-9cf8-f677ebcbd26c
22:05:00.866 [flink-akka.actor.default-dispatcher-5] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader http://localhost:54472 , session=d1d687c2-013f-41eb-9cf8-f677ebcbd26c
22:05:00.885 [main] INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcherd440b0f6-efbb-49f3-9521-802d10ba01e7 .
22:05:00.916 [main] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.dispatcher.StandaloneDispatcher@b558294 @ akka://flink/user/dispatcherd440b0f6-efbb-49f3-9521-802d10ba01e7
22:05:00.917 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher akka://flink/user/dispatcherd440b0f6-efbb-49f3-9521-802d10ba01e7 was granted leadership with fencing token 96379f3b-e661-41d4-a165-02f24a49bd3b
22:05:00.917 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Flink Mini Cluster started successfully
22:05:00.932 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Recovering all persisted jobs.
22:05:00.938 [flink-akka.actor.default-dispatcher-5] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/dispatcherd440b0f6-efbb-49f3-9521-802d10ba01e7 , session=96379f3b-e661-41d4-a165-02f24a49bd3b
22:05:00.964 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting job 2a772d0f4f4465d9d6430867166f4d41 (Flink Streaming Job).
22:05:00.989 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_1 .
22:05:01.017 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Initializing job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41).
22:05:01.028 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=4, delayBetweenRestartAttempts=10000) for Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41).
22:05:01.034 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/5f3161a1-6caf-4701-8488-7cefd8e943f7 .
22:05:01.079 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via failover strategy: full graph restart
22:05:01.135 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Running initialization on master for job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41).
22:05:01.135 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Successfully ran initialization on master in 0 ms.
22:05:01.176 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
22:05:01.192 [flink-akka.actor.default-dispatcher-5] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmaster.JobManagerRunner@2cde553b @ akka://flink/user/jobmanager_1
22:05:01.193 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmaster.JobManagerRunner  - JobManager runner for job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41) was granted leadership with session id 295c3158-04be-42b5-9323-91a97bdd46bf at akka://flink/user/jobmanager_1.
22:05:01.196 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution of job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41)
22:05:01.197 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41) switched from state CREATED to RUNNING.
22:05:01.200 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (c9db0b40c129462f8af1fe08414dacd2) switched from CREATED to SCHEDULED.
22:05:01.214 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) switched from CREATED to SCHEDULED.
22:05:01.214 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (2f0aab54f7c4df4c4e6d16e66bfc24c7) switched from CREATED to SCHEDULED.
22:05:01.214 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (86bf593ef5017b87b2a83b7f24148706) switched from CREATED to SCHEDULED.
22:05:01.217 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Connecting to ResourceManager akka://flink/user/resourcemanager_c6f5111f-5c57-4d70-bab8-06a45aac77e3(8a2fca632858249d49841cfad29a4206)
22:05:01.223 [jobmanager-future-thread-1] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=295c3158-04be-42b5-9323-91a97bdd46bf
22:05:01.228 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved ResourceManager address, beginning registration
22:05:01.228 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Registration at ResourceManager attempt 1 (timeout=100ms)
22:05:01.231 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job manager 932391a97bdd46bf295c315804be42b5@akka://flink/user/jobmanager_1 for job 2a772d0f4f4465d9d6430867166f4d41.
22:05:01.235 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{4b7436f55e3d4db18219f1679e562575}]
22:05:01.239 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job manager 932391a97bdd46bf295c315804be42b5@akka://flink/user/jobmanager_1 for job 2a772d0f4f4465d9d6430867166f4d41.
22:05:01.241 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{06e0ad9636c70da6426e0e37d734730f}]
22:05:01.241 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager successfully registered at ResourceManager, leader id: 8a2fca632858249d49841cfad29a4206.
22:05:01.241 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{84808c931e4c814605c26f57d81180d1}]
22:05:01.241 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{441bdea88d1cff7b158aef3749291a01}]
22:05:01.242 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Requesting new slot [SlotRequestId{4b7436f55e3d4db18219f1679e562575}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
22:05:01.243 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 2a772d0f4f4465d9d6430867166f4d41 with allocation id AllocationID{f0158bce4624c653c4e42a978b993faa}.
22:05:01.243 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Requesting new slot [SlotRequestId{441bdea88d1cff7b158aef3749291a01}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
22:05:01.243 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Requesting new slot [SlotRequestId{06e0ad9636c70da6426e0e37d734730f}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
22:05:01.244 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Requesting new slot [SlotRequestId{84808c931e4c814605c26f57d81180d1}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
22:05:01.244 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Receive slot request AllocationID{f0158bce4624c653c4e42a978b993faa} for job 2a772d0f4f4465d9d6430867166f4d41 from resource manager with leader id 8a2fca632858249d49841cfad29a4206.
22:05:01.245 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 2a772d0f4f4465d9d6430867166f4d41 with allocation id AllocationID{d3ef57e5c801ff80c6621100668fcccb}.
22:05:01.245 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Allocated slot for AllocationID{f0158bce4624c653c4e42a978b993faa}.
22:05:01.245 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Add job 2a772d0f4f4465d9d6430867166f4d41 for job leader monitoring.
22:05:01.246 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 2a772d0f4f4465d9d6430867166f4d41 with allocation id AllocationID{24ae4ba4e24d3b81b881f714d6619d1c}.
22:05:01.246 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 2a772d0f4f4465d9d6430867166f4d41 with allocation id AllocationID{82fad8dd162681f2e3183f7bbca370df}.
22:05:01.247 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Receive slot request AllocationID{d3ef57e5c801ff80c6621100668fcccb} for job 2a772d0f4f4465d9d6430867166f4d41 from resource manager with leader id 8a2fca632858249d49841cfad29a4206.
22:05:01.247 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Allocated slot for AllocationID{d3ef57e5c801ff80c6621100668fcccb}.
22:05:01.247 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Add job 2a772d0f4f4465d9d6430867166f4d41 for job leader monitoring.
22:05:01.247 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Receive slot request AllocationID{24ae4ba4e24d3b81b881f714d6619d1c} for job 2a772d0f4f4465d9d6430867166f4d41 from resource manager with leader id 8a2fca632858249d49841cfad29a4206.
22:05:01.247 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Allocated slot for AllocationID{24ae4ba4e24d3b81b881f714d6619d1c}.
22:05:01.247 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Try to register at job manager akka://flink/user/jobmanager_1 with leader id 295c3158-04be-42b5-9323-91a97bdd46bf.
22:05:01.247 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Add job 2a772d0f4f4465d9d6430867166f4d41 for job leader monitoring.
22:05:01.248 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Receive slot request AllocationID{82fad8dd162681f2e3183f7bbca370df} for job 2a772d0f4f4465d9d6430867166f4d41 from resource manager with leader id 8a2fca632858249d49841cfad29a4206.
22:05:01.248 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Allocated slot for AllocationID{82fad8dd162681f2e3183f7bbca370df}.
22:05:01.248 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Try to register at job manager akka://flink/user/jobmanager_1 with leader id 295c3158-04be-42b5-9323-91a97bdd46bf.
22:05:01.248 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Add job 2a772d0f4f4465d9d6430867166f4d41 for job leader monitoring.
22:05:01.249 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Try to register at job manager akka://flink/user/jobmanager_1 with leader id 295c3158-04be-42b5-9323-91a97bdd46bf.
22:05:01.249 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Resolved JobManager address, beginning registration
22:05:01.250 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Resolved JobManager address, beginning registration
22:05:01.250 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Registration at JobManager attempt 1 (timeout=100ms)
22:05:01.250 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Resolved JobManager address, beginning registration
22:05:01.250 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Registration at JobManager attempt 1 (timeout=100ms)
22:05:01.252 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Successful registration at job manager akka://flink/user/jobmanager_1 for job 2a772d0f4f4465d9d6430867166f4d41.
22:05:01.252 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Successful registration at job manager akka://flink/user/jobmanager_1 for job 2a772d0f4f4465d9d6430867166f4d41.
22:05:01.253 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Establish JobManager connection for job 2a772d0f4f4465d9d6430867166f4d41.
22:05:01.257 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Offer reserved slots to the leader of job 2a772d0f4f4465d9d6430867166f4d41.
22:05:01.265 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (c9db0b40c129462f8af1fe08414dacd2) switched from SCHEDULED to DEPLOYING.
22:05:01.265 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (attempt #0) to 127.0.0.1
22:05:01.267 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable  - Activate slot AllocationID{f0158bce4624c653c4e42a978b993faa}.
22:05:01.267 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable  - Activate slot AllocationID{82fad8dd162681f2e3183f7bbca370df}.
22:05:01.267 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable  - Activate slot AllocationID{d3ef57e5c801ff80c6621100668fcccb}.
22:05:01.267 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable  - Activate slot AllocationID{24ae4ba4e24d3b81b881f714d6619d1c}.
22:05:01.269 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (2f0aab54f7c4df4c4e6d16e66bfc24c7) switched from SCHEDULED to DEPLOYING.
22:05:01.269 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (attempt #0) to 127.0.0.1
22:05:01.269 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (86bf593ef5017b87b2a83b7f24148706) switched from SCHEDULED to DEPLOYING.
22:05:01.269 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (attempt #0) to 127.0.0.1
22:05:01.269 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) switched from SCHEDULED to DEPLOYING.
22:05:01.269 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (attempt #0) to 127.0.0.1
22:05:01.292 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Received task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4).
22:05:01.295 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (c9db0b40c129462f8af1fe08414dacd2) switched from CREATED to DEPLOYING.
22:05:01.295 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Creating FileSystem stream leak safety net for task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (c9db0b40c129462f8af1fe08414dacd2) [DEPLOYING]
22:05:01.298 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Received task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4).
22:05:01.301 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (2f0aab54f7c4df4c4e6d16e66bfc24c7) switched from CREATED to DEPLOYING.
22:05:01.301 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Creating FileSystem stream leak safety net for task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (2f0aab54f7c4df4c4e6d16e66bfc24c7) [DEPLOYING]
22:05:01.304 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Received task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4).
22:05:01.308 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (86bf593ef5017b87b2a83b7f24148706) switched from CREATED to DEPLOYING.
22:05:01.308 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Creating FileSystem stream leak safety net for task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (86bf593ef5017b87b2a83b7f24148706) [DEPLOYING]
22:05:01.311 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (c9db0b40c129462f8af1fe08414dacd2) [DEPLOYING].
22:05:01.311 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (86bf593ef5017b87b2a83b7f24148706) [DEPLOYING].
22:05:01.310 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (2f0aab54f7c4df4c4e6d16e66bfc24c7) [DEPLOYING].
22:05:01.332 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Received task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4).
22:05:01.333 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Registering task at network: Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (c9db0b40c129462f8af1fe08414dacd2) [DEPLOYING].
22:05:01.337 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) switched from CREATED to DEPLOYING.
22:05:01.338 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Creating FileSystem stream leak safety net for task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) [DEPLOYING]
22:05:01.338 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) [DEPLOYING].
22:05:01.338 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Registering task at network: Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (2f0aab54f7c4df4c4e6d16e66bfc24c7) [DEPLOYING].
22:05:01.339 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Registering task at network: Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (86bf593ef5017b87b2a83b7f24148706) [DEPLOYING].
22:05:01.348 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Registering task at network: Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) [DEPLOYING].
22:05:01.363 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (86bf593ef5017b87b2a83b7f24148706) switched from DEPLOYING to RUNNING.
22:05:01.368 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) switched from DEPLOYING to RUNNING.
22:05:01.368 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  org.apache.flink.streaming.runtime.tasks.StreamTask  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
22:05:01.372 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (2f0aab54f7c4df4c4e6d16e66bfc24c7) switched from DEPLOYING to RUNNING.
22:05:01.372 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  org.apache.flink.streaming.runtime.tasks.StreamTask  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
22:05:01.373 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.streaming.runtime.tasks.StreamTask  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
22:05:01.373 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (86bf593ef5017b87b2a83b7f24148706) switched from DEPLOYING to RUNNING.
22:05:01.373 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) switched from DEPLOYING to RUNNING.
22:05:01.375 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (2f0aab54f7c4df4c4e6d16e66bfc24c7) switched from DEPLOYING to RUNNING.
22:05:01.383 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (c9db0b40c129462f8af1fe08414dacd2) switched from DEPLOYING to RUNNING.
22:05:01.383 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  org.apache.flink.streaming.runtime.tasks.StreamTask  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
22:05:01.385 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (c9db0b40c129462f8af1fe08414dacd2) switched from DEPLOYING to RUNNING.
22:05:01.703 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does not contain a setter for field topic
22:05:01.703 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
22:05:01.712 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does not contain a setter for field topic
22:05:01.713 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
22:05:01.715 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does not contain a setter for field topic
22:05:01.716 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
22:05:01.719 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does not contain a setter for field topic
22:05:01.719 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
22:05:01.791 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
22:05:01.791 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
22:05:01.791 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
22:05:01.791 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
22:05:03.014 [clickhouse-writer-0] INFO  r.i.o.f.applied.ClickhouseWriter$WriterTask  - Start writer task, id = 0
22:05:03.015 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  r.i.o.f.applied.ClickhouseSinkScheduledChecker  - Build Sink scheduled checker, timeout (sec) = 60
22:05:03.016 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  r.i.o.flinkclickhousesink.applied.ClickhouseSinkManager  - Build sink writer's manager. params = ClickhouseSinkCommonParams{clickhouseClusterSettings=ClickhouseClusterSettings{hostsWithPorts=[http://10.5.36.134:8123], credentials='', authorizationRequired=false, currentHostId=0}, failedRecordsPath='/tmp/flink', numWriters=1, queueMaxCapacity=5, timeout=60, maxRetries=3}
22:05:03.016 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  r.i.o.flinkclickhousesink.applied.ClickhouseSinkBuffer  - Instance Clickhouse Sink, target table = test, buffer size = 1
22:05:03.016 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  r.i.o.flinkclickhousesink.applied.ClickhouseSinkBuffer  - Instance Clickhouse Sink, target table = test, buffer size = 1
22:05:03.016 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  r.i.o.flinkclickhousesink.applied.ClickhouseSinkBuffer  - Instance Clickhouse Sink, target table = test, buffer size = 1
22:05:03.016 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  r.i.o.flinkclickhousesink.applied.ClickhouseSinkBuffer  - Instance Clickhouse Sink, target table = test, buffer size = 1
22:05:03.102 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  o.a.f.s.c.kafka.internals.Kafka08PartitionDiscoverer  - Trying to get topic metadata from broker 10.5.36.131:9092 in try 0/3
22:05:03.103 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  o.a.f.s.c.kafka.internals.Kafka08PartitionDiscoverer  - Trying to get topic metadata from broker 10.5.36.131:9092 in try 0/3
22:05:03.103 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  o.a.f.s.c.kafka.internals.Kafka08PartitionDiscoverer  - Trying to get topic metadata from broker 10.5.36.131:9092 in try 0/3
22:05:03.107 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.f.s.c.kafka.internals.Kafka08PartitionDiscoverer  - Trying to get topic metadata from broker 10.5.36.131:9092 in try 0/3
22:05:03.228 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 1 initially has no partitions to read from.
22:05:03.228 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 3 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='test0', partition=1}]
22:05:03.228 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 2 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='test0', partition=0}]
22:05:03.229 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='test0', partition=2}]
22:05:03.474 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - Starting
22:05:03.474 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - Starting
22:05:03.474 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - Starting
22:05:03.474 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - Starting
22:05:03.482 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
22:05:03.482 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:host.name=DESKTOP-IJA66DD
22:05:03.482 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.8.0_31
22:05:03.482 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle Corporation
22:05:03.482 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.home=D:\Java\jdk1.8.0_31\jre
22:05:03.482 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.class.path=D:\Java\jdk1.8.0_31\jre\lib\charsets.jar;D:\Java\jdk1.8.0_31\jre\lib\deploy.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\cldrdata.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\dnsns.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\jaccess.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\jfxrt.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\localedata.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\nashorn.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\sunec.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\sunmscapi.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk1.8.0_31\jre\lib\ext\zipfs.jar;D:\Java\jdk1.8.0_31\jre\lib\javaws.jar;D:\Java\jdk1.8.0_31\jre\lib\jce.jar;D:\Java\jdk1.8.0_31\jre\lib\jfr.jar;D:\Java\jdk1.8.0_31\jre\lib\jfxswt.jar;D:\Java\jdk1.8.0_31\jre\lib\jsse.jar;D:\Java\jdk1.8.0_31\jre\lib\management-agent.jar;D:\Java\jdk1.8.0_31\jre\lib\plugin.jar;D:\Java\jdk1.8.0_31\jre\lib\resources.jar;D:\Java\jdk1.8.0_31\jre\lib\rt.jar;C:\Users\Aaron\Desktop\clickhouse\flinkclickhouse\target\classes;C:\Users\Aaron\.m2\repository\com\typesafe\config\1.3.3\config-1.3.3.jar;C:\Users\Aaron\.m2\repository\com\google\guava\guava\23.0\guava-23.0.jar;C:\Users\Aaron\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Aaron\.m2\repository\com\google\errorprone\error_prone_annotations\2.0.18\error_prone_annotations-2.0.18.jar;C:\Users\Aaron\.m2\repository\com\google\j2objc\j2objc-annotations\1.1\j2objc-annotations-1.1.jar;C:\Users\Aaron\.m2\repository\org\codehaus\mojo\animal-sniffer-annotations\1.14\animal-sniffer-annotations-1.14.jar;C:\Users\Aaron\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\Aaron\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\Aaron\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\Aaron\.m2\repository\org\asynchttpclient\async-http-client\2.0.39\async-http-client-2.0.39.jar;C:\Users\Aaron\.m2\repository\org\asynchttpclient\async-http-client-netty-utils\2.0.39\async-http-client-netty-utils-2.0.39.jar;C:\Users\Aaron\.m2\repository\io\netty\netty-buffer\4.0.56.Final\netty-buffer-4.0.56.Final.jar;C:\Users\Aaron\.m2\repository\io\netty\netty-codec-http\4.0.56.Final\netty-codec-http-4.0.56.Final.jar;C:\Users\Aaron\.m2\repository\io\netty\netty-codec\4.0.56.Final\netty-codec-4.0.56.Final.jar;C:\Users\Aaron\.m2\repository\io\netty\netty-handler\4.0.56.Final\netty-handler-4.0.56.Final.jar;C:\Users\Aaron\.m2\repository\io\netty\netty-transport\4.0.56.Final\netty-transport-4.0.56.Final.jar;C:\Users\Aaron\.m2\repository\io\netty\netty-transport-native-epoll\4.0.56.Final\netty-transport-native-epoll-4.0.56.Final-linux-x86_64.jar;C:\Users\Aaron\.m2\repository\io\netty\netty-common\4.0.56.Final\netty-common-4.0.56.Final.jar;C:\Users\Aaron\.m2\repository\org\asynchttpclient\netty-resolver-dns\2.0.39\netty-resolver-dns-2.0.39.jar;C:\Users\Aaron\.m2\repository\org\asynchttpclient\netty-resolver\2.0.39\netty-resolver-2.0.39.jar;C:\Users\Aaron\.m2\repository\org\asynchttpclient\netty-codec-dns\2.0.39\netty-codec-dns-2.0.39.jar;C:\Users\Aaron\.m2\repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;C:\Users\Aaron\.m2\repository\com\typesafe\netty\netty-reactive-streams\1.0.8\netty-reactive-streams-1.0.8.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.7.0\flink-streaming-java_2.11-1.7.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-runtime_2.11\1.7.0\flink-runtime_2.11-1.7.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-core\1.7.0\flink-core-1.7.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-annotations\1.7.0\flink-annotations-1.7.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-metrics-core\1.7.0\flink-metrics-core-1.7.0.jar;C:\Users\Aaron\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\Aaron\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\Aaron\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-java\1.7.0\flink-java-1.7.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.7.0\flink-queryable-state-client-java_2.11-1.7.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-hadoop-fs\1.7.0\flink-hadoop-fs-1.7.0.jar;C:\Users\Aaron\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-5.0\flink-shaded-netty-4.1.24.Final-5.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-shaded-asm\5.0.4-5.0\flink-shaded-asm-5.0.4-5.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-shaded-jackson\2.7.9-5.0\flink-shaded-jackson-2.7.9-5.0.jar;C:\Users\Aaron\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\Aaron\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\Aaron\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\Aaron\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;C:\Users\Aaron\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\Aaron\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;C:\Users\Aaron\.m2\repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;C:\Users\Aaron\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;C:\Users\Aaron\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;C:\Users\Aaron\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\Aaron\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\Aaron\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Aaron\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\Aaron\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-clients_2.11\1.7.0\flink-clients_2.11-1.7.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-optimizer_2.11\1.7.0\flink-optimizer_2.11-1.7.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-5.0\flink-shaded-guava-18.0-5.0.jar;C:\Users\Aaron\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\force-shading\1.7.0\force-shading-1.7.0.jar;C:\Users\Aaron\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\Aaron\.m2\repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;C:\Users\Aaron\.m2\repository\org\javassist\javassist\3.18.2-GA\javassist-3.18.2-GA.jar;C:\Users\Aaron\.m2\repository\org\apache\commons\commons-compress\1.16.1\commons-compress-1.16.1.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-connector-kafka-0.8_2.11\1.7.0\flink-connector-kafka-0.8_2.11-1.7.0.jar;C:\Users\Aaron\.m2\repository\org\apache\flink\flink-connector-kafka-base_2.11\1.7.0\flink-connector-kafka-base_2.11-1.7.0.jar;C:\Users\Aaron\.m2\repository\org\apache\kafka\kafka_2.11\0.8.2.2\kafka_2.11-0.8.2.2.jar;C:\Users\Aaron\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.2\scala-xml_2.11-1.0.2.jar;C:\Users\Aaron\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\Aaron\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.2\scala-parser-combinators_2.11-1.0.2.jar;C:\Users\Aaron\.m2\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar;C:\Users\Aaron\.m2\repository\org\apache\kafka\kafka-clients\0.8.2.2\kafka-clients-0.8.2.2.jar;C:\Users\Aaron\.m2\repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;C:\Users\Aaron\.m2\repository\org\slf4j\slf4j-log4j12\1.6.1\slf4j-log4j12-1.6.1.jar;C:\Users\Aaron\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\Aaron\.m2\repository\io\netty\netty\3.7.0.Final\netty-3.7.0.Final.jar;C:\Users\Aaron\.m2\repository\log4j\log4j\1.2.16\log4j-1.2.16.jar;C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2018.2.2\lib\idea_rt.jar
22:05:03.487 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.library.path=D:\Java\jdk1.8.0_31\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Windows\System32\OpenSSH\;C:\Program Files\TortoiseSVN\bin;C:\Users\Aaron\AppData\Local\Microsoft\WindowsApps;;.
22:05:03.487 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.io.tmpdir=C:\Users\Aaron\AppData\Local\Temp\
22:05:03.487 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.compiler=<NA>
22:05:03.487 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:os.name=Windows 8.1
22:05:03.487 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:os.arch=amd64
22:05:03.487 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:os.version=6.3
22:05:03.487 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:user.name=Aaron
22:05:03.487 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:user.home=C:\Users\Aaron
22:05:03.487 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:user.dir=C:\Users\Aaron\Desktop\clickhouse\flinkclickhouse
22:05:03.488 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating client connection, connectString=10.5.36.131:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@495b738f
22:05:03.488 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating client connection, connectString=10.5.36.131:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@636e1ec4
22:05:03.488 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating client connection, connectString=10.5.36.131:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@64313bb5
22:05:03.488 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating client connection, connectString=10.5.36.131:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@4361bc9b
22:05:03.509 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 10.5.36.131/10.5.36.131:2181. Will not attempt to authenticate using SASL (unknown error)
22:05:03.509 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 10.5.36.131/10.5.36.131:2181. Will not attempt to authenticate using SASL (unknown error)
22:05:03.510 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 10.5.36.131/10.5.36.131:2181. Will not attempt to authenticate using SASL (unknown error)
22:05:03.509 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 10.5.36.131/10.5.36.131:2181. Will not attempt to authenticate using SASL (unknown error)
22:05:03.512 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to 10.5.36.131/10.5.36.131:2181, initiating session
22:05:03.512 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to 10.5.36.131/10.5.36.131:2181, initiating session
22:05:03.512 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to 10.5.36.131/10.5.36.131:2181, initiating session
22:05:03.512 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to 10.5.36.131/10.5.36.131:2181, initiating session
22:05:03.515 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Starting periodic offset committer, with commit interval of 60000ms
22:05:03.521 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 10.5.36.131/10.5.36.131:2181, sessionid = 0x36812fb22b30da9, negotiated timeout = 60000
22:05:03.521 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 10.5.36.131/10.5.36.131:2181, sessionid = 0x36812fb22b30daa, negotiated timeout = 60000
22:05:03.521 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 10.5.36.131/10.5.36.131:2181, sessionid = 0x36812fb22b30dac, negotiated timeout = 60000
22:05:03.521 [Source: Custom Source -> (10.5.36.131:2181)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 10.5.36.131/10.5.36.131:2181, sessionid = 0x36812fb22b30dab, negotiated timeout = 60000
22:05:03.531 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)-EventThread] INFO  o.a.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State change: CONNECTED
22:05:03.531 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)-EventThread] INFO  o.a.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State change: CONNECTED
22:05:03.531 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)-EventThread] INFO  o.a.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State change: CONNECTED
22:05:03.531 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)-EventThread] INFO  o.a.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State change: CONNECTED
22:05:03.571 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Starting periodic offset committer, with commit interval of 60000ms
22:05:03.572 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Starting periodic offset committer, with commit interval of 60000ms
22:05:03.572 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Starting periodic offset committer, with commit interval of 60000ms
22:05:03.573 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Assigning 1 partitions to broker threads
22:05:03.573 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Assigning 1 partitions to broker threads
22:05:03.573 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Refreshing leader information for partitions [Partition: KafkaTopicPartition{topic='test0', partition=1}, KafkaPartitionHandle=[test0,1], offset=61]
22:05:03.573 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Refreshing leader information for partitions [Partition: KafkaTopicPartition{topic='test0', partition=2}, KafkaPartitionHandle=[test0,2], offset=61]
22:05:03.573 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Assigning 1 partitions to broker threads
22:05:03.573 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Refreshing leader information for partitions [Partition: KafkaTopicPartition{topic='test0', partition=0}, KafkaPartitionHandle=[test0,0], offset=60]
22:05:03.575 [Thread-13] INFO  o.a.f.s.c.kafka.internals.Kafka08PartitionDiscoverer  - Trying to get topic metadata from broker 10.5.36.131:9092 in try 0/3
22:05:03.575 [Thread-14] INFO  o.a.f.s.c.kafka.internals.Kafka08PartitionDiscoverer  - Trying to get topic metadata from broker 10.5.36.131:9092 in try 0/3
22:05:03.575 [Thread-15] INFO  o.a.f.s.c.kafka.internals.Kafka08PartitionDiscoverer  - Trying to get topic metadata from broker 10.5.36.131:9092 in try 0/3
22:05:03.583 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Starting thread SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-161 (lakala4:9092)
22:05:03.583 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Starting thread SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-163 (lakala2:9092)
22:05:03.583 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - Starting thread SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-162 (lakala1:9092)
22:05:03.583 [SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-161 (lakala4:9092)] INFO  o.a.f.s.connectors.kafka.internals.SimpleConsumerThread  - Starting to fetch from [Partition: KafkaTopicPartition{topic='test0', partition=0}, KafkaPartitionHandle=[test0,0], offset=60]
22:05:03.583 [SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-162 (lakala1:9092)] INFO  o.a.f.s.connectors.kafka.internals.SimpleConsumerThread  - Starting to fetch from [Partition: KafkaTopicPartition{topic='test0', partition=1}, KafkaPartitionHandle=[test0,1], offset=61]
22:05:03.585 [SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-163 (lakala2:9092)] INFO  o.a.f.s.connectors.kafka.internals.SimpleConsumerThread  - Starting to fetch from [Partition: KafkaTopicPartition{topic='test0', partition=2}, KafkaPartitionHandle=[test0,2], offset=61]
22:05:03.596 [SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-163 (lakala2:9092)] INFO  o.a.f.s.connectors.kafka.internals.SimpleConsumerThread  - Starting to consume 1 partitions with consumer thread SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-163 (lakala2:9092)
22:05:03.596 [SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-161 (lakala4:9092)] INFO  o.a.f.s.connectors.kafka.internals.SimpleConsumerThread  - Starting to consume 1 partitions with consumer thread SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-161 (lakala4:9092)
22:05:03.596 [SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-162 (lakala1:9092)] INFO  o.a.f.s.connectors.kafka.internals.SimpleConsumerThread  - Starting to consume 1 partitions with consumer thread SimpleConsumer - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) - broker-162 (lakala1:9092)
22:05:08.518 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  o.a.f.streaming.connectors.kafka.internals.Kafka08Fetcher  - All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher
22:05:08.519 [Curator-Framework-0] INFO  o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - backgroundOperationsLoop exiting
22:05:08.524 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 0x36812fb22b30da9 closed
22:05:08.524 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)-EventThread] INFO  o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - EventThread shut down for session: 0x36812fb22b30da9
22:05:08.524 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  r.i.opensource.flinkclickhousesink.applied.ClickhouseWriter  - Closing clickhouse-writer...
22:05:08.539 [clickhouse-writer-0] INFO  r.i.o.f.applied.ClickhouseWriter$WriterTask  - Task id = 0 is finished
22:05:08.541 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  r.i.opensource.flinkclickhousesink.applied.ClickhouseWriter  - ClickhouseWriter is closed
22:05:08.546 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) switched from RUNNING to FINISHED.
22:05:08.546 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf).
22:05:08.546 [Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) [FINISHED]
22:05:08.547 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) da92b6ea272600e28044bae42fae15bf.
22:05:08.556 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (da92b6ea272600e28044bae42fae15bf) switched from RUNNING to FINISHED.
22:05:10.969 [main] INFO  org.apache.flink.runtime.minicluster.MiniCluster  - Shutting down Flink Mini Cluster
22:05:10.970 [main] INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - Shutting down rest endpoint.
22:05:10.971 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Stopping TaskExecutor akka://flink/user/taskmanager_0.
22:05:10.973 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
22:05:10.975 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (2f0aab54f7c4df4c4e6d16e66bfc24c7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
22:05:10.976 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
	at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
	at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
	at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:374)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
22:05:10.991 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (c9db0b40c129462f8af1fe08414dacd2) switched from RUNNING to CANCELING.
22:05:10.993 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager removed spill file directory C:\Users\Aaron\AppData\Local\Temp\flink-io-274efec7-2a39-4850-95b6-78865107ce82
22:05:10.994 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down the network environment and its components.
22:05:10.997 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (86bf593ef5017b87b2a83b7f24148706) switched from RUNNING to CANCELING.
22:05:11.000 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (c9db0b40c129462f8af1fe08414dacd2) switched from CANCELING to CANCELED.
22:05:11.000 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (86bf593ef5017b87b2a83b7f24148706) switched from CANCELING to CANCELED.
22:05:11.000 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Try to restart or fail the job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41) if no longer possible.
22:05:11.000 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41) switched from state FAILING to RESTARTING.
22:05:11.000 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Restarting the job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41).
22:05:11.000 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.resourcemanager.slotmanager.SlotManager  - Closing the SlotManager.
22:05:11.000 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.resourcemanager.slotmanager.SlotManager  - Suspending the SlotManager.
22:05:11.004 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Close ResourceManager connection b42b9528e1dcd2ad2919bb1fcc08cc9c: ResourceManager leader changed to new address null.
22:05:11.005 [ForkJoinPool.commonPool-worker-1] INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - Removing cache directory C:\Users\Aaron\AppData\Local\Temp\flink-web-ui
22:05:11.005 [ForkJoinPool.commonPool-worker-1] INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - Shut down complete.
22:05:11.011 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Stop job leader service.
22:05:11.017 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Stopped TaskExecutor akka://flink/user/taskmanager_0.
22:05:21.006 [jobmanager-future-thread-3] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41) switched from state RESTARTING to CREATED.
22:05:21.006 [jobmanager-future-thread-3] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink Streaming Job (2a772d0f4f4465d9d6430867166f4d41) switched from state CREATED to RUNNING.
22:05:21.006 [jobmanager-future-thread-3] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (1/4) (40c3680b6274bcdbe8862996f32d0c6a) switched from CREATED to SCHEDULED.
22:05:21.006 [jobmanager-future-thread-3] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (2/4) (2c544fcbfc5c4d105db8d81a8c07fd2a) switched from CREATED to SCHEDULED.
22:05:21.006 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{601710a9b2df98e6d52c74e40e3fa635}]
22:05:21.007 [jobmanager-future-thread-3] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (3/4) (65b34f8e8d6000da72f9acdf64172aea) switched from CREATED to SCHEDULED.
22:05:21.007 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{da2537e683df11cf7d1fd0a66f4034ed}]
22:05:21.007 [jobmanager-future-thread-3] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> (Map -> Sink: flinktest, Sink: Print to Std. Out) (4/4) (3f2b818a2f4850d6d0480df6a2cfb955) switched from CREATED to SCHEDULED.
22:05:21.007 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{6fd69d88e0eb8b16a36bbc09b74a25ea}]
22:05:21.007 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{8d6b3bcd1046f9dc9eceff0d0b75d5a1}]

@Aaronzk on the first view I do not see anything suspicious.

Did I correctly understand you: this code worked properly with 1.3.2 and has broken when you have decided to set 1.7.0?

ClickhouseSink.close() is calling when your application started to shutdowning. See https://github.com/ivi-ru/flink-clickhouse-sink/blob/master/src/main/java/ru/ivi/opensource/flinkclickhousesink/ClickhouseSink.java#L63. So, you need to find the reason why your application started to closing.

@mchernyakov
YES,this code worked properly with 1.3.2, and did not work with 1.7.0。

@Aaronzk
I think you should clone source, understand it and upgrade version into your existing environment, now I'm going to do this, my side flink version is 1.8

@mchernyakov
Hi, I'm going to clone the source code, and read after adding some function, such as: I'm going to add a trigger: at a fixed time and fixed size, which first come will trigger which. Such as: 10 k in 10 minutes, if the data in 10 minutes to reach 10 k immediately trigger to write, if the 10 minutes but the data is not so also triggers the written to reach 10 k.

@testmepro
Hi, if I understand correctly, you want to add trigger, which already exists.

To implement this scenario, you must set the parameters in the configuration as follows:
clickhouse.sink.queue-max-capacity = 10000
clickhouse.sink.timeout-sec = 600

@ashulenko
oh,I was on the analysis of the source code, I do not know whether his this setting can meet my needs, if can meet better,thank you !

@ashulenko Andrey, please, close this issue. We have discussed a lot of different things which are not related to the main topic. Thanks.