admin 管理员组

文章数量: 887021


2024年1月14日发(作者:cocoscreator开发的出名游戏)

Flink学习笔记-Window简单示例

Window

Flink中Window可以将无限流切分成有限流,是处理有限流的核心组件,现在Flink中 Window可以是时间驱动的Time Window,也可以是数据驱动的Count Window。

基于时间的窗口操作:在每个相同的时间间隔对Stream中的记录进行处理,通常各个时间间隔内的窗口操作处理的记录数不固定。

基于数据驱动的窗口操作:可以在Stream中选择固定数量的记录作为一个窗口,对该窗口中的记录进行处理。

窗口类型

Tumbling Window(滚动窗口):窗口间的元素无重复

一个翻滚窗口分配器的每个数据元分配给指定的窗口的窗口大小。翻滚窗具有固定的尺寸,不重叠。

Sliding Window(滑动窗口):窗口间的元素可能重复

该滑动窗口分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,窗口大小由窗口大小参数配置。附加的窗口滑动参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,数据元被分配给多个窗口。

Session Window(会话窗口)

在会话窗口中按活动会话分配器组中的数据元。与翻滚窗口和滑动窗口相比,会话窗口不重叠并且没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到数据元时,即当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态会话间隙或会话间隙提取器函数,该函数定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续数据元将分配给新的会话窗口。

Global Window(全局窗口)

一个全局性的窗口分配器分配使用相同的Keys相同的单个的所有数据元全局窗口。此窗口方案仅在您还指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有我们可以处理聚合数据元的自然结束。

Window Function在窗口触发后,负责对窗口内的元素进行计算。

Window Function分为两类: 增量聚合和全量聚合。

增量聚合: 窗口不维护原始数据,只维护中间结果,每次基于中间结果和增量数据进行聚合。如:

ReduceFunction、AggregateFunction等。

全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。如:ProcessWindowFunction。

Time的分类

Event-Time :事件时间是每个事件在其生产设备上发生的时间。

Ingestion-Time :摄取时间是事件进入Flink的时间。

Processing-Time : 处理时间是指执行相应算子操作的机器的系统时间。

不设置Time 类型,默认是processingTime。可以如下方式修改时间特性:

eamTimeCharacteristic(ime);

Watermark

主要解决延迟数据、乱序问题

Watermark是Apache Flink为了处理EventTime窗口计算提出的一种机制,本质上也是一种时间戳,由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime Clock。

Watermark两种生成方式:

Periodic - 周期性(一定时间间隔或者达到一定的记录条数)产生一个Watermark,默认周期为200毫秒。

在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

接口 AssignerWithPeriodicWatermarks

Punctuated:数据流中每一个递增的EventTime都会产生一个Watermark。没有时间周期规律,可打断的生成Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

接口 AssignerWithPunctuatedWatermarks

在基于Event-Time的流处理应用中,每个数据有两个必需的信息:

时间戳:事件发生的时间

Watermark:算子通过Watermark推断当前的事件时间。Watermark用于通知算子没有比水位更小的时间戳的事件会发生了。

基于时间的窗口会根据事件时间将一个数据分配给某个窗口。每个时间窗口都有一个开始时间戳和结束时间戳。所有内置的窗口分配器都会提供一个默认的触发器,一旦时间超过某个窗口的结束时间,触发器就会触发对这个窗口的计算。

API简单示例

import amp

import DateFormat

import ties

import ateFunction

import StringSchema

import .{ListState, ListStateDescriptor}

import uration

import aracteristic

import

dOutOfOrdernessTimestampExtractor

import ons.{AssignerWithPeriodicWatermarks,

AssignerWithPunctuatedWatermarks, KeyedProcessFunction}

import ._

import Function

import ark

import

import ndow

import afkaConsumer

import tor

import ffer

case class UserBehavior(userId:Long, itemId:Long, behavior:String, timestamp:Long)

case class ItemCount(itemId:Long, count:Long, timestamp:Long)

object TopNHotItemsGenerator {

def main(args: Array[String]): Unit = {

val env = cutionEnvironment

allelism(1)

// 设置水位间隔,默认200毫秒

oWatermarkInterval(100)

// 设置时间特性

eamTimeCharacteristic(ime)

val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")

val properties = new Properties()

perty("s", "192.168.0.1:9092")

perty("", "tmp-1")

perty("alizer",

"Deserializer")

perty("alizer",

"Deserializer")

perty("", "latest")

val dataStream = rce(new FlinkKafkaConsumer[String]("tmp1",

SimpleStringSchema(), properties))

.map(data => {

val dataArray = (",")

UserBehavior(dataArray(0).trim().toLong, dataArray(1).trim().toLong,

dataArray(2).trim(), (dataArray(3).trim()).getTime)

})

.assignAscendingTimestamps(_.timestamp) // 数据中提取时间戳

/** 定义水位生成方式

.assignTimestampsAndWatermarks(new

BoundedOutOfOrdernessTimestampExtractor[UserBehavior](s(1)) {

override def extractTimestamp(t: UserBehavior): Long = {

amp

}

})

.assignAscendingTimestamps(new PeriodicWatermarksAssigner())

.assignAscendingTimestamps(new PunctuatedWatermarksAssigner())

*/

()

val aggregateStream = (_.behavior == "click").keyBy(_.itemId)

.timeWindow((1), s(5))

.aggregate(new CountAggregate(), new WindowAggregateResult()) // 窗口聚合

()

val processStream = (_.timestamp)

.process(new TopNHotItemsProcessFunction(2))

()

e("topn hot items")

}

}

class CountAggregate() extends AggregateFunction[UserBehavior, Long, Long] {

override def createAccumulator(): Long = 0L

override def add(in: UserBehavior, acc: Long): Long = acc + 1

override def getResult(acc: Long): Long = acc

new

override def merge(acc: Long, acc1: Long): Long = acc + acc1

}

class AvgAggregate() extends AggregateFunction[UserBehavior, (Long, Long), Double] {

override def createAccumulator(): (Long, Long) = (0L, 0L)

override def add(in: UserBehavior, acc: (Long, Long)): (Long, Long) = (acc._1 + amp,

acc._2 + 1)

override def getResult(acc: (Long, Long)): Double = acc._1 / acc._2

override def merge(acc: (Long, Long), acc1: (Long, Long)): (Long, Long) = (acc._1 + acc1._1,

acc._2 + acc1._2)

}

class WindowAggregateResult() extends WindowFunction[Long, ItemCount, Long, TimeWindow] {

override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out:

Collector[ItemCount]): Unit = {

t(ItemCount(key, (), ))

}

}

class TopNHotItemsProcessFunction(topN: Int) extends KeyedProcessFunction[Long, ItemCount,

String] {

var itemsState:ListState[ItemCount] = _

override def open(parameters: Configuration): Unit = {

(parameters)

itemsState = tState(new ListStateDescriptor[ItemCount]("hot-items-state", classOf[ItemCount]))

}

override def processElement(value: ItemCount, context: KeyedProcessFunction[Long, ItemCount,

String]#Context, collector: Collector[String]): Unit = {

(value)

// 注册事件时间定时器

ervice().registerEventTimeTimer(amp + 1)

}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemCount,

String]#OnTimerContext, out: Collector[String]): Unit = {

val items:ListBuffer[ItemCount] = new ListBuffer[ItemCount]()

import nversions._

for (item <- ()) {

items += item

}

val topNHotItems = (_.count)(e).take(topN)

val result: StringBuilder = new StringBuilder()

("time: ").append(new Timestamp(timestamp - 1)).append("n")

for (i <- s) {

val hotItem = topNHotItems(i)

("top%d:

)).append("n")

itemId %d count %d".format(i + 1, ,

}

("====================")

t(ng())

()

}

}

class PeriodicWatermarksAssigner extends AssignerWithPeriodicWatermarks[UserBehavior] {

val bound: Long = 60000; //延时一分钟

var maxTs: Long = ue //最大时间戳

override def getCurrentWatermark: Watermark = {

new Watermark(maxTs - bound)

}

override def extractTimestamp(element: UserBehavior, l: Long): Long = {

maxTs = (amp)

amp

}

}

class PunctuatedWatermarksAssigner extends AssignerWithPunctuatedWatermarks[UserBehavior] {

val bound: Long = 60000; //延时一分钟

override def checkAndGetNextWatermark(element: UserBehavior, l: Long): Watermark = {

if (or == "") {

new Watermark(l - bound)

} else {

null

}

}

override def extractTimestamp(element: UserBehavior, l: Long): Long = amp

}

// 侧输出流处理

class CProcessFunctioin extends ProcessFunction[(String, Int), String] {

lazy val outputTag: OutputTag[String] = new OutputTag[String]("coutput")

override def processElement(element: (String, Int), context: ProcessFunction[(String, Int),

String]#Context, collector: Collector[String]): Unit = {

if (element._2 < 15) {

(outputTag, "cot %s %d".format(element._1, element._2))

} else {

t("%s %d".format(element._1, element._2))

}

}

}


本文标签: 时间 数据 算子 分配器 滑动