Last active
November 6, 2022 20:47
-
-
Save HeartSaVioR/1d865b1a444af1ef7cae201bbdb196b0 to your computer and use it in GitHub Desktop.
[Flink DataStream API] The example of custom type of session window (allow logout event to close session, along with allowing inactivity)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.heartsavior.flink | |
import java.util | |
import java.util.{Calendar, Collections} | |
import org.apache.flink.api.common.ExecutionConfig | |
import org.apache.flink.api.common.typeutils.TypeSerializer | |
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton | |
import org.apache.flink.api.scala._ | |
import org.apache.flink.core.memory.{DataInputView, DataOutputView} | |
import org.apache.flink.streaming.api.{TimeCharacteristic, environment} | |
import org.apache.flink.streaming.api.functions.source.RichSourceFunction | |
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext | |
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor | |
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction | |
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment} | |
import org.apache.flink.streaming.api.watermark.Watermark | |
import org.apache.flink.streaming.api.windowing.assigners.{MergingWindowAssigner, WindowAssigner} | |
import org.apache.flink.streaming.api.windowing.time.Time | |
import org.apache.flink.streaming.api.windowing.triggers.{EventTimeTrigger, Trigger} | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow | |
import org.apache.flink.util.Collector | |
import scala.collection.mutable | |
import scala.util.Random | |
object CustomSessionWindowExample { | |
def main(args: Array[String]): Unit = { | |
val env = StreamExecutionEnvironment.getExecutionEnvironment | |
env.getCheckpointConfig.setCheckpointInterval(10 * 1000) | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) | |
env.getConfig.setAutoWatermarkInterval(1000L) | |
val events: DataStream[GameEvents] = env | |
.addSource(new GameEventSource) | |
.setParallelism(1) | |
// we can let our source to set timestamp and emit watermark, but worth to know | |
.assignTimestampsAndWatermarks(new GameTimeAssigner) | |
val incompleteGameSessions = events | |
.keyBy(_.userId) | |
.window(new CustomSessionWindowAssigner) | |
.process(new FindIncompleteGamesFunction) | |
incompleteGameSessions.print("incomplete-games") | |
incompleteGameSessions | |
.getSideOutput(new OutputTag[GameEvents]("events-before-login")) | |
.printToErr("events-before-login") | |
env.execute("Find incomplete game sessions from game events") | |
} | |
} | |
case class GameEvents(userId: Long, eventType: String, gameSessionId: Option[Long], timestamp: Long) | |
case class IncompleteGameSession(userId: Long, gameSessionId: Long) | |
object GameEventTypes { | |
val LOG_IN = "login" | |
val GAME_START = "game_start" | |
val GAME_END = "game_end" | |
val LOG_OUT = "logout" | |
} | |
class GameTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[GameEvents](Time.seconds(5)) { | |
override def extractTimestamp(t: GameEvents): Long = t.timestamp | |
} | |
class FindIncompleteGamesFunction extends ProcessWindowFunction[GameEvents, IncompleteGameSession, Long, SessionEndAwareTimeWindow] { | |
// side-output to emit events in session before logged in | |
lazy val eventsBeforeLoginOutput: OutputTag[GameEvents] = new OutputTag[GameEvents]("events-before-login") | |
override def process(key: Long, context: Context, elements: Iterable[GameEvents], out: Collector[IncompleteGameSession]): Unit = { | |
println(elements.mkString("========= ", ",", " =======")) | |
val eventsBeforeLogin = elements.takeWhile(evt => evt.eventType != GameEventTypes.LOG_IN) | |
eventsBeforeLogin.foreach(context.output(eventsBeforeLoginOutput, _)) | |
val events = elements.dropWhile(_.eventType != GameEventTypes.LOG_IN) | |
val openedGameSessions = events | |
.filter(evt => evt.eventType == GameEventTypes.GAME_START && evt.gameSessionId.isDefined) | |
.map(_.gameSessionId.get) | |
.toSet | |
val closedGameSessions = events | |
.filter(evt => evt.eventType == GameEventTypes.GAME_END && evt.gameSessionId.isDefined) | |
.map(_.gameSessionId.get) | |
.toSet | |
val openButNotClosedGameSessions = openedGameSessions.diff(closedGameSessions) | |
openButNotClosedGameSessions.foreach(gameNo => out.collect(IncompleteGameSession(key, gameNo))) | |
} | |
} | |
// It extends TimeWindow to not reimplementing everything, especially EventTimeTrigger. | |
class SessionEndAwareTimeWindow(val startTime: Long, val endTime: Long, val sessionEnd: Boolean) | |
extends TimeWindow(startTime, endTime) { | |
override def maxTimestamp(): Long = { | |
if (sessionEnd) { | |
startTime | |
} else { | |
endTime - 1 | |
} | |
} | |
override def intersects(other: TimeWindow): Boolean = ??? | |
override def cover(other: TimeWindow): TimeWindow = ??? | |
def intersects(other: SessionEndAwareTimeWindow): Boolean = { | |
// assuming sort order is guaranteed | |
require(startTime <= other.startTime) | |
if (other.sessionEnd) { | |
this.startTime <= other.startTime && this.endTime >= other.startTime | |
} else { | |
this.startTime <= other.endTime && this.endTime >= other.startTime | |
} | |
} | |
def cover(other: SessionEndAwareTimeWindow): SessionEndAwareTimeWindow = { | |
if (other.sessionEnd) { | |
new SessionEndAwareTimeWindow(Math.min(startTime, other.startTime), -1, true) | |
} else { | |
new SessionEndAwareTimeWindow(Math.min(startTime, other.startTime), Math.max(endTime, other.endTime), false) | |
} | |
} | |
def canEqual(other: Any): Boolean = other.isInstanceOf[SessionEndAwareTimeWindow] | |
override def equals(other: Any): Boolean = other match { | |
case that: SessionEndAwareTimeWindow => | |
super.equals(that) && | |
(that canEqual this) && | |
startTime == that.startTime && | |
endTime == that.endTime && | |
sessionEnd == that.sessionEnd | |
case _ => false | |
} | |
override def hashCode(): Int = { | |
val state = Seq(super.hashCode(), startTime, endTime, sessionEnd) | |
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) | |
} | |
override def toString = s"SessionEndAwareTimeWindow($startTime, $endTime, $sessionEnd)" | |
} | |
object SessionEndAwareTimeWindow { | |
@SerialVersionUID(1L) | |
class Serializer extends TypeSerializerSingleton[SessionEndAwareTimeWindow] { | |
override def isImmutableType = true | |
override def createInstance: SessionEndAwareTimeWindow = null | |
override def copy(from: SessionEndAwareTimeWindow): SessionEndAwareTimeWindow = from | |
override def copy(from: SessionEndAwareTimeWindow, reuse: SessionEndAwareTimeWindow): SessionEndAwareTimeWindow = from | |
override def getLength = 0 | |
override def serialize(record: SessionEndAwareTimeWindow, target: DataOutputView): Unit = { | |
target.writeLong(record.startTime) | |
target.writeLong(record.endTime) | |
target.writeBoolean(record.sessionEnd) | |
} | |
override def deserialize(source: DataInputView): SessionEndAwareTimeWindow = { | |
val start = source.readLong | |
val end = source.readLong | |
val sessionEnd = source.readBoolean() | |
new SessionEndAwareTimeWindow(start, end, sessionEnd) | |
} | |
override def deserialize(reuse: SessionEndAwareTimeWindow, source: DataInputView): SessionEndAwareTimeWindow = deserialize(source) | |
override def copy(source: DataInputView, target: DataOutputView): Unit = { | |
target.writeLong(source.readLong) | |
target.writeLong(source.readLong) | |
target.writeBoolean(source.readBoolean()) | |
} | |
override def canEqual(obj: Any): Boolean = obj.isInstanceOf[SessionEndAwareTimeWindow.Serializer] | |
} | |
} | |
class CustomSessionWindowAssigner extends MergingWindowAssigner[GameEvents, SessionEndAwareTimeWindow] { | |
override def assignWindows(element: GameEvents, timestamp: Long, context: WindowAssigner.WindowAssignerContext): util.Collection[SessionEndAwareTimeWindow] = { | |
if (element.eventType == GameEventTypes.LOG_OUT) { | |
Collections.singletonList(new SessionEndAwareTimeWindow(timestamp, -1, true)) | |
} else { | |
Collections.singletonList(new SessionEndAwareTimeWindow(timestamp, timestamp + (5 * 60 * 1000), false)) | |
} | |
} | |
override def getDefaultTrigger(env: environment.StreamExecutionEnvironment): Trigger[GameEvents, SessionEndAwareTimeWindow] = { | |
EventTimeTrigger.create().asInstanceOf[Trigger[GameEvents, SessionEndAwareTimeWindow]] | |
} | |
override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[SessionEndAwareTimeWindow] = { | |
new SessionEndAwareTimeWindow.Serializer | |
} | |
override def mergeWindows(windows: util.Collection[SessionEndAwareTimeWindow], callback: MergingWindowAssigner.MergeCallback[SessionEndAwareTimeWindow]): Unit = { | |
import scala.collection.JavaConverters._ | |
// sort the windows by the start time and then merge overlapping windows | |
val sortedWindows = windows.asScala.toList.sortBy(_.startTime) | |
println(sortedWindows.mkString("Windows to merge: ========= ", ",", " =======")) | |
val merged = new mutable.ArrayBuffer[(SessionEndAwareTimeWindow, Seq[SessionEndAwareTimeWindow])] | |
var currentMerge: (SessionEndAwareTimeWindow, Seq[SessionEndAwareTimeWindow]) = null | |
sortedWindows.foreach { candidate => | |
if (currentMerge == null) { | |
currentMerge = (candidate, Seq(candidate)) | |
} else if (currentMerge._1.intersects(candidate)) { | |
currentMerge = (currentMerge._1.cover(candidate), currentMerge._2 :+ candidate) | |
} else { | |
merged.append(currentMerge) | |
currentMerge = (candidate, Seq(candidate)) | |
} | |
} | |
if (currentMerge != null) { | |
merged.append(currentMerge) | |
} | |
merged.filter(_._2.size > 1).foreach { toMerge => | |
/* | |
Wrapper for Scala collection (even mutable) to Java collection doesn't work here, | |
as Flink calls Iterator.remove which seems to not supported. | |
java.lang.UnsupportedOperationException | |
at scala.collection.convert.Wrappers$IteratorWrapper.remove(Wrappers.scala:35) | |
at scala.collection.convert.Wrappers$IteratorWrapper.remove(Wrappers.scala:30) | |
at java.util.AbstractCollection.remove(AbstractCollection.java:293) | |
at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:185) | |
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) | |
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) | |
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) | |
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) | |
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) | |
at java.lang.Thread.run(Thread.java:748) | |
*/ | |
callback.merge(new util.ArrayList[SessionEndAwareTimeWindow](toMerge._2.asJava), toMerge._1) | |
} | |
} | |
override def isEventTime: Boolean = true | |
} | |
class GameEventSource extends RichSourceFunction[GameEvents] { | |
override def run(srcCtx: SourceContext[GameEvents]): Unit = { | |
// initialize random number generator | |
val rand = new Random() | |
val startTime = Calendar.getInstance.getTimeInMillis | |
var curTime = startTime | |
val userId = rand.nextLong() | |
// login | |
srcCtx.collect(GameEvents(userId, GameEventTypes.LOG_IN, None, curTime)) | |
curTime += 500 | |
val gameSessionId = rand.nextLong() | |
// start game | |
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_START, Some(gameSessionId), curTime)) | |
curTime += 2000 | |
// end game | |
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_END, Some(gameSessionId), curTime)) | |
curTime += 1000 | |
val gameSessionId2 = rand.nextLong() | |
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_START, Some(gameSessionId2), curTime)) | |
curTime += 2000 | |
// log out | |
srcCtx.collect(GameEvents(userId, GameEventTypes.LOG_OUT, None, curTime)) | |
curTime += 500 | |
val gameSessionId3 = rand.nextLong() | |
// game start event after logged out (invalid) | |
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_START, Some(gameSessionId3), curTime)) | |
curTime += 1500 | |
// game end event after logged out (invalid) | |
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_END, Some(gameSessionId3), curTime)) | |
curTime += 500 | |
// new login | |
srcCtx.collect(GameEvents(userId, GameEventTypes.LOG_IN, None, curTime)) | |
curTime += 1000 | |
val gameSessionId4 = rand.nextLong() | |
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_START, Some(gameSessionId4), curTime)) | |
curTime += 500 | |
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_END, Some(gameSessionId4), curTime)) | |
val gameSessionId5 = rand.nextLong() | |
curTime += 2000 | |
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_START, Some(gameSessionId5), curTime)) | |
// out of session | |
curTime += (6 * 60 * 1000) | |
// game end signaled after session is closed | |
srcCtx.collect(GameEvents(userId, GameEventTypes.GAME_END, Some(gameSessionId5), curTime)) | |
// force emit watermark to see result immediately | |
srcCtx.emitWatermark(new Watermark(curTime + (6 * 60 * 1000))) | |
Thread.sleep(3000) | |
// Flink will finish the application after run() method returns | |
} | |
override def cancel(): Unit = {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment