-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodels.py
1798 lines (1504 loc) · 58.7 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from datetime import datetime, date
from urllib.request import urlopen
import io
import json
import jsonref
import jsonschema
import zipfile
import os
import shortuuid
from django.db import models
from django.core.files import File
from django.core.files.temp import NamedTemporaryFile
from django.core.files.base import ContentFile, File
from .utils import OverwriteStorage
from django.contrib.auth.models import User
from django.db.models import Count
from django.urls import reverse
#from django.urls import reverse_lazy
from django.conf import settings
from lib.music.Score import *
from lib.music.jsonld import JsonLD
# Score model
import lib.music.annotation as annot_mod
import lib.music.source as source_mod
import lib.music.opusmeta as opusmeta_mod
import lib.music.iiifutils as iiif_mod
# DMOS parser
from lib.collabscore.parser import CollabScoreParser, OmrScore
from lib.collabscore.editions import Edition
# For tree models
from mptt.models import MPTTModel, TreeForeignKey
import inspect
from pprint import pprint
import music21 as m21
import verovio
from xml.dom import minidom
import itertools
import uuid
from hashlib import md5
# Note : we no longer need sklearn nor scipy
#from lib.neumautils.stats import symetrical_chi_square
#from lib.neumautils.matrix_transform import matrix_transform
#from lib.neumautils.kmedoids import cluster
# Get an instance of a logger
# See https://realpython.com/python-logging/
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# For the console
c_handler = logging.StreamHandler()
c_handler.setLevel(logging.INFO)
c_format = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
c_handler.setFormatter(c_format)
logger.addHandler(c_handler)
def set_logging_level(level):
logger.setLevel(level)
class Person (models.Model):
'''Persons (authors, composers, etc)'''
first_name = models.CharField(max_length=100)
last_name = models.CharField(max_length=100)
year_birth = models.IntegerField()
year_death = models.IntegerField()
dbpedia_uri = models.CharField(max_length=255,null=True)
class Meta:
db_table = "Person"
def __str__(self): # __unicode__ on Python 2
return self.first_name + " " + self.last_name
def to_json (self):
return {"first_name": self.first_name,
"last_name": self.last_name,
"year_birth": self.year_birth,
"year_death": self.year_death,
"dbpedia_uri": self.dbpedia_uri
}
class Licence (models.Model):
'''Description of a licence'''
code = models.CharField(max_length=25,primary_key=True)
name = models.CharField(max_length=100)
url = models.CharField(max_length=255,null=True,blank=True)
notice = models.TextField()
full_text = models.TextField(null=True,blank=True)
class Meta:
db_table = "Licence"
def __str__(self): # __unicode__ on Python 2
return "(" + self.code + ") " + self.name
class Corpus(models.Model):
title = models.CharField(max_length=255)
short_title = models.CharField(max_length=255)
description = models.TextField()
short_description = models.TextField()
is_public = models.BooleanField(default=True)
parent = models.ForeignKey('self', null=True,on_delete=models.CASCADE)
composer = models.ForeignKey(Person, null=True,blank=True,on_delete=models.PROTECT)
creation_timestamp = models.DateTimeField('Created',auto_now_add=True)
update_timestamp = models.DateTimeField('Updated',auto_now=True)
ref = models.CharField(max_length=255,unique=True)
licence = models.ForeignKey(Licence, null=True,blank=True,on_delete=models.PROTECT)
copyright = models.CharField(max_length=255,null=True,blank=True)
supervisors = models.CharField(max_length=255,null=True,blank=True)
def upload_path(self, filename):
'''Set the path where corpus-related files must be stored'''
return 'corpora/%s/%s' % (self.ref.replace(settings.NEUMA_ID_SEPARATOR, "/"), filename)
cover = models.FileField(upload_to=upload_path,null=True,storage=OverwriteStorage())
def __init__(self, *args, **kwargs):
super(Corpus, self).__init__(*args, **kwargs)
# Non persistent fields
self.children = []
self.matrix = {}
class Meta:
db_table = "Corpus"
# permissions = (
# ('view_corpus', 'View corpus')
# ('import_corpus', 'Import corpus'),
# )
def __str__(self): # __unicode__ on Python 2
return "(" + self.ref + ") " + self.title
@staticmethod
def local_ref(ref):
"""
Get the local ref from the full corpus ref
"""
if settings.NEUMA_ID_SEPARATOR in ref:
# Find the last occurrence of the separator
last_pos = ref.rfind(settings.NEUMA_ID_SEPARATOR)
return ref[last_pos+1:]
else:
# Top-level corpus
return ref
@staticmethod
def parent_ref(ref):
"""
Get the parent ref from the full corpus ref
"""
if settings.NEUMA_ID_SEPARATOR in ref:
# Find the last occurrence of the separator
last_pos = ref.rfind(settings.NEUMA_ID_SEPARATOR)
return ref[:last_pos]
else:
# Top-level corpus
return ""
@staticmethod
def make_ref_from_local_and_parent(local_ref, parent_ref):
"""
Create the corpus reference from the local reference and parent reference
"""
return parent_ref + settings.NEUMA_ID_SEPARATOR + local_ref
def get_cover(self):
"""
Return the corpus cover if exists, else take the parent cover (recursively)
"""
if self.cover != "":
return self.cover;
elif self.parent is not None:
return self.parent.get_cover()
else:
# Should not happen: a top level corpus without image
return ""
def get_url(self):
"""
Get the URL to the Web corpus page, taken from urls.py
"""
return reverse('home:corpus', args=[self.ref])
def load_from_dict(self, dict_corpus):
"""Load content from a dictionary."""
self.title = dict_corpus["title"]
self.short_title = dict_corpus["short_title"]
self.description = dict_corpus["description"]
self.short_description = dict_corpus["short_description"]
self.is_public = dict_corpus["is_public"]
if "licence_code" in dict_corpus:
try:
self.licence = Licence.objects.get(code=dict_corpus["licence_code"])
except Licence.DoesNotExist:
print ("Unknown licence. Ignored. Did you run setup_neuma?")
if "composer" in dict_corpus:
try:
self.composer = Person.objects.get(dbpedia_uri=dict_corpus["composer"])
except Person.DoesNotExist:
print (f"Unknown composer {dict_corpus['composer']}. Ignored. Did you run setup_neuma?")
if "copyright" in dict_corpus:
self.copyright = dict_corpus["copyright"]
if "supervisors" in dict_corpus:
self.supervisors = dict_corpus["supervisors"]
return
def to_json(self):
"""
Create a dictionary that can be used for JSON exports
"""
if self.licence is not None:
licence_code = self.licence.code
else:
licence_code = None
core = {"ref": Corpus.local_ref(self.ref),
"title": self.title,
"short_title": self.short_title,
"description": self.description,
"is_public": self.is_public,
"short_description": self.short_description,
"licence_code": licence_code,
"copyright": self.copyright,
"supervisors": self.supervisors
}
if self.composer is not None:
core["composer"] = self.composer.to_json()
return core
if self.composer is not None:
core["composer"] = self.composer.dbpedia_uri
return core
def get_children(self, recursive=True):
self.children = Corpus.objects.filter(parent=self).order_by("ref")
for child in self.children:
child.get_children(recursive)
return self.children
def parse_dmos(self):
for opus in Opus.objects.filter(corpus=self).order_by("ref"):
print (f"\n\nProcessing opus {opus.ref}")
try:
opus.parse_dmos()
except Exception as e:
print ("Error when trying to convert DMOS file for opus {opus.ref}:{e}")
def get_direct_children(self):
return self.get_children(False)
def get_nb_children(self):
return Corpus.objects.filter(parent=self).count()
def get_nb_grammars(self):
return transcription.models.Grammar.objects.filter(corpus=self).count()
def get_grammars(self):
return transcription.models.Grammar.objects.filter(corpus=self).order_by('name')
def get_nb_opera(self):
return Opus.objects.filter(corpus=self).count()
def get_nb_opera_and_descendants(self):
return Opus.objects.filter(ref__startswith=self.ref).count()
def get_opera(self):
return Opus.objects.filter(corpus=self).order_by('ref')
def generate_sim_matrix(self):
''' Compute distance matrix and store them in database in form
of triplets (opus,opus,distance) '''
all_pairs = itertools.combinations(self.get_opera(),2)
i,l = 0, len(list(all_pairs))
missing_score = {}
error_status = {}
for pair in itertools.combinations(self.get_opera(),2):
#print(str(i)+'/'+str(l),end=" ")
if pair[0].ref != pair[1].ref:
for crit in SimMeasure.objects.order_by('code'):#does this work without @/map ?
self.matrix[crit] = {} # temp local storage
# print ("Process opus " + matrix.opus1.ref + " and opus " + matrix.opus2.ref)
try:
hist1 = pair[0].get_histograms(crit)#.values()
except LookupError:
missing_score[pair[0].ref] = True
continue
except Exception as e:
error_status[pair[0].ref] = True
continue
try:
hist2 = pair[1].get_histograms(crit)#.values()
except LookupError:
missing_score[pair[1].ref] = True
continue
except Exception as e:
error_status[pair[1].ref] = True
continue
# Make the two histogram have the same keys
# This solves rhythms "problem"
for a in set(hist1.keys()).union(set(hist2.keys())):
if a not in hist1:
hist1[a] = 0
if a not in hist2:
hist2[a] = 0
value = symetrical_chi_square(list(hist1.values()),list(hist2.values()))
# Note : we update or create value to avoid duplication of matrix in DB
if value != 'nan':
SimMatrix.objects.update_or_create(
sim_measure = crit,
opus1 = pair[0],
opus2 = pair[1],
value = value
)
SimMatrix.objects.update_or_create(
sim_measure = crit,
opus1 = pair[1],
opus2 = pair[0],
value = value
)
i+=1
#print("\r",end="")
print(str(l-len(error_status)-len(missing_score))+"/"+str(l)+" combination computed")
print("Missing score for "+str(len(missing_score))+" opera : ")
print(",".join(missing_score.keys()))
print("Error processing "+str(len(error_status))+" opera : ")
print(",".join(error_status.keys()))
def get_matrix_data(self, measure):
# measure = SimMeasure.objects.get(code=measure)
data = SimMatrix.objects.filter(sim_measure=measure,
opus1__corpus=self,
opus2__corpus=self)
return data
def has_matrix(self,measure):
return len(self.get_matrix_data(measure))>0
def generate_kmeans(self,measure_name,nb_class):
# Prevents crash if distances haven't been computed for this corpus
if not self.has_matrix(measure_name):
return []
try:
measure = SimMeasure.objects.get(code=measure_name)
except:
print('Invalid measure given "'+measure_name+'"')
return
q1 = SimMatrix.objects.filter(sim_measure=measure,
opus1__corpus=self).values_list('opus1','opus2','value')
q2 = SimMatrix.objects.filter(sim_measure=measure,
opus2__corpus=self).values_list('opus1','opus2','value')
# x + (y-x) :: note : this is missing on neighbor query !!!
all_distances = list(q1) + list(set(q1) - set(q2))
# Mapping distances to matrix
#distances , map_ids = matrix_transform(all_distances)
# Computing clusters / medoids
#clusters , medoids = cluster(distances,int(nb_class))
# pprint(clusters)
# pprint(medoids)
# pprint(list(map(lambda x:map_ids[x],clusters)))
# pprint(list(map(lambda x:map_ids[x],medoids)))
# SimMatrix.objects.update_or_create(
# sim_measure = crit,
# corpus = self,
# value = value
# )
# ok FIXME : how should we store kmedoids / clusters in DB ?
# --> how to display them on corpus page ?
# Return n medoids
return list(map(lambda x:map_ids[x],medoids))
def get_medoids(self,measure,k):
x = list(map(lambda x:Opus.objects.filter(id=x)[0],self.generate_kmeans(measure,k)))
pprint(x)
return x
def export_as_zip(self, request, mode="json"):
''' Export a corpus, its children and all opuses in
a recursive zip file.
By default, standard JSON files are used to encode corpus and opus
If mode == jsonld, we export as linked data
'''
# Write the ZIP file in memory
s = io.BytesIO()
# The zip compressor
zf = zipfile.ZipFile(s, "w")
# Add a JSON file with meta data
if mode == "jsonld":
zf.writestr("corpus.json", json.dumps(self.to_jsonld()))
else:
zf.writestr("corpus.json", json.dumps(self.to_json()))
# Write the cover file
if self.cover is not None:
try:
with open (self.cover.path, "r") as coverfile:
zf.writestr("cover.jpg", self.cover.read())
except Exception as ex:
print ("Cannot read the cover file ?" + str(ex))
# Add the zip files of the children
for child in self.get_direct_children():
# Composer at the corpus level ? Then each child inherits the composer
if self.composer is not None:
child.composer = self.composer
child.save()
zf.writestr(Corpus.local_ref(child.ref) + ".zip",
child.export_as_zip(request,mode).getvalue())
for opus in self.get_opera():
# Only add files where we are not in momde JSON-LD
if not mode == "jsonld":
# Add MusicXML file
if opus.musicxml:
if os.path.exists(opus.musicxml.path):
zf.write(opus.musicxml.path, opus.local_ref() + ".xml")
if opus.mei:
if os.path.exists(opus.mei.path):
zf.write(opus.mei.path, opus.local_ref() + ".mei")
# Add a sub dir for sources files
source_bytes = io.BytesIO()
# The zip compressor
source_compressor = zipfile.ZipFile(source_bytes, "w")
nb_source_files = 0
for source in opus.opussource_set.all():
if source.source_file:
nb_source_files += 1
source_compressor.write(source.source_file.path,
source.ref + "." + source.source_file.path.split(".")[-1])
if source.manifest:
nb_source_files += 1
source_compressor.write(source.manifest.path,
source.ref + "_mnf." + source.manifest.path.split(".")[-1])
source_compressor.close()
if nb_source_files > 0:
source_file = opus.local_ref() + '.szip'
#source_file = opus.local_ref() + '.source_files.zip'
zf.writestr( source_file, source_bytes.getvalue())
# Composer at the corpus level ? Then each opus inherits the composer
if self.composer is not None:
opus.add_meta(OpusMeta.MK_COMPOSER, self.composer.dbpedia_uri)
opus.save()
# Add a JSON file with meta data
if mode == "jsonld":
opus_json = json.dumps(opus.to_jsonld())
else:
opus_json = json.dumps(opus.to_json(request))
zf.writestr(opus.local_ref() + ".json", opus_json)
zf.close()
return s
def to_jsonld (self):
ontos = {"scorelib": settings.SCORELIB_ONTOLOGY_URI}
jsonld = JsonLD (ontos)
jsonld.add_type("scorelib", "Collection")
jsonld.add_type("scorelib", "Opus")
jsonld.add_type("scorelib", "Score")
dict_corpus = {"@id": self.ref,
"@type": "Collection",
"hasCollectionTitle": self.title,
"hasCollectionCopyright": self.copyright
}
if self.licence is not None:
dict_corpus["hasLicence"] = self.licence.code
if self.parent is not None:
dict_corpus["isInCollection"] = self.parent.ref
tab_opus = []
for opus in self.get_opera():
tab_opus.append(opus.to_jsonld())
has_opus = {"hasOpus": tab_opus}
return jsonld.get_context() | dict_corpus #| has_opus
@staticmethod
def import_from_zip(zfile, parent_corpus, zip_name):
''' Import a corpus from a Neuma zip export. If necessary, the
corpus is created, and its descriptions loaded from the json file
'''
opus_files = {}
children = {}
found_corpus_data = False
found_cover = False
corpus_dict = {}
cover_data = ""
# Scan the content of the ZIP file to find the list of opus
for fname in zfile.namelist():
# Skip files with weird names
base, extension = decompose_zip_name (fname)
if base == "" or base.startswith('_') or base.startswith('.'):
continue
# Look for the corpus data file
if base == "corpus" and extension == ".json":
found_corpus_data = True
corpus_dict = json.loads(zfile.open(fname).read().decode('utf-8'))
elif base == "cover" and extension == ".jpg":
found_cover = True
cover_data = zfile.open(fname).read()
elif extension == ".zip":
# If not a zip of source files: A zip file with a sub corpus
if not base.__contains__ ("source_files"):
children[base] = zipfile.ZipFile(io.BytesIO(zfile.open(fname).read()))
# OK, there is an Opus there
elif (extension ==".json" or extension == ".mei" or extension == ".xml"
or extension == '.mxl' or extension=='.krn' or extension=='.mid'):
opus_files[base] = {"mei": "",
"musicxml": "",
"compressed_xml": "",
"json": "",
"kern": "",
"source_files": ""}
else:
print ("Ignoring file %s%s" % (base, extension))
# Sanity
if not found_corpus_data:
logger.warning ("Missing corpus JSON file. Producing a skeleton with ref %s" % zip_name)
corpus_dict = {"ref": zip_name,
"title": zip_name,
"short_title": zip_name,
"description": zip_name,
"is_public": True,
"short_description": zip_name,
"copyright": "",
"supervisors": ""
}
if not found_cover:
logger.warning ("Missing cover for corpus " + corpus_dict['ref'])
# Get the corpus, or create it
logger.info ("Importing corpus %s in %s" % (corpus_dict['ref'], parent_corpus.ref) )
print ("Importing corpus %s in %s" % (corpus_dict['ref'], parent_corpus.ref) )
full_corpus_ref = Corpus.make_ref_from_local_and_parent(corpus_dict['ref'], parent_corpus.ref)
try:
corpus = Corpus.objects.get(ref=full_corpus_ref)
except Corpus.DoesNotExist as e:
# Create this corpus
corpus = Corpus (parent=parent_corpus, ref=full_corpus_ref)
# Load / replace content from the dictionary
corpus.load_from_dict(corpus_dict)
corpus.save()
# Take the cover image
if found_cover :
corpus.cover.save("cover.jpg", ContentFile(cover_data))
else:
# Good to know: sets the file field to blank string
corpus.cover = None
# Recursive import of the children
for base in children.keys():
print ("*** Importing sub corpus " + base)
corpus.import_from_zip(children[base], corpus, base)
# Second scan: we note the files present for each opus
for fname in zfile.namelist():
(opus_ref, extension) = decompose_zip_name (fname)
if opus_ref in opus_files:
if extension == '.mxl':
opus_files[opus_ref]["compressed_xml"] = fname
elif (extension == '.xml' or extension == '.musicxml'):
opus_files[opus_ref]["musicxml"] = fname
elif extension == '.mei':
opus_files[opus_ref]["mei"] = fname
elif extension == '.json':
opus_files[opus_ref]["json"] = fname
elif extension == '.mid':
opus_files[opus_ref]["midi"] = fname
elif extension == '.krn':
opus_files[opus_ref]["kern"] = fname
elif extension == ".szip":
# If a zip of source files
opus_files[opus_ref]["source_files"] = fname
# OK, now in opus_files, we know whether we have the MusicXML, MEI or any other
list_imported = []
for opus_ref, opus_files_desc in opus_files.items():
full_opus_ref = corpus.ref + settings.NEUMA_ID_SEPARATOR + opus_ref
print ("Import opus with ref " + opus_ref + " in corpus " + corpus_dict['ref'])
try:
opus = Opus.objects.get(ref=full_opus_ref)
except Opus.DoesNotExist as e:
# Create the Opus
opus = Opus(corpus=corpus, ref=full_opus_ref, title=opus_ref)
list_imported.append(opus)
opus.mei = None
# If a json exists, then it should contain the relevant metadata
if opus_files_desc["json"] != "":
logger.info ("Found JSON metadata file %s" % opus_files_desc["json"])
json_file = zfile.open(opus_files_desc["json"])
json_doc = json_file.read()
opus.load_from_dict (corpus, json.loads(json_doc.decode('utf-8')))
# Check whether a source file exists for each source
for source in opus.opussource_set.all():
if opus_files_desc["source_files"] != "":
# Yep we found one
source_zip_content = io.BytesIO(zfile.read(opus_files_desc["source_files"]))
source_zip = zipfile.ZipFile(source_zip_content)
# Check in the zip file for the file that corresponds to the source
for fname in source_zip.namelist():
base, extension = decompose_zip_name (fname)
if base == source.ref:
# The file contains the source itself
sfile_content = source_zip.read(fname)
print (f"Saving source file {fname}")
source.source_file.save(fname, ContentFile(sfile_content))
# In that case the URL is irrelevant
source.url=""
source.save()
if base == source.ref + "_mnf":
# The file contains the source manifest
print ("Import manifest")
manifest_content = source_zip.read(fname)
source.manifest.save(fname, ContentFile(manifest_content))
source.save()
# OK, we loaded metada : save
opus.mei = None
opus.save()
if opus_files_desc["compressed_xml"] != "":
logger.info ("Found compressed MusicXML content")
# Compressed XML
container = io.BytesIO(zfile.read(opus_files_desc["compressed_xml"]))
xmlzip = zipfile.ZipFile(container)
# Keep the file in the container with the same basename
for name2 in xmlzip.namelist():
basename2 = os.path.basename(name2)
ref2 = os.path.splitext(basename2)[0]
if opus_files_desc["opus_ref"] == ref2:
xml_content = xmlzip.read(name2)
opus.musicxml.save("score.xml", ContentFile(xml_content))
if opus_files_desc["musicxml"] != "":
logger.info ("Found MusicXML content")
xml_content = zfile.read(opus_files_desc["musicxml"])
opus.musicxml.save("score.xml", ContentFile(xml_content))
if opus_files_desc["kern"] != "":
logger.info ("Found KERN content")
kern_content = zfile.read(opus_files_desc["kern"])
# We need to write in a tmp file, probably
tmp_file = "/tmp/tmp_kern.txt"
f = open(tmp_file, "w")
lines = kern_content.splitlines()
for line in lines:
if not (line.startswith(b"!!!ARE: ")
or line.startswith(b"!!!AGN: ")
or line.startswith(b"!!!OTL: ")
or line.startswith(b"!!!YOR: ")
or line.startswith(b"!!!SCA: ")
or line.startswith(b"!!!OCY: ")
or line.startswith(b"!! ")
):
f.write (line.decode() + os.linesep)
f.close()
try:
tk = verovio.toolkit()
tk.loadFile(tmp_file)
mei_content = tk.getMEI()
opus.mei.save("mei.xml", ContentFile(mei_content))
doc = minidom.parseString(mei_content)
titles = doc.getElementsByTagName("title")
for title in titles:
for txtnode in title.childNodes:
opus.title = str(txtnode.data)
break
break
except Exception as e:
print ("Exception pendant le traitement d'un fichier Kern: " + str(e))
return
if bool(opus.mei) == False:
if opus_files_desc["mei"] != "":
logger.info ("Load MEI content")
# Add the MEI file
try:
mei_file = zfile.open(opus_files_desc["mei"])
mei_raw = mei_file.read()
encoding = "utf-8"
try:
logger.info("Attempt to read in UTF 8")
mei_raw.decode(encoding)
except Exception as ex:
logger.info("Read in UTF 16")
encoding = "utf-16"
mei_raw.decode(encoding)
logger.info("Correct encoding: " + encoding)
mei_content = mei_raw.decode(encoding)
logger.info ("Save the MEI file.")
opus.mei.save("mei.xml", ContentFile(mei_content))
except Exception as ex:
logger.error ("Error processing MEI " + str(ex))
else:
# Produce the MEI from the MusicXML
if opus_files_desc["musicxml"] != "":
logger.info ("Produce the MEI from MusicXML")
try:
print ("Produce the MEI from MusicXML")
tk = verovio.toolkit()
tk.loadFile(opus.musicxml.path)
mei_content = tk.getMEI()
opus.mei.save("mei.xml", ContentFile(mei_content))
except Exception as e:
print ("Exception : " + str(e))
# Workflow.produce_opus_mei(opus)
else:
logger.warning ("No MEI, no MusicXML: opus %s is incomplete" % opus.ref)
# Now try to obtain metadata
if opus_files_desc["compressed_xml"]!="" or opus_files_desc["musicxml"]!="":
# Get MusicXML metadata
doc = minidom.parseString(xml_content)
titles = doc.getElementsByTagName("movement-title")
for title in titles:
for txtnode in title.childNodes:
opus.title = str(txtnode.data)
break
break
elif opus_files_desc["compressed_xml"]!="" or opus_files_desc["musicxml"]!="":
# Get MusicXML metadata
doc = minidom.parseString(xml_content)
titles = doc.getElementsByTagName("movement-title")
for title in titles:
for txtnode in title.childNodes:
opus.title = str(txtnode.data)
break
break
try:
if opus.title == opus_ref:
#print ("Title = " + opus.title + " Try to obtain metadata")
logger.info ("Try to find metadata in the XML file with music21")
score = opus.get_score()
if score.get_title() != None and len(score.get_title()) > 0:
opus.title = score.get_title()
if score.get_composer() != None and len(score.get_composer()) > 0:
opus.composer = score.get_composer()
opus.save()
except Exception as ex:
print ("Error importing opus " + str(ex))
logger.error ("Error importing opus " + str(ex))
print ("Opus ref " + opus_ref + " imported in corpus " + corpus.ref+ "\n")
return list_imported
####################################################
class Opus(models.Model):
corpus = models.ForeignKey(Corpus,on_delete=models.CASCADE)
title = models.CharField(max_length=255)
lyricist = models.CharField(max_length=255,null=True, blank=True)
composer = models.CharField(max_length=255,null=True, blank=True)
# Linked data to the composer entity
composer_ld = models.ForeignKey(Person, null=True,blank=True,on_delete=models.PROTECT)
ref = models.CharField(max_length=255,unique=True)
external_link = models.CharField(max_length=255,null=True, blank=True)
# .Files names
FILE_NAMES = {"score.xml": "musicxml",
"mei.xml": "mei",
"summary.json": "summary"}
def statsDic(opus):
""" produces a dic with features"""
stats = StatsDesc(opus)
dico = stats.computeStats()
return dico
def upload_path(self, filename):
'''Set the path where opus-related files must be stored'''
return 'corpora/%s/%s' % (self.ref.replace(settings.NEUMA_ID_SEPARATOR, "/"), filename)
def __init__(self, *args, **kwargs):
super(Opus, self).__init__(*args, **kwargs)
# Non persistent fields
# self.stats = self.statsDic()
self.histogram_cache={}
# List of files associated to an Opus
musicxml = models.FileField(upload_to=upload_path,null=True,blank=True,storage=OverwriteStorage())
mei = models.FileField(upload_to=upload_path,null=True,blank=True,storage=OverwriteStorage(), max_length=255)
summary = models.FileField(upload_to=upload_path,null=True,blank=True,storage=OverwriteStorage())
class Meta:
db_table = "Opus"
def get_url(self):
"""
Get the URL to the Web opus page, taken from urls.py
"""
return reverse('home:opus', args=[self.ref])
def local_ref(self):
"""
The ref of the Opus inside its Corpus
"""
last_pos = self.ref.rfind(settings.NEUMA_ID_SEPARATOR)
return self.ref[last_pos+1:]
def add_meta (self, mkey, mvalue):
"""Add a (key, value) pair as an ad-hoc attribute"""
# The key must belongs to the list of pre-deefined accepted values
if mkey not in OpusMeta.META_KEYS:
raise Exception(f"Sorry, the key {mkey} does not belong to the accepted meta keys")
# Search if exists
try:
meta_pair = OpusMeta.objects.get(opus=self,meta_key=mkey)
except OpusMeta.DoesNotExist as e:
meta_pair = OpusMeta(opus=self, meta_key=mkey, meta_value=mvalue)
meta_pair.save()
def get_metas (self):
"""Return the list of key-value pairs"""
metas = []
for m in OpusMeta.objects.filter(opus=self):
m.displayed_label = OpusMeta.META_KEYS[m.meta_key]["displayed_label"]
metas.append (m)
return metas
def add_source (self,source_dict):
"""Add a source to the opus"""
# Search if exists
try:
source = OpusSource.objects.get(opus=self,ref=source_dict["ref"])
source.description = source_dict["description"]
source.url=source_dict["url"]
except OpusSource.DoesNotExist as e:
stype = SourceType.objects.get(code=source_dict["source_type"])
source = OpusSource(opus=self,
ref=source_dict["ref"],
description=source_dict["description"],
source_type=stype,
url=source_dict["url"])
source.save()
return source
def copy_mei_as_source(self):
# Save the MEI file as a reference source
if self.mei:
source_dict = {"ref": "ref_mei",
"source_type": SourceType.STYPE_MEI,
"description": "Référence MEI",
"url": ""}
source = self.add_source (source_dict)
source.source_file.save("ref_mei.xml", File(self.mei))
def load_from_dict(self, corpus, dict_opus, files={}, opus_url=""):
"""Load content from a dictionary.
The dictionary is commonly a decrypted JSON object, coming
either from the Neuma REST API or from ElasticSearch
"""
# The id can be named id or _id
if ("ref" in dict_opus.keys()):
self.ref = Corpus.make_ref_from_local_and_parent(dict_opus["ref"].strip(), corpus.ref)
elif ("_ref" in dict_opus.keys()):
self.ref = Corpus.make_ref_from_local_and_parent(dict_opus["_ref"].strip(), corpus.ref)
else:
raise KeyError('Missing ref field in an Opus dictionary')
self.corpus = corpus
self.title = dict_opus["title"]
if ("lyricist" in dict_opus.keys()):
if (dict_opus["lyricist"] != None):
self.lyricist = dict_opus["lyricist"]
if ("composer" in dict_opus.keys()):
if (dict_opus["composer"] != None):
self.composer = dict_opus["composer"]
# Saving before adding related objects
self.save()
if ("features" in dict_opus.keys()):
if (dict_opus["features"] != None):
for m in dict_opus["features"]:
self.add_meta (m["feature_key"], m["feature_value"])
if ("sources" in dict_opus.keys()
and type(dict_opus["sources"]) in {list} ):
if (dict_opus["sources"] != None):
for source in dict_opus["sources"]:
self.add_source (source)
# Cas ou la source contient une réf Gallica
if ("sources" in dict_opus.keys()
and isinstance(dict_opus["sources"], str)):
source_dict = {"ref": "iiif",
"source_type": "JPEG",
"description": "Lien Gallica",
"url": dict_opus["sources"]}
self.add_source (source_dict)
print ("Source Gallica: " + dict_opus["sources"] )
# Get the Opus files
for fname, desc in files.items():
if (fname in Opus.FILE_NAMES):
print ("Found " + fname + " at URL " + opus_url + fname)
# Take the score
file_temp = NamedTemporaryFile()
f = urlopen(opus_url + fname)
content = f.read()
file_temp.write(content)
file_temp.flush()
getattr(self, Opus.FILE_NAMES[fname]).save(fname, File(file_temp))
# Get the sequence file if any
if opus_url != "":
print ("Try to import sequence file")
try:
f = urlopen(opus_url + "sequence.json")
content = f.read().decode("utf-8")
# test that we got it
jseq = json.loads(content)
if "status" in jseq.keys():
print ("Something wrong. Received message: " + jseq["message"])
else:
self.music_summary = content
except:
print("Something wrong when getting " + opus_url + "sequence.json")
return
def get_score(self):
"""Get a score object from an XML document"""
score = Score()
# Try to obtain the MEI document, which contains IDs
if self.mei:
print ("Load from MEI")
score.load_from_xml(self.mei.path, "mei")
return score
elif self.musicxml:
print ("Load from MusicXML")
score.load_from_xml(self.musicxml.path, "musicxml")
return score
else:
raise LookupError ("Opus " + self.ref + " doesn't have any XML file attached")
def freeze(self,filepath="./"):
# http://web.mit.edu/music21/doc/moduleReference/moduleConverter.html#music21.converter.freeze