Created
October 13, 2017 07:22
-
-
Save darionyaphet/2c988cf89a2801f787f175a5fc720007 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.darion.yaphet.gearpump | |
import org.apache.gearpump.cluster.UserConfig | |
import org.apache.gearpump.cluster.client.ClientContext | |
import org.apache.gearpump.streaming.{Processor, StreamApplication} | |
import org.apache.gearpump.streaming.partitioner.HashPartitioner | |
import org.apache.gearpump.streaming.source.DataSourceProcessor | |
import org.apache.gearpump.util.{AkkaApp, Graph} | |
import org.apache.gearpump.util.Graph.Node | |
//org.darion.yaphet.gearpump.GearpumpMain | |
object GearpumpMain extends AkkaApp { | |
override def main(akkaConf: Config, args: Array[String]): Unit = { | |
val context = ClientContext(akkaConf) | |
implicit val actorSystem = context.system | |
val sourceProcessor = DataSourceProcessor(new WordSource, args(1).toInt, "Split") | |
val sink = Processor[WordSink](args(2).toInt) | |
val partitioner = new HashPartitioner | |
val computation = sourceProcessor ~ partitioner ~> sink | |
val app = StreamApplication("wordCount", Graph(computation), UserConfig.empty) | |
context.submit(app) | |
} | |
override def help(): Unit = { | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.darion.yaphet.gearpump | |
import java.time.Instant | |
import org.apache.gearpump.Message | |
import org.apache.gearpump.cluster.UserConfig | |
import org.apache.gearpump.streaming.task.{Task, TaskContext} | |
import scala.collection.mutable | |
class WordSink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { | |
private val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() | |
private var snapShotTime: Long = System.currentTimeMillis() | |
override def onStart(startTime: Instant): Unit = { | |
snapShotTime = System.currentTimeMillis() | |
} | |
override def onNext(msg: Message): Unit = { | |
val current = map.getOrElse(msg.value.asInstanceOf[String], 0L) | |
map.put(msg.value.asInstanceOf[String], current + 1) | |
//if ((System.currentTimeMillis() - snapShotTime) > 1000 * 30) { | |
// for ((k, v) <- map) { | |
// println(k + " " + v) | |
// } | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.darion.yaphet.gearpump | |
import java.io.File | |
import java.nio.charset.Charset | |
import java.time.Instant | |
import java.util.Random | |
import org.apache.gearpump.Message | |
import org.apache.gearpump.com.google.common.io.Files | |
import org.apache.gearpump.streaming.source.DataSource | |
import org.apache.gearpump.streaming.task.TaskContext | |
import scala.collection.mutable.ArrayBuffer | |
class WordSource extends DataSource { | |
private val wordList = ArrayBuffer[String]() | |
private var size = 0 | |
override def open(context: TaskContext, startTime: Instant): Unit = { | |
val words = Files.readLines(new File("/data/words"), Charset.defaultCharset) | |
for (index <- 0.until(words.size())) { | |
wordList += words.get(index) | |
} | |
println(wordList) | |
size = words.size() | |
println(size) | |
} | |
override def read(): Message = { | |
val timeStamp = System.currentTimeMillis() | |
val index = new Random(timeStamp).nextInt(size) | |
Message(wordList(index), timeStamp) | |
} | |
override def close(): Unit = { | |
} | |
override def getWatermark: Instant = { | |
Instant.now() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment