forked from EUDAT-B2FIND/md-ingestion
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cleaner.py
executable file
·386 lines (336 loc) · 16.5 KB
/
cleaner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
#!/usr/bin/env python
"""cleaner.py
checks and/or deletes - depending on excecution mode and existence -
B2FIND files on disc, datasets in CKAN database and/or
and handles(PIDs) from handle server.
Copyright (c) 2014 Heinrich Widmann (DKRZ)
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
Modified by c/o DKRZ 2014 Heinrich Widmann
"""
import optparse, os, sys, re
import time
from b2handle.clientcredentials import PIDClientCredentials
from b2handle.handleclient import EUDATHandleClient
from b2handle.handleexceptions import HandleAuthenticationError,HandleNotFoundException,HandleSyntaxError,GenericHandleError
from uploading import CKAN_CLIENT, Uploader
from output import Output
import logging as log
def options_parser(modes):
descI="""
For each given entry existence checks and deletions of the following B2FIND objects is performed :
- 1. XML files on disc, harvested from OAI-PMH MD provider(s)\n\t
- 2. JSON files on disc, generated by semantic mapping\n\t
- 3. Uploaded CKAN datasets in the B2FIND catalogue and portal\n\t
- 4. Handles (PIDs) from handle server
"""
p = optparse.OptionParser(
description = "Description: checks and delete B2FIND files, datasets and handles from disk, CKAN database and handle server respectively.\n" + descI,
formatter = optparse.TitledHelpFormatter(),
prog = 'cleaner.py 1.0',
epilog='For any further information and documentation please look at README.txt file or at the EUDAT wiki (-> JMD Software).',
version = "%prog "
)
p.add_option('-v', '--verbose', action="count",
help="increase output verbosity (e.g., -vv is more than -v)", default=False)
p.add_option('-q', '--quiet', action="count",
help="quite modus : deletion is not really performed but only announced.", default=False)
p.add_option('--jobdir', help='\ndirectory where log, error and html-result files are stored. By default directory is created as startday/starthour/processid .', default=None)
p.add_option('--mode', '-m', metavar=' ' + " | ".join(modes), help='\nThis can be used to do a partial workflow. Default is "x-p" which means deletion of all found objects, i.e. (x)ml files, (j) json files, (c)kan datasets and (p)id\'s in handle server.', default='x-p')
p.add_option('--identifier', '-i', help="identifier for which objects are checked and deleted. If not given identifiers list must be given by -l option", default=None, metavar='STRING')
p.add_option('--community', '-c', help="community for which objects are checked and deleted. If no value given identifiers from -i rsp. -l option is taken.", default=None, metavar='STRING')
p.add_option('--fromdate', help="Filter harvested files by date (Format: YYYY-MM-DD).", default=None, metavar='DATE')
p.add_option('--handle_check',
help="check and generate handles of CKAN datasets in handle server and with credentials as specified in given credstore file",
default=None,metavar='FILE')
p.add_option('--ckan_check',
help="check existence and checksum against existing datasets in CKAN database",
default='False', metavar='BOOLEAN')
p.add_option('--outdir', '-d', help="The relative root dir in which all harvested files will be saved. The converting and the uploading processes work with the files from this dir. (default is 'oaidata')",default='oaidata', metavar='PATH')
group_multi = optparse.OptionGroup(p, "Multi Mode Options",
"Use these options if you want to ingest from a list in a file.")
group_multi.add_option('--list', '-l', help="list of identifiers (-i mode) or communities sources (-c mode, default is ./harvest_list)", default=None,metavar='FILE')
group_multi.add_option('--parallel',
help="[DEPRECATED]",#performs list of ingest requests in parallel (makes only sense with option [--list|-l] )",
default='serial')
group_single = optparse.OptionGroup(p, "Single Mode Options",
"Use these options if you want to ingest from only ONE source.")
group_single.add_option('--source', '-s', help="A URL to .xml files which you want to harvest",default=None,metavar='PATH')
group_single.add_option('--verb', help="Verbs or requests defined in OAI-PMH, can be ListRecords (default) or ListIdentifers here",default='ListRecords', metavar='STRING')
group_single.add_option('--mdsubset', help="Subset of harvested meta data",default=None, metavar='STRING')
group_single.add_option('--mdprefix', help="Prefix of harvested meta data",default=None, metavar='STRING')
group_upload = optparse.OptionGroup(p, "Upload Options",
"These options will be required to upload an dataset to a CKAN database.")
group_upload.add_option('--host', help="host or IP adress of B2FIND portal (CKAN instance)", metavar='IP')
group_upload.add_option('--auth', help="Authentification for CKAN APIs (API key, by default taken from file $HOME/.netrc)",metavar='STRING')
p.add_option_group(group_multi)
p.add_option_group(group_single)
p.add_option_group(group_upload)
return p
def pstat_init (p,modes,mode,source,host):
if (mode):
if not(mode in modes):
print("[ERROR] Mode " + mode + " is not supported")
sys.exit(-1)
else: # all processes (default)
mode = 'h-u'
# initialize status, count and timing of processes
plist=['x','j','c','p']
pstat = {
'status' : {},
'text' : {},
'short' : [],
}
for proc in plist :
pstat['status'][proc]='no'
if ( proc in mode):
pstat['status'][proc]='tbd'
if (len(mode) == 3) and ( mode[1] == '-'): # multiple mode
ind=plist.index(mode[0])
last=plist.index(mode[2])
while ( ind <= last ):
pstat['status'][plist[ind]]='tbd'
ind+=1
if ( mode == 'c-p'):
pstat['status']['a']='tbd'
if source:
stext='provider '+source
else:
stext='a list of MD providers'
pstat['text']['x']='Delete XML files from disc '
pstat['text']['j']='Delete JSON fieles from disc'
pstat['text']['c']='Delete datasets from CKAN server %s' % host
pstat['text']['p']='Delete pids from handle server'
pstat['short'].append(['x', 'XML'])
pstat['short'].append(['j', 'JSON'])
pstat['short'].append(['c', 'CKAN'])
pstat['short'].append(['p', 'PID'])
return (mode, pstat)
def main():
# parse command line options and arguments:
modes=['x','xmlfiles','j','jsonfiles','c','ckandatasets','p','pids','x-p', 'x-j', 'j-c','j-p']
p = options_parser(modes)
options,arguments = p.parse_args()
# check option 'mode' and generate process list:
(mode, pstat) = pstat_init(p,modes,options.mode,options.source,options.host)
# check for quiet mode
if (options.quiet):
qmsg='would be'
mainmode='check'
else:
qmsg='is'
mainmode='deletion'
if options.host :
print "\tCKAN HOST:\t%s" % (options.host)
if options.handle_check :
print "\tCREDENTIAL:\t%s" % (options.handle_check)
print '='*90
# make jobdir
now = time.strftime("%Y-%m-%d %H:%M:%S")
jid = os.getpid()
print "\tStart of processing:\t%s" % (now)
global logger
OUT = Output(pstat,now,jid,options)
##HEW-D logger = log.getLogger()
## logger
logger = OUT.setup_custom_logger('root',options.verbose)
# create credentials if required
if (options.handle_check):
try:
cred = PIDClientCredentials.load_from_JSON('credentials_11098')
except Exception, err:
logger.critical("[CRITICAL] %s Could not create credentials from credstore %s" % (err,options.handle_check))
p.print_help()
sys.exit(-1)
else:
logger.debug("Create EUDATHandleClient instance")
HandleClient = EUDATHandleClient.instantiate_with_credentials(cred,HTTPS_verify=True)
else:
cred=None
HandleClient=None
# checking given options:
if (options.host):
if (not options.auth):
from os.path import expanduser
home = expanduser("~")
if(not os.path.isfile(home+'/.netrc')):
logger.critical('[CRITICAL] Can not access job host authentification file %s/.netrc ' % home )
exit()
f = open(home+'/.netrc','r')
lines=f.read().splitlines()
f.close()
l = 0
for host in lines:
if(options.host == host.split()[0]):
options.auth = host.split()[1]
break
else:
logger.critical(
"\033[1m [CRITICAL] " +
"For CKAN database delete mode valid URL of CKAN instance (option --host) and API key (--auth or read from ~/.netrc) must be given" + "\033[0;0m"
)
sys.exit(-1)
CKAN = CKAN_CLIENT(options.host,options.auth)
## UP = UPLOADER(CKAN, OUT, options.outdir,options.fromdate)
UP = Uploader(CKAN,options.ckan_check,HandleClient,cred,OUT,options.outdir,options.fromdate,options.host)
if (options.identifier):
list = [ options.identifier ]
listtext='given by option -i (%d id\'s)' % len(list)
elif (options.list):
f = open(options.list,'r')
list = f.readlines()
f.close()
listtext='got from file %s (%d id\'s)' % (options.list,len(list))
elif (options.community):
##UP.purge_group(options.community)
UP.get_packages(options.community)
##HEW??? UP.get_group_list(options.community)
print "--- Start get community list from CKAN---\n"
list = UP.package_list.keys()
##clist = UP.get_packages(options.community).keys()
##print clist
listtext='got from CKAN community %s, stored in file %s-id.list (%d id\'s)' % (options.community,options.community,len(list))
cf = open('%s-id.list' % options.community,'w')
cf.write("\n".join(list))
cf.close()
##print UP.package_list.keys()
else:
print 'ERROR : one of the otptions -c COMMUNITY, -i IDENTIFIER or -l LIST must be given'
sys.exit()
##HEW-Tprint '%s list ' % list
##HEW-Tsys.exit()
print "\n=== Start %s processing ===\n\tTIME:\t%s\n\tID LIST:\t%s ... \n\t%s MODE:\t%s" % (mainmode,now,list[0:100], mainmode.upper(), options.mode)
n=0
xcount=0
jcount=0
ccount=0
pcount=0
print '\n| %-5s | %-35s | %-6s | %-6s | %-6s | %-6s |\n|%s|' % ('#', 'Identifier','XML','JSON','CKAN','PID',"-" * 53)
for entry in list:
n+=1
dir = os.path.dirname(entry).rstrip()
id, ext = os.path.splitext(os.path.basename(entry.rstrip()))
##HEW-D id = id.split("_")[-1].lower()
id = id.split()[-1]
id = re.sub(r'^"|"$', '', id)
actionreq=""
actiontxt="Actions %s required : " % qmsg
### check,set and remove xml/json files
xmlfile=None
jsonfile=None
xmlstatus=None
jsonstatus=None
xmlaction=''
jsonaction=''
if ( dir and ( ext == '.json' or ext == '.xml' ) ):
## print " FILES to remove :"
if ( ext == '.json' ):
jsonfile='%s' % (entry.rstrip())
xmlfile='%s/%s/%s%s' % (os.path.split(dir)[0],'xml',id,'.xml')
elif ( ext == '.xml' ):
xmlfile=entry.rstrip()
jsonfile='%s/%s/%s%s' % (os.path.split(dir)[0],'json',id,'.json')
if (os.path.isfile(xmlfile)):
xmlstatus='exists'
actionreq+=' remove xml file'
if (not options.quiet):
try:
os.remove(xmlfile)
except Exception, e:
logger.error('[ERROR] Unexpected Error: %s' % e)
raise
else:
##print '\tXML file %s %s removed' % (xmlfile,qmsg)
xmlaction='removed'
else:
print "\tWARNING : Can not access %s for removing" % xmlfile
if (os.path.isfile(jsonfile)):
jsonstatus='exists'
actionreq+=', remove json file'
if (not options.quiet):
try:
os.remove(jsonfile)
except Exception, e:
logger.error('[ERROR] Unexpected Error: %s' % e)
raise
else:
jsonaction='removed'
##print '\tJSON file %s %s removed' % (jsonfile,qmsg)
else:
print "\tWARNING : Can not access %s for removing" % jsonfile
## else:
## print " INFO : No directory or/and no supported extension %s given => NO FILES to remove" % ext
# check and delete dataset and pid, if required
ckanstatus=None
handlestatus=None
ckanaction=''
handleaction=''
if (options.handle_check or options.ckan_check=='True'):
if (options.handle_check):
checksum2 = None
# check against handle server
handlestatus="unknown"
pid = "11098/eudat-jmd_" + id.lower()
try:
checksum2 = HandleClient.get_value_from_handle(pid,"CHECKSUM")
b2findversion = HandleClient.get_value_from_handle(pid,"JMDVERSION")
except (HandleNotFoundException,HandleSyntaxError) as err :
logger.debug("[DEBUG : %s] in HandleClient.get_value of pid %s" % (err,pid))
except Exception, err:
logger.critical("[CRITICAL : %s] in HandleClient.modify_handle_value of %s" % (err,pid))
sys.exit()
else:
logger.debug(" Get checksum %s from handle %s " % (checksum2,pid))
if (checksum2 == None):
logger.debug(" |-> Can not access pid %s to get checksum" % (pid))
handlestatus="new"
else:
logger.debug(" |-> pid %s exists" % (pid))
print 'JMDVERSION %s' % b2findversion
handlestatus="exist"
##HEW-T print '\n Handle status : %s' % handlestatus
if (handlestatus == 'exist'):
actionreq+='\n\tUser %s removed PID %s%s %s removed' % (cred.get_username(),"/eudat-jmd_",id,qmsg)
try:
if (not options.quiet): ##HEW-ADD and ( b2findversion != '1.0'):
HandleClient.delete_handle(pid)
pcount+=1
handleaction='removed'
except GenericHandleError as err:
logger.error('[ERROR] Unexpected Error: %s' % err)
except Exception, e:
logger.error('[ERROR] Unexpected Error: %s' % e)
handleaction='failed'
raise
### check for and remove ckan dataset
if (options.ckan_check == 'True'):
# check for and remove ckan dataset
ckanstatus = 'unknown'
if (options.community):
checksum='fe5f25c9f6d17ba289d6551afc98a8c3'
ckanstatus=UP.check_dataset(id,checksum)
if (ckanstatus == 'unknown' or ckanstatus == 'changed' or ckanstatus == 'unchanged'):
actionreq+=' remove ckan dataset'
try:
if (not options.quiet): ##HEW-ADD and ( b2findversion != '1.0'):
delete = UP.delete(id,ckanstatus)
if (delete == 1):
## logger.info(' |-> %s' % ('Deletion was successful'))
ccount += 1
ckanaction='removed'
else:
ckanaction='failed'
except Exception, e:
logger.error('[ERROR] Unexpected Error: %s' % e)
raise
print '| %-6d | %-35s | %-6s | %-6s | %-6s | %-6s |' % (n, id,xmlstatus,jsonstatus,ckanstatus,handlestatus)
if (not options.quiet):
print '--> %-42s | %-6s | %-6s | %-6s | %-6s |' % ('action performed',xmlaction,jsonaction,ckanaction,handleaction)
logger.info('end of cleaning ...')
if __name__ == "__main__":
main()