Source code for linkforge.core.parsers.srdf_parser

"""SRDF XML parser for LinkForge.

This module implements a robust SRDF (Semantic Robot Description Format) parser
that supports MoveIt-style tags and native XACRO resolution.
"""

from __future__ import annotations

import io
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any, TypeVar

from .._utils.xml_utils import (
    get_xml_namespace,
    parse_float,
    strip_xml_namespace,
)
from ..base import IResourceResolver
from ..constants import (
    MAX_FILE_SIZE,
    MAX_XML_DEPTH,
    XACRO_URIS,
)
from ..exceptions import (
    RobotParserError,
    RobotParserIOError,
    RobotParserUnexpectedError,
    RobotParserXMLRootError,
)
from ..logging_config import get_logger
from ..models.srdf import (
    Chain,
    CollisionPair,
    EndEffector,
    GroupState,
    JointProperty,
    LinkSphereApproximation,
    PassiveJoint,
    PlanningGroup,
    SemanticRobotDescription,
    SrdfSphere,
    VirtualJoint,
)
from .xml_base import RobotXMLParser

# Define a TypeVar for generic collection parsing
T = TypeVar("T")

logger = get_logger(__name__)


[docs] class SRDFParser(RobotXMLParser[SemanticRobotDescription]): """Semantic Robot Description Format (SRDF) Parser. This parser converts SRDF XML content into a structured ``SemanticRobotDescription`` model. It supports MoveIt-specific tags such as planning groups, end effectors, and collision disabling. """
[docs] def __init__( self, max_file_size: int = MAX_FILE_SIZE, sandbox_root: Path | None = None, resource_resolver: IResourceResolver | None = None, ) -> None: """Initialize SRDF parser. Args: max_file_size: Maximum allowed file size in bytes. sandbox_root: Optional root directory for security sandbox. resource_resolver: Optional resolver for URIs. """ super().__init__( max_file_size=max_file_size, sandbox_root=sandbox_root, resource_resolver=resource_resolver, )
def _detect_xacro_content(self, root: ET.Element) -> None: """Detect if the XML contains unexpanded XACRO macros. Args: root: The XML root element. Raises: RobotParserUnexpectedError: If XACRO is detected. """ is_xacro = False for child in root: if get_xml_namespace(child.tag) in XACRO_URIS: is_xacro = True break if not is_xacro: for elem in root.iter(): if any("${" in v or "$(" in v for v in elem.attrib.values() if isinstance(v, str)): is_xacro = True break if is_xacro: raise RobotParserUnexpectedError( source_area="SRDF Parser", original_error="Unexpanded XACRO detected in SRDF. Please use parse_xacro().", ) def _parse_planning_group(self, group_elem: ET.Element) -> PlanningGroup | None: """Parse a <group> element into a PlanningGroup model. Args: group_elem: The XML element for the group. Returns: A populated PlanningGroup instance or None if invalid. """ name = group_elem.get("name") if not name: logger.warning("SRDF: Planning group missing name attribute, skipping") return None links: list[str] = [] joints: list[str] = [] chains: list[Chain] = [] subgroups: list[str] = [] for link_elem in group_elem.findall("{*}link"): link_name = link_elem.get("name") if link_name: links.append(link_name) for joint_elem in group_elem.findall("{*}joint"): joint_name = joint_elem.get("name") if joint_name: joints.append(joint_name) for chain_elem in group_elem.findall("{*}chain"): base = chain_elem.get("base_link") tip = chain_elem.get("tip_link") if base and tip: chains.append(Chain(base_link=base, tip_link=tip)) for subgroup_elem in group_elem.findall("{*}group"): subgroup_name = subgroup_elem.get("name") if subgroup_name: subgroups.append(subgroup_name) try: return PlanningGroup( name=name, links=tuple(links), joints=tuple(joints), chains=tuple(chains), subgroups=tuple(subgroups), ) except Exception as e: logger.warning(f"SRDF: Skipping planning group '{name}': {e}") return None def _parse_group_state(self, state_elem: ET.Element) -> GroupState | None: """Parse a <group_state> element into a GroupState model. Args: state_elem: The XML element for the group state. Returns: A populated GroupState instance, or None if invalid. """ name = state_elem.get("name") group = state_elem.get("group") if not name or not group: logger.warning("SRDF: Group state missing name or group attribute, skipping") return None joint_values: dict[str, tuple[float, ...]] = {} for joint_elem in state_elem.findall("{*}joint"): j_name = joint_elem.get("name") j_val_str = joint_elem.get("value") if not j_name or j_val_str is None: logger.warning( f"SRDF: Joint in group state '{name}' missing name or value, skipping" ) continue try: # Parse space-separated floats vals = tuple(parse_float(v, f"joint {j_name} value") for v in j_val_str.split()) if vals: joint_values[j_name] = vals except Exception as e: logger.warning( f"SRDF: Invalid joint value for '{j_name}' in group state '{name}': {e}" ) try: return GroupState(name=name, group=group, joint_values=joint_values) except Exception as e: logger.warning(f"SRDF: Skipping group state '{name}': {e}") return None
[docs] def parse_string( self, content: str, **_kwargs: Any, ) -> SemanticRobotDescription: """Parse SRDF content from a string. Args: content: The raw SRDF XML string. **kwargs: Additional options for future extensions. Returns: A SemanticRobotDescription model representing the SRDF. Raises: RobotParserUnexpectedError: If the XML is malformed. RobotParserXMLRootError: If the root tag is not <robot>. """ self._validate_content(content) f = io.StringIO(content) try: context = ET.iterparse(f, events=("start", "end")) _, root = next(context) except ET.ParseError as e: raise RobotParserUnexpectedError(source_area="SRDF parse", original_error=e) from e except StopIteration: raise RobotParserUnexpectedError( source_area="SRDF parse", original_error="Empty or truncated XML" ) from None except Exception as e: raise RobotParserUnexpectedError( source_area="Unexpected SRDF parse", original_error=e ) from e if strip_xml_namespace(root.tag) != "robot": raise RobotParserXMLRootError(root.tag) try: return self._parse_from_context(context, root) except ET.ParseError as e: raise RobotParserUnexpectedError(source_area="SRDF parse", original_error=e) from e except RobotParserError: raise except Exception as e: raise RobotParserUnexpectedError( source_area="Unexpected SRDF parse", original_error=e ) from e
def _parse_from_context(self, context: Any, root: ET.Element) -> SemanticRobotDescription: """Process iterative XML parsing internally for O(1) memory complexity. Args: context: The iterparse context. root: The root <robot> element. Returns: A fully populated SemanticRobotDescription model. Raises: RobotParserUnexpectedError: If XML nesting exceeds MAX_XML_DEPTH. """ self._detect_xacro_content(root) robot_name = root.get("name", "") virtual_joints: list[VirtualJoint] = [] groups: list[PlanningGroup] = [] group_states: list[GroupState] = [] end_effectors: list[EndEffector] = [] passive_joints: list[PassiveJoint] = [] disabled_collisions: list[CollisionPair] = [] enabled_collisions: list[CollisionPair] = [] no_default_collision_links: list[str] = [] link_sphere_approximations: list[LinkSphereApproximation] = [] joint_properties: list[JointProperty] = [] depth = 0 for event, elem in context: if event == "start": depth += 1 if depth > MAX_XML_DEPTH: raise RobotParserUnexpectedError( source_area="XML nesting", original_error=depth ) else: if depth == 1: tag = strip_xml_namespace(elem.tag) if tag == "virtual_joint": vj = self._parse_virtual_joint_elem(elem) if vj: virtual_joints.append(vj) elif tag == "group": g = self._parse_planning_group(elem) if g: groups.append(g) elif tag == "group_state": gs = self._parse_group_state(elem) if gs: group_states.append(gs) elif tag == "end_effector": ee = self._parse_end_effector_elem(elem) if ee: end_effectors.append(ee) elif tag == "disable_collisions": cp = self._parse_collision_pair_elem(elem) if cp: disabled_collisions.append(cp) elif tag == "enable_collisions": cp = self._parse_collision_pair_elem(elem) if cp: enabled_collisions.append(cp) elif tag == "passive_joint": pj_name = elem.get("name") if pj_name: passive_joints.append(PassiveJoint(name=pj_name)) elif tag == "disable_default_collisions": link = elem.get("link") if link: no_default_collision_links.append(link) elif tag == "link_sphere_approximation": lsa = self._parse_link_sphere_approximation_elem(elem) if lsa: link_sphere_approximations.append(lsa) elif tag == "joint_property": jp = self._parse_joint_property_elem(elem) if jp: joint_properties.append(jp) # Clear element to free memory (O(1) complexity) root.clear() depth -= 1 # Final cross-reference validation self._validate_cross_references(groups, group_states, end_effectors) return SemanticRobotDescription( robot_name=robot_name, virtual_joints=tuple(virtual_joints), groups=tuple(groups), group_states=tuple(group_states), end_effectors=tuple(end_effectors), passive_joints=tuple(passive_joints), disabled_collisions=tuple(disabled_collisions), enabled_collisions=tuple(enabled_collisions), no_default_collision_links=tuple(no_default_collision_links), link_sphere_approximations=tuple(link_sphere_approximations), joint_properties=tuple(joint_properties), ) def _validate_cross_references( self, groups: list[PlanningGroup], group_states: list[GroupState], end_effectors: list[EndEffector], ) -> None: """Validate that group states and end effectors refer to existing groups. Args: groups: List of parsed planning groups. group_states: List of parsed group states. end_effectors: List of parsed end effectors. """ group_names = {g.name for g in groups} for gs in group_states: if gs.group not in group_names: logger.warning( f"SRDF: Group state '{gs.name}' refers to unknown group '{gs.group}'" ) for ee in end_effectors: if ee.group not in group_names: logger.warning( f"SRDF: End effector '{ee.name}' refers to unknown group '{ee.group}'" ) def _parse_virtual_joint_elem(self, elem: ET.Element) -> VirtualJoint | None: """Parse a <virtual_joint> element into a VirtualJoint model. Args: elem: The XML element for the virtual joint. Returns: A populated VirtualJoint instance, or None if invalid. """ name = elem.get("name") vtype = elem.get("type") parent = elem.get("parent_frame") child = elem.get("child_link") if not name or not vtype or not parent or not child: logger.warning("SRDF: Virtual joint missing required attributes, skipping") return None try: return VirtualJoint( name=name, type=vtype, parent_frame=parent, child_link=child, ) except Exception as e: logger.warning(f"SRDF: Skipping virtual joint '{name}': {e}") return None def _parse_end_effector_elem(self, elem: ET.Element) -> EndEffector | None: """Parse an <end_effector> element into an EndEffector model. Args: elem: The XML element for the end effector. Returns: A populated EndEffector instance, or None if invalid. """ name = elem.get("name") group = elem.get("group") parent = elem.get("parent_link") if not name or not group or not parent: logger.warning("SRDF: End effector missing required attributes, skipping") return None try: return EndEffector( name=name, group=group, parent_link=parent, parent_group=elem.get("parent_group"), ) except Exception as e: logger.warning(f"SRDF: Skipping end effector '{name}': {e}") return None def _parse_collision_pair_elem(self, elem: ET.Element) -> CollisionPair | None: """Parse a collision rule element into a CollisionPair model. Args: elem: The XML element for the collision pair (disable/enable). Returns: A populated CollisionPair instance, or None if invalid. """ link1 = elem.get("link1") link2 = elem.get("link2") if not link1 or not link2: logger.warning("SRDF: Collision pair missing link1 or link2, skipping") return None try: return CollisionPair( link1=link1, link2=link2, reason=elem.get("reason"), ) except Exception as e: logger.warning(f"SRDF: Skipping collision pair '{link1}/{link2}': {e}") return None def _parse_link_sphere_approximation_elem( self, elem: ET.Element ) -> LinkSphereApproximation | None: """Parse a <link_sphere_approximation> element into a model. Args: elem: The XML element for the sphere approximation. Returns: A populated LinkSphereApproximation instance, or None if invalid. """ link = elem.get("link") if not link: logger.warning("SRDF: Link sphere approximation missing link attribute, skipping") return None spheres: list[SrdfSphere] = [] for sphere_elem in elem.findall("{*}sphere"): center_str = sphere_elem.get("center") radius_str = sphere_elem.get("radius") if not center_str or not radius_str: logger.warning(f"SRDF: Sphere in link '{link}' missing center or radius, skipping") continue try: cx, cy, cz = (parse_float(v, "sphere center") for v in center_str.split()) r = parse_float(radius_str, "sphere radius") spheres.append(SrdfSphere(center_x=cx, center_y=cy, center_z=cz, radius=r)) except Exception as e: logger.warning(f"SRDF: Invalid sphere in link '{link}': {e}") try: return LinkSphereApproximation(link=link, spheres=tuple(spheres)) except Exception as e: logger.warning(f"SRDF: Skipping sphere approximation for link '{link}': {e}") return None def _parse_joint_property_elem(self, elem: ET.Element) -> JointProperty | None: """Parse a <joint_property> element into a JointProperty model. Args: elem: The XML element for the joint property. Returns: A populated JointProperty instance, or None if invalid. """ joint_name = elem.get("joint_name") property_name = elem.get("property_name") value = elem.get("value") if not joint_name or not property_name or not value: logger.warning("SRDF: Joint property missing required attributes, skipping") return None try: return JointProperty( joint_name=joint_name, property_name=property_name, value=value, ) except Exception as e: logger.warning(f"SRDF: Skipping joint property for '{joint_name}': {e}") return None
[docs] def parse(self, filepath: Path, **_kwargs: Any) -> SemanticRobotDescription: """Load and parse an SRDF file from disk. Args: filepath: Path to the .srdf file. **kwargs: Additional options (unused). Returns: A SemanticRobotDescription model. Raises: RobotParserIOError: If the file is missing or exceeds max_file_size. RobotParserXMLRootError: If the root tag is not <robot>. """ self._validate_file(filepath) try: context = ET.iterparse(str(filepath), events=("start", "end")) _, root = next(context) if strip_xml_namespace(root.tag) != "robot": raise RobotParserXMLRootError(root.tag) return self._parse_from_context(context, root) except ET.ParseError as e: raise RobotParserUnexpectedError(source_area="SRDF file parse", original_error=e) from e except StopIteration: raise RobotParserUnexpectedError( source_area="SRDF file parse", original_error="Empty or truncated XML" ) from None except Exception as e: if isinstance(e, RobotParserError): raise raise RobotParserIOError(filepath=filepath, reason=str(e)) from e