diff --git a/README.rst b/README.rst index bc2753b..b6e534e 100644 --- a/README.rst +++ b/README.rst @@ -55,23 +55,30 @@ PIP: Basic Usage ----------- -Connect to a PLC and get some basic information, -use the ``slot`` kwarg if the PLC is not in slot 0. CompactLogix leave ``slot=0``. +Connect to a PLC and get some basic information about it. The ``path`` argument is the only one required, and it +has 3 forms: + + - IP Address Only (``10.20.30.100``) - Use if PLC is in slot 0 or if connecting to CompactLogix + - IP Address/Slot (``10.20.30.100/1``) - Use if PLC is not in slot 0 + - CIP Routing Path (``10.20.30.100/backplane/3/enet/10.20.40.100/backplane/0``) - Use if needing to route thru a backplane + - first 2 examples will replaced with the full path automatically, they're there for convenience. + - ``enet``/``backplane`` (or ``bp``) are for port selection, standard CIP routing but without having to remember + which port is what value. :: from pycomm3 import LogixDriver - with LogixDriver('10.20.30.100', slot=1) as plc: + with LogixDriver('10.20.30.100/1') as plc: print(plc) # OUTPUT: - # Program Name: PLCA, Device: 1756-L74/A LOGIX5574, Revision: 31.11 + # Program Name: PLCA, Device: 1756-L83E/B, Revision: 28.13 print(plc.info) # OUTPUT: # {'vendor': 'Rockwell Automation/Allen-Bradley', 'product_type': 'Programmable Logic Controller', - # 'product_code': 55, 'version_major': 20, 'version_minor': 12, 'revision': '20.12', 'serial': '004b8fe0', - # 'device_type': '1756-L62/B LOGIX5562', 'keyswitch': 'REMOTE RUN', 'name': 'PLCA'} + # 'product_code': 166, 'version_major': 28, 'version_minor': 13, 'revision': '28.13', 'serial': 'FFFFFFFF', + # 'device_type': '1756-L83E/B', 'keyswitch': 'REMOTE RUN', 'name': 'PLCA'} @@ -104,7 +111,7 @@ Both methods will return ``Tag`` objects to reflect the success or failure of th class Tag(NamedTuple): tag: str value: Any - type: Union[str, None] + type: Optional[str] = None error: Optional[str] = None ``Tag`` objects are considered successful if the value is not None and the error is None. Otherwise, the error will @@ -238,7 +245,7 @@ Unit Testing ``pytest`` is used for unit testing. The ``tests`` directory contains an L5X export of the ``Pycomm3_Testing`` program that contains all tags necessary for testing. The only requirement for testing (besides a running PLC with the testing -program) is the environment variables ``IP`` and ``SLOT`` for the PLC defined. +program) is the environment variable ``PLCPATH`` for the PLC defined. .. Note:: Test coverage is not complete, pull requests are very much welcome to cover all combinations for reading and writing tags. diff --git a/pycomm3/__init__.py b/pycomm3/__init__.py index 7c1672c..d7e3d4a 100644 --- a/pycomm3/__init__.py +++ b/pycomm3/__init__.py @@ -24,7 +24,7 @@ # SOFTWARE. # -__version_info__ = (0, 4, 4) +__version_info__ = (0, 5, 0) __version__ = '.'.join(f'{x}' for x in __version_info__) from typing import NamedTuple, Any, Union, Optional diff --git a/pycomm3/base.py b/pycomm3/base.py index 051ebf8..b707e95 100644 --- a/pycomm3/base.py +++ b/pycomm3/base.py @@ -31,26 +31,82 @@ from . import DataError, CommError from .bytes_ import (pack_usint, pack_udint, pack_uint, pack_dint, unpack_uint, unpack_usint, unpack_udint, print_bytes_msg, DATA_FUNCTION_SIZE, UNPACK_DATA_FUNCTION) -from .const import (DATA_TYPE, TAG_SERVICES_REQUEST, ENCAPSULATION_COMMAND, EXTENDED_SYMBOL, +from .const import (DATA_TYPE, TAG_SERVICES_REQUEST, ENCAPSULATION_COMMAND, EXTENDED_SYMBOL, PATH_SEGMENTS, ELEMENT_ID, CLASS_CODE, PADDING_BYTE, CONNECTION_SIZE, CLASS_ID, INSTANCE_ID, FORWARD_CLOSE, FORWARD_OPEN, LARGE_FORWARD_OPEN, CONNECTION_MANAGER_INSTANCE, PRIORITY, TIMEOUT_MULTIPLIER, TIMEOUT_TICKS, TRANSPORT_CLASS, UNCONNECTED_SEND, PRODUCT_TYPES, VENDORS, STATES) from .packets import REQUEST_MAP from .socket_ import Socket +import socket + + +def _parse_connection_path(path): + ip, *segments = path.split('/') + if not socket.inet_aton(ip): + raise ValueError('Invalid IP Address', ip) + segments = [_parse_path_segment(s) for s in segments] + + if not segments: + _path = [pack_usint(PATH_SEGMENTS['backplane']), b'\x00'] # default backplane/0 + elif len(segments) == 1: + _path = [pack_usint(PATH_SEGMENTS['backplane']), pack_usint(segments[0])] + else: + pairs = (segments[i:i + 2] for i in range(0, len(segments), 2)) + _path = [] + for port, dest in pairs: + if isinstance(dest, bytes): + port |= 1 << 4 # set Extended Link Address bit, CIP Vol 1 C-1.3 + dest_len = len(dest) + if dest_len % 2: + dest += b'\x00' + _path.extend([pack_usint(port), pack_usint(dest_len), dest]) + else: + _path.extend([pack_usint(port), pack_usint(dest)]) + + _path += [ + CLASS_ID['8-bit'], + CLASS_CODE['Message Router'], + INSTANCE_ID['8-bit'], + b'\x01' + ] + + _path_bytes = b''.join(_path) + + if len(_path_bytes) % 2: + _path_bytes += b'\x00' + + return ip, pack_usint(len(_path_bytes) // 2) + _path_bytes + + +def _parse_path_segment(segment: str): + try: + if segment.isnumeric(): + return int(segment) + else: + tmp = PATH_SEGMENTS.get(segment.lower()) + if tmp: + return tmp + else: + if socket.inet_aton(segment): + return b''.join(pack_usint(ord(c)) for c in segment) + except: + raise ValueError(f'Failed to parse path segment', segment) + + raise ValueError(f'Path segment is invalid', segment) @logged class Base: _sequence = 0 - def __init__(self, direct_connection=False, debug=False): + def __init__(self, path, debug=False): if Base._sequence == 0: Base._sequence = getpid() else: Base._sequence = Base._get_sequence() self._sock = None - self.__direct_connections = direct_connection + # self.__direct_connections = direct_connection self._debug = debug self._session = 0 self._connection_opened = False @@ -60,21 +116,22 @@ def __init__(self, direct_connection=False, debug=False): self._last_tag_write = () self._info = {} self.connection_size = 500 + ip, _path = _parse_connection_path(path) + self.attribs = { 'context': b'_pycomm_', 'protocol version': b'\x01\x00', 'rpi': 5000, 'port': 0xAF12, # 44818 'timeout': 10, - 'backplane': 1, - 'cpu slot': 0, + 'ip address': ip, + 'cip_path': _path, 'option': 0, 'cid': b'\x27\x04\x19\x71', 'csn': b'\x27\x04', 'vid': b'\x09\x10', 'vsn': b'\x09\x10\x19\x71', 'name': 'Base', - 'ip address': None, 'extended forward open': False} def __len__(self): @@ -207,19 +264,10 @@ def forward_open(self): init_net_params = (True << 9) | (0 << 10) | (2 << 13) | (False << 15) if self.attribs['extended forward open']: - connection_size = 4002 net_params = pack_udint((self.connection_size & 0xFFFF) | init_net_params << 16) else: - connection_size = 500 net_params = pack_uint((self.connection_size & 0x01FF) | init_net_params) - if self.__direct_connections: - connection_params = [CONNECTION_SIZE['Direct Network'], CLASS_ID["8-bit"], CLASS_CODE["Message Router"]] - else: - connection_params = [ - CONNECTION_SIZE['Backplane'], - ] - forward_open_msg = [ FORWARD_OPEN if not self.attribs['extended forward open'] else LARGE_FORWARD_OPEN, b'\x02', # CIP Path size @@ -241,12 +289,7 @@ def forward_open(self): b'\x01\x40\x20\x00', net_params, TRANSPORT_CLASS, - *connection_params, - pack_usint(self.attribs['backplane']), - pack_usint(self.attribs['cpu slot']), - b'\x20\x02', - INSTANCE_ID["8-bit"], - b'\x01' + self.attribs['cip_path'] ] request = self.new_request('send_rr_data') request.add(*forward_open_msg) @@ -271,6 +314,8 @@ def forward_close(self): raise CommError("A session need to be registered before to call forward_close.") request = self.new_request('send_rr_data') + path_size, *path = self.attribs['cip_path'] # for some reason we need to add a 0x00 between these? CIP Vol 1 + forward_close_msg = [ FORWARD_CLOSE, b'\x02', @@ -283,25 +328,9 @@ def forward_close(self): self.attribs['csn'], self.attribs['vid'], self.attribs['vsn'], - CLASS_ID["8-bit"], - CLASS_CODE["Message Router"], - INSTANCE_ID["8-bit"], - b'\x01' + bytes([path_size, 0, *path]) ] - if self.__direct_connections: - forward_close_msg[11:2] = [ - CONNECTION_SIZE['Direct Network'], - b'\x00' - ] - else: - forward_close_msg[11:4] = [ - CONNECTION_SIZE['Backplane'], - b'\x00', - pack_usint(self.attribs['backplane']), - pack_usint(self.attribs['cpu slot']) - ] - request.add(*forward_close_msg) response = request.send() if response: diff --git a/pycomm3/clx.py b/pycomm3/clx.py index 28c17d1..c95aa78 100644 --- a/pycomm3/clx.py +++ b/pycomm3/clx.py @@ -27,10 +27,10 @@ import struct from collections import defaultdict from types import GeneratorType -from typing import Union, List, Sequence, Tuple, Optional, Any +from typing import Union, List, Sequence, Tuple, Optional from autologging import logged - +from functools import wraps from . import DataError, Tag, RequestError from .base import Base from .bytes_ import (pack_dint, pack_uint, pack_udint, pack_usint, unpack_usint, unpack_uint, unpack_dint, unpack_udint, @@ -48,14 +48,14 @@ def with_forward_open(func): """Decorator to ensure a forward open request has been completed with the plc""" - def forward_open_decorator(self, *args, **kwargs): + @wraps(func) + def wrapped(self, *args, **kwargs): if not self.forward_open(): msg = f'Target did not connected. {func.__name__} will not be executed.' - self.__log.warning(msg) raise DataError(msg) return func(self, *args, **kwargs) - return forward_open_decorator + return wrapped @logged @@ -73,7 +73,7 @@ class LogixDriver(Base): """ - def __init__(self, ip_address: str, *args, slot: int = 0, large_packets: bool = True, + def __init__(self, path: str, *args, large_packets: bool = True, init_info: bool = True, init_tags: bool = True, init_program_tags: bool = False, **kwargs): """ :param ip_address: IP address of PLC @@ -83,15 +83,13 @@ def __init__(self, ip_address: str, *args, slot: int = 0, large_packets: bool = :param init_tags: if True, uploads all controller-scoped tag definitions on connect :param init_program_tags: if True, uploads all program-scoped tag definitions on connect """ - super().__init__(*args, **kwargs) + super().__init__(path, *args, **kwargs) self._cache = None self._data_types = {} self._program_names = set() self._tags = {} - self.attribs['ip address'] = ip_address - self.attribs['cpu slot'] = slot self.attribs['extended forward open'] = large_packets self.connection_size = 4000 if large_packets else 500 self.use_instance_ids = True diff --git a/pycomm3/const.py b/pycomm3/const.py index 40799e3..746a40d 100644 --- a/pycomm3/const.py +++ b/pycomm3/const.py @@ -85,16 +85,16 @@ "16-bit": b'\x31', } -# Path are combined as: -# CLASS_ID + PATHS -# For example PCCC path is CLASS_ID["8-bit"]+PATH["PCCC"] -> 0x20, 0x67, 0x24, 0x01. -PATH = { - 'Connection Manager': b'\x06\x24\x01', - 'Router': b'\x02\x24\x01', - 'Backplane Data Type': b'\x66\x24\x01', - 'PCCC': b'\x67\x24\x01', - 'DHCP Channel A': b'\xa6\x24\x01\x01\x2c\x01', - 'DHCP Channel B': b'\xa6\x24\x01\x02\x2c\x01' +PATH_SEGMENTS = { + 'backplane': 0x01, + 'bp': 0x01, + 'enet': 0x02, + 'dhrio-a': 0x02, + 'dhrio-b': 0x03, + 'dnet': 0x02, + 'cnet': 0x02, + 'dh485-a': 0x02, + 'dh485-b': 0x03, } ENCAPSULATION_COMMAND = { # Volume 2: 2-3.2 Command Field UINT 2 byte diff --git a/pycomm3/packets/requests.py b/pycomm3/packets/requests.py index 8529173..60045d7 100644 --- a/pycomm3/packets/requests.py +++ b/pycomm3/packets/requests.py @@ -447,8 +447,6 @@ def _make_write_data_tag(tag_info, value, elements, request_path, fragmented=Fal else: _dt_value = pack_uint(DATA_TYPE[data_type]) - # _val = writable_value(value, elements, data_type) - service = bytes([TAG_SERVICES_REQUEST['Write Tag Fragmented' if fragmented else 'Write Tag']]) request_path = b''.join((service, diff --git a/tests/conftest.py b/tests/conftest.py index fe650c3..30d4436 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,11 +3,10 @@ import os -SLOT = int(os.environ['slot']) -IP_ADDR = os.environ['ip'] +PATH = os.environ['PLCPATH'] @pytest.fixture(scope='module', autouse=True) def plc(): - with LogixDriver(IP_ADDR, slot=SLOT) as plc_: + with LogixDriver(PATH) as plc_: yield plc_ diff --git a/tests/test_plc_init.py b/tests/test_plc_init.py index d6541a4..eea4335 100644 --- a/tests/test_plc_init.py +++ b/tests/test_plc_init.py @@ -2,12 +2,11 @@ import os -SLOT = int(os.environ['slot']) -IP_ADDR = os.environ['ip'] +PATH = os.environ['PLCPATH'] def test_connect_init_none(): - with LogixDriver(IP_ADDR, slot=SLOT, init_info=False, init_tags=False) as plc: + with LogixDriver(PATH, init_info=False, init_tags=False) as plc: assert plc.name is None assert not plc.info assert plc.connected @@ -15,7 +14,7 @@ def test_connect_init_none(): def test_connect_init_info(): - with LogixDriver(IP_ADDR, slot=SLOT, init_info=True, init_tags=False) as plc: + with LogixDriver(PATH, init_info=True, init_tags=False) as plc: # assert plc.name == 'PLCA' assert plc.info['vendor'] == 'Rockwell Automation/Allen-Bradley' assert plc.info['keyswitch'] == 'REMOTE RUN' @@ -24,7 +23,7 @@ def test_connect_init_info(): def test_connect_init_tags(): - with LogixDriver(IP_ADDR, slot=SLOT, init_info=False, init_tags=True) as plc: + with LogixDriver(PATH, init_info=False, init_tags=True) as plc: assert len(plc.tags) > 0 assert isinstance(plc.tags, dict)