-
Notifications
You must be signed in to change notification settings - Fork 36
Record Linkage
Here is an example of record linkage between two datasets. One dataset is the prefix of country names, whereas the second dataset is all countries. See approxCountries
and countries
, respectively below.
See also prefix linkage or fuzzy linkage
Examples with "real-world" datasets are available below:
To run all the above examples, you should provide the spark-csv jars. See the ./spark-shell-csv.sh
script.
We assume that you started spark-shell
with spark-lucenerdd's assembly JAR loaded.
First, define a set of prefixes for countries.
import org.apache.spark.rdd.RDD
val approxCountries = Array("gree", "germa", "spa", "ita")
val approxCountriesRDD: RDD[String] = sc.parallelize(approxCountries)
Second, load the list of all country names and instantiate a LuceneRDD
object, which instantiates a Lucene index per Spark executor.
import scala.io.Source
import org.zouzias.spark.lucenerdd.LuceneRDD
val countries = sc.parallelize(Source.fromFile("src/test/resources/countries.txt").getLines()
.map(_.toLowerCase()).toSeq)
val luceneRDD = LuceneRDD(countries)
Finally, there is one more input required for the record linkage, i.e., specify how to link the approxCountries with all countries. To do so, you require to define a function
val prefixLinker = (country: String) => {
s"_1:${country}*"
}
Now, to link the two above RDD's do:
val linked = luceneRDD.link(approxCountriesRDD, prefixLinker, 10)
where linked
has type RDD[(String, List[SparkScoreDoc])]
To see the results,
linked.take(4).foreach(println)
(gree,List(SparkScoreDoc(1.0,24,0,Text fields:_1:[greece]), SparkScoreDoc(1.0,25,0,Text fields:_1:[greenland])))
(germa,List(SparkScoreDoc(1.0,21,0,Text fields:_1:[germany])))
(spa,List(SparkScoreDoc(1.0,22,0,Text fields:_1:[spain])))
(ita,List(SparkScoreDoc(1.0,44,0,Text fields:_1:[italy])))