Created
October 17, 2022 13:47
-
-
Save fr33m0nk/200705b84d38f50ee4373b1652c2fedc to your computer and use it in GitHub Desktop.
Functions for creating sequential and parallel streams.
Parallel streams are very useful when working with Datomic's `d/datom` api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns fr33m0nk.clj-java-streams.core | |
(:refer-clojure :exclude [filter map distinct take drop sort take-while drop-while concat]) | |
(:import | |
(java.util Collection Comparator) | |
(java.util.function BinaryOperator Consumer Function Predicate) | |
(java.util.stream BaseStream Collectors Stream StreamSupport))) | |
(defn iterable? | |
[coll] | |
(let [class-of-coll (class coll)] | |
(isa? class-of-coll Iterable))) | |
(defn implements-collection? | |
[coll] | |
(let [class-of-coll (class coll)] | |
(isa? class-of-coll Collection))) | |
(defn stream-of | |
^Stream | |
[coll] | |
(Stream/of (to-array coll))) | |
(defn coll->stream | |
"Converts collection implementing java.lang.Iterable to parallel Java stream" | |
^Stream | |
[^Iterable coll] | |
(assert (iterable? coll) "Incorrect Collection type. Collection should implement java.lang.Iterable") | |
(if (implements-collection? coll) | |
(-> ^Collection coll (.stream)) | |
(-> ^Iterable coll (.spliterator) (StreamSupport/stream false)))) | |
(defn coll->parallel-stream | |
"Converts collection implementing java.lang.Iterable to parallel Java stream" | |
^Stream | |
[^Iterable coll] | |
(assert (iterable? coll) "Incorrect Collection type. Collection should implement java.lang.Iterable") | |
(if (implements-collection? coll) | |
(-> ^Collection coll (.parallelStream)) | |
(-> ^Iterable coll (.spliterator) (StreamSupport/stream true)))) | |
(defn filter | |
^Stream | |
[^Predicate pred ^Stream stream] | |
(.filter stream pred)) | |
(defn map | |
^Stream | |
[^Function mapper ^Stream stream] | |
(.map stream mapper)) | |
(defn flat-map | |
^Stream | |
[^Function mapper ^Stream stream] | |
(.flatMap stream mapper)) | |
(defn distinct | |
^Stream | |
[^Stream stream] | |
(.distinct stream)) | |
(defn sort | |
^Stream | |
([^Stream stream] | |
(.sorted stream)) | |
([^Comparator comparator ^Stream stream] | |
(.sorted stream comparator))) | |
(defn take | |
^Stream | |
[^Long size ^Stream stream] | |
(.limit stream size)) | |
(defn drop | |
^Stream | |
[^Long size ^Stream stream] | |
(.skip stream size)) | |
(defn take-while | |
^Stream | |
[^Predicate pred ^Stream stream] | |
(.takeWhile stream pred)) | |
(defn drop-while | |
^BaseStream | |
[^Predicate pred ^BaseStream stream] | |
(.dropWhile stream pred)) | |
(defn concat | |
^Stream | |
[^Stream stream-a ^Stream stream-b] | |
(Stream/concat stream-a stream-b)) | |
(defn to-vector | |
[^Stream stream] | |
(-> stream | |
(.collect (Collectors/toList)) | |
vec)) | |
(defn to-set | |
[^Stream stream] | |
(-> stream | |
(.collect (Collectors/toSet)) | |
set)) | |
(defn to-map | |
([^Function key-mapper ^Function value-mapper ^Stream stream] | |
(-> stream | |
(.collect (Collectors/toMap key-mapper value-mapper)) | |
((partial into {})))) | |
([^Function key-mapper ^Function value-mapper ^BinaryOperator mergeFunction ^Stream stream] | |
(-> stream | |
(.collect (Collectors/toMap key-mapper value-mapper mergeFunction)) | |
((partial into {}))))) | |
(defn for-each | |
[^Consumer action ^Stream stream] | |
(.forEach stream action)) |
Author
fr33m0nk
commented
Oct 21, 2022
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment