Skip to content

Instantly share code, notes, and snippets.

@olavfosse
Created August 20, 2024 00:06
Show Gist options
  • Save olavfosse/85884a7ea2ed835fb93dec9c043dc594 to your computer and use it in GitHub Desktop.
Save olavfosse/85884a7ea2ed835fb93dec9c043dc594 to your computer and use it in GitHub Desktop.
Telemere Reducible
(ns logs
(:require [clojure.edn :as edn]
[clojure.string :as str])
(:import java.nio.file.Path
java.nio.file.Paths
java.nio.file.Files
java.nio.file.OpenOption
java.io.PushbackReader
java.nio.file.LinkOption
java.io.InputStreamReader
java.util.zip.GZIPInputStream
clojure.lang.IReduceInit))
(defn- pushbackreader [^Path path]
(PushbackReader.
(if (str/ends-with? (.getFileName path) ".gz")
(InputStreamReader. (GZIPInputStream. (Files/newInputStream path (make-array OpenOption 0))))
(Files/newBufferedReader path))))
(defn reducible
"Returns a reducible for reducing Telemere's logs chronologically. The
reducible itself is not stateful and may be reused, reducing the
reducible does all the IO and cleanup.
Effortlessly handles multiple log files and decompression 😄!
Example setup:
(tm/add-handler! :durable
(tm/handler:file handler-opts)
{:needs-stopping? true})
(def logs (logs/reducible {:handler-opts handler-opts
:edn-opts edn-opts}))
Example usage:
(reduce (completing (fn [acc _] (inc acc))) 0 logs) := 33890
"
[{:as opts :keys [handler-opts edn-opts]}]
;; Protocol vs Reify?
(reify IReduceInit
(reduce [this rf init]
(let [files (sort-by
#(Files/getLastModifiedTime % (make-array LinkOption 0))
(remove #(Files/isDirectory % (make-array LinkOption 0))
(with-open [ds (Files/newDirectoryStream (Paths/get (get handler-opts :path "logs") (make-array String 0)))]
(vec ds))))]
(if (empty? files)
init
(let [!pbr (volatile! (pushbackreader (first files)))]
(try
(loop [acc init
files (rest files)]
(let [signal (edn/read (assoc edn-opts :eof ::eof) @!pbr)]
(if (= signal ::eof) (do (.close @!pbr)
(if (empty? files)
(rf acc)
(do (vreset! !pbr (pushbackreader (first files)))
(recur acc (rest files)))))
(let [acc (rf acc signal)]
(if (reduced? acc)
(unreduced acc)
(recur acc files))))))
(finally (.close @!pbr)))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment