Created
May 8, 2020 15:30
-
-
Save HeartSaVioR/fab85734b5be85198c48f45004c8e0ca to your computer and use it in GitHub Desktop.
Mismatched pair of getter/setter in Encoders.bean
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.heartsavior.spark.trial; | |
import org.apache.spark.api.java.function.MapFunction; | |
import org.apache.spark.api.java.function.MapGroupsWithStateFunction; | |
import org.apache.spark.sql.Dataset; | |
import org.apache.spark.sql.Encoders; | |
import org.apache.spark.sql.SparkSession; | |
import org.apache.spark.sql.streaming.StreamingQuery; | |
import org.apache.spark.sql.streaming.StreamingQueryException; | |
import java.util.Objects; | |
public class ProblematicApp { | |
public static void main(String[] args) throws StreamingQueryException { | |
SparkSession spark = SparkSession | |
.builder() | |
.appName("Sample") | |
.master("local[*]") | |
.getOrCreate(); | |
Dataset df = spark.readStream() | |
.format("rate") | |
.option("rowsPerSecond", "1000") | |
.option("numPartitions", "1") | |
.load(); | |
Dataset<Long> df2 = df.select("value").as(Encoders.LONG()); | |
MapGroupsWithStateFunction<Long, Long, StateInfo, UpdateInfo> stateUpdateFunc = | |
(MapGroupsWithStateFunction<Long, Long, StateInfo, UpdateInfo>) (key, values, state) -> { | |
StateInfo stateObj; | |
long maxValue = Long.MIN_VALUE; | |
if (state.exists()) { | |
stateObj = state.get(); | |
maxValue = stateObj.getMaxValue(); | |
} else { | |
stateObj = new StateInfo(); | |
stateObj.setKey(key); | |
} | |
while (values.hasNext()) { | |
long value = values.next(); | |
if (value > maxValue) { | |
maxValue = value; | |
} | |
} | |
stateObj.setMaxValue(maxValue); | |
state.update(stateObj); | |
UpdateInfo update = new UpdateInfo(); | |
update.setKey(stateObj.getKey()); | |
update.setMaxValue(stateObj.getMaxValue()); | |
return update; | |
}; | |
Dataset<UpdateInfo> updates = df2.groupByKey( | |
(MapFunction<Long, Long>) value -> value % 10, | |
Encoders.LONG() | |
).mapGroupsWithState( | |
stateUpdateFunc, | |
Encoders.bean(StateInfo.class), | |
Encoders.bean(UpdateInfo.class) | |
); | |
StreamingQuery query = updates | |
.writeStream() | |
.outputMode("update") | |
.format("console") | |
.start(); | |
query.awaitTermination(); | |
} | |
public static class KeyValueRecord { | |
private long key; | |
private long value; | |
public long getKey() { | |
return key; | |
} | |
public void setKey(long key) { | |
this.key = key; | |
} | |
public long getValue() { | |
return value; | |
} | |
public void setValue(long value) { | |
this.value = value; | |
} | |
@Override | |
public String toString() { | |
return com.google.common.base.Objects.toStringHelper(this) | |
.add("key", key) | |
.add("value", value) | |
.toString(); | |
} | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) return true; | |
if (o == null || getClass() != o.getClass()) return false; | |
KeyValueRecord that = (KeyValueRecord) o; | |
return key == that.key && | |
value == that.value; | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hash(key, value); | |
} | |
} | |
public static class StateInfo { | |
private long key; | |
private long maxValue; | |
// this | |
public long getATimestamp() { | |
// dummy method to trigger the issue when there's only a getter method | |
return System.currentTimeMillis(); | |
} | |
public long getKey() { | |
return key; | |
} | |
public void setKey(long key) { | |
this.key = key; | |
} | |
public long getMaxValue() { | |
return maxValue; | |
} | |
public void setMaxValue(long maxValue) { | |
this.maxValue = maxValue; | |
} | |
} | |
public static class UpdateInfo { | |
private long key; | |
private long maxValue; | |
public long getKey() { | |
return key; | |
} | |
public void setKey(long key) { | |
this.key = key; | |
} | |
public long getMaxValue() { | |
return maxValue; | |
} | |
public void setMaxValue(long maxValue) { | |
this.maxValue = maxValue; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment