Skip to content

Instantly share code, notes, and snippets.

@jsmorph
Last active December 19, 2015 01:39

Revisions

  1. jsmorph revised this gist Jun 27, 2013. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion ClojureSeqCombiner.java
    Original file line number Diff line number Diff line change
    @@ -24,7 +24,7 @@
    * https://github.com/charlessimpson/clojure-accumulo-iterators/blob/master/src/main/java/clojure_accumulo/iterators/ClojureCombiner.java
    *
    * The primary difference is that this code delegates the iteration
    * and marshalling to Clojure code.
    * to Clojure code.
    *
    * The user-specified Clojure code should return a function of two
    * arguments: a Key and an Iterator<Value>, and the function should
  2. jsmorph revised this gist Jun 27, 2013. 1 changed file with 11 additions and 66 deletions.
    77 changes: 11 additions & 66 deletions example.txt
    Original file line number Diff line number Diff line change
    @@ -1,66 +1,11 @@
    foo> insert a c x 1
    insert a c x 1
    foo> insert a c x 2
    insert a c x 2
    foo> insert a c x 3
    insert a c x 3
    foo> scan
    scan
    a c:x [] 1,1;2,1;3,1
    foo> insert a c x 4
    insert a c x 4
    foo> insert a c x 5
    insert a c x 5
    foo> insert a c x 6
    insert a c x 6
    foo> insert a c x 7
    insert a c x 7
    foo> insert a c x 8
    insert a c x 8
    foo> insert a c x 9
    insert a c x 9
    foo> insert a c x 10
    insert a c x 10
    foo> scan
    scan
    a c:x [] 1,1;2,1;3,1;4,1;5,1;6,1;7,1;8,1;9,1;10,1
    foo> insert a c x 11
    insert a c x 11
    foo> insert a c x 12
    insert a c x 12
    foo> insert a c x 13
    insert a c x 13
    foo> insert a c x 14
    insert a c x 14
    foo> insert a c x 15
    insert a c x 15
    foo> insert a c x 16
    insert a c x 16
    foo> scan
    scan
    a c:x [] 11,1;12,1;13,1;14,1;15,1;16,1;1,1;2,1;3,1;4,1;5,1;6,1;7,1;8,1;9,1;10,1
    foo> insert a c x 17
    insert a c x 17
    foo> scan
    scan
    a c:x [] 11,1;12,1;13,1;14,1;15,1;16,1;17,1;*,1;2,1;3,1;4,1;5,1;6,1;7,1;8,1;9,1;10,1
    foo> insert a c x 1
    insert a c x 1
    foo> scan
    scan
    a c:x [] 11,1;12,1;13,1;14,1;15,1;16,1;17,1;*,1;1,2;3,1;4,1;5,1;6,1;7,1;8,1;9,1;10,1
    foo> insert a c x 1
    insert a c x 1
    foo> insert a c x 1
    insert a c x 1
    foo> insert a c x 1
    insert a c x 1
    foo> insert a c x 2
    insert a c x 2
    foo> insert a c x 2
    insert a c x 2
    foo> insert a c x 2
    insert a c x 2
    foo> scan
    scan
    a c:x [] 11,1;12,1;13,1;14,1;15,1;16,1;17,1;*,1;1,5;2,4;4,1;5,1;6,1;7,1;8,1;9,1;10,1
    foo> setiter -t foo -p 10 -scan -minc -majc -n clj -class org.apache.accumulo.examples.simple.combiner.ClojureSeqCombiner
    foo> insert b c x a
    foo> insert b c x b
    foo> insert b c x b
    foo> insert b c x c
    foo> insert b c x c
    foo> insert b c x c
    foo> insert b c x c
    foo> insert b c x d
    foo> scan
    b c:x [] a,1;b,2;c,4;d,1
  3. jsmorph revised this gist Jun 27, 2013. 1 changed file with 10 additions and 0 deletions.
    10 changes: 10 additions & 0 deletions examine.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,10 @@
    (defn examine
    ([hist-string]
    (let [pairs
    (map #(let [[x n] (vec (.split % ","))]
    [x (Long/parseLong n)])
    (.split hist-string ";"))]
    (println :entries (count pairs))
    (println :total (reduce + (map second pairs)))
    (doseq [pair (sort-by second pairs)]
    (println pair)))))
  4. jsmorph revised this gist Jun 27, 2013. 1 changed file with 32 additions and 0 deletions.
    32 changes: 32 additions & 0 deletions histogram.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,32 @@
    (fn [k vals]
    (let [pair-sep "," pairs-sep ";" limit 16 other "*"]
    (loop [vs (iterator-seq vals)
    acc {}] ;; ToDo: Use transients.
    (if (empty? vs)
    (.getBytes (reduce str
    (interpose pairs-sep
    (map (fn [[x n]]
    (str x pair-sep n))
    acc))))
    (recur (rest vs)
    (let [v (new String (.get (first vs)))]
    (if (.contains v pair-sep)
    (into {}
    (let [merged (merge-with +
    acc
    (into {}
    (map (fn [xn]
    (let [[x n] (.split xn pair-sep 2)]
    [x (Long/parseLong n)]))
    (.split (new String v) pairs-sep))))
    sorted (sort-by (fn [[k c]] [(- c) k]
    (dissoc merged other))
    top (take limit sorted)
    bottom (drop limit sorted)]
    (conj top [other
    (+ (merged other 0)
    (reduce + (map second bottom)))])))
    (if (or (acc v)
    (< (count acc) limit))
    (assoc acc v (inc (acc v 0)))
    (assoc acc other (inc (acc other 0))))))))))))
  5. jsmorph created this gist Jun 27, 2013.
    205 changes: 205 additions & 0 deletions ClojureSeqCombiner.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,205 @@
    /*
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */

    /*
    * An Accumolo Combiner
    *
    * http://accumulo.apache.org/1.5/accumulo_user_manual.html#_combiners
    *
    * that delegates the reduce to user-specified Clojure code.
    *
    * Derived in part from
    *
    * https://github.com/charlessimpson/clojure-accumulo-iterators/blob/master/src/main/java/clojure_accumulo/iterators/ClojureCombiner.java
    *
    * The primary difference is that this code delegates the iteration
    * and marshalling to Clojure code.
    *
    * The user-specified Clojure code should return a function of two
    * arguments: a Key and an Iterator<Value>, and the function should
    * return a Java byte array.
    *
    * See 'EXAMPLE_*` in the code below.
    *
    * Building:
    *
    * Add
    *
    * <dependency>
    * <groupId>org.clojure</groupId>
    * <artifactId>clojure</artifactId>
    * <version>1.4.0</version>
    * </dependency>
    *
    * to 'examples/simple/pom.xml'.
    *
    */


    package org.apache.accumulo.examples.simple.combiner;

    import java.io.IOException;
    import java.util.Iterator;
    import java.util.Map;
    import java.io.ByteArrayOutputStream;
    import java.io.OutputStreamWriter;
    import java.io.StringReader;

    import clojure.lang.IFn;
    import org.apache.accumulo.core.data.Key;
    import org.apache.accumulo.core.data.KeyValue;
    import org.apache.accumulo.core.data.Value;
    import org.apache.accumulo.core.iterators.Combiner;
    import org.apache.accumulo.core.iterators.IteratorEnvironment;
    import org.apache.accumulo.core.iterators.SortedKeyValueIterator;

    import clojure.lang.Compiler;
    import clojure.lang.RT;
    import clojure.lang.Var;
    import org.apache.accumulo.start.classloader.AccumuloClassLoader;


    public final class ClojureSeqCombiner extends Combiner {

    protected static final boolean useExample = true;

    protected static final String F_OPTION = "f";
    protected IFn f;
    // ToDo: Also provide Clojure-level parameter object.

    // Example: Concatenate values with a comma separator.
    // LOOP-based iteration.
    protected static String EXAMPLE_1 =
    "(fn [k vals] " +
    " (loop [vs (iterator-seq vals) " +
    " acc (transient [])] " +
    " (if (empty? vs) " +
    " (.getBytes (reduce str (interpose \",\" (persistent! acc)))) " +
    " (recur (rest vs) " +
    " (conj! acc (new String (.get (first vs)))))))) ";

    // Example: Concatenate values with a comma separator.
    // MAP-based iteration.
    protected static String EXAMPLE_2 =
    "(fn [k vals] " +
    " (.getBytes (reduce str " +
    " (interpose \",\" " +
    " (map #(new String (.get %)) " +
    " (iterator-seq vals)))))) ";


    protected static String EXAMPLE_3 =
    // ToDo: Allow options to be passed to the Clojure function.
    "(fn [k vals] \n" +
    " (let [pair-sep \",\" pairs-sep \";\" limit 16 other \"*\"] \n" +
    " (loop [vs (iterator-seq vals) \n" +
    " acc {}] ;; ToDo: Use transients. \n" +
    " (if (empty? vs) \n" +
    " (.getBytes (reduce str \n" +
    " (interpose pairs-sep \n" +
    " (map (fn [[x n]] \n" +
    " (str x pair-sep n)) \n" +
    " acc)))) \n" +
    " (recur (rest vs) \n" +
    " (let [v (new String (.get (first vs)))] \n" +
    " (if (.contains v pair-sep) \n" +
    " (into {} \n" +
    " (let [merged (merge-with + \n" +
    " acc \n" +
    " (into {} \n" +
    " (map (fn [xn] \n" +
    " (let [[x n] (.split xn pair-sep 2)] \n" +
    " [x (Long/parseLong n)])) \n" +
    " (.split (new String v) pairs-sep)))) \n" +
    " sorted (sort-by (fn [[k c]] [(- c) k])\n" +
    " (dissoc merged other)) \n" +
    " top (take limit sorted) \n" +
    " bottom (drop limit sorted)] \n" +
    " (conj top [other \n" +
    " (+ (merged other 0) \n" +
    " (reduce + (map second bottom)))]))) \n" +
    " (if (or (acc v) \n" +
    " (< (count acc) limit)) \n" +
    " (assoc acc v (inc (acc v 0))) \n" +
    " (assoc acc other (inc (acc other 0))))))))))) \n";


    // Which example to use by default.
    protected static String EXAMPLE = EXAMPLE_3;

    @Override

    public Value reduce(Key key, Iterator<Value> valueIterator) {
    return new Value((byte[]) f.invoke(key, valueIterator));
    }

    public static Object eval (String clj) {
    Object ret;

    try {
    // Set the class loader to use Accumulo's so we get Hadoop,
    // Accumulo, and whatever is in lib/ext.
    Thread.currentThread().setContextClassLoader(AccumuloClassLoader.getClassLoader());
    Var.pushThreadBindings(RT.map(RT.USE_CONTEXT_CLASSLOADER, RT.T));

    // Actually compile the given code.
    ret = Compiler.load(new StringReader(clj));
    } catch (Exception e) {
    throw new IllegalArgumentException(e);
    } finally {
    Var.popThreadBindings();
    }

    return ret;
    }

    @Override
    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
    super.init(source, options, env);

    if (options == null && !useExample) {
    throw new IllegalArgumentException(F_OPTION + " must be set for ClojureSeqCombiner");
    }

    String fString = options.get(F_OPTION);
    if (fString == null) {
    if (useExample) {
    fString = EXAMPLE;
    } else {
    throw new IllegalArgumentException(F_OPTION + " must be set for ClojureSeqCombiner");
    }
    }

    final Object obj = eval(fString);
    if (!(obj instanceof IFn)) {
    throw new IllegalArgumentException(F_OPTION + " must compile to something that implements IFn");
    }
    f = (IFn) obj;
    }

    @Override
    public IteratorOptions describeOptions() {
    final IteratorOptions io = super.describeOptions();
    io.setName("clojurSeqCombiner");
    io.addNamedOption(F_OPTION, "String containing combiner reduce function");

    io.setDescription("ClojureSeqCombiner allows a Clojure function to be passed in and invoked as the Combiner's reduce method");

    return io;
    }
    }
    // Local Variables:
    // c-basic-offset: 2
    // indent-tabs-mode: nil
    // End:
    66 changes: 66 additions & 0 deletions example.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,66 @@
    foo> insert a c x 1
    insert a c x 1
    foo> insert a c x 2
    insert a c x 2
    foo> insert a c x 3
    insert a c x 3
    foo> scan
    scan
    a c:x [] 1,1;2,1;3,1
    foo> insert a c x 4
    insert a c x 4
    foo> insert a c x 5
    insert a c x 5
    foo> insert a c x 6
    insert a c x 6
    foo> insert a c x 7
    insert a c x 7
    foo> insert a c x 8
    insert a c x 8
    foo> insert a c x 9
    insert a c x 9
    foo> insert a c x 10
    insert a c x 10
    foo> scan
    scan
    a c:x [] 1,1;2,1;3,1;4,1;5,1;6,1;7,1;8,1;9,1;10,1
    foo> insert a c x 11
    insert a c x 11
    foo> insert a c x 12
    insert a c x 12
    foo> insert a c x 13
    insert a c x 13
    foo> insert a c x 14
    insert a c x 14
    foo> insert a c x 15
    insert a c x 15
    foo> insert a c x 16
    insert a c x 16
    foo> scan
    scan
    a c:x [] 11,1;12,1;13,1;14,1;15,1;16,1;1,1;2,1;3,1;4,1;5,1;6,1;7,1;8,1;9,1;10,1
    foo> insert a c x 17
    insert a c x 17
    foo> scan
    scan
    a c:x [] 11,1;12,1;13,1;14,1;15,1;16,1;17,1;*,1;2,1;3,1;4,1;5,1;6,1;7,1;8,1;9,1;10,1
    foo> insert a c x 1
    insert a c x 1
    foo> scan
    scan
    a c:x [] 11,1;12,1;13,1;14,1;15,1;16,1;17,1;*,1;1,2;3,1;4,1;5,1;6,1;7,1;8,1;9,1;10,1
    foo> insert a c x 1
    insert a c x 1
    foo> insert a c x 1
    insert a c x 1
    foo> insert a c x 1
    insert a c x 1
    foo> insert a c x 2
    insert a c x 2
    foo> insert a c x 2
    insert a c x 2
    foo> insert a c x 2
    insert a c x 2
    foo> scan
    scan
    a c:x [] 11,1;12,1;13,1;14,1;15,1;16,1;17,1;*,1;1,5;2,4;4,1;5,1;6,1;7,1;8,1;9,1;10,1