-
Notifications
You must be signed in to change notification settings - Fork 1
/
TripleUtil.scala
367 lines (307 loc) · 12.1 KB
/
TripleUtil.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
import org.apache.log4j.{Level, Logger}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.Map
object TripleUtil {
// Encoded URIs
val RDF_TYPE = 0L
val RDF_PROPERTY = 1L
val RDF_NIL = 28L
val RDF_LIST = 27L
val RDF_FIRST = 26L
val RDF_REST = 25L
val RDFS_RANGE = 2L
val RDFS_DOMAIN = 3L
val RDFS_SUBPROPERTY = 4L
val RDFS_SUBCLASS = 5L
val RDFS_MEMBER = 19L
val RDFS_LITERAL = 20L
val RDFS_CONTAINER_MEMBERSHIP_PROPERTY = 21L
val RDFS_DATATYPE = 22L
val RDFS_CLASS = 23L
val RDFS_RESOURCE = 24L
val OWL_CLASS = 6L
val OWL_FUNCTIONAL_PROPERTY = 7L
val OWL_INVERSE_FUNCTIONAL_PROPERTY = 8L
val OWL_SYMMETRIC_PROPERTY = 9L
val OWL_TRANSITIVE_PROPERTY = 10L
val OWL_SAME_AS = 11L
val OWL_INVERSE_OF = 12L
val OWL_EQUIVALENT_CLASS = 13L
val OWL_EQUIVALENT_PROPERTY = 14L
val OWL_HAS_VALUE = 15L
val OWL_ON_PROPERTY = 16L
val OWL_SOME_VALUES_FROM = 17L
val OWL_ALL_VALUES_FROM = 18L
val OWL2_PROPERTY_CHAIN_AXIOM = 29
// Standard URIs
val S_RDF_NIL = "<http://www.w3.org/1999/02/22-rdf-syntax-ns#nil>"
val S_RDF_LIST = "<http://www.w3.org/1999/02/22-rdf-syntax-ns#List>"
val S_RDF_FIRST = "<http://www.w3.org/1999/02/22-rdf-syntax-ns#first>"
val S_RDF_REST = "<http://www.w3.org/1999/02/22-rdf-syntax-ns#rest>"
val S_RDF_TYPE = "<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>"
val S_RDF_PROPERTY = "<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>"
val S_RDFS_RANGE = "<http://www.w3.org/2000/01/rdf-schema#range>"
val S_RDFS_DOMAIN = "<http://www.w3.org/2000/01/rdf-schema#domain>"
val S_RDFS_SUBPROPERTY = "<http://www.w3.org/2000/01/rdf-schema#subPropertyOf>"
val S_RDFS_SUBCLASS = "<http://www.w3.org/2000/01/rdf-schema#subClassOf>"
val S_RDFS_MEMBER = "<http://www.w3.org/2000/01/rdf-schema#member>"
val S_RDFS_LITERAL = "<http://www.w3.org/2000/01/rdf-schema#Literal>"
val S_RDFS_CONTAINER_MEMBERSHIP_PROPERTY = "<http://www.w3.org/2000/01/rdf-schema#ContainerMembershipProperty>"
val S_RDFS_DATATYPE = "<http://www.w3.org/2000/01/rdf-schema#Datatype>"
val S_RDFS_CLASS = "<http://www.w3.org/2000/01/rdf-schema#Class>"
val S_RDFS_RESOURCE = "<http://www.w3.org/2000/01/rdf-schema#Resource>"
val S_OWL_CLASS = "<http://www.w3.org/2002/07/owl#Class>"
val S_OWL_FUNCTIONAL_PROPERTY = "<http://www.w3.org/2002/07/owl#FunctionalProperty>"
val S_OWL_INVERSE_FUNCTIONAL_PROPERTY = "<http://www.w3.org/2002/07/owl#InverseFunctionalProperty>"
val S_OWL_SYMMETRIC_PROPERTY = "<http://www.w3.org/2002/07/owl#SymmetricProperty>"
val S_OWL_TRANSITIVE_PROPERTY = "<http://www.w3.org/2002/07/owl#TransitiveProperty>"
val S_OWL_SAME_AS = "<http://www.w3.org/2002/07/owl#sameAs>"
val S_OWL_INVERSE_OF = "<http://www.w3.org/2002/07/owl#inverseOf>"
val S_OWL_EQUIVALENT_CLASS = "<http://www.w3.org/2002/07/owl#equivalentClass>"
val S_OWL_EQUIVALENT_PROPERTY = "<http://www.w3.org/2002/07/owl#equivalentProperty>"
val S_OWL_HAS_VALUE = "<http://www.w3.org/2002/07/owl#hasValue>"
val S_OWL_ON_PROPERTY = "<http://www.w3.org/2002/07/owl#onProperty>"
val S_OWL_SOME_VALUES_FROM = "<http://www.w3.org/2002/07/owl#someValuesFrom>"
val S_OWL_ALL_VALUES_FROM = "<http://www.w3.org/2002/07/owl#allValuesFrom>"
val S_OWL2_PROPERTY_CHAIN_AXIOM = "<http://www.w3.org/2002/07/owl#propertyChainAxiom>"
val S_OWL2_HAS_KEY = "<http://www.w3.org/2002/07/owl#hasKey>"
/* Utility High Order Functions */
def createCombiner = (value: String) => List(value)
def mergeValue = (list: List[String], value: String) => list ::: (value :: Nil)
def mergeCombiners = (list1: List[String], list2: List[String]) => list1 ::: list2
def parseTriple(triple: String) = {
// Get subject
val subj = if (triple.startsWith("<")) {
triple.substring(0, triple.indexOf('>') + 1)
} else {
triple.substring(0, triple.indexOf(' '))
}
// Get predicate
val triple0 = triple.substring(triple.indexOf(' ') + 1)
val pred = triple0.substring(0, triple0.indexOf('>') + 1)
val triple1 = triple0.substring(pred.length + 1)
// Get object
val obj = if (triple1.startsWith("<")) {
triple1.substring(0, triple1.indexOf('>') + 1)
}
else if (triple1.startsWith("\"")) {
triple1.substring(0, triple1.substring(1).indexOf('\"') + 2)
}
else {
triple1.substring(0, triple1.indexOf(' '))
}
(subj, (pred, obj))
}
def combination(xs: List[String]) = {
var ys = List[(String, String)]()
val n = xs.length
for (i <- 0 to n - 1; j <- 0 to n - 1) {
val x = xs(i)
val y = xs(j)
if (!x.equals(y)) {
ys = ys :+ (x, y)
}
}
ys
}
def setLogLevel(level: Level): Unit = {
Logger.getLogger("org").setLevel(level)
Logger.getLogger("akka").setLevel(level)
}
// Fixed-poString iteration
def transitiveClosure(input: RDD[(String, String)], sparkContext: SparkContext) = {
val hashPartitioner = new HashPartitioner(input.partitions.length)
var tc = input
val edges = tc.map { case (s, o) => (o, s) }.partitionBy(hashPartitioner)
// This join is iterated until a fixed poString is reached.
var oldCount = 0L
var nextCount = tc.count()
do {
oldCount = nextCount
tc = tc.union(tc.join(edges).map { case (_, (z, x)) => (x, z) })
.filter { case (x, y) => !x.equals(y) }.distinct
.partitionBy(hashPartitioner).cache()
nextCount = tc.count()
} while (nextCount != oldCount)
edges.unpersist(true)
input.unpersist(true)
tc
}
def transitivePropClosure(input: RDD[(String, (String, String))], sparkContext: SparkContext) = {
val hashPartitioner = new HashPartitioner(input.partitions.length)
var tc = input.map { case (s, (p, o)) => ((s, p), o)}
val edges = tc.map { case ((s, p), o) => ((o, p), s) }.partitionBy(hashPartitioner)
// This join is iterated until a fixed poString is reached.
var oldCount = 0L
var nextCount = tc.count()
do {
oldCount = nextCount
tc = tc.union(tc.join(edges).map { case ((_, p), (s, o)) => ((o, p), s) })
.filter { case ((o, _), s) => !s.equals(o) }.distinct
.partitionBy(hashPartitioner).cache()
nextCount = tc.count()
} while (nextCount != oldCount)
edges.unpersist(true)
input.unpersist(true)
tc.map { case ((s, p), o) => (s, (p, o)) }
}
def combinator(list: List[String]) = {
for (x <- list; y <- list if !x.equals(y)) yield (x, y)
}
def subpropInhReasoning = (bcSubpropMap: Broadcast[Map[String, Iterable[String]]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val dict = bcSubpropMap.value
for {
(s, (p, o)) <- iterator
if dict.contains(p)
q <- dict.get(p).get
} yield (s, (q, o))
}
def filterTranProp = (bcTransPropArray: Broadcast[Array[String]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val dict = bcTransPropArray.value
for {
(s, (p, o)) <- iterator
if dict.contains(p)
} yield (s, (p, o))
}
def inverseReasoning1 = (bcInverseOfMap1: Broadcast[Map[String, Iterable[String]]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val vp1 = bcInverseOfMap1.value
for {
(v, (p, w)) <- iterator
if (vp1.contains(p))
q <- vp1.get(p).get
} yield (w, (q, v))
}
def inverseReasoning2 = (bcInverseOfMap2: Broadcast[Map[String, Iterable[String]]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val inverseOfMap = bcInverseOfMap2.value
for {
(v, (p, w)) <- iterator
if (inverseOfMap.contains(p))
q <- inverseOfMap.get(p).get
} yield (w, (q, v))
}
def symmetricReasoning = (bcSymmetricPropMap: Broadcast[Array[String]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val symmetricMap = bcSymmetricPropMap.value
for {
(v, (p, w)) <- iterator
if (symmetricMap.contains(p))
} yield (w, (p, v))
}
def hasValueReasoning1 = (joinedHasValOnPropMap1: Broadcast[Map[(String, String), Iterable[String]]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val dict = joinedHasValOnPropMap1.value
for {
(u, pw) <- iterator
if dict.contains(pw)
v <- dict.get(pw).get
} yield (u, v)
}
def hasValueReasoning2 = (joinedHasValOnPropMap2: Broadcast[Map[String, scala.Iterable[(String, String)]]]) =>
(iterator: Iterator[(String, String)]) => {
val map = joinedHasValOnPropMap2.value
for {
(u, v) <- iterator
if map.contains(v)
pw <- map.get(v).get
} yield (u, pw)
}
def filterSomValOnPropTriple = (joinedSomValOnPropMap: Broadcast[Map[(String, String), Iterable[String]]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val list = joinedSomValOnPropMap.value.map { case ((_, p), _) => p }.toList
for {
(u, (p, x)) <- iterator
if list.contains(p)
} yield (x, (p, u))
}
def filterSomValOnPropType = (joinedSomValOnPropMap: Broadcast[Map[(String, String), Iterable[String]]]) =>
(iterator: Iterator[(String, String)]) => {
val list = joinedSomValOnPropMap.value.map { case ((w, _), _) => w }.toList
for {
(x, w) <- iterator
if list.contains(w)
} yield (x, w)
}
def someValueReasoning = (joinedSomValOnPropMap: Broadcast[Map[(String, String), Iterable[String]]]) =>
(iterator: Iterator[(String, ((String, String), String))]) => {
val dict = joinedSomValOnPropMap.value
for {
(_, ((p, u), w)) <- iterator
if dict.contains((w, p))
v <- dict.get((w, p)).get
} yield (u, v)
}
def filterAllValOnPropTriple = (joinedAllValOnPropMap: Broadcast[Map[(String, String), Iterable[String]]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val list = joinedAllValOnPropMap.value.map { case ((_, p), _) => p }.toList
for {
(u, (p, x)) <- iterator
if list.contains(p)
} yield (x, (p, u))
}
def filterAllValOnPropType = (joinedAllValOnPropMap: Broadcast[Map[(String, String), Iterable[String]]]) =>
(iterator: Iterator[(String, String)]) => {
val list = joinedAllValOnPropMap.value.map { case ((v, _), _) => v }.toList
for {
(u, v) <- iterator
if list.contains(v)
} yield (u, v)
}
def allValueReasoning = (joinedAllValOnPropMap: Broadcast[Map[(String, String), Iterable[String]]]) =>
(iterator: Iterator[(String, ((String, String), String))]) => {
val dict = joinedAllValOnPropMap.value
for {
(_, ((p, x), v)) <- iterator
if dict.contains((v, p))
w <- dict.get((v, p)).get
} yield (x, w)
}
def subclassReasoning = (bcSubclassMap: Broadcast[Map[String, Iterable[String]]]) =>
(iterator: Iterator[(String, String)]) => {
val subClassMap = bcSubclassMap.value
for {
(u, c) <- iterator
if subClassMap.contains(c)
c1 <- subClassMap.get(c).get
} yield (u, c1)
}
def domainReasoning = (bcDomainMap: Broadcast[Map[String, Iterable[String]]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val domDict = bcDomainMap.value
for {
(s, (p, o)) <- iterator
if (domDict.contains(p))
q <- domDict.get(p).get
} yield (s, q)
}
def rangeReasoning = (bcRangeMap: Broadcast[Map[String, Iterable[String]]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val rangeMap = bcRangeMap.value
for {
(s, (p, o)) <- iterator
if (rangeMap.contains(p))
q <- rangeMap.get(p).get
} yield (o, q)
}
def filterInvFuncProp = (bcFuncProp: Broadcast[Array[String]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val funcPropMap = bcFuncProp.value
for {
(s, (p, o)) <- iterator
if funcPropMap.contains(p)
} yield ((p, o), s)
}
def filterFuncProp = (bcInvFuncProp: Broadcast[Array[String]]) =>
(iterator: Iterator[(String, (String, String))]) => {
val invFuncPropMap = bcInvFuncProp.value
for {
(s, (p, o)) <- iterator
if invFuncPropMap.contains(p)
} yield ((s, p), o)
}
}