Skip to content

Commit

Permalink
- changed the driver to require a single path argument now, inste…
Browse files Browse the repository at this point in the history
…ad of ``ip_address`` and optionally ``slot``

- added support for full CIP routing in the ``path`` arg
- removed direct connection support, unsure of what exactly it is or how to support it
  • Loading branch information
ottowayi authored and ottowayi committed Mar 10, 2020
1 parent cd38ccf commit e2e36f2
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 72 deletions.
15 changes: 11 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,21 @@ PIP:
Basic Usage
-----------

Connect to a PLC and get some basic information,
use the ``slot`` kwarg if the PLC is not in slot 0. CompactLogix leave ``slot=0``.
Connect to a PLC and get some basic information about it. The ``path`` argument is the only one required, and it
has 3 forms:

- IP Address Only (``10.20.30.100``) - Use if PLC is in slot 0 or if connecting to CompactLogix
- IP Address/Slot (``10.20.30.100/1``) - Use if PLC is not in slot 0
- CIP Routing Path (``10.20.30.100/backplane/3/enet/10.20.40.100/backplane/0``) - Use if needing to route thru a backplane
- first 2 examples will replaced with the full path automatically, they're there for convenience.
- ``enet``/``backplane`` (or ``bp``) are for port selection, standard CIP routing but without having to remember
which port is what value.

::

from pycomm3 import LogixDriver

with LogixDriver('10.20.30.100', slot=1) as plc:
with LogixDriver('10.20.30.100/1') as plc:
print(plc)
# OUTPUT:
# Program Name: PLCA, Device: 1756-L83E/B, Revision: 28.13
Expand Down Expand Up @@ -238,7 +245,7 @@ Unit Testing

``pytest`` is used for unit testing. The ``tests`` directory contains an L5X export of the ``Pycomm3_Testing`` program
that contains all tags necessary for testing. The only requirement for testing (besides a running PLC with the testing
program) is the environment variables ``IP`` and ``SLOT`` for the PLC defined.
program) is the environment variable ``PLCPATH`` for the PLC defined.

.. Note::
Test coverage is not complete, pull requests are very much welcome to cover all combinations for reading and writing tags.
Expand Down
2 changes: 1 addition & 1 deletion pycomm3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
# SOFTWARE.
#

__version_info__ = (0, 4, 4)
__version_info__ = (0, 5, 0)
__version__ = '.'.join(f'{x}' for x in __version_info__)

from typing import NamedTuple, Any, Union, Optional
Expand Down
105 changes: 67 additions & 38 deletions pycomm3/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,82 @@
from . import DataError, CommError
from .bytes_ import (pack_usint, pack_udint, pack_uint, pack_dint, unpack_uint, unpack_usint, unpack_udint,
print_bytes_msg, DATA_FUNCTION_SIZE, UNPACK_DATA_FUNCTION)
from .const import (DATA_TYPE, TAG_SERVICES_REQUEST, ENCAPSULATION_COMMAND, EXTENDED_SYMBOL,
from .const import (DATA_TYPE, TAG_SERVICES_REQUEST, ENCAPSULATION_COMMAND, EXTENDED_SYMBOL, PATH_SEGMENTS,
ELEMENT_ID, CLASS_CODE, PADDING_BYTE, CONNECTION_SIZE, CLASS_ID, INSTANCE_ID, FORWARD_CLOSE,
FORWARD_OPEN, LARGE_FORWARD_OPEN, CONNECTION_MANAGER_INSTANCE, PRIORITY, TIMEOUT_MULTIPLIER,
TIMEOUT_TICKS, TRANSPORT_CLASS, UNCONNECTED_SEND, PRODUCT_TYPES, VENDORS, STATES)
from .packets import REQUEST_MAP
from .socket_ import Socket
import socket


def _parse_connection_path(path):
ip, *segments = path.split('/')
if not socket.inet_aton(ip):
raise ValueError('Invalid IP Address', ip)
segments = [_parse_path_segment(s) for s in segments]

if not segments:
_path = [pack_usint(PATH_SEGMENTS['backplane']), b'\x00'] # default backplane/0
elif len(segments) == 1:
_path = [pack_usint(PATH_SEGMENTS['backplane']), pack_usint(segments[0])]
else:
pairs = (segments[i:i + 2] for i in range(0, len(segments), 2))
_path = []
for port, dest in pairs:
if isinstance(dest, bytes):
port |= 1 << 4 # set Extended Link Address bit, CIP Vol 1 C-1.3
dest_len = len(dest)
if dest_len % 2:
dest += b'\x00'
_path.extend([pack_usint(port), pack_usint(dest_len), dest])
else:
_path.extend([pack_usint(port), pack_usint(dest)])

_path += [
CLASS_ID['8-bit'],
CLASS_CODE['Message Router'],
INSTANCE_ID['8-bit'],
b'\x01'
]

_path_bytes = b''.join(_path)

if len(_path_bytes) % 2:
_path_bytes += b'\x00'

return ip, pack_usint(len(_path_bytes) // 2) + _path_bytes


def _parse_path_segment(segment: str):
try:
if segment.isnumeric():
return int(segment)
else:
tmp = PATH_SEGMENTS.get(segment.lower())
if tmp:
return tmp
else:
if socket.inet_aton(segment):
return b''.join(pack_usint(ord(c)) for c in segment)
except:
raise ValueError(f'Failed to parse path segment', segment)

raise ValueError(f'Path segment is invalid', segment)


@logged
class Base:
_sequence = 0

def __init__(self, direct_connection=False, debug=False):
def __init__(self, path, debug=False):
if Base._sequence == 0:
Base._sequence = getpid()
else:
Base._sequence = Base._get_sequence()

self._sock = None
self.__direct_connections = direct_connection
# self.__direct_connections = direct_connection
self._debug = debug
self._session = 0
self._connection_opened = False
Expand All @@ -60,21 +116,22 @@ def __init__(self, direct_connection=False, debug=False):
self._last_tag_write = ()
self._info = {}
self.connection_size = 500
ip, _path = _parse_connection_path(path)

self.attribs = {
'context': b'_pycomm_',
'protocol version': b'\x01\x00',
'rpi': 5000,
'port': 0xAF12, # 44818
'timeout': 10,
'backplane': 1,
'cpu slot': 0,
'ip address': ip,
'cip_path': _path,
'option': 0,
'cid': b'\x27\x04\x19\x71',
'csn': b'\x27\x04',
'vid': b'\x09\x10',
'vsn': b'\x09\x10\x19\x71',
'name': 'Base',
'ip address': None,
'extended forward open': False}

def __len__(self):
Expand Down Expand Up @@ -207,19 +264,10 @@ def forward_open(self):

init_net_params = (True << 9) | (0 << 10) | (2 << 13) | (False << 15)
if self.attribs['extended forward open']:
connection_size = 4002
net_params = pack_udint((self.connection_size & 0xFFFF) | init_net_params << 16)
else:
connection_size = 500
net_params = pack_uint((self.connection_size & 0x01FF) | init_net_params)

if self.__direct_connections:
connection_params = [CONNECTION_SIZE['Direct Network'], CLASS_ID["8-bit"], CLASS_CODE["Message Router"]]
else:
connection_params = [
CONNECTION_SIZE['Backplane'],
]

forward_open_msg = [
FORWARD_OPEN if not self.attribs['extended forward open'] else LARGE_FORWARD_OPEN,
b'\x02', # CIP Path size
Expand All @@ -241,12 +289,7 @@ def forward_open(self):
b'\x01\x40\x20\x00',
net_params,
TRANSPORT_CLASS,
*connection_params,
pack_usint(self.attribs['backplane']),
pack_usint(self.attribs['cpu slot']),
b'\x20\x02',
INSTANCE_ID["8-bit"],
b'\x01'
self.attribs['cip_path']
]
request = self.new_request('send_rr_data')
request.add(*forward_open_msg)
Expand All @@ -271,6 +314,8 @@ def forward_close(self):
raise CommError("A session need to be registered before to call forward_close.")
request = self.new_request('send_rr_data')

path_size, *path = self.attribs['cip_path'] # for some reason we need to add a 0x00 between these? CIP Vol 1

forward_close_msg = [
FORWARD_CLOSE,
b'\x02',
Expand All @@ -283,25 +328,9 @@ def forward_close(self):
self.attribs['csn'],
self.attribs['vid'],
self.attribs['vsn'],
CLASS_ID["8-bit"],
CLASS_CODE["Message Router"],
INSTANCE_ID["8-bit"],
b'\x01'
bytes([path_size, 0, *path])
]

if self.__direct_connections:
forward_close_msg[11:2] = [
CONNECTION_SIZE['Direct Network'],
b'\x00'
]
else:
forward_close_msg[11:4] = [
CONNECTION_SIZE['Backplane'],
b'\x00',
pack_usint(self.attribs['backplane']),
pack_usint(self.attribs['cpu slot'])
]

request.add(*forward_close_msg)
response = request.send()
if response:
Expand Down
16 changes: 7 additions & 9 deletions pycomm3/clx.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import struct
from collections import defaultdict
from types import GeneratorType
from typing import Union, List, Sequence, Tuple, Optional, Any
from typing import Union, List, Sequence, Tuple, Optional

from autologging import logged

from functools import wraps
from . import DataError, Tag, RequestError
from .base import Base
from .bytes_ import (pack_dint, pack_uint, pack_udint, pack_usint, unpack_usint, unpack_uint, unpack_dint, unpack_udint,
Expand All @@ -48,14 +48,14 @@
def with_forward_open(func):
"""Decorator to ensure a forward open request has been completed with the plc"""

def forward_open_decorator(self, *args, **kwargs):
@wraps(func)
def wrapped(self, *args, **kwargs):
if not self.forward_open():
msg = f'Target did not connected. {func.__name__} will not be executed.'
self.__log.warning(msg)
raise DataError(msg)
return func(self, *args, **kwargs)

return forward_open_decorator
return wrapped


@logged
Expand All @@ -73,7 +73,7 @@ class LogixDriver(Base):
"""

def __init__(self, ip_address: str, *args, slot: int = 0, large_packets: bool = True,
def __init__(self, path: str, *args, large_packets: bool = True,
init_info: bool = True, init_tags: bool = True, init_program_tags: bool = False, **kwargs):
"""
:param ip_address: IP address of PLC
Expand All @@ -83,15 +83,13 @@ def __init__(self, ip_address: str, *args, slot: int = 0, large_packets: bool =
:param init_tags: if True, uploads all controller-scoped tag definitions on connect
:param init_program_tags: if True, uploads all program-scoped tag definitions on connect
"""
super().__init__(*args, **kwargs)
super().__init__(path, *args, **kwargs)
self._cache = None

self._data_types = {}
self._program_names = set()
self._tags = {}

self.attribs['ip address'] = ip_address
self.attribs['cpu slot'] = slot
self.attribs['extended forward open'] = large_packets
self.connection_size = 4000 if large_packets else 500
self.use_instance_ids = True
Expand Down
20 changes: 10 additions & 10 deletions pycomm3/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@
"16-bit": b'\x31',
}

# Path are combined as:
# CLASS_ID + PATHS
# For example PCCC path is CLASS_ID["8-bit"]+PATH["PCCC"] -> 0x20, 0x67, 0x24, 0x01.
PATH = {
'Connection Manager': b'\x06\x24\x01',
'Router': b'\x02\x24\x01',
'Backplane Data Type': b'\x66\x24\x01',
'PCCC': b'\x67\x24\x01',
'DHCP Channel A': b'\xa6\x24\x01\x01\x2c\x01',
'DHCP Channel B': b'\xa6\x24\x01\x02\x2c\x01'
PATH_SEGMENTS = {
'backplane': 0x01,
'bp': 0x01,
'enet': 0x02,
'dhrio-a': 0x02,
'dhrio-b': 0x03,
'dnet': 0x02,
'cnet': 0x02,
'dh485-a': 0x02,
'dh485-b': 0x03,
}

ENCAPSULATION_COMMAND = { # Volume 2: 2-3.2 Command Field UINT 2 byte
Expand Down
2 changes: 0 additions & 2 deletions pycomm3/packets/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,6 @@ def _make_write_data_tag(tag_info, value, elements, request_path, fragmented=Fal
else:
_dt_value = pack_uint(DATA_TYPE[data_type])

# _val = writable_value(value, elements, data_type)

service = bytes([TAG_SERVICES_REQUEST['Write Tag Fragmented' if fragmented else 'Write Tag']])

request_path = b''.join((service,
Expand Down
5 changes: 2 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import os


SLOT = int(os.environ['slot'])
IP_ADDR = os.environ['ip']
PATH = os.environ['PLCPATH']


@pytest.fixture(scope='module', autouse=True)
def plc():
with LogixDriver(IP_ADDR, slot=SLOT) as plc_:
with LogixDriver(PATH) as plc_:
yield plc_
9 changes: 4 additions & 5 deletions tests/test_plc_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@
import os


SLOT = int(os.environ['slot'])
IP_ADDR = os.environ['ip']
PATH = os.environ['PLCPATH']


def test_connect_init_none():
with LogixDriver(IP_ADDR, slot=SLOT, init_info=False, init_tags=False) as plc:
with LogixDriver(PATH, init_info=False, init_tags=False) as plc:
assert plc.name is None
assert not plc.info
assert plc.connected
assert plc._session != 0


def test_connect_init_info():
with LogixDriver(IP_ADDR, slot=SLOT, init_info=True, init_tags=False) as plc:
with LogixDriver(PATH, init_info=True, init_tags=False) as plc:
# assert plc.name == 'PLCA'
assert plc.info['vendor'] == 'Rockwell Automation/Allen-Bradley'
assert plc.info['keyswitch'] == 'REMOTE RUN'
Expand All @@ -24,7 +23,7 @@ def test_connect_init_info():


def test_connect_init_tags():
with LogixDriver(IP_ADDR, slot=SLOT, init_info=False, init_tags=True) as plc:
with LogixDriver(PATH, init_info=False, init_tags=True) as plc:
assert len(plc.tags) > 0
assert isinstance(plc.tags, dict)

0 comments on commit e2e36f2

Please sign in to comment.