Skip to content

Instantly share code, notes, and snippets.

@codejitsu
Forked from MLnick/StreamingHLL.scala
Created July 3, 2017 19:23

Revisions

  1. @MLnick MLnick revised this gist Feb 12, 2013. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions StreamingHLL.scala
    Original file line number Diff line number Diff line change
    @@ -31,9 +31,9 @@ object StreamingHLL {
    val globalHll = new HyperLogLogMonoid(12)
    var userSet: Set[Long] = Set()

    val approxUsers = users.map(id => {
    val approxUsers = users.mapPartitions(ids => {
    val hll = new HyperLogLogMonoid(12)
    hll(id)
    ids.map(id => hll(id))
    }).reduce(_ + _)

    val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
  2. @MLnick MLnick revised this gist Feb 12, 2013. 1 changed file with 1 addition and 3 deletions.
    4 changes: 1 addition & 3 deletions StreamingHLL.scala
    Original file line number Diff line number Diff line change
    @@ -36,9 +36,7 @@ object StreamingHLL {
    hll(id)
    }).reduce(_ + _)

    val exactUsers = users.map(id => {
    Set(id)
    }).reduce(_ ++ _)
    val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)

    var h = globalHll.zero
    approxUsers.foreach(rdd => {
  3. @MLnick MLnick created this gist Feb 12, 2013.
    65 changes: 65 additions & 0 deletions StreamingHLL.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,65 @@
    import spark.streaming.StreamingContext._
    import spark.streaming.{Seconds, StreamingContext}
    import spark.SparkContext._
    import spark.storage.StorageLevel
    import spark.streaming.examples.twitter.TwitterInputDStream
    import com.twitter.algebird.HyperLogLog._
    import com.twitter.algebird._

    /**
    * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
    * TwitterInputDStream
    */
    object StreamingHLL {
    def main(args: Array[String]) {
    if (args.length < 3) {
    System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
    " [filter1] [filter2] ... [filter n]")
    System.exit(1)
    }

    val Array(master, username, password) = args.slice(0, 3)
    val filters = args.slice(3, args.length)

    val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2))
    val stream = new TwitterInputDStream(ssc, username, password, filters,
    StorageLevel.MEMORY_ONLY_SER)
    ssc.registerInputStream(stream)

    val users = stream.map(status => status.getUser.getId)

    val globalHll = new HyperLogLogMonoid(12)
    var userSet: Set[Long] = Set()

    val approxUsers = users.map(id => {
    val hll = new HyperLogLogMonoid(12)
    hll(id)
    }).reduce(_ + _)

    val exactUsers = users.map(id => {
    Set(id)
    }).reduce(_ ++ _)

    var h = globalHll.zero
    approxUsers.foreach(rdd => {
    if (rdd.count() != 0) {
    val partial = rdd.first()
    h += partial
    println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
    println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt))
    }
    })

    exactUsers.foreach(rdd => {
    if (rdd.count() != 0) {
    val partial = rdd.first()
    userSet ++= partial
    println("Exact distinct users this batch: %d".format(partial.size))
    println("Exact distinct users overall: %d".format(userSet.size))
    println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100))
    }
    })

    ssc.start()
    }
    }