/flink-connector-mongo

flink连接mongo

Primary LanguageJavaApache License 2.0Apache-2.0

flink-connector-mongo

flink连接mongo

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // non-transactional sink with a flush strategy of 1000 documents or 10 seconds
    Properties properties = new Properties();
    properties.setProperty(MongoOptions.SINK_TRANSACTION_ENABLED, "false");
    properties.setProperty(MongoOptions.SINK_FLUSH_ON_CHECKPOINT, "false");
    properties.setProperty(MongoOptions.SINK_FLUSH_SIZE, String.valueOf(1_000L));
    properties.setProperty(MongoOptions.SINK_FLUSH_INTERVAL, String.valueOf(10_000L));

    env.addSource(...)
       .sinkTo(new MongoSink<>("mongodb://user:password@127.0.0.1:27017", "mydb", "mycollection",
                               new StringDocumentSerializer(), properties));

    env.execute();