Created
February 3, 2019 20:06
-
-
Save ariskk/c489d1ac24dcf0b94bf5a723006b8c73 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.state.ValueStateDescriptor | |
import org.apache.flink.api.common.typeinfo.TypeInformation | |
import org.apache.flink.api.common.typeutils.TypeSerializer | |
import org.apache.flink.streaming.api.functions.co.{CoMapFunction, RichCoMapFunction} | |
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream} | |
import org.apache.flink.streaming.api.scala._ | |
case class M1(key: String, foo: String) | |
case class M2(key: String, bar: String) | |
case class State(maybeM1: Option[M1], maybeM2: Option[M2]) { | |
lazy val out = for { | |
m1 <- maybeM1 | |
m2 <- maybeM2 | |
} yield Out(m1.key, m1.foo + m2.bar) | |
} | |
case class Out(key: String, foodbar: String) | |
val env = StreamExecutionEnvironment.getExecutionEnvironment | |
val keyedS1 = env.fromElements(M1("a", "fooA"), M1("b", "fooB")).keyBy(_.key) | |
val keyedS2 = env.fromElements(M2("a", "barA"), M2("c", "barC")).keyBy(_.key) | |
lazy val stream = keyedS1.connect(keyedS2).map( | |
new RichCoMapFunction[M1, M2, Option[Out]] { | |
lazy val stateTypeInfo: TypeInformation[State] = implicitly[TypeInformation[State]] | |
lazy val serializer: TypeSerializer[State] = stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig) | |
lazy val stateDescriptor = new ValueStateDescriptor[State]("state", serializer) | |
def map[I](in: I, f: (I, State) => (Option[Out], State)) = { | |
val state = getRuntimeContext.getState(stateDescriptor) | |
val (o, newState) = f(in, Option(state.value).getOrElse(State(None, None))) | |
state.update(newState) | |
o | |
} | |
override def map1(in: M1): Option[Out] = map[M1](in, (in, s) => { | |
val newState = s.copy(maybeM1 = Option(in)) | |
(newState.out, newState) | |
}) | |
override def map2(in: M2): Option[Out] = map[M2](in, (in, s) => { | |
val newState = s.copy(maybeM2 = Option(in)) | |
(newState.out, newState) | |
}) | |
} | |
) | |
val results = new DataStreamUtils(stream2).collect.toList | |
println(results) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment