diff --git a/.gitignore b/.gitignore index 15c803a..172e2f6 100644 --- a/.gitignore +++ b/.gitignore @@ -83,6 +83,7 @@ ENV/ env.bak/ venv.bak/ my_venv/ +venv395-32/ # IDE - Visual Code .vscode/ diff --git a/requirements-dev.txt b/requirements-dev.txt index 2b95cdc..c7c3549 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,9 +1,10 @@ setuptools bumpversion +flake8 twine wheel pycodestyle pytest pytest-cov pytest-dependency --r requirements.txt \ No newline at end of file +-r requirements.txt diff --git a/requirements.txt b/requirements.txt index 86c871e..b8cda2e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -lxml \ No newline at end of file +lxml~=4.6.3 +pytest~=6.2.3 +setuptools~=41.2.0 \ No newline at end of file diff --git a/setup.py b/setup.py index d3b8c75..26e00a1 100644 --- a/setup.py +++ b/setup.py @@ -9,9 +9,8 @@ """ # Always prefer setuptools over distutils -import setuptools from os import path -from setuptools import setup, find_packages +from setuptools import setup HERE = path.abspath(path.dirname(__file__)) @@ -26,7 +25,7 @@ # Fields marked as "Optional" may be commented out. setup( name='scl_loader', # Required - version='1.8.0', # Required + version='1.8.1', # Required description='Outil de manipulation de SCD', # Required long_description=LONG_DESCRIPTION, # Optional long_description_content_type='text/markdown', # Optional (see note above) diff --git a/src/scl_loader/scl_loader.py b/src/scl_loader/scl_loader.py index 910b2bf..201c6b7 100644 --- a/src/scl_loader/scl_loader.py +++ b/src/scl_loader/scl_loader.py @@ -7,7 +7,6 @@ import weakref from copy import deepcopy from lxml import etree -from xml.etree import ElementTree as XmlET HERE = os.path.abspath(os.path.dirname(__file__)) XSD_PATH = os.path.join(HERE, 'resources', 'SCL_Schema', 'SCL.xsd') @@ -21,6 +20,7 @@ REG_ARRAY_TAGS = r'(?:\{.+\})?(?:FCDA|ClientLN|IEDName|FIP|BAP|ExtRef|Terminal|P)' # |Server)' REG_DT_NODE = r'(?:.*\})?((?:[BS]?D[AO])|(?:LN0?))' REF_SCL_NODES = r'(?:\{.+\})?(?:Header|Substation|Private|Communication)' +DEFAULT_AP = 'PROCESS_AP' SEP1 = '$' # Standard MMS separator. SEP2 = '/' # MMS Separator for system testing @@ -218,7 +218,7 @@ def get_Data_Type_Definitions(self) -> dict: A dictionnary of the DataTypes definitions grouped by tag """ - tags = {'LNodeType':[], 'DOType':[], 'DAType':[], 'EnumType':[]} + tags = {'LNodeType': [], 'DOType': [], 'DAType': [], 'EnumType': []} for tag_key in tags.keys(): item_xpath = 'child::iec61850:{}'.format(tag_key) @@ -816,10 +816,17 @@ def get_inputs_goose_extrefs(self) -> list: return extrefs + def get_LN_by_name(self, ln_Name: str) -> LN: + if hasattr(self, ln_Name): + return getattr(self, ln_Name) + + class IED(SCDNode): """ Class to manage an IED """ + DEFAULT_AP = 'PROCESS_AP' + def __init__(self, datatypes: DataTypeTemplates, node_elem: etree.Element = None, fullattrs: bool = False, **kwargs: dict): """ Constructor @@ -843,6 +850,7 @@ def __init__(self, datatypes: DataTypeTemplates, node_elem: etree.Element = None """ self._all_attributes = [] self._all_attributes.extend(NODES_ATTRS['IED']) + self._LDs = {} super().__init__(datatypes, node_elem, fullattrs, **kwargs) def get_inputs_goose_extrefs(self) -> list: @@ -865,8 +873,30 @@ def get_inputs_goose_extrefs(self) -> list: return extrefs - def get_children_LDs(self) -> list: - return self.PROCESS_AP.Server.get_children('LDevice') + def get_children_LDs(self, ap_name: str = DEFAULT_AP) -> list: + ap = self._get_ap_by_name(ap_name) + if ap: + return ap.Server.get_children('LDevice') + + def get_LD_by_inst(self, ld_inst: str, ap_name: str = DEFAULT_AP) -> LD: + if hasattr(self._LDs, ld_inst): + return self._LDs[ld_inst] + + ap = self._get_ap_by_name(ap_name) + if ap and hasattr(ap.Server, ld_inst): + result = getattr(ap.Server, ld_inst) + self._LDs[ld_inst] = result + return self._LDs[ld_inst] + + def get_LN_by_name(self, ld_inst: str, ln_Name: str, ap_name: str = DEFAULT_AP) -> LN: + ld = self.get_LD_by_inst(ld_inst, ap_name) + ln = ld.get_LN_by_name(ln_Name) + return ln + + def _get_ap_by_name(self, ap_name): + if hasattr(self, ap_name): + return getattr(self, ap_name) + class SCD_handler(): """ @@ -890,6 +920,7 @@ def __init__(self, scd_path: str, fullattrs: bool = False): schema_doc = etree.parse(XSD_PATH) self._schema = etree.XMLSchema(schema_doc) self._fullattrs = fullattrs + self._IEDs = {} is_valid, error = self._check_scd_file() if not is_valid: @@ -934,7 +965,7 @@ def extract_sub_SCD(self, ied_name_list: list) -> str: for child in scl_children: newroot.append(deepcopy(child)) - #Clean not needed ConnectedAP + # Clean not needed ConnectedAP connected_aps = newroot.xpath('child::iec61850:Communication/*/iec61850:ConnectedAP', namespaces=NS) connected_aps = [itm for itm in connected_aps if itm.get('iedName') not in ied_name_list] for c_ap in connected_aps: @@ -960,15 +991,14 @@ def get_all_IEDs(self) -> list: `[IED]` An array of the loaded IED objects """ - ieds = [] - if self._scl_root is not None: - ieds = self._scl_root.xpath('child::iec61850:IED', namespaces=NS) - else: - ieds = self._iter_get_all_IEDs() + + ied_names = self.get_IED_names_list() tIED = [] - for ied in ieds: - tIED.append(IED(self.datatypes, ied, self._fullattrs)) + for ied_name in ied_names: + if ied_name not in self._IEDs.keys(): + self._IEDs[ied_name] = self.get_IED_by_name(ied_name) + tIED.append(self._IEDs[ied_name]) return tIED @@ -986,9 +1016,13 @@ def get_IED_by_name(self, ied_name: str) -> IED: `IED` The loaded IED object """ + if hasattr(self._IEDs, ied_name): + return self._IEDs[ied_name] + ied_elems = self._get_IED_elems_by_names([ied_name]) if len(ied_elems) > 0: - return IED(self.datatypes, ied_elems[0], self._fullattrs) + self._IEDs[ied_name] = IED(self.datatypes, ied_elems[0], self._fullattrs) + return self._IEDs[ied_name] def get_IED_names_list(self) -> list: """ @@ -1009,6 +1043,20 @@ def get_IED_names_list(self) -> list: return result + def get_IP_Adr(self, ied_name: str): + for iComm in self.Communication.get_children('SubNetwork'): # browse all iED SubNetWork + if iComm.type != "8-MMS": # IP can be found only in MMS access point '. + continue + for iCnxAP in iComm.get_children('ConnectedAP'): # browse all Access Point(s) of the iED + if iCnxAP.iedName == ied_name: # and iCnxAP.apName == apNAme: + for iAdr in iCnxAP.get_children('Address'): + + for iP in iAdr.P: + if iP.type == "IP": # Look for IP address + if iP.Val is not None: + return iP.Val, iCnxAP.apName + return None, None # Not found + def _check_scd_file(self) -> tuple: """ /!\\ PRIVATE : do not use /!\\ @@ -1025,7 +1073,7 @@ def _check_scd_file(self) -> tuple: The boolean is True if the xml is valid If the xml is not valid the str is the error message """ - file_size = os.path.getsize(self._scd_path) // (1024*1024) + file_size = os.path.getsize(self._scd_path) // (1024 * 1024) if file_size < MAX_VALIDATION_SIZE: tree = etree.parse(self._scd_path) is_valid = self._schema.validate(tree) @@ -1033,7 +1081,7 @@ def _check_scd_file(self) -> tuple: self._scl_root = tree.getroot() return (is_valid, self._schema.error_log.last_error) else: - LOGGER.warn('XSD validation skipped due to file size over {} Mo' % MAX_VALIDATION_SIZE) + LOGGER.warning('XSD validation skipped due to file size over {} Mo' % MAX_VALIDATION_SIZE) return True def _get_IED_elems_by_names(self, ied_names_list: list) -> list: @@ -1089,7 +1137,7 @@ def _get_SCL_elems(self) -> list: return result - def _get_all_elem_by_tag(self, tag:str) -> list: + def _get_all_elem_by_tag(self, tag: str) -> list: """ /!\\ PRIVATE : do not use /!\\ @@ -1113,7 +1161,7 @@ def _get_all_elem_by_tag(self, tag:str) -> list: return elem_list - def _iter_get_all_elem_by_tag(self, tag:str) -> list: + def _iter_get_all_elem_by_tag(self, tag: str) -> list: """ /!\\ PRIVATE : do not use /!\\ @@ -1167,7 +1215,7 @@ def _iter_get_SCL_elems(self, tags: list) -> list: result.append(elem) elif elem.tag.split('}')[-1] == 'Private' \ and (elem.xpath('following-sibling::iec61850:Header', namespaces=NS) - or elem.xpath('preceding-sibling::iec61850:Header', namespaces=NS)): + or elem.xpath('preceding-sibling::iec61850:Header', namespaces=NS)): result.append(elem) else: elem.clear() @@ -1217,19 +1265,3 @@ def _iter_get_IED_names_list(self) -> list: ied.clear() return result - - def _iter_get_all_IEDs(self) -> list: - """ - Load all the IEDs from the SCD/SCL file - - Returns - ------- - `[IED]` - An array of the loaded IED objects - """ - context = etree.iterparse(self._scd_path, events=("end",), tag='{}IED'.format(SCL_NAMESPACE)) - tIED = [] - for _, ied in context: - tIED.append(ied) - - return tIED diff --git a/tests/test_scd_manager.py b/tests/test_scd_manager.py index 6a5173d..46a0041 100644 --- a/tests/test_scd_manager.py +++ b/tests/test_scd_manager.py @@ -8,7 +8,9 @@ import logging import pytest import hashlib -import cProfile, pstats, io +import cProfile +import pstats +import io from pstats import SortKey from lxml import etree @@ -32,6 +34,7 @@ SCD_OPEN_IOP_PATH = os.path.join(HERE, 'resources', SCD_OPEN_IOP_NAME) SCD_WRONG_PATH = os.path.join(HERE, 'resources', SCD_WRONG_NAME) + def hashfile(file): # A arbitrary (but fixed) buffer @@ -62,7 +65,6 @@ def hashfile(file): # that data) sha256.update(data) - # sha256.hexdigest() hashes all the input # data passed to the sha256() via sha256.update() # Acts as a finalize method, after which @@ -71,13 +73,15 @@ def hashfile(file): # in hexadecimal format return sha256.hexdigest() -def _get_node_list_by_tag(scd, tag:str) -> list: + +def _get_node_list_by_tag(scd, tag: str) -> list: result = [] context = etree.iterparse(scd._scd_path, events=("end",), tag=r'{http://www.iec.ch/61850/2003/SCL}' + tag) for _, ied in context: result.append(ied) return result + def test_safe_convert_value(): """ I should be able to convert a value in @@ -85,23 +89,25 @@ def test_safe_convert_value(): Typed formats supported : bool, int, float """ assert scdl._safe_convert_value('abc123') == 'abc123' - assert scdl._safe_convert_value('false') == False - assert scdl._safe_convert_value('False') == False - assert scdl._safe_convert_value('true') == True - assert scdl._safe_convert_value('TRUE') == True - assert scdl._safe_convert_value('123') == 123 - assert scdl._safe_convert_value('-123') == -123 - assert scdl._safe_convert_value('.123') == '.123' - assert scdl._safe_convert_value('01.23') == 1.23 - assert scdl._safe_convert_value('-1.23') == -1.23 - assert scdl._safe_convert_value('01b23') == '01b23' - assert scdl._safe_convert_value('#{~[~]{@^|`@`~\\/') == '#{~[~]{@^|`@`~\\/' + assert scdl._safe_convert_value('false') is False + assert scdl._safe_convert_value('False') is False + assert scdl._safe_convert_value('true') is True + assert scdl._safe_convert_value('TRUE') is True + assert scdl._safe_convert_value('123') == 123 + assert scdl._safe_convert_value('-123') == -123 + assert scdl._safe_convert_value('.123') == '.123' + assert scdl._safe_convert_value('01.23') == 1.23 + assert scdl._safe_convert_value('-1.23') == -1.23 + assert scdl._safe_convert_value('01b23') == '01b23' + assert scdl._safe_convert_value('#{~[~]{@^|`@`~\\/') == '#{~[~]{@^|`@`~\\/' + def test_valid_scd(): assert SCD_handler(SCD_OPEN_PATH) with pytest.raises(AttributeError): SCD_handler(SCD_WRONG_PATH) + def test_datatypes_get_by_id(): """ I should be able to load a Datatype by id @@ -112,13 +118,16 @@ def test_datatypes_get_by_id(): do_type = datatypes.get_type_by_id('ENC') assert do_type.get('cdc') == 'ENC' + class TestSCD_OPEN(): def setup_method(self): self.scd = SCD_handler(SCD_OPEN_PATH) + self.tIED = self.scd.get_all_IEDs() def teardown_method(self): del self.scd + del self.tIED def _start_perfo_stats(self): self.pr = cProfile.Profile() @@ -135,56 +144,56 @@ def test_create_scd_object(self): """ I Should be able to create a SCL object with its children (except IEDs and Datatype) """ - assert self.scd.Header.toolID == 'ggu' # pylint: disable=maybe-no-member - assert self.scd.Communication.Net1.LD_All.GSE.Address.P[0].type == 'VLAN-ID' # pylint: disable=maybe-no-member + assert self.scd.Header.toolID == 'ggu' # pylint: disable=maybe-no-member + assert self.scd.Communication.Net1.LD_All.GSE.Address.P[0].type == 'VLAN-ID' # pylint: disable=maybe-no-member def test_create_DA_by_kwargs(self): """ I should be able to create a DA object with kwargs """ - simple_da = {'fc':'ST', 'dchg':'false', 'qchg':'true', 'dupd':'false', 'name':'q', 'bType':'Quality'} - simple2_da = {'fc':'DC', 'dchg':'false', 'qchg':'false', 'dupd':'false', 'name':'d', 'bType':'VisString255', 'valKind':'RO', 'valImport':'false'} - enum_da = {'fc':'CF', 'dchg':'true', 'qchg':'false', 'dupd':'false', 'name':'ctlModel', 'bType':'Enum', 'valKind':'RO', 'type':'CtlModelKind', 'valImport':'false'} + simple_da = {'fc': 'ST', 'dchg': 'false', 'qchg': 'true', 'dupd': 'false', 'name': 'q', 'bType': 'Quality'} + simple2_da = {'fc': 'DC', 'dchg': 'false', 'qchg': 'false', 'dupd': 'false', 'name': 'd', 'bType': 'VisString255', 'valKind': 'RO', 'valImport': 'false'} + enum_da = {'fc': 'CF', 'dchg': 'true', 'qchg': 'false', 'dupd': 'false', 'name': 'ctlModel', 'bType': 'Enum', 'valKind': 'RO', 'type': 'CtlModelKind', 'valImport': 'false'} with pytest.raises(AttributeError): DA(self.scd.datatypes) simple_da_inst = DA(self.scd.datatypes, None, None, **simple_da) - assert getattr(simple_da_inst,'fc') == 'ST' - assert getattr(simple_da_inst,'dchg') == False - assert getattr(simple_da_inst,'qchg') == True - assert getattr(simple_da_inst,'dupd') == False - assert getattr(simple_da_inst,'name') == 'q' - assert getattr(simple_da_inst,'bType') == 'Quality' + assert getattr(simple_da_inst, 'fc') == 'ST' + assert getattr(simple_da_inst, 'dchg') is False + assert getattr(simple_da_inst, 'qchg') is True + assert getattr(simple_da_inst, 'dupd') is False + assert getattr(simple_da_inst, 'name') == 'q' + assert getattr(simple_da_inst, 'bType') == 'Quality' simple2_da_inst = DA(self.scd.datatypes, None, None, **simple2_da) - assert getattr(simple2_da_inst,'fc') == 'DC' - assert getattr(simple2_da_inst,'dchg') == False - assert getattr(simple2_da_inst,'qchg') == False - assert getattr(simple2_da_inst,'dupd') == False - assert getattr(simple2_da_inst,'name') == 'd' - assert getattr(simple2_da_inst,'bType') == 'VisString255' - assert getattr(simple2_da_inst,'valKind') == 'RO' - assert getattr(simple2_da_inst,'valImport') == False + assert getattr(simple2_da_inst, 'fc') == 'DC' + assert getattr(simple2_da_inst, 'dchg') is False + assert getattr(simple2_da_inst, 'qchg') is False + assert getattr(simple2_da_inst, 'dupd') is False + assert getattr(simple2_da_inst, 'name') == 'd' + assert getattr(simple2_da_inst, 'bType') == 'VisString255' + assert getattr(simple2_da_inst, 'valKind') == 'RO' + assert getattr(simple2_da_inst, 'valImport') is False enum_da_inst = DA(self.scd.datatypes, None, None, **enum_da) - assert getattr(enum_da_inst,'fc') == 'CF' - assert getattr(enum_da_inst,'dchg') == True - assert getattr(enum_da_inst,'qchg') == False - assert getattr(enum_da_inst,'dupd') == False - assert getattr(enum_da_inst,'name') == 'ctlModel' - assert getattr(enum_da_inst,'bType') == 'Enum' - assert getattr(enum_da_inst,'valKind') == 'RO' - assert getattr(enum_da_inst,'type') == 'CtlModelKind' - assert getattr(enum_da_inst,'valImport') == False + assert getattr(enum_da_inst, 'fc') == 'CF' + assert getattr(enum_da_inst, 'dchg') is True + assert getattr(enum_da_inst, 'qchg') is False + assert getattr(enum_da_inst, 'dupd') is False + assert getattr(enum_da_inst, 'name') == 'ctlModel' + assert getattr(enum_da_inst, 'bType') == 'Enum' + assert getattr(enum_da_inst, 'valKind') == 'RO' + assert getattr(enum_da_inst, 'type') == 'CtlModelKind' + assert getattr(enum_da_inst, 'valImport') is False def test_create_struct_DA_by_kwargs(self): - struct_da = {'name':'originSrc', 'fc':'ST', 'bType':'Struct', 'type':'Originator'} + struct_da = {'name': 'originSrc', 'fc': 'ST', 'bType': 'Struct', 'type': 'Originator'} struct_da_inst = DA(self.scd.datatypes, None, **struct_da) - assert getattr(struct_da_inst,'fc') == 'ST' - assert getattr(struct_da_inst,'name') == 'originSrc' - assert getattr(struct_da_inst,'bType') == 'Struct' - assert getattr(struct_da_inst,'type') == 'Originator' + assert getattr(struct_da_inst, 'fc') == 'ST' + assert getattr(struct_da_inst, 'name') == 'originSrc' + assert getattr(struct_da_inst, 'bType') == 'Struct' + assert getattr(struct_da_inst, 'type') == 'Originator' assert struct_da_inst.orCat.tag == 'BDA' # pylint: disable=maybe-no-member assert struct_da_inst.orIdent.bType == 'Octet64' # pylint: disable=maybe-no-member @@ -194,55 +203,55 @@ def test_create_DO_by_dtid(self): """ input_node = etree.Element('DO') input_node.attrib['id'] = 'ENC' - simple_do_inst = DO(self.scd.datatypes, input_node, **{'name':'DO_RTE_1'}) - assert getattr(simple_do_inst,'id') == 'ENC' - assert getattr(simple_do_inst,'cdc') == 'ENC' - assert getattr(simple_do_inst,'parent') == None - assert isinstance(getattr(simple_do_inst,'ctlModel') , DA) + simple_do_inst = DO(self.scd.datatypes, input_node, **{'name': 'DO_RTE_1'}) + assert getattr(simple_do_inst, 'id') == 'ENC' + assert getattr(simple_do_inst, 'cdc') == 'ENC' + assert getattr(simple_do_inst, 'parent') is None + assert isinstance(getattr(simple_do_inst, 'ctlModel'), DA) assert simple_do_inst.ctlModel.type == 'CtlModels' # pylint: disable=maybe-no-member - assert isinstance(getattr(simple_do_inst,'blkEna') , DA) - assert isinstance(getattr(simple_do_inst,'ctlNum') , DA) - assert isinstance(getattr(simple_do_inst,'d') , DA) - assert isinstance(getattr(simple_do_inst,'dU') , DA) - assert isinstance(getattr(simple_do_inst,'dataNs') , DA) - assert isinstance(getattr(simple_do_inst,'opOk') , DA) - assert isinstance(getattr(simple_do_inst,'opRcvd') , DA) - assert isinstance(getattr(simple_do_inst,'operTimeout') , DA) - assert isinstance(getattr(simple_do_inst,'origin') , DA) - assert isinstance(getattr(simple_do_inst,'q') , DA) - assert isinstance(getattr(simple_do_inst,'stVal') , DA) - assert isinstance(getattr(simple_do_inst,'subEna') , DA) - assert isinstance(getattr(simple_do_inst,'subID') , DA) - assert isinstance(getattr(simple_do_inst,'subQ') , DA) - assert isinstance(getattr(simple_do_inst,'subVal') , DA) - assert isinstance(getattr(simple_do_inst,'t') , DA) - assert isinstance(getattr(simple_do_inst,'tOpOk') , DA) + assert isinstance(getattr(simple_do_inst, 'blkEna'), DA) + assert isinstance(getattr(simple_do_inst, 'ctlNum'), DA) + assert isinstance(getattr(simple_do_inst, 'd'), DA) + assert isinstance(getattr(simple_do_inst, 'dU'), DA) + assert isinstance(getattr(simple_do_inst, 'dataNs'), DA) + assert isinstance(getattr(simple_do_inst, 'opOk'), DA) + assert isinstance(getattr(simple_do_inst, 'opRcvd'), DA) + assert isinstance(getattr(simple_do_inst, 'operTimeout'), DA) + assert isinstance(getattr(simple_do_inst, 'origin'), DA) + assert isinstance(getattr(simple_do_inst, 'q'), DA) + assert isinstance(getattr(simple_do_inst, 'stVal'), DA) + assert isinstance(getattr(simple_do_inst, 'subEna'), DA) + assert isinstance(getattr(simple_do_inst, 'subID'), DA) + assert isinstance(getattr(simple_do_inst, 'subQ'), DA) + assert isinstance(getattr(simple_do_inst, 'subVal'), DA) + assert isinstance(getattr(simple_do_inst, 't'), DA) + assert isinstance(getattr(simple_do_inst, 'tOpOk'), DA) def test_create_LN_by_dtid(self): """ I should be able to create a LN object """ - kwargs = {'lnClass':'GAPC', 'inst':'0', 'lnType':'GAPC', 'desc':'This is a GAPC'} + kwargs = {'lnClass': 'GAPC', 'inst': '0', 'lnType': 'GAPC', 'desc': 'This is a GAPC'} ln_inst = LN(self.scd.datatypes, None, **kwargs) - assert getattr(ln_inst,'lnType') == 'GAPC' - assert getattr(ln_inst,'lnClass') == 'GAPC' - assert getattr(ln_inst,'inst') == 0 - assert getattr(ln_inst,'name') == 'GAPC0' - assert getattr(ln_inst,'desc') == 'This is a GAPC' - assert isinstance(getattr(ln_inst,'Alm1') , DO) - assert isinstance(getattr(ln_inst,'Auto') , DO) - assert isinstance(getattr(ln_inst,'DPCSO1') , DO) - assert isinstance(getattr(ln_inst,'ISCSO1') , DO) - assert isinstance(getattr(ln_inst,'Ind1') , DO) - assert isinstance(getattr(ln_inst,'Loc') , DO) - assert isinstance(getattr(ln_inst,'LocKey') , DO) - assert isinstance(getattr(ln_inst,'LocSta') , DO) - assert isinstance(getattr(ln_inst,'Op1') , DO) - assert isinstance(getattr(ln_inst,'OpCntRs'), DO) - assert isinstance(getattr(ln_inst,'SPCSO1') , DO) - assert isinstance(getattr(ln_inst,'Str1') , DO) - assert isinstance(getattr(ln_inst,'StrVal1'), DO) - assert isinstance(getattr(ln_inst,'Wrn1') , DO) # pylint: disable=maybe-no-member + assert getattr(ln_inst, 'lnType') == 'GAPC' + assert getattr(ln_inst, 'lnClass') == 'GAPC' + assert getattr(ln_inst, 'inst') == 0 + assert getattr(ln_inst, 'name') == 'GAPC0' + assert getattr(ln_inst, 'desc') == 'This is a GAPC' + assert isinstance(getattr(ln_inst, 'Alm1'), DO) + assert isinstance(getattr(ln_inst, 'Auto'), DO) + assert isinstance(getattr(ln_inst, 'DPCSO1'), DO) + assert isinstance(getattr(ln_inst, 'ISCSO1'), DO) + assert isinstance(getattr(ln_inst, 'Ind1'), DO) + assert isinstance(getattr(ln_inst, 'Loc'), DO) + assert isinstance(getattr(ln_inst, 'LocKey'), DO) + assert isinstance(getattr(ln_inst, 'LocSta'), DO) + assert isinstance(getattr(ln_inst, 'Op1'), DO) + assert isinstance(getattr(ln_inst, 'OpCntRs'), DO) + assert isinstance(getattr(ln_inst, 'SPCSO1'), DO) + assert isinstance(getattr(ln_inst, 'Str1'), DO) + assert isinstance(getattr(ln_inst, 'StrVal1'), DO) + assert isinstance(getattr(ln_inst, 'Wrn1'), DO) # pylint: disable=maybe-no-member assert ln_inst.LocKey.dU.bType == 'Unicode255' # pylint: disable=maybe-no-member assert ln_inst.DPCSO1.ctlModel.fc == 'CF' # pylint: disable=maybe-no-member @@ -253,18 +262,18 @@ def test_create_LN0_by_dtid(self): ln0s = _get_node_list_by_tag(self.scd, 'LN0') assert len(ln0s) > 0 ln0 = LN0(self.scd.datatypes, ln0s[0]) - assert getattr(ln0,'lnClass') == 'LLN0' - assert getattr(ln0,'name') == 'LLN0' + assert getattr(ln0, 'lnClass') == 'LLN0' + assert getattr(ln0, 'name') == 'LLN0' datasets = ln0.get_children('DataSet') assert len(datasets) == 157 assert datasets[0] - assert getattr(datasets[0],'name') == 'DS_LPHD' - assert isinstance(datasets[0] , SCDNode) + assert getattr(datasets[0], 'name') == 'DS_LPHD' + assert isinstance(datasets[0], SCDNode) assert len(ln0.get_children('ReportControl')) == 157 assert len(ln0.get_children('GSEControl')) == 157 assert len(ln0.get_children('ReportControl')) == 157 assert len(ln0.get_children('DO')) == 8 - assert isinstance(getattr(ln0,'Diag') , DO) + assert isinstance(getattr(ln0, 'Diag'), DO) def test_create_LD(self): """ @@ -326,11 +335,12 @@ def test_get_all_ied(self): def test_open_iop(): scd = SCD_handler(SCD_OPEN_IOP_PATH) assert scd - assert scd.Header.toolID == 'PVR GEN TOOL' # pylint: disable=maybe-no-member - assert scd.Communication.RSPACE_PROCESS_NETWORK.AUT1A_SITE_1.GSE.Address.P[0].type == 'VLAN-PRIORITY' # pylint: disable=maybe-no-member - assert scd.Substation[0].SITEP41.name == 'SITEP41' # pylint: disable=maybe-no-member + assert scd.Header.toolID == 'PVR GEN TOOL' # pylint: disable=maybe-no-member + assert scd.Communication.RSPACE_PROCESS_NETWORK.AUT1A_SITE_1.GSE.Address.P[0].type == 'VLAN-PRIORITY' # pylint: disable=maybe-no-member + assert scd.Substation[0].SITEP41.name == 'SITEP41' # pylint: disable=maybe-no-member del scd + def test_get_Data_Type_Definition(): scd = SCD_handler(SCD_OPEN_IOP_PATH) datatype_defs = scd.datatypes.get_Data_Type_Definitions() @@ -341,6 +351,7 @@ def test_get_Data_Type_Definition(): assert len(datatype_defs['DAType']) == 16 assert len(datatype_defs['EnumType']) == 40 + def test_extract_sub_SCD(): ref_scd2_hash = '517e42b4be8a4e23e51621a4248418182ff4097cfb4d8f8177e5e1b90c8f3057' scd = SCD_handler(SCD_OPEN_IOP_PATH) @@ -352,13 +363,26 @@ def test_extract_sub_SCD(): assert ref_scd2_hash == hashfile(dest_path) -class TestSCD_IOP(): +def test_get_IP_Adr(): + scd = SCD_handler(SCD_OPEN_IOP_PATH) + (ip1, apName1) = scd.get_IP_Adr('AUT1A_SITE_1') + assert ip1 == '127.0.0.1' + assert apName1 == "PROCESS_AP" + + (ip2, apName2) = scd.get_IP_Adr('IEDTEST_SITE_1') + assert ip2 == '127.0.0.1' + assert apName2 == "PROCESS_AP" + return True + +class TestSCD_IOP(): def setup_method(self): self.scd = SCD_handler(SCD_OPEN_IOP_PATH) + self.tIED = self.scd.get_all_IEDs() def teardown_method(self): del self.scd + del self.tIED def test_get_ied_extrefs(self): ied = self.scd.get_IED_by_name('AUT1A_SITE_1') @@ -383,3 +407,29 @@ def test_get_ld_extrefs(self): result = ld.get_inputs_goose_extrefs() assert len(result) == 42 assert result[0] == {'desc': 'DYN_LDASLD_Position filtree sectionneur_5_Dbpos_1_stVal_3', 'doName': 'Pos', 'iedName': 'IEDTEST_SITE_1', 'intAddr': 'VDF', 'ldInst': 'XX_BCU_4LINE2_1_LDCMDSL_1', 'lnClass': 'CSWI', 'lnInst': '0', 'pDO': 'Pos', 'pLN': 'CSWI', 'pServT': 'GOOSE', 'serviceType': 'GOOSE', 'srcCBName': 'PVR_LLN0_CB_GSE_EXT', 'srcLDInst': 'XX_BCU_4LINE2_1_LDCMDSL_1'} + + def test_LD_get_LN_by_name(self): + ied = self.scd.get_IED_by_name('AUT1A_SITE_1') + ld = ied.get_children_LDs()[0] + ln_name = 'PTRC1' + + result = ld.get_LN_by_name(ln_name) + assert isinstance(result, LN) + assert result.name == ln_name + + def test_IED_get_LN_by_name(self): + ied = self.scd.get_IED_by_name('AUT1A_SITE_1') + ld_inst = 'LDASLD' + ln_name = 'PTRC1' + + result = ied.get_LN_by_name(ld_inst, ln_name) + assert isinstance(result, LN) + assert result.name == ln_name + + def test_IED_get_LD_by_inst(self): + ied = self.scd.get_IED_by_name('AUT1A_SITE_1') + ld_inst = 'LDASLD' + + result = ied.get_LD_by_inst(ld_inst) + assert isinstance(result, LD) + assert result.name == ld_inst