forked from htautau/hhntup
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatch
executable file
·157 lines (141 loc) · 5.73 KB
/
batch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python
from goodruns.extern.argparse import ArgumentParser
parser = ArgumentParser(usage="%(prog)s [args] samplename1 samplename2 ...")
parser.add_argument("-v","--verbose", action="store_true",
help="verbose", default=False)
parser.add_argument('-n',"--nproc", type=int,
help="number of students (parallel processes)", default=1)
parser.add_argument("--events", type=int,
help="number of events to process", default=-1)
parser.add_argument("--queue", action='store_true',
help="use queue to feed files to students", default=False)
parser.add_argument("--profile", action='store_true',
help="profile execution time of each student's work method", default=False)
parser.add_argument("--random-sample", type=int, dest="rsample",
help="specify length of a random sampling of input files to process", default=None)
parser.add_argument("--nice", type=int, dest="nice",
help="nice students", default=0)
parser.add_argument('-p',"--periods",
help="data periods separated by commas or all period by default if not specified", default=None)
parser.add_argument('-r',"--runs",
help="data runs separated by commas (must not also specify periods)", default=None)
parser.add_argument("--suffix",
help="suffix appended to sample name", default=None)
parser.add_argument('-s',"--student",
help="the file (excluding .py extension) containing a "
"class of the same name inheriting from rootpy.batch.Student", required=True)
parser.add_argument('-m',"--metadata",
help="dataset metadata in YAML format", default="datasets.yml")
parser.add_argument('datasets', nargs='+')
args, user_args = parser.parse_known_args()
import sys
import os
import glob
import yaml
from higgstautau.batch import ATLASSupervisor
from higgstautau import datasets
import multiprocessing
from configobj import ConfigObj, flatten_errors
from validate import Validator
import fnmatch
sys.path.insert(0,'.')
if not args.student:
sys.exit("Student file not defined!")
dataroot = os.getenv('DATAROOT', None)
if dataroot is None:
sys.exit("$DATAROOT not defined!")
if len(args.datasets) == 0:
print "No samples specified!"
sys.exit(1)
if len(args.datasets) == 1:
if args.datasets[0].lower() == 'all':
args = []
dirs = glob.glob(os.path.join(dataroot,'*'))
for dir in dirs:
print dir
if os.path.isfile(os.path.join(dir,'meta.yml')):
args.append(os.path.basename(dir))
if args.runs != None and args.periods != None:
print "Warning: you specified both runs and data periods. Your run selection will override the periods selection"
args.periods = None
if args.periods is not None:
print "using period(s) %s for data sample" % args.periods
elif args.runs is not None:
print "using run(s) %s for data sample" % args.runs
if args.runs != None:
if ',' in args.runs:
args.runs = [int(run) for run in args.runs.split(',')]
elif '-' in args.runs:
begin, end = args.runs.split('-')
args.runs = range(int(begin), int(end)+1)
else:
args.runs = [int(args.runs)]
if args.metadata.endswith('.yml'):
with open(args.metadata, 'r') as configfile:
metadata = yaml.load(configfile)
else:
configspec = os.path.splitext(args.metadata)[0] + '.spec'
if not os.path.isfile(configspec):
sys.exit('%s does not exist' % configspec)
metadata = ConfigObj(args.metadata, configspec=configspec)
validator = Validator()
result = metadata.validate(validator, preserve_errors=True)
if result != True:
for entry in flatten_errors(metadata, result):
# each entry is a tuple
section_list, key, error = entry
if key is not None:
section_list.append(key)
else:
section_list.append('[missing section]')
section_string = ', '.join(section_list)
if error == False:
error = 'Missing value or section.'
print section_string, ' = ', error
sys.exit(1)
sorted_datasets = sorted(metadata.keys())
# expand globs
_datasets = []
for dataset in args.datasets:
if '*' in dataset:
_datasets += fnmatch.filter(sorted_datasets, dataset)
else:
_datasets.append(dataset)
filesets = []
for sample in _datasets:
if sample not in metadata:
sys.exit("sample %s not defined in metadata %s" % (sample, args.metadata))
meta = metadata[sample]
fileset = datasets.get_sample(sample,
meta,
periods = args.periods,
runs = args.runs,
random_sample = args.rsample)
if not fileset:
print "FATAL: sample %s does not exist!" % sample
sys.exit(1)
print "processing %s..." % fileset.name
filesets.append(fileset)
for fileset in filesets:
parent_connection, child_connection = multiprocessing.Pipe()
supervisor = ATLASSupervisor(
student = args.student,
outputname = '_'.join([fileset.name, args.suffix]) if args.suffix else fileset.name,
files = fileset.files,
metadata = fileset,
nstudents = args.nproc,
connection = child_connection,
queuemode = args.queue,
profile = args.profile,
grl = fileset.grl,
events = args.events,
nice = args.nice,
options = user_args)
try:
supervisor.start()
supervisor.join()
except KeyboardInterrupt, SystemExit:
print "Cleaning up..."
parent_connection.send(None)
supervisor.join()
break