Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save xiyoulaoyuanjia/98ec380dc264edcd166a95405862fa17 to your computer and use it in GitHub Desktop.
Save xiyoulaoyuanjia/98ec380dc264edcd166a95405862fa17 to your computer and use it in GitHub Desktop.
sparkingStreamingPairTest.java
package SparkStreaming;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class testMain {
public static void main(String[] args) throws InterruptedException {
// SparkSession spark = SparkSession.builder().appName("ExecSql: ").master("local").getOrCreate();
SparkConf conf = new SparkConf().setAppName("spark streaming: ").setMaster("local[2]");
JavaStreamingContext sc = new JavaStreamingContext(conf, Durations.seconds(15));
JavaReceiverInputDStream<String> lines = sc.socketTextStream("10.7.65.211", 8888);
JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>(){
@Override public Iterator<String> call(String x){
//spark streaming 不同版本不一致,需要转化为iterator
return Arrays.asList(x.split(" ")).iterator();
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>(){
public Tuple2<String, Integer> call(String s){
return new Tuple2<String, Integer>(s,1);
}
});
JavaPairDStream<String, Integer> wc = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer i1, Integer i2){
return i1+i2;
}
});
wc.print();
sc.start();
sc.awaitTermination();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment