-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtriple_workflow.py
109 lines (85 loc) · 2.68 KB
/
triple_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import luigi
import glob
import os
from tasks.parse_tasks import TripleTask
from tasks.parliament_tasks import ParliamentTask
from tasks.parliament_tasks import SimpleParliamentTask
from tasks.task_helpers import parse_yaml
from tasks.task_helpers import run_init
class TripleWorkflow(luigi.Task):
doc_dir = luigi.Parameter()
yaml_file = luigi.Parameter()
start_index = luigi.Parameter(default=0)
end_index = luigi.Parameter(default=1000)
def requires(self):
return [
TripleTask(
input_file=f,
yaml_file=self.yaml_file
) for f in self._iterator()
]
def output(self):
return luigi.LocalTarget('log.txt')
def run(self):
self._configure()
print 'running'
def _configure(self):
config = parse_yaml(self.yaml_file)
run_init(config)
def _iterator(self):
for f in glob.glob(
os.path.join(self.doc_dir, '*.json')
)[self.start_index:self.end_index]:
yield f
# TODO: there has to be a better way than the duplication
# as i recall a build order thing in luigi?
class ParliamentWorkflow(luigi.Task):
doc_dir = luigi.Parameter()
yaml_file = luigi.Parameter()
start_index = luigi.Parameter(default=0)
end_index = luigi.Parameter(default=1000)
def requires(self):
return [
ParliamentTask(
input_file=f,
yaml_file=self.yaml_file
) for f in self._iterator()
]
def output(self):
return luigi.LocalTarget('log.txt')
def run(self):
self._configure()
print 'running'
def _configure(self):
config = parse_yaml(self.yaml_file)
run_init(config)
def _iterator(self):
for f in glob.glob(
os.path.join(self.doc_dir, '*.json')
)[self.start_index:self.end_index]:
yield f
class SimpleParliamentWorkflow(luigi.Task):
doc_dir = luigi.Parameter()
yaml_file = luigi.Parameter()
start_index = luigi.Parameter(default=0)
end_index = luigi.Parameter(default=1000)
def requires(self):
return [
SimpleParliamentTask(
input_file=f,
yaml_file=self.yaml_file
) for f in self._iterator()
]
def output(self):
return luigi.LocalTarget('log.txt')
def run(self):
self._configure()
print 'running'
def _configure(self):
config = parse_yaml(self.yaml_file)
run_init(config)
def _iterator(self):
for f in glob.glob(
os.path.join(self.doc_dir, '*.ttl')
)[self.start_index:self.end_index]:
yield f