From a195cf44be316bd3799916133a1e75844e508e4e Mon Sep 17 00:00:00 2001 From: Sunghyun Cho Date: Wed, 10 Jun 2026 18:39:12 +0900 Subject: [PATCH] Add defined-tags support for nested Logix UDTs Introduce a generic --defined-tags loader that accepts either a JSON file path or inline JSON text and registers controller tags, nested UDT members, and template metadata against a shared backing buffer. Expose Symbol Object and Template Object responses so clients can enumerate controller tags and expand UDT definitions through @tags and @udt/ style requests. Prefer longest symbolic tag matches during resolution so direct nested member access, including BOOL leaves and atomic arrays, works alongside root STRUCT reads and writes. Support contribution-ready schema inputs by accepting explicit CIP typeCode values, inferring atomic and template metadata when possible, and honoring explicit BOOL bit positions while retaining the compact JSON shorthand. Add focused tests for tag listing, UDT payload generation, shared-buffer nested writes, explicit typeCode schemas, UTF-8 JSON loading, inline JSON loading, and symbol/template object responses. --- server/enip/defined_tags.py | 647 +++++++++++++++++++++++++++++++ server/enip/defined_tags_test.py | 298 ++++++++++++++ server/enip/device.py | 30 ++ server/enip/main.py | 11 +- 4 files changed, 983 insertions(+), 3 deletions(-) create mode 100644 server/enip/defined_tags.py create mode 100644 server/enip/defined_tags_test.py diff --git a/server/enip/defined_tags.py b/server/enip/defined_tags.py new file mode 100644 index 0000000..0a043de --- /dev/null +++ b/server/enip/defined_tags.py @@ -0,0 +1,647 @@ +from __future__ import absolute_import, print_function, division + +import io +import json +import struct + +from ...automata import dfa, state +from ...dotdict import dotdict +from . import device +from . import parser + + +TYPE_IS_STRUCT = 0x8000 +TYPE_IS_ARRAY = 0x2000 + +GET_INSTANCE_ATTRIBUTE_LIST_REQ = 0x55 +GET_INSTANCE_ATTRIBUTE_LIST_RPY = GET_INSTANCE_ATTRIBUTE_LIST_REQ | 0x80 +GET_INSTANCE_ATTRIBUTE_LIST_CTX = "get_instance_attribute_list" +READ_TEMPLATE_REQ = 0x4C +READ_TEMPLATE_RPY = READ_TEMPLATE_REQ | 0x80 + +try: + string_types = (basestring,) +except NameError: + string_types = (str,) + +_registry = None + + +class ControllerTag(object): + def __init__(self, instance_id, name, template_id, size, dimensions=None): + self.instance_id = instance_id + self.name = name + self.template_id = template_id + self.size = size + self.dimensions = (dimensions or [0, 0, 0])[:3] + while len(self.dimensions) < 3: + self.dimensions.append(0) + self.type_code = TYPE_IS_STRUCT | template_id + + +class UdtDefinition(object): + def __init__(self, template_id, name, size, fields): + self.template_id = template_id + self.name = name + self.size = size + self.fields = fields + + +class UdtField(object): + def __init__(self, name, type_code, offset, metadata=0, template_id=None, atomic_type=None, bit=None): + self.name = name + self.type_code = type_code + self.offset = offset + self.metadata = metadata + self.template_id = template_id + self.atomic_type = atomic_type + self.bit = bit + + @property + def is_struct(self): + return (self.type_code & TYPE_IS_STRUCT) != 0 and self.template_id is not None + + @property + def is_array(self): + return (self.type_code & TYPE_IS_ARRAY) != 0 + + +class RawStructParser(parser.STRUCT): + def __init__(self, structure_tag, size): + self.structure_tag = structure_tag + self.struct_calcsize = size + super(RawStructParser, self).__init__(structure_tag=structure_tag) + + +class BackingBuffer(object): + def __init__(self, size): + self.data = bytearray(size) + + def read(self, offset, size): + return bytes(self.data[offset:offset + size]) + + def write(self, offset, payload): + end = offset + len(payload) + assert end <= len(self.data) + self.data[offset:end] = bytearray(payload) + + def read_values(self, offset, count, type_cls): + type_parser = type_cls() + size = type_parser.struct_calcsize + fmt = type_parser.struct_format + return [ + struct.unpack_from(fmt, self.data, offset + (index * size))[0] + for index in range(count) + ] + + def write_values(self, offset, values, type_cls): + type_parser = type_cls() + size = type_parser.struct_calcsize + fmt = type_parser.struct_format + for index, value in enumerate(values): + struct.pack_into(fmt, self.data, offset + (index * size), value) + + def read_bit(self, offset, bit): + return bool(self.data[offset] & (1 << bit)) + + def write_bit(self, offset, bit, value): + if value: + self.data[offset] |= 1 << bit + else: + self.data[offset] &= ~(1 << bit) + + +class StructSliceAttribute(object): + error = 0x00 + + def __init__(self, name, backing, offset, size, structure_tag): + self.name = name + self.backing = backing + self.offset = offset + self.size = size + self.parser = RawStructParser(structure_tag=structure_tag, size=size) + + def __len__(self): + return 1 + + def __getitem__(self, key): + self._validate_key(key) + record = dotdict() + record.data = dotdict() + record.data.input = self.backing.read(self.offset, self.size) + return [record] if isinstance(key, slice) else record + + def __setitem__(self, key, value): + self._validate_key(key) + payload = self._coerce_payload(value) + self.backing.write(self.offset, payload[:self.size]) + + def _validate_key(self, key): + if isinstance(key, slice): + start, stop, stride = key.indices(1) + if start == 0 and stop == 1 and stride == 1: + return + elif key == 0: + return + raise KeyError("Unsupported STRUCT slice for {0}: {1!r}".format(self.name, key)) + + def _coerce_payload(self, value): + if isinstance(value, list): + value = value[0] + if hasattr(value, "input"): + return bytes(bytearray(value.input)) + if hasattr(value, "data") and hasattr(value.data, "input"): + return bytes(bytearray(value.data.input)) + return bytes(bytearray(value)) + + +class AtomicArrayAttribute(object): + error = 0x00 + + def __init__(self, name, backing, offset, count, type_cls): + self.name = name + self.backing = backing + self.offset = offset + self.count = count + self.type_cls = type_cls + self.parser = type_cls() + + def __len__(self): + return self.count + + def __getitem__(self, key): + values = self.backing.read_values(self.offset, self.count, self.type_cls) + return values[key] + + def __setitem__(self, key, value): + values = self.backing.read_values(self.offset, self.count, self.type_cls) + if isinstance(key, slice): + values[key] = list(value) + else: + values[key] = value + self.backing.write_values(self.offset, values, self.type_cls) + + +class BoolAttribute(object): + error = 0x00 + + def __init__(self, name, backing, offset, bit): + self.name = name + self.backing = backing + self.offset = offset + self.bit = bit + self.parser = parser.BOOL() + + def __len__(self): + return 1 + + def __getitem__(self, key): + self._validate_key(key) + value = self.backing.read_bit(self.offset, self.bit) + return [value] if isinstance(key, slice) else value + + def __setitem__(self, key, value): + self._validate_key(key) + if isinstance(key, slice): + value = next(iter(value)) + self.backing.write_bit(self.offset, self.bit, bool(value)) + + def _validate_key(self, key): + if isinstance(key, slice): + start, stop, stride = key.indices(1) + if start == 0 and stop == 1 and stride == 1: + return + elif key == 0: + return + raise KeyError("Unsupported BOOL slice for {0}: {1!r}".format(self.name, key)) + + +class BytesAttribute(object): + error = 0x00 + + def __init__(self, name, payload): + self.name = name + self.payload = list(bytearray(payload)) + self.parser = parser.USINT() + + def __len__(self): + return len(self.payload) + + def __getitem__(self, key): + return self.payload[key] + + def __setitem__(self, key, value): + self.payload[key] = value + + +class DefinedTagsRegistry(object): + def __init__(self, controller_tags, templates): + self.controller_tags = controller_tags + self.templates = templates + + @classmethod + def from_file(cls, path): + with io.open(path, "r", encoding="utf-8") as schema_file: + return cls.from_dict(json.load(schema_file)) + + @classmethod + def from_json(cls, text): + return cls.from_dict(json.loads(text)) + + @classmethod + def from_source(cls, source): + if _looks_like_json_text(source): + return cls.from_json(source) + return cls.from_file(source) + + @classmethod + def from_dict(cls, schema): + templates = _parse_templates(schema["templates"]) + controller_tags = [ + ControllerTag( + _as_int(item.get("id", index + 1)), + item["name"], + _as_int(item.get("templateId", item.get("typeTemplateId"))), + _as_int(item["size"]), + [_as_int(value) for value in item.get("dimensions", [0, 0, 0])], + ) + for index, item in enumerate(schema["controllerTags"]) + ] + return cls(controller_tags, templates) + + def build_tags(self): + tags = dotdict() + for controller_tag in self.controller_tags: + backing = BackingBuffer(controller_tag.size) + root_template = self.templates[controller_tag.template_id] + self._add_struct_tag( + tags, + controller_tag.name, + controller_tag.name, + backing, + 0, + controller_tag.size, + controller_tag.template_id, + ) + self._add_template_members(tags, controller_tag.name, backing, 0, root_template, {}) + + _add_tag(tags, "@tags", BytesAttribute("@tags", self.build_tag_listing_payload())) + for template_id in self.templates: + _add_tag(tags, "@udt/{0}".format(template_id), BytesAttribute( + "@udt/{0}".format(template_id), + self.build_udt_payload(template_id), + )) + return tags + + def build_tag_listing_payload(self): + payload = bytearray() + for controller_tag in self.controller_tags: + payload += _tag_info_record( + controller_tag.instance_id, + controller_tag.type_code, + controller_tag.size, + controller_tag.dimensions, + controller_tag.name, + ) + return bytes(payload) + + def build_udt_payload(self, template_id): + definition = self.templates[int(template_id)] + payload = bytearray() + payload += struct.pack( + "