admin 管理员组文章数量: 887021
2024年1月14日发(作者:cocoscreator开发的出名游戏)
Flink学习笔记-Window简单示例
Window
Flink中Window可以将无限流切分成有限流,是处理有限流的核心组件,现在Flink中 Window可以是时间驱动的Time Window,也可以是数据驱动的Count Window。
基于时间的窗口操作:在每个相同的时间间隔对Stream中的记录进行处理,通常各个时间间隔内的窗口操作处理的记录数不固定。
基于数据驱动的窗口操作:可以在Stream中选择固定数量的记录作为一个窗口,对该窗口中的记录进行处理。
窗口类型
Tumbling Window(滚动窗口):窗口间的元素无重复
一个翻滚窗口分配器的每个数据元分配给指定的窗口的窗口大小。翻滚窗具有固定的尺寸,不重叠。
Sliding Window(滑动窗口):窗口间的元素可能重复
该滑动窗口分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,窗口大小由窗口大小参数配置。附加的窗口滑动参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,数据元被分配给多个窗口。
Session Window(会话窗口)
在会话窗口中按活动会话分配器组中的数据元。与翻滚窗口和滑动窗口相比,会话窗口不重叠并且没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到数据元时,即当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态会话间隙或会话间隙提取器函数,该函数定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续数据元将分配给新的会话窗口。
Global Window(全局窗口)
一个全局性的窗口分配器分配使用相同的Keys相同的单个的所有数据元全局窗口。此窗口方案仅在您还指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有我们可以处理聚合数据元的自然结束。
Window Function在窗口触发后,负责对窗口内的元素进行计算。
Window Function分为两类: 增量聚合和全量聚合。
增量聚合: 窗口不维护原始数据,只维护中间结果,每次基于中间结果和增量数据进行聚合。如:
ReduceFunction、AggregateFunction等。
全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。如:ProcessWindowFunction。
Time的分类
Event-Time :事件时间是每个事件在其生产设备上发生的时间。
Ingestion-Time :摄取时间是事件进入Flink的时间。
Processing-Time : 处理时间是指执行相应算子操作的机器的系统时间。
不设置Time 类型,默认是processingTime。可以如下方式修改时间特性:
eamTimeCharacteristic(ime);
Watermark
主要解决延迟数据、乱序问题
Watermark是Apache Flink为了处理EventTime窗口计算提出的一种机制,本质上也是一种时间戳,由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime Clock。
Watermark两种生成方式:
Periodic - 周期性(一定时间间隔或者达到一定的记录条数)产生一个Watermark,默认周期为200毫秒。
在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
接口 AssignerWithPeriodicWatermarks
Punctuated:数据流中每一个递增的EventTime都会产生一个Watermark。没有时间周期规律,可打断的生成Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
接口 AssignerWithPunctuatedWatermarks
在基于Event-Time的流处理应用中,每个数据有两个必需的信息:
时间戳:事件发生的时间
Watermark:算子通过Watermark推断当前的事件时间。Watermark用于通知算子没有比水位更小的时间戳的事件会发生了。
基于时间的窗口会根据事件时间将一个数据分配给某个窗口。每个时间窗口都有一个开始时间戳和结束时间戳。所有内置的窗口分配器都会提供一个默认的触发器,一旦时间超过某个窗口的结束时间,触发器就会触发对这个窗口的计算。
API简单示例
import amp
import DateFormat
import ties
import ateFunction
import StringSchema
import .{ListState, ListStateDescriptor}
import uration
import aracteristic
import
dOutOfOrdernessTimestampExtractor
import ons.{AssignerWithPeriodicWatermarks,
AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
import ._
import Function
import ark
import
import ndow
import afkaConsumer
import tor
import ffer
case class UserBehavior(userId:Long, itemId:Long, behavior:String, timestamp:Long)
case class ItemCount(itemId:Long, count:Long, timestamp:Long)
object TopNHotItemsGenerator {
def main(args: Array[String]): Unit = {
val env = cutionEnvironment
allelism(1)
// 设置水位间隔,默认200毫秒
oWatermarkInterval(100)
// 设置时间特性
eamTimeCharacteristic(ime)
val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
val properties = new Properties()
perty("s", "192.168.0.1:9092")
perty("", "tmp-1")
perty("alizer",
"Deserializer")
perty("alizer",
"Deserializer")
perty("", "latest")
val dataStream = rce(new FlinkKafkaConsumer[String]("tmp1",
SimpleStringSchema(), properties))
.map(data => {
val dataArray = (",")
UserBehavior(dataArray(0).trim().toLong, dataArray(1).trim().toLong,
dataArray(2).trim(), (dataArray(3).trim()).getTime)
})
.assignAscendingTimestamps(_.timestamp) // 数据中提取时间戳
/** 定义水位生成方式
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[UserBehavior](s(1)) {
override def extractTimestamp(t: UserBehavior): Long = {
amp
}
})
.assignAscendingTimestamps(new PeriodicWatermarksAssigner())
.assignAscendingTimestamps(new PunctuatedWatermarksAssigner())
*/
()
val aggregateStream = (_.behavior == "click").keyBy(_.itemId)
.timeWindow((1), s(5))
.aggregate(new CountAggregate(), new WindowAggregateResult()) // 窗口聚合
()
val processStream = (_.timestamp)
.process(new TopNHotItemsProcessFunction(2))
()
e("topn hot items")
}
}
class CountAggregate() extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(in: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
new
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
class AvgAggregate() extends AggregateFunction[UserBehavior, (Long, Long), Double] {
override def createAccumulator(): (Long, Long) = (0L, 0L)
override def add(in: UserBehavior, acc: (Long, Long)): (Long, Long) = (acc._1 + amp,
acc._2 + 1)
override def getResult(acc: (Long, Long)): Double = acc._1 / acc._2
override def merge(acc: (Long, Long), acc1: (Long, Long)): (Long, Long) = (acc._1 + acc1._1,
acc._2 + acc1._2)
}
class WindowAggregateResult() extends WindowFunction[Long, ItemCount, Long, TimeWindow] {
override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out:
Collector[ItemCount]): Unit = {
t(ItemCount(key, (), ))
}
}
class TopNHotItemsProcessFunction(topN: Int) extends KeyedProcessFunction[Long, ItemCount,
String] {
var itemsState:ListState[ItemCount] = _
override def open(parameters: Configuration): Unit = {
(parameters)
itemsState = tState(new ListStateDescriptor[ItemCount]("hot-items-state", classOf[ItemCount]))
}
override def processElement(value: ItemCount, context: KeyedProcessFunction[Long, ItemCount,
String]#Context, collector: Collector[String]): Unit = {
(value)
// 注册事件时间定时器
ervice().registerEventTimeTimer(amp + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemCount,
String]#OnTimerContext, out: Collector[String]): Unit = {
val items:ListBuffer[ItemCount] = new ListBuffer[ItemCount]()
import nversions._
for (item <- ()) {
items += item
}
val topNHotItems = (_.count)(e).take(topN)
val result: StringBuilder = new StringBuilder()
("time: ").append(new Timestamp(timestamp - 1)).append("n")
for (i <- s) {
val hotItem = topNHotItems(i)
("top%d:
)).append("n")
itemId %d count %d".format(i + 1, ,
}
("====================")
t(ng())
()
}
}
class PeriodicWatermarksAssigner extends AssignerWithPeriodicWatermarks[UserBehavior] {
val bound: Long = 60000; //延时一分钟
var maxTs: Long = ue //最大时间戳
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(element: UserBehavior, l: Long): Long = {
maxTs = (amp)
amp
}
}
class PunctuatedWatermarksAssigner extends AssignerWithPunctuatedWatermarks[UserBehavior] {
val bound: Long = 60000; //延时一分钟
override def checkAndGetNextWatermark(element: UserBehavior, l: Long): Watermark = {
if (or == "") {
new Watermark(l - bound)
} else {
null
}
}
override def extractTimestamp(element: UserBehavior, l: Long): Long = amp
}
// 侧输出流处理
class CProcessFunctioin extends ProcessFunction[(String, Int), String] {
lazy val outputTag: OutputTag[String] = new OutputTag[String]("coutput")
override def processElement(element: (String, Int), context: ProcessFunction[(String, Int),
String]#Context, collector: Collector[String]): Unit = {
if (element._2 < 15) {
(outputTag, "cot %s %d".format(element._1, element._2))
} else {
t("%s %d".format(element._1, element._2))
}
}
}
版权声明:本文标题:Flink学习笔记-Window简单示例 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/free/1705207563h476943.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论