Created
January 27, 2020 11:42
-
-
Save koen-dejonghe/5c0fa6793082dbca21f1a1a97853efa5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.stream.alpakka.elasticsearch.scaladsl | |
import java.time.ZonedDateTime | |
import java.time.format.DateTimeFormatter | |
import akka.actor.Cancellable | |
import akka.stream.alpakka.elasticsearch.{ElasticsearchSourceSettings, ReadResult} | |
import akka.stream.scaladsl.{Flow, Source} | |
import com.typesafe.scalalogging.LazyLogging | |
import org.elasticsearch.client.RestClient | |
import spray.json.JsObject | |
import scala.collection.mutable | |
import scala.concurrent.duration._ | |
import scala.language.postfixOps | |
object ElasticsearchPollingSource extends LazyLogging { | |
/** | |
* Poll an ES index and push documents downstream. | |
* @param indexPrefix Prefix of the index to poll. Date of the index will be appended. | |
* For example: prefix = "logstash-apache" will poll "logstash-apache-yyyy.MM.dd" | |
* where "yyyy.MM.dd" is the date when the tick occurs. | |
* @param typeName Type of the ES query | |
* @param tickInterval Interval between ticks, in seconds | |
* @param overlap Extra seconds to take from the last window, duplicates will be removed | |
* @param lag Number of seconds to allow documents to appear on the ES index | |
* @param bufferSize Size of the buffer used for deduplication | |
* @param settings Elastic Search source settings | |
* @param elasticsearchClient REST client | |
* @return Source with Spray JSON read results | |
*/ | |
def create(indexPrefix: String, | |
typeName: String, | |
tickInterval: Int, | |
overlap: Int, | |
lag: Int, | |
bufferSize: Int, | |
settings: ElasticsearchSourceSettings)( | |
implicit elasticsearchClient: RestClient | |
): Source[ReadResult[JsObject], Cancellable] = { | |
def indexOfToday: String = { | |
val formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd") | |
val today = ZonedDateTime.now() | |
val date = formatter format today | |
s"$indexPrefix-$date" | |
} | |
def query(missedTicks: Int): String = { | |
val window = lag + ((tickInterval + overlap) * (1 + missedTicks)) | |
s""" | |
|{ | |
| "range" : { | |
| "@timestamp" : { | |
| "gt" : "now-${window}s/s", | |
| "lte" : "now-${lag}s/s" | |
| } | |
| } | |
|} | |
|""".stripMargin | |
} | |
// see https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#collecting-missed-ticks | |
val missedTicks = | |
Flow[String].conflateWithSeed(seed = _ => 0)((missedTicks, _) => { | |
logger.warn(s"missed ticks: ${missedTicks + 1}") | |
missedTicks + 1 | |
}) | |
Source | |
.tick(0 seconds, tickInterval seconds, "tick") | |
.via(missedTicks) | |
.flatMapConcat { missedTicks: Int => | |
ElasticsearchSource | |
.create( | |
indexName = indexOfToday, | |
typeName = typeName, | |
query = query(missedTicks), | |
settings = settings | |
) | |
.map(message => (message.id, message)) | |
} | |
.statefulMapConcat { () => // deduplicate | |
val dedupBuffer = new LtdUniQueue[String](bufferSize) | |
m => | |
val (id, message) = m | |
if (dedupBuffer ?+ id) List(message) else Nil | |
} | |
} | |
class LtdUniQueue[A](maxSize: Int) extends mutable.LinkedHashSet[A] { | |
import scala.util.chaining._ | |
def ?+(elem: A): Boolean = | |
add(elem) tap (added => if (added && size >= maxSize) remove(head)) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment