Created
October 22, 2018 06:10
-
-
Save HeartSaVioR/4e6c8aa003890515efe9a5b1d1509315 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class SessionInfo(sessionStartTimestampMs: Long, | |
sessionEndTimestampMs: Long, | |
numEvents: Int) { | |
/** Duration of the session, between the first and last events + session gap */ | |
def durationMs: Long = sessionEndTimestampMs - sessionStartTimestampMs | |
} | |
case class SessionUpdate(id: String, | |
sessionStartTimestampSecs: Long, | |
sessionEndTimestampSecs: Long, | |
durationSecs: Long, | |
numEvents: Int) | |
// NOTE: you may want to move above case classes out of scope on state function or dataframe | |
// which may raise serialization error | |
import java.sql.Timestamp | |
val inputData = MemoryStream[(String, Long)] | |
val events = inputData.toDF() | |
.select($"_1".as("value"), $"_2".as("timestamp")) | |
.as[(String, Timestamp)] | |
.flatMap { case (v, timestamp) => | |
v.split(" ").map { word => (word, timestamp) } | |
} | |
.as[(String, Timestamp)] | |
.withWatermark("_2", "30 seconds") | |
val outputMode = OutputMode.Append() // below stateFunc also supports OutputMode.Update | |
val sessionGapMills = 10 * 1000 | |
val stateFunc: (String, Iterator[(String, Timestamp)], GroupState[List[SessionInfo]]) | |
=> Iterator[SessionUpdate] = | |
(sessionId: String, events: Iterator[(String, Timestamp)], | |
state: GroupState[List[SessionInfo]]) => { | |
def handleEvict(sessionId: String, state: GroupState[List[SessionInfo]]) | |
: Iterator[SessionUpdate] = { | |
state.getOption match { | |
case Some(lst) => | |
// assuming sessions are sorted by session start timestamp | |
val (evicted, kept) = lst.span { | |
s => s.sessionEndTimestampMs < state.getCurrentWatermarkMs() | |
} | |
if (kept.isEmpty) { | |
state.remove() | |
} else { | |
state.update(kept) | |
state.setTimeoutTimestamp(kept.head.sessionEndTimestampMs) | |
} | |
outputMode match { | |
case s if s == OutputMode.Append() => | |
evicted.iterator.map(si => SessionUpdate(sessionId, | |
si.sessionStartTimestampMs / 1000, | |
si.sessionEndTimestampMs / 1000, | |
si.durationMs / 1000, si.numEvents)) | |
case s if s == OutputMode.Update() => Seq.empty[SessionUpdate].iterator | |
case s => throw new UnsupportedOperationException(s"Not supported output mode $s") | |
} | |
case None => | |
state.remove() | |
Seq.empty[SessionUpdate].iterator | |
} | |
} | |
def mergeSession(session1: SessionInfo, session2: SessionInfo): SessionInfo = { | |
SessionInfo( | |
sessionStartTimestampMs = Math.min(session1.sessionStartTimestampMs, | |
session2.sessionStartTimestampMs), | |
sessionEndTimestampMs = Math.max(session1.sessionEndTimestampMs, | |
session2.sessionEndTimestampMs), | |
numEvents = session1.numEvents + session2.numEvents) | |
} | |
def handleEvents(sessionId: String, events: Iterator[(String, Timestamp)], | |
state: GroupState[List[SessionInfo]]): Iterator[SessionUpdate] = { | |
import java.{util => ju} | |
import scala.collection.mutable | |
import collection.JavaConverters._ | |
// we assume only previous sessions are sorted: events are not guaranteed to be sorted. | |
// we also assume the number of sessions for each key is not huge, which is valid | |
// unless end users set huge watermark delay as well as smaller session gap. | |
val newSessions: ju.LinkedList[SessionInfo] = state.getOption match { | |
case Some(lst) => new ju.LinkedList[SessionInfo](lst.asJava) | |
case None => new ju.LinkedList[SessionInfo]() | |
} | |
// this is to track the change of sessions for update mode | |
// if you define "update" as returning whole new sessions on given key, | |
// you can remove this and logic to track sessions | |
val updatedSessions = new mutable.ListBuffer[SessionInfo]() | |
while (events.hasNext) { | |
val ev = events.next() | |
// convert each event to one of session window | |
val event = SessionInfo(ev._2.getTime, ev._2.getTime + sessionGapMills, 1) | |
// find matched session | |
var index = 0 | |
var updated = false | |
while (!updated && index < newSessions.size()) { | |
val session = newSessions.get(index) | |
if (event.sessionEndTimestampMs < session.sessionStartTimestampMs) { | |
// no matched session, and following sessions will not be matched | |
newSessions.add(index, event) | |
updated = true | |
updatedSessions += event | |
} else if (event.sessionStartTimestampMs > session.sessionEndTimestampMs) { | |
// continue to next session | |
index += 1 | |
} else { | |
// matched: update session | |
var newSession = session.copy( | |
sessionStartTimestampMs = Math.min(session.sessionStartTimestampMs, | |
event.sessionStartTimestampMs), | |
sessionEndTimestampMs = Math.max(session.sessionEndTimestampMs, | |
event.sessionEndTimestampMs), | |
numEvents = session.numEvents + event.numEvents) | |
// we are going to replace previous session with new session, so previous session should be removed from updated sessions | |
// same occurs below if statements | |
updatedSessions -= session | |
// check for a chance to concatenate new session and next session | |
if (index + 1 < newSessions.size()) { | |
val nextSession = newSessions.get(index + 1) | |
if (newSession.sessionEndTimestampMs <= nextSession.sessionStartTimestampMs) { | |
newSession = mergeSession(newSession, nextSession) | |
updatedSessions -= nextSession | |
newSessions.remove(index + 1) | |
} | |
} | |
// check for a chance to concatenate new session and previous session | |
if (index - 1 >= 0) { | |
val prevSession = newSessions.get(index - 1) | |
if (newSession.sessionEndTimestampMs <= prevSession.sessionStartTimestampMs) { | |
newSession = mergeSession(newSession, prevSession) | |
updatedSessions -= prevSession | |
newSessions.remove(index - 1) | |
index -= 1 | |
} | |
} | |
newSessions.set(index, newSession) | |
updatedSessions += newSession | |
updated = true | |
} | |
} | |
if (!updated) { | |
// none matched so far, add to last | |
newSessions.addLast(event) | |
updatedSessions += event | |
} | |
} | |
val newSessionsForScala = newSessions.asScala.toList | |
state.update(newSessionsForScala) | |
// there must be at least one session available | |
// set timeout to earliest sessions' session end: we will traverse and evict sessions | |
state.setTimeoutTimestamp(newSessionsForScala.head.sessionEndTimestampMs) | |
outputMode match { | |
case s if s == OutputMode.Update() => | |
updatedSessions.iterator.map(si => | |
SessionUpdate(sessionId, si.sessionStartTimestampMs / 1000, | |
si.sessionEndTimestampMs / 1000, si.durationMs / 1000, si.numEvents)) | |
case s if s == OutputMode.Append() => Seq.empty[SessionUpdate].iterator | |
case s => throw new UnsupportedOperationException(s"Not supported output mode $s") | |
} | |
} | |
if (state.hasTimedOut) { | |
handleEvict(sessionId, state) | |
} else { | |
handleEvents(sessionId, events, state) | |
} | |
} | |
val sessionUpdates = events | |
.groupByKey(event => event._1) | |
.flatMapGroupsWithState[List[SessionInfo], SessionUpdate]( | |
outputMode, timeoutConf = GroupStateTimeout.EventTimeTimeout())(stateFunc) | |
// codes for verifying output place here |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment