tillrohrmann/cep-monitoring

Calculate average temperature in Flink DataStream

Opened this issue · 0 comments

I am using Flink DataStream API where there where racks are available & I want to calculate "average"of temperature group by rack IDs. My window duration is of 40 seconds & my window is sliding by 10 seconds...Following is my code where I am calculating sum of temperatures after 10 second for every rackID,but now I want to calculate average temperatures::

static Properties properties=new Properties();
    public static Properties getProperties()
    {
        properties.setProperty("bootstrap.servers", "54.164.200.104:9092");
        properties.setProperty("zookeeper.connect", "54.164.200.104:2181");
        //properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
        //properties.setProperty("group.id", "akshay");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }

 @SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception 
{
    StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    Properties props=Program.getProperties();
    DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
    DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature");
    env.execute("Temperature Consumer");
}

How can I calculate average of temperatures for each rackId based on window duration ??