Skip to content

Commit

Permalink
Sparkplug serialisation model (#183)
Browse files Browse the repository at this point in the history
* Sparkplugs serialisation instead of sparklings

* Return to 100% tets coverage

* Mention sparkplug in all related source files

* Fixed typo
  • Loading branch information
anthony-khong authored Sep 2, 2020
1 parent d6377d4 commit 4d1e760
Show file tree
Hide file tree
Showing 21 changed files with 516 additions and 236 deletions.
81 changes: 68 additions & 13 deletions src/clojure/zero_one/geni/rdd/function.clj
Original file line number Diff line number Diff line change
@@ -1,13 +1,68 @@
(ns zero-one.geni.rdd.function)

(defmacro gen-function [cls wrapper-name]
`(defn ~wrapper-name [f#]
(new ~(symbol (str "zero_one.geni.rdd.function." cls)) f#)))

(gen-function Function function)
(gen-function Function2 function2)
(gen-function VoidFunction void-function)
(gen-function FlatMapFunction flat-map-function)
(gen-function FlatMapFunction2 flat-map-function2)
(gen-function PairFlatMapFunction pair-flat-map-function)
(gen-function PairFunction pair-function)
;; Taken from https://github.com/amperity/sparkplug
(ns zero-one.geni.rdd.function
(:require
[clojure.string :as str])
(:import
(java.lang.reflect Field Modifier)
(java.util HashSet)))

(defn access-field [^Field field obj]
(try
(.setAccessible field true)
(.get field obj)
(catch Exception _ nil))) ;; Original was IllegalAccessException

(defn walk-object-vars [^HashSet references ^HashSet visited obj]
(when-not (or (nil? obj)
(boolean? obj)
(string? obj)
(number? obj)
(keyword? obj)
(symbol? obj)
(instance? clojure.lang.Ref obj)
(.contains visited obj))
(.add visited obj)
(if (var? obj)
(let [ns-sym (ns-name (:ns (meta obj)))]
(.add references ns-sym))
(do
(when (map? obj)
(doall
(for [entry obj]
(walk-object-vars references visited entry))))
(doall
(for [^Field field (.getDeclaredFields (class obj))]
(when (or (not (map? obj)) (Modifier/isStatic (.getModifiers field)))
(let [value (access-field field obj)]
(when (or (ifn? value) (map? value))
(walk-object-vars references visited value))))))))))

(defn namespace-references [^Object obj]
(let [obj-ns (-> (.. obj getClass getName)
(Compiler/demunge)
(str/split #"/")
(first)
(symbol))
references (HashSet.)
visited (HashSet.)]
(when-not (class? (resolve obj-ns))
(.add references obj-ns))
(walk-object-vars references visited obj)
(disj (set references) 'clojure.core)))

(defmacro ^:private gen-function
[fn-name constructor]
(let [class-sym (symbol (str "zero_one.geni.rdd.function." fn-name))]
`(defn ~(vary-meta constructor assoc :tag class-sym)
~(str "Construct a new serializable " fn-name " function wrapping `f`.")
[~'f]
(let [references# (namespace-references ~'f)]
(new ~class-sym ~'f (mapv str references#))))))

(gen-function Fn1 function)
(gen-function Fn2 function2)
(gen-function FlatMapFn1 flat-map-function)
(gen-function FlatMapFn2 flat-map-function2)
(gen-function PairFlatMapFn pair-flat-map-function)
(gen-function PairFn pair-function)
(gen-function VoidFn void-function)
27 changes: 27 additions & 0 deletions src/java/geni/rdd/function/ComparatorFn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Taken from https://github.com/amperity/sparkplug
package zero_one.geni.rdd.function;


import clojure.lang.IFn;

import java.util.Collection;
import java.util.Comparator;


/**
* Compatibility wrapper for a `Comparator` of two arguments.
*/
public class ComparatorFn extends SerializableFn implements Comparator<Object> {

public ComparatorFn(IFn f, Collection<String> namespaces) {
super(f, namespaces);
}


@Override
@SuppressWarnings("unchecked")
public int compare(Object v1, Object v2) {
return (int)f.invoke(v1, v2);
}

}
30 changes: 30 additions & 0 deletions src/java/geni/rdd/function/FlatMapFn1.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Taken from https://github.com/amperity/sparkplug
package zero_one.geni.rdd.function;


import clojure.lang.IFn;

import java.util.Iterator;
import java.util.Collection;

import org.apache.spark.api.java.function.FlatMapFunction;


/**
* Compatibility wrapper for a Spark `FlatMapFunction` of one argument.
*/
public class FlatMapFn1 extends SerializableFn implements FlatMapFunction {

public FlatMapFn1(IFn f, Collection<String> namespaces) {
super(f, namespaces);
}


@Override
@SuppressWarnings("unchecked")
public Iterator<Object> call(Object v1) throws Exception {
Collection<Object> results = (Collection<Object>)f.invoke(v1);
return results.iterator();
}

}
30 changes: 30 additions & 0 deletions src/java/geni/rdd/function/FlatMapFn2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Taken from https://github.com/amperity/sparkplug
package zero_one.geni.rdd.function;


import clojure.lang.IFn;

import java.util.Iterator;
import java.util.Collection;

import org.apache.spark.api.java.function.FlatMapFunction2;


/**
* Compatibility wrapper for a Spark `FlatMapFunction2` of two arguments.
*/
public class FlatMapFn2 extends SerializableFn implements FlatMapFunction2 {

public FlatMapFn2(IFn f, Collection<String> namespaces) {
super(f, namespaces);
}


@Override
@SuppressWarnings("unchecked")
public Iterator<Object> call(Object v1, Object v2) throws Exception {
Collection<Object> results = (Collection<Object>)f.invoke(v1, v2);
return results.iterator();
}

}
19 changes: 0 additions & 19 deletions src/java/geni/rdd/function/FlatMapFunction.java

This file was deleted.

17 changes: 0 additions & 17 deletions src/java/geni/rdd/function/FlatMapFunction2.java

This file was deleted.

28 changes: 28 additions & 0 deletions src/java/geni/rdd/function/Fn1.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Taken from https://github.com/amperity/sparkplug
package zero_one.geni.rdd.function;


import clojure.lang.IFn;

import java.util.Collection;

import org.apache.spark.api.java.function.Function;


/**
* Compatibility wrapper for a Spark `Function` of one argument.
*/
public class Fn1 extends SerializableFn implements Function {

public Fn1(IFn f, Collection<String> namespaces) {
super(f, namespaces);
}


@Override
@SuppressWarnings("unchecked")
public Object call(Object v1) throws Exception {
return f.invoke(v1);
}

}
28 changes: 28 additions & 0 deletions src/java/geni/rdd/function/Fn2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Taken from https://github.com/amperity/sparkplug
package zero_one.geni.rdd.function;


import clojure.lang.IFn;

import java.util.Collection;

import org.apache.spark.api.java.function.Function2;


/**
* Compatibility wrapper for a Spark `Function2` of two arguments.
*/
public class Fn2 extends SerializableFn implements Function2 {

public Fn2(IFn f, Collection<String> namespaces) {
super(f, namespaces);
}


@Override
@SuppressWarnings("unchecked")
public Object call(Object v1, Object v2) throws Exception {
return f.invoke(v1, v2);
}

}
28 changes: 28 additions & 0 deletions src/java/geni/rdd/function/Fn3.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Taken from https://github.com/amperity/sparkplug
package zero_one.geni.rdd.function;


import clojure.lang.IFn;

import java.util.Collection;

import org.apache.spark.api.java.function.Function3;


/**
* Compatibility wrapper for a Spark `Function3` of three arguments.
*/
public class Fn3 extends SerializableFn implements Function3 {

public Fn3(IFn f, Collection<String> namespaces) {
super(f, namespaces);
}


@Override
@SuppressWarnings("unchecked")
public Object call(Object v1, Object v2, Object v3) throws Exception {
return f.invoke(v1, v2, v3);
}

}
13 changes: 0 additions & 13 deletions src/java/geni/rdd/function/Function.java

This file was deleted.

13 changes: 0 additions & 13 deletions src/java/geni/rdd/function/Function2.java

This file was deleted.

13 changes: 0 additions & 13 deletions src/java/geni/rdd/function/Function3.java

This file was deleted.

42 changes: 42 additions & 0 deletions src/java/geni/rdd/function/PairFlatMapFn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Taken from https://github.com/amperity/sparkplug
package zero_one.geni.rdd.function;


import clojure.lang.IFn;

import java.util.Collection;
import java.util.Iterator;

import org.apache.spark.api.java.function.PairFlatMapFunction;

import scala.Tuple2;


/**
* Compatibility wrapper for a Spark `PairFlatMapFunction` of one argument
* which returns a sequence of pairs.
*/
public class PairFlatMapFn extends SerializableFn implements PairFlatMapFunction {

public PairFlatMapFn(IFn f, Collection<String> namespaces) {
super(f, namespaces);
}


@Override
@SuppressWarnings("unchecked")
public Iterator<Tuple2<Object, Object>> call(Object v1) throws Exception {
Collection<Object> result = (Collection<Object>)f.invoke(v1);
Iterator<Object> results = result.iterator();
return new Iterator<Tuple2<Object, Object>>() {
public boolean hasNext() {
return results.hasNext();
}

public Tuple2<Object, Object> next() {
return PairFn.coercePair(f, results.next());
}
};
}

}
Loading

0 comments on commit 4d1e760

Please sign in to comment.