Skip to content

Commit

Permalink
Dev pascal (#14)
Browse files Browse the repository at this point in the history
* change to all extref attributes

* fix tests get extrefs

* add get_inputs_goose_extrefs to LD
add get_children_LDs to IED

* Adding test for getters functions

* Ajout de get_IP_adr

* Ajout de get_IP_adr, correction sauts de lignes

* Corrections, remise à zéro de self.tIED

* Ajout de commentaires,

* get LD and get LN

* fix get_all_ieds

* intégration du get_IP_Adr et fixes

Co-authored-by: Pascal Senizergues <pascal.senizergues@rte-france.com>
Co-authored-by: gilles.guillet <gilles.guillet@rte-france.com>
  • Loading branch information
3 people authored Jun 23, 2021
1 parent f0f4c30 commit b645f84
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 137 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ ENV/
env.bak/
venv.bak/
my_venv/
venv395-32/

# IDE - Visual Code
.vscode/
Expand Down
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
setuptools
bumpversion
flake8
twine
wheel
pycodestyle
pytest
pytest-cov
pytest-dependency
-r requirements.txt
-r requirements.txt
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
lxml
lxml~=4.6.3
pytest~=6.2.3
setuptools~=41.2.0
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
"""

# Always prefer setuptools over distutils
import setuptools
from os import path
from setuptools import setup, find_packages
from setuptools import setup

HERE = path.abspath(path.dirname(__file__))

Expand All @@ -26,7 +25,7 @@
# Fields marked as "Optional" may be commented out.
setup(
name='scl_loader', # Required
version='1.8.0', # Required
version='1.8.1', # Required
description='Outil de manipulation de SCD', # Required
long_description=LONG_DESCRIPTION, # Optional
long_description_content_type='text/markdown', # Optional (see note above)
Expand Down
100 changes: 66 additions & 34 deletions src/scl_loader/scl_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import weakref
from copy import deepcopy
from lxml import etree
from xml.etree import ElementTree as XmlET

HERE = os.path.abspath(os.path.dirname(__file__))
XSD_PATH = os.path.join(HERE, 'resources', 'SCL_Schema', 'SCL.xsd')
Expand All @@ -21,6 +20,7 @@
REG_ARRAY_TAGS = r'(?:\{.+\})?(?:FCDA|ClientLN|IEDName|FIP|BAP|ExtRef|Terminal|P)' # |Server)'
REG_DT_NODE = r'(?:.*\})?((?:[BS]?D[AO])|(?:LN0?))'
REF_SCL_NODES = r'(?:\{.+\})?(?:Header|Substation|Private|Communication)'
DEFAULT_AP = 'PROCESS_AP'
SEP1 = '$' # Standard MMS separator.
SEP2 = '/' # MMS Separator for system testing

Expand Down Expand Up @@ -218,7 +218,7 @@ def get_Data_Type_Definitions(self) -> dict:
A dictionnary of the DataTypes definitions grouped by tag
"""
tags = {'LNodeType':[], 'DOType':[], 'DAType':[], 'EnumType':[]}
tags = {'LNodeType': [], 'DOType': [], 'DAType': [], 'EnumType': []}

for tag_key in tags.keys():
item_xpath = 'child::iec61850:{}'.format(tag_key)
Expand Down Expand Up @@ -816,10 +816,17 @@ def get_inputs_goose_extrefs(self) -> list:

return extrefs

def get_LN_by_name(self, ln_Name: str) -> LN:
if hasattr(self, ln_Name):
return getattr(self, ln_Name)


class IED(SCDNode):
"""
Class to manage an IED
"""
DEFAULT_AP = 'PROCESS_AP'

def __init__(self, datatypes: DataTypeTemplates, node_elem: etree.Element = None, fullattrs: bool = False, **kwargs: dict):
"""
Constructor
Expand All @@ -843,6 +850,7 @@ def __init__(self, datatypes: DataTypeTemplates, node_elem: etree.Element = None
"""
self._all_attributes = []
self._all_attributes.extend(NODES_ATTRS['IED'])
self._LDs = {}
super().__init__(datatypes, node_elem, fullattrs, **kwargs)

def get_inputs_goose_extrefs(self) -> list:
Expand All @@ -865,8 +873,30 @@ def get_inputs_goose_extrefs(self) -> list:

return extrefs

def get_children_LDs(self) -> list:
return self.PROCESS_AP.Server.get_children('LDevice')
def get_children_LDs(self, ap_name: str = DEFAULT_AP) -> list:
ap = self._get_ap_by_name(ap_name)
if ap:
return ap.Server.get_children('LDevice')

def get_LD_by_inst(self, ld_inst: str, ap_name: str = DEFAULT_AP) -> LD:
if hasattr(self._LDs, ld_inst):
return self._LDs[ld_inst]

ap = self._get_ap_by_name(ap_name)
if ap and hasattr(ap.Server, ld_inst):
result = getattr(ap.Server, ld_inst)
self._LDs[ld_inst] = result
return self._LDs[ld_inst]

def get_LN_by_name(self, ld_inst: str, ln_Name: str, ap_name: str = DEFAULT_AP) -> LN:
ld = self.get_LD_by_inst(ld_inst, ap_name)
ln = ld.get_LN_by_name(ln_Name)
return ln

def _get_ap_by_name(self, ap_name):
if hasattr(self, ap_name):
return getattr(self, ap_name)


class SCD_handler():
"""
Expand All @@ -890,6 +920,7 @@ def __init__(self, scd_path: str, fullattrs: bool = False):
schema_doc = etree.parse(XSD_PATH)
self._schema = etree.XMLSchema(schema_doc)
self._fullattrs = fullattrs
self._IEDs = {}

is_valid, error = self._check_scd_file()
if not is_valid:
Expand Down Expand Up @@ -934,7 +965,7 @@ def extract_sub_SCD(self, ied_name_list: list) -> str:
for child in scl_children:
newroot.append(deepcopy(child))

#Clean not needed ConnectedAP
# Clean not needed ConnectedAP
connected_aps = newroot.xpath('child::iec61850:Communication/*/iec61850:ConnectedAP', namespaces=NS)
connected_aps = [itm for itm in connected_aps if itm.get('iedName') not in ied_name_list]
for c_ap in connected_aps:
Expand All @@ -960,15 +991,14 @@ def get_all_IEDs(self) -> list:
`[IED]`
An array of the loaded IED objects
"""
ieds = []
if self._scl_root is not None:
ieds = self._scl_root.xpath('child::iec61850:IED', namespaces=NS)
else:
ieds = self._iter_get_all_IEDs()

ied_names = self.get_IED_names_list()

tIED = []
for ied in ieds:
tIED.append(IED(self.datatypes, ied, self._fullattrs))
for ied_name in ied_names:
if ied_name not in self._IEDs.keys():
self._IEDs[ied_name] = self.get_IED_by_name(ied_name)
tIED.append(self._IEDs[ied_name])

return tIED

Expand All @@ -986,9 +1016,13 @@ def get_IED_by_name(self, ied_name: str) -> IED:
`IED`
The loaded IED object
"""
if hasattr(self._IEDs, ied_name):
return self._IEDs[ied_name]

ied_elems = self._get_IED_elems_by_names([ied_name])
if len(ied_elems) > 0:
return IED(self.datatypes, ied_elems[0], self._fullattrs)
self._IEDs[ied_name] = IED(self.datatypes, ied_elems[0], self._fullattrs)
return self._IEDs[ied_name]

def get_IED_names_list(self) -> list:
"""
Expand All @@ -1009,6 +1043,20 @@ def get_IED_names_list(self) -> list:

return result

def get_IP_Adr(self, ied_name: str):
for iComm in self.Communication.get_children('SubNetwork'): # browse all iED SubNetWork
if iComm.type != "8-MMS": # IP can be found only in MMS access point '.
continue
for iCnxAP in iComm.get_children('ConnectedAP'): # browse all Access Point(s) of the iED
if iCnxAP.iedName == ied_name: # and iCnxAP.apName == apNAme:
for iAdr in iCnxAP.get_children('Address'):

for iP in iAdr.P:
if iP.type == "IP": # Look for IP address
if iP.Val is not None:
return iP.Val, iCnxAP.apName
return None, None # Not found

def _check_scd_file(self) -> tuple:
"""
/!\\ PRIVATE : do not use /!\\
Expand All @@ -1025,15 +1073,15 @@ def _check_scd_file(self) -> tuple:
The boolean is True if the xml is valid
If the xml is not valid the str is the error message
"""
file_size = os.path.getsize(self._scd_path) // (1024*1024)
file_size = os.path.getsize(self._scd_path) // (1024 * 1024)
if file_size < MAX_VALIDATION_SIZE:
tree = etree.parse(self._scd_path)
is_valid = self._schema.validate(tree)
if not FORCE_ITER_MODE:
self._scl_root = tree.getroot()
return (is_valid, self._schema.error_log.last_error)
else:
LOGGER.warn('XSD validation skipped due to file size over {} Mo' % MAX_VALIDATION_SIZE)
LOGGER.warning('XSD validation skipped due to file size over {} Mo' % MAX_VALIDATION_SIZE)
return True

def _get_IED_elems_by_names(self, ied_names_list: list) -> list:
Expand Down Expand Up @@ -1089,7 +1137,7 @@ def _get_SCL_elems(self) -> list:

return result

def _get_all_elem_by_tag(self, tag:str) -> list:
def _get_all_elem_by_tag(self, tag: str) -> list:
"""
/!\\ PRIVATE : do not use /!\\
Expand All @@ -1113,7 +1161,7 @@ def _get_all_elem_by_tag(self, tag:str) -> list:

return elem_list

def _iter_get_all_elem_by_tag(self, tag:str) -> list:
def _iter_get_all_elem_by_tag(self, tag: str) -> list:
"""
/!\\ PRIVATE : do not use /!\\
Expand Down Expand Up @@ -1167,7 +1215,7 @@ def _iter_get_SCL_elems(self, tags: list) -> list:
result.append(elem)
elif elem.tag.split('}')[-1] == 'Private' \
and (elem.xpath('following-sibling::iec61850:Header', namespaces=NS)
or elem.xpath('preceding-sibling::iec61850:Header', namespaces=NS)):
or elem.xpath('preceding-sibling::iec61850:Header', namespaces=NS)):
result.append(elem)
else:
elem.clear()
Expand Down Expand Up @@ -1217,19 +1265,3 @@ def _iter_get_IED_names_list(self) -> list:
ied.clear()

return result

def _iter_get_all_IEDs(self) -> list:
"""
Load all the IEDs from the SCD/SCL file
Returns
-------
`[IED]`
An array of the loaded IED objects
"""
context = etree.iterparse(self._scd_path, events=("end",), tag='{}IED'.format(SCL_NAMESPACE))
tIED = []
for _, ied in context:
tIED.append(ied)

return tIED
Loading

0 comments on commit b645f84

Please sign in to comment.