Flink-窗口(处理时间,count统计窗口,session回话窗口)- 重点
Flink-窗口(处理时间,count统计窗口,session回话窗口)
* 时间窗口 * SlidingEventTimeWindows: 滑动的事件时间窗口 * SlidingProcessingTimeWindows: 滑动的处理时间窗口 * TumblingEventTimeWindows:滚动的事件时间窗口 * TumblingProcessingTimeWindows: 滚动的处理时间窗口 * * 滑动:窗口会存在交叉部分 * 滚动:窗口美哦与交叉 * * 事件时间:数据中自带一个时间字段, 如果要使用事件时间需要设置时间字段和水位线
1. 滚动的处理时间窗口
-
package com.wt.flink.window
-
import org.apache.flink.streaming.api.scala._
-
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
-
import org.apache.flink.streaming.api.windowing.time.Time
-
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-
object Demo1TimeWindow {
-
def main(args: Array[String]): Unit = {
-
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
-
env.setParallelism(1)
-
-
//读取卡口过车数据
-
val dataDS: DataStream[String] = env.socketTextStream("master", 8888)
-
-
//整理数据取出道路编号和时间戳
-
val kcDS: DataStream[(String, Long)] = dataDS.map(line => {
-
val split: Array[String] = line.split(",")
-
//道路编号
-
val roadId: String = split(1)
-
//时间戳
-
val ts: Long = split(2).toLong
-
(roadId, ts)
-
})
-
-
-
val kvDS: DataStream[(String, Int)] = kcDS.map(kv => (kv._1, 1))
-
-
-
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
-
-
/**
-
* 时间窗口
-
* SlidingEventTimeWindows: 滑动的事件时间窗口
-
* SlidingProcessingTimeWindows: 滑动的处理时间窗口
-
* TumblingEventTimeWindows:滚动的事件时间窗口
-
* TumblingProcessingTimeWindows: 滚动的处理时间窗口
-
*
-
* 滑动:窗口会存在交叉部分
-
* 滚动:窗口美哦与交叉
-
*
-
* 事件时间:数据中自带一个时间字段, 如果要使用事件时间需要设置时间字段和水位线
-
* 处理时间:数据被处理的时间
-
*
-
*/
-
val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
-
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-
-
val countDS: DataStream[(String, Int)] = windowDS.sum(1)
-
-
countDS.print()
-
env.execute()
-
}
-
}
2. 统计窗口(每达到一定的量就做一次统计)
-
package com.wt.flink.window
-
import org.apache.flink.streaming.api.scala._
-
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-
object Demo2CountWindow {
-
def main(args: Array[String]): Unit = {
-
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
-
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
-
-
val wordsDS: DataStream[String] = linesDS.flatMap(_.split(','))
-
-
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
-
-
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
-
-
/**
-
* 统计窗口
-
* countWindow(10): 滚动的统计窗口
-
* countWindow(10,5): 滑动的统计窗口
-
*
-
*/
-
-
val countWindowDS: WindowedStream[(String, Int), String, GlobalWindow] = keyByDS
-
.countWindow(10, 5)
-
-
-
val countDS: DataStream[(String, Int)] = countWindowDS.sum(1)
-
-
countDS.print()
-
-
env.execute()
-
}
-
}
3. Session窗口(停止接收数据多长时间后,开始统计)
-
package com.wt.flink.window
-
import org.apache.flink.streaming.api.scala._
-
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
-
import org.apache.flink.streaming.api.windowing.time.Time
-
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-
object Demo3SessionWindow {
-
-
def main(args: Array[String]): Unit = {
-
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
-
env.setParallelism(1)
-
-
//读取卡口过车数据
-
val dataDS: DataStream[String] = env.socketTextStream("master", 8888)
-
-
//整理数据取出道路编号和时间戳
-
val kcDS: DataStream[(String, Long)] = dataDS.map(line => {
-
val split: Array[String] = line.split(",")
-
//道路编号
-
val roadId: String = split(1)
-
//时间戳
-
val ts: Long = split(2).toLong
-
(roadId, ts)
-
})
-
-
//设置时间字段
-
val assDS: DataStream[(String, Long)] = kcDS.assignAscendingTimestamps(_._2)
-
-
-
val kvDS: DataStream[(String, Int)] = assDS.map(kv => (kv._1, 1))
-
-
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
-
-
/**
-
* 会话窗口 -- 同一个key一段时间没有数据开始计算
-
* ProcessingTimeSessionWindows: 处理时间的会话窗口
-
* EventTimeSessionWindows: 事件时间的会话窗口,需要指定时间字段
-
*
-
*
-
*/
-
-
val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
-
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
-
-
val countDS: DataStream[(String, Int)] = windowDS.sum(1)
-
-
countDS.print()
-
-
env.execute()
-
-
}
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgkihfh
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13