• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Flink-窗口(处理时间,count统计窗口,session回话窗口)- 重点

武飞扬头像
a-tao必须奥利给
帮助1

Flink-窗口(处理时间,count统计窗口,session回话窗口)

  1.  
    * 时间窗口
  2.  
    * SlidingEventTimeWindows: 滑动的事件时间窗口
  3.  
    * SlidingProcessingTimeWindows: 滑动的处理时间窗口
  4.  
    * TumblingEventTimeWindows:滚动的事件时间窗口
  5.  
    * TumblingProcessingTimeWindows: 滚动的处理时间窗口
  6.  
    *
  7.  
    * 滑动:窗口会存在交叉部分
  8.  
    * 滚动:窗口美哦与交叉
  9.  
    *
  10.  
    * 事件时间:数据中自带一个时间字段, 如果要使用事件时间需要设置时间字段和水位线

1. 滚动的处理时间窗口

  1.  
    package com.wt.flink.window
  2.  
    import org.apache.flink.streaming.api.scala._
  3.  
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
  4.  
    import org.apache.flink.streaming.api.windowing.time.Time
  5.  
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  6.  
    object Demo1TimeWindow {
  7.  
    def main(args: Array[String]): Unit = {
  8.  
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  9.  
     
  10.  
    env.setParallelism(1)
  11.  
     
  12.  
    //读取卡口过车数据
  13.  
    val dataDS: DataStream[String] = env.socketTextStream("master", 8888)
  14.  
     
  15.  
    //整理数据取出道路编号和时间戳
  16.  
    val kcDS: DataStream[(String, Long)] = dataDS.map(line => {
  17.  
    val split: Array[String] = line.split(",")
  18.  
    //道路编号
  19.  
    val roadId: String = split(1)
  20.  
    //时间戳
  21.  
    val ts: Long = split(2).toLong
  22.  
    (roadId, ts)
  23.  
    })
  24.  
     
  25.  
     
  26.  
    val kvDS: DataStream[(String, Int)] = kcDS.map(kv => (kv._1, 1))
  27.  
     
  28.  
     
  29.  
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
  30.  
     
  31.  
    /**
  32.  
    * 时间窗口
  33.  
    * SlidingEventTimeWindows: 滑动的事件时间窗口
  34.  
    * SlidingProcessingTimeWindows: 滑动的处理时间窗口
  35.  
    * TumblingEventTimeWindows:滚动的事件时间窗口
  36.  
    * TumblingProcessingTimeWindows: 滚动的处理时间窗口
  37.  
    *
  38.  
    * 滑动:窗口会存在交叉部分
  39.  
    * 滚动:窗口美哦与交叉
  40.  
    *
  41.  
    * 事件时间:数据中自带一个时间字段, 如果要使用事件时间需要设置时间字段和水位线
  42.  
    * 处理时间:数据被处理的时间
  43.  
    *
  44.  
    */
  45.  
    val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
  46.  
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  47.  
     
  48.  
    val countDS: DataStream[(String, Int)] = windowDS.sum(1)
  49.  
     
  50.  
    countDS.print()
  51.  
    env.execute()
  52.  
    }
  53.  
    }
学新通

2. 统计窗口(每达到一定的量就做一次统计)

  1.  
    package com.wt.flink.window
  2.  
    import org.apache.flink.streaming.api.scala._
  3.  
    import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
  4.  
    object Demo2CountWindow {
  5.  
    def main(args: Array[String]): Unit = {
  6.  
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  7.  
     
  8.  
    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
  9.  
     
  10.  
    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(','))
  11.  
     
  12.  
    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
  13.  
     
  14.  
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
  15.  
     
  16.  
    /**
  17.  
    * 统计窗口
  18.  
    * countWindow(10): 滚动的统计窗口
  19.  
    * countWindow(10,5): 滑动的统计窗口
  20.  
    *
  21.  
    */
  22.  
     
  23.  
    val countWindowDS: WindowedStream[(String, Int), String, GlobalWindow] = keyByDS
  24.  
    .countWindow(10, 5)
  25.  
     
  26.  
     
  27.  
    val countDS: DataStream[(String, Int)] = countWindowDS.sum(1)
  28.  
     
  29.  
    countDS.print()
  30.  
     
  31.  
    env.execute()
  32.  
    }
  33.  
    }
学新通

3. Session窗口(停止接收数据多长时间后,开始统计)

  1.  
    package com.wt.flink.window
  2.  
    import org.apache.flink.streaming.api.scala._
  3.  
    import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
  4.  
    import org.apache.flink.streaming.api.windowing.time.Time
  5.  
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  6.  
    object Demo3SessionWindow {
  7.  
     
  8.  
    def main(args: Array[String]): Unit = {
  9.  
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  10.  
     
  11.  
    env.setParallelism(1)
  12.  
     
  13.  
    //读取卡口过车数据
  14.  
    val dataDS: DataStream[String] = env.socketTextStream("master", 8888)
  15.  
     
  16.  
    //整理数据取出道路编号和时间戳
  17.  
    val kcDS: DataStream[(String, Long)] = dataDS.map(line => {
  18.  
    val split: Array[String] = line.split(",")
  19.  
    //道路编号
  20.  
    val roadId: String = split(1)
  21.  
    //时间戳
  22.  
    val ts: Long = split(2).toLong
  23.  
    (roadId, ts)
  24.  
    })
  25.  
     
  26.  
    //设置时间字段
  27.  
    val assDS: DataStream[(String, Long)] = kcDS.assignAscendingTimestamps(_._2)
  28.  
     
  29.  
     
  30.  
    val kvDS: DataStream[(String, Int)] = assDS.map(kv => (kv._1, 1))
  31.  
     
  32.  
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
  33.  
     
  34.  
    /**
  35.  
    * 会话窗口 -- 同一个key一段时间没有数据开始计算
  36.  
    * ProcessingTimeSessionWindows: 处理时间的会话窗口
  37.  
    * EventTimeSessionWindows: 事件时间的会话窗口,需要指定时间字段
  38.  
    *
  39.  
    *
  40.  
    */
  41.  
     
  42.  
    val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
  43.  
    .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
  44.  
     
  45.  
    val countDS: DataStream[(String, Int)] = windowDS.sum(1)
  46.  
     
  47.  
    countDS.print()
  48.  
     
  49.  
    env.execute()
  50.  
     
  51.  
    }
  52.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgkihfh
系列文章
更多 icon
同类精品
更多 icon
继续加载