"""URDF XML parser to import robot models.
This module implements a robust URDF/XACRO parser that transforms XML robot
descriptions into the internal LinkForge `Robot` model. It is designed for
high-fidelity round-tripping and includes:
- **Comprehensive Parsing**: Full support for all URDF joint types, multiple
visual/collision elements, and complex nested structures.
- **Security & Validation**: Built-in protection against XML attacks (depth
limits), strict validation of mesh paths, numeric values, and rejection of
unresolved XACRO files to maintain model integrity.
- **Error Resilience**: Informative error messages and fallback mechanisms
for malformed or incomplete XML configurations.
- **Fidelity Preservation**: Captures metadata, sensor noise, and ROS 2
control parameters to ensure nothing is lost during the Blender import process.
"""
from __future__ import annotations
import io
import xml.etree.ElementTree as ET
from dataclasses import replace
from pathlib import Path
from typing import Any, cast
from .._utils.math_utils import normalize_vector
from .._utils.xml_utils import (
get_xml_namespace,
parse_float,
parse_int,
parse_optional_bool,
parse_optional_float,
parse_vector3,
strip_xml_namespace,
)
from ..base import IResourceResolver
from ..constants import (
CONTROL_TYPE_SYSTEM,
DEFAULT_CAMERA_FAR,
DEFAULT_CAMERA_FORMAT,
DEFAULT_CAMERA_FOV,
DEFAULT_CAMERA_HEIGHT,
DEFAULT_CAMERA_NEAR,
DEFAULT_CAMERA_WIDTH,
DEFAULT_LIDAR_MAX_ANGLE,
DEFAULT_LIDAR_MIN_ANGLE,
DEFAULT_LIDAR_RANGE_MAX,
DEFAULT_LIDAR_RANGE_MIN,
DEFAULT_LIDAR_SAMPLES,
DEFAULT_UPDATE_RATE,
DEFAULT_URDF_AXIS_XYZ,
DEFAULT_URDF_AXIS_XYZ_STR,
HW_IF_POSITION,
JOINT_CONTINUOUS,
JOINT_FIXED,
JOINT_FLOATING,
JOINT_PLANAR,
JOINT_PRISMATIC,
JOINT_REVOLUTE,
MAX_FILE_SIZE,
MAX_XML_DEPTH,
NOISE_GAUSSIAN,
UNNAMED_JOINT,
UNNAMED_LINK,
XACRO_URIS,
)
from ..exceptions import (
RobotModelError,
RobotParserIOError,
RobotParserUnexpectedError,
RobotParserXMLRootError,
RobotValidationError,
ValidationErrorCode,
XacroDetectedError,
)
from ..logging_config import get_logger
from ..models import (
CameraInfo,
Collision,
ContactInfo,
ForceTorqueInfo,
GazeboElement,
GazeboPlugin,
GPSInfo,
IMUInfo,
Joint,
JointCalibration,
JointDynamics,
JointLimits,
JointMimic,
JointSafetyController,
JointType,
LidarInfo,
Link,
Material,
Robot,
Ros2Control,
Ros2ControlJoint,
Ros2ControlSensor,
Sensor,
SensorNoise,
SensorType,
Transform,
Transmission,
TransmissionActuator,
TransmissionJoint,
Vector3,
Visual,
)
from .xml_base import RobotXMLParser
logger = get_logger(__name__)
[docs]
class URDFParser(RobotXMLParser[Robot]):
"""Refined URDF Parser using a class-based interface."""
[docs]
def __init__(
self,
max_file_size: int = MAX_FILE_SIZE,
sandbox_root: Path | None = None,
resource_resolver: IResourceResolver | None = None,
) -> None:
"""Initialize parser.
Args:
max_file_size: Maximum allowed file size in bytes (default: 100MB)
sandbox_root: Optional root directory for security sandbox
resource_resolver: Optional resolver for URIs
"""
super().__init__(
max_file_size=max_file_size,
sandbox_root=sandbox_root,
resource_resolver=resource_resolver,
)
def _parse_link(
self,
link_elem: ET.Element,
materials: dict[str, Material],
source_directory: Path | None = None,
) -> Link:
"""Parse a <link> element into a Link model.
Args:
link_elem: The XML element for the link.
materials: Dictionary of global materials for resolution.
source_directory: Base directory for relative mesh paths.
Returns:
A populated Link model.
"""
name = link_elem.get("name", UNNAMED_LINK)
# Parse Visuals
visuals = [
v
for visual_elem in link_elem.findall("{*}visual")
if (v := self._parse_visual_element(visual_elem, materials, source_directory))
]
# Parse Collisions
collisions = [
c
for collision_elem in link_elem.findall("{*}collision")
if (c := self._parse_collision_element(collision_elem, source_directory))
]
inertial = self._parse_inertial_element(link_elem.find("{*}inertial"))
return Link(
name=name,
visuals=visuals,
collisions=collisions,
inertial=inertial,
)
def _parse_visual_element(
self,
visual_elem: ET.Element,
materials: dict[str, Material],
source_directory: Path | None = None,
) -> Visual | None:
"""Parse a single <visual> element.
Args:
visual_elem: The visual XML element.
materials: Global materials dictionary.
source_directory: Path for mesh resolution.
Returns:
A Visual model or None if geometry is missing.
"""
origin = self._parse_origin_element(visual_elem.find("{*}origin"))
geom_elem = visual_elem.find("{*}geometry")
if geom_elem is None:
return None
geometry = self._parse_geometry_element(geom_elem, source_directory)
if not geometry:
return None
material = self._parse_material_element(visual_elem.find("{*}material"), materials)
return Visual(
geometry=geometry,
origin=origin,
material=material,
name=visual_elem.get("name"),
)
def _parse_collision_element(
self, collision_elem: ET.Element, source_directory: Path | None = None
) -> Collision | None:
"""Parse a single <collision> element.
Args:
collision_elem: The collision XML element.
source_directory: Path for mesh resolution.
Returns:
A Collision model or None if geometry is missing.
"""
origin = self._parse_origin_element(collision_elem.find("{*}origin"))
geom_elem = collision_elem.find("{*}geometry")
if geom_elem is None:
return None
geometry = self._parse_geometry_element(geom_elem, source_directory)
if not geometry:
return None
return Collision(
geometry=geometry,
origin=origin,
name=collision_elem.get("name"),
)
def _parse_joint(self, joint_elem: ET.Element) -> Joint:
"""Parse a <joint> element into a Joint model.
Args:
joint_elem: The XML element for the joint.
Returns:
A populated Joint model.
"""
name = joint_elem.get("name", UNNAMED_JOINT)
joint_type_str = joint_elem.get("type", JOINT_FIXED)
type_map = {
JOINT_REVOLUTE: JointType.REVOLUTE,
JOINT_CONTINUOUS: JointType.CONTINUOUS,
JOINT_PRISMATIC: JointType.PRISMATIC,
JOINT_FIXED: JointType.FIXED,
JOINT_FLOATING: JointType.FLOATING,
JOINT_PLANAR: JointType.PLANAR,
}
joint_type = type_map.get(joint_type_str.lower(), JointType.FIXED)
parent_elem = joint_elem.find("{*}parent")
child_elem = joint_elem.find("{*}child")
parent = parent_elem.get("link", "") if parent_elem is not None else ""
child = child_elem.get("link", "") if child_elem is not None else ""
origin = self._parse_origin_element(joint_elem.find("{*}origin"))
# Parse Axis
axis = self._parse_joint_axis(joint_elem, joint_type)
# Parse Sub-components
limits = self._parse_joint_limits(joint_elem, joint_type, name)
dynamics = self._parse_joint_dynamics(joint_elem.find("{*}dynamics"))
mimic = self._parse_joint_mimic(joint_elem.find("{*}mimic"))
safety = self._parse_joint_safety(joint_elem.find("{*}safety_controller"))
calibration = self._parse_joint_calibration(joint_elem.find("{*}calibration"))
return Joint(
name=name,
type=joint_type,
parent=parent,
child=child,
origin=origin,
axis=axis,
limits=limits,
dynamics=dynamics,
mimic=mimic,
safety_controller=safety,
calibration=calibration,
)
def _parse_joint_axis(self, joint_elem: ET.Element, joint_type: JointType) -> Vector3 | None:
"""Parse the <axis> element of a joint."""
if joint_type in (
JointType.FIXED,
JointType.FLOATING,
):
return None
# Exclude other types that don't traditionally have an axis but aren't fixed
if joint_type not in (
JointType.REVOLUTE,
JointType.CONTINUOUS,
JointType.PRISMATIC,
JointType.PLANAR,
):
return None
axis_elem = joint_elem.find("{*}axis")
if axis_elem is not None:
axis = parse_vector3(axis_elem.get("xyz", DEFAULT_URDF_AXIS_XYZ_STR))
nx, ny, nz = normalize_vector(axis.x, axis.y, axis.z)
if nx == 0.0 and ny == 0.0 and nz == 0.0:
return Vector3(*DEFAULT_URDF_AXIS_XYZ)
return Vector3(nx, ny, nz)
return Vector3(*DEFAULT_URDF_AXIS_XYZ)
def _parse_joint_limits(
self, joint_elem: ET.Element, joint_type: JointType, joint_name: str
) -> JointLimits | None:
"""Parse the <limit> element for a joint.
Args:
joint_elem: The joint XML element.
joint_type: The type of the joint.
joint_name: The name of the joint for logging.
Returns:
A JointLimits model or None.
"""
if joint_type not in (JointType.REVOLUTE, JointType.PRISMATIC, JointType.CONTINUOUS):
return None
limits_elem = joint_elem.find("{*}limit")
if limits_elem is not None:
lower_str = limits_elem.get("lower")
upper_str = limits_elem.get("upper")
effort = parse_float(limits_elem.get("effort"), check_name="effort", default=0.0)
velocity = parse_float(limits_elem.get("velocity"), check_name="velocity", default=0.0)
# Validation: effort and velocity must be non-negative
if effort < 0:
logger.warning(f"Joint '{joint_name}' effort limit is negative ({effort})")
effort = 0.0
if velocity < 0:
logger.warning(f"Joint '{joint_name}' velocity limit is negative ({velocity})")
velocity = 0.0
return JointLimits(
lower=float(lower_str) if lower_str is not None else None,
upper=float(upper_str) if upper_str is not None else None,
effort=effort,
velocity=velocity,
)
if joint_type in (JointType.REVOLUTE, JointType.PRISMATIC):
return JointLimits(lower=0.0, upper=0.0, effort=0.0, velocity=0.0)
return None
def _parse_joint_dynamics(self, elem: ET.Element | None) -> JointDynamics | None:
"""Parse the <dynamics> element into a JointDynamics model.
Args:
elem: The dynamics XML element.
Returns:
A populated JointDynamics instance or None.
"""
if elem is None:
return None
return JointDynamics(
damping=parse_float(elem.get("damping"), check_name="damping", default=0.0),
friction=parse_float(elem.get("friction"), check_name="friction", default=0.0),
)
def _parse_joint_mimic(self, elem: ET.Element | None) -> JointMimic | None:
"""Parse the <mimic> element into a JointMimic model.
Args:
elem: The mimic XML element.
Returns:
A populated JointMimic instance or None.
"""
if elem is None:
return None
return JointMimic(
joint=elem.get("joint", ""),
multiplier=parse_float(elem.get("multiplier"), check_name="multiplier", default=1.0),
offset=parse_float(elem.get("offset"), check_name="offset", default=0.0),
)
def _parse_joint_safety(self, elem: ET.Element | None) -> JointSafetyController | None:
"""Parse the <safety_controller> element into a JointSafetyController model.
Args:
elem: The safety_controller XML element.
Returns:
A populated JointSafetyController instance or None.
"""
if elem is None:
return None
return JointSafetyController(
soft_lower_limit=parse_float(
elem.get("soft_lower_limit"), check_name="soft_lower_limit"
),
soft_upper_limit=parse_float(
elem.get("soft_upper_limit"), check_name="soft_upper_limit"
),
k_position=parse_float(elem.get("k_position"), check_name="k_position"),
k_velocity=parse_float(elem.get("k_velocity"), check_name="k_velocity", default=0.0),
)
def _parse_joint_calibration(self, elem: ET.Element | None) -> JointCalibration | None:
"""Parse the <calibration> element into a JointCalibration model.
Args:
elem: The calibration XML element.
Returns:
A populated JointCalibration instance or None.
"""
if elem is None:
return None
rising_str = elem.get("rising")
falling_str = elem.get("falling")
return JointCalibration(
rising=parse_float(rising_str, check_name="rising") if rising_str is not None else None,
falling=parse_float(falling_str, check_name="falling")
if falling_str is not None
else None,
)
def _normalize_hardware_interface(self, interface: str) -> str:
"""Normalize hardware interface string (e.g., 'PositionJointInterface' -> 'position').
Args:
interface: The raw hardware interface string from the URDF.
Returns:
The normalized ROS 2 compatible hardware interface string.
"""
clean = interface.lower().replace("hardware_interface/", "").replace("jointinterface", "")
return clean.replace("interface", "")
def _parse_ros2_control(self, rc_elem: ET.Element) -> Ros2Control | None:
"""Parse a <ros2_control> element.
Args:
rc_elem: The XML element for ros2_control.
Returns:
A Ros2Control model or None if parsing fails.
"""
name = rc_elem.get("name", "")
rc_type = rc_elem.get("type", CONTROL_TYPE_SYSTEM)
hw_elem = rc_elem.find("{*}hardware")
plugin_elem = hw_elem.find("{*}plugin") if hw_elem is not None else None
hardware_plugin = (
plugin_elem.text.strip() if plugin_elem is not None and plugin_elem.text else ""
)
parameters: dict[str, str] = {}
if hw_elem is not None:
for param_elem in hw_elem.findall("{*}param"):
p_name = param_elem.get("name")
if p_name and param_elem.text:
parameters[p_name] = param_elem.text.strip()
joints: list[Ros2ControlJoint] = []
for joint_elem in rc_elem.findall("{*}joint"):
joint_name = joint_elem.get("name", "")
command_interfaces = [
self._normalize_hardware_interface(cmd.get("name", HW_IF_POSITION))
for cmd in joint_elem.findall("{*}command_interface")
]
state_interfaces = [
self._normalize_hardware_interface(state.get("name", HW_IF_POSITION))
for state in joint_elem.findall("{*}state_interface")
]
joint_params: dict[str, str] = {
str(param.get("name")): param.text.strip()
for param in joint_elem.findall("{*}param")
if param.get("name") and param.text
}
if command_interfaces or state_interfaces:
joints.append(
Ros2ControlJoint(
name=joint_name,
command_interfaces=command_interfaces,
state_interfaces=state_interfaces,
parameters=joint_params,
)
)
sensors: list[Ros2ControlSensor] = []
for sensor_elem in rc_elem.findall("{*}sensor"):
sensor_name = sensor_elem.get("name", "")
state_interfaces = [
self._normalize_hardware_interface(state.get("name", HW_IF_POSITION))
for state in sensor_elem.findall("{*}state_interface")
]
sensor_params: dict[str, str] = {
str(param.get("name")): param.text.strip()
for param in sensor_elem.findall("{*}param")
if param.get("name") and param.text
}
if state_interfaces or sensor_params:
sensors.append(
Ros2ControlSensor(
name=sensor_name,
state_interfaces=state_interfaces,
parameters=sensor_params,
)
)
# Catch-all for top-level parameters not inside <hardware>
for child in rc_elem:
if strip_xml_namespace(child.tag) not in ("hardware", "joint", "sensor") and child.text:
parameters[strip_xml_namespace(child.tag)] = child.text.strip()
try:
return Ros2Control(
name=name,
type=rc_type,
hardware_plugin=hardware_plugin,
joints=joints,
sensors=sensors,
parameters=parameters,
)
except RobotModelError as e:
logger.warning(f"Invalid ros2_control '{name}' ignored during strict parsing: {e}")
return None
def _parse_transmission_component(
self, elem: ET.Element, tag: str
) -> TransmissionJoint | TransmissionActuator | None:
"""Parse a joint or actuator component within a transmission.
Args:
elem: The XML element (joint or actuator).
tag: The component type ('joint' or 'actuator').
Returns:
The parsed component model or None.
"""
name = elem.get("name", "")
if not name:
return None
# Handle both camelCase and snake_case for hardware interfaces
hw_interfaces = [
self._normalize_hardware_interface(hw.text.strip())
for hw in (elem.findall("{*}hardwareInterface") + elem.findall("{*}hardware_interface"))
if hw.text
]
reduction = parse_float(
elem.findtext("{*}mechanicalReduction") or elem.findtext("{*}mechanical_reduction"),
check_name="mechanicalReduction",
default=1.0,
)
if reduction == 0:
logger.warning(f"Transmission component '{name}' has zero reduction, defaulting to 1.0")
reduction = 1.0
offset = parse_float(elem.findtext("{*}offset"), check_name="offset", default=0.0)
if tag == "joint":
return TransmissionJoint(
name=name,
hardware_interfaces=hw_interfaces or [HW_IF_POSITION],
mechanical_reduction=reduction,
offset=offset,
)
return TransmissionActuator(
name=name,
hardware_interfaces=hw_interfaces or [HW_IF_POSITION],
mechanical_reduction=reduction,
offset=offset,
)
def _parse_transmission(self, trans_elem: ET.Element) -> Transmission | None:
"""Parse a <transmission> element.
Args:
trans_elem: The transmission XML element.
Returns:
A Transmission model or None if parsing fails.
"""
name = trans_elem.get("name", "unnamed_transmission")
trans_type = trans_elem.findtext("{*}type", "")
joints: list[TransmissionJoint] = []
for j_elem in trans_elem.findall("{*}joint"):
comp = self._parse_transmission_component(j_elem, "joint")
if isinstance(comp, TransmissionJoint):
joints.append(comp)
actuators: list[TransmissionActuator] = []
for a_elem in trans_elem.findall("{*}actuator"):
comp = self._parse_transmission_component(a_elem, "actuator")
if isinstance(comp, TransmissionActuator):
actuators.append(comp)
try:
return Transmission(name=name, type=trans_type, joints=joints, actuators=actuators)
except RobotModelError as e:
logger.warning(f"Invalid transmission '{name}' ignored: {e}")
return None
def _parse_sensor_noise(self, noise_elem: ET.Element | None) -> SensorNoise | None:
"""Parse a <noise> element.
Args:
noise_elem: The XML element containing noise parameters.
Returns:
A SensorNoise model or None.
"""
if noise_elem is None:
return None
actual_noise_elem = noise_elem.find("{*}noise") if noise_elem.tag != "noise" else noise_elem
if actual_noise_elem is None:
return None
return SensorNoise(
type=actual_noise_elem.findtext("{*}type", NOISE_GAUSSIAN),
mean=parse_float(
actual_noise_elem.findtext("{*}mean", "0.0"), check_name="mean", default=0.0
),
stddev=parse_float(
actual_noise_elem.findtext("{*}stddev", "0.0"), check_name="stddev", default=0.0
),
)
def _parse_sensor_from_gazebo(self, gazebo_elem: ET.Element) -> Sensor | None:
"""Parse a <sensor> from within a <gazebo> extension element.
Args:
gazebo_elem: The parent Gazebo extension element.
Returns:
A populated Sensor model or None if no sensor is defined.
"""
link_name = gazebo_elem.get("reference", "")
sensor_elem = gazebo_elem.find("{*}sensor")
if not link_name or sensor_elem is None:
return None
sensor_name = sensor_elem.get("name", "")
sensor_type_str = sensor_elem.get("type", "camera")
type_map = {
"camera": SensorType.CAMERA,
"depth": SensorType.DEPTH_CAMERA,
"depth_camera": SensorType.DEPTH_CAMERA,
"multicamera": SensorType.CAMERA,
"ray": SensorType.LIDAR,
"lidar": SensorType.LIDAR,
"gpu_ray": SensorType.GPU_LIDAR,
"gpu_lidar": SensorType.GPU_LIDAR,
"imu": SensorType.IMU,
"gps": SensorType.GPS,
"navsat": SensorType.GPS,
"contact": SensorType.CONTACT,
"force_torque": SensorType.FORCE_TORQUE,
}
sensor_type = type_map.get(sensor_type_str.lower(), SensorType.CAMERA)
update_rate = parse_float(
sensor_elem.findtext("{*}update_rate", str(DEFAULT_UPDATE_RATE)),
check_name="updateRate",
default=DEFAULT_UPDATE_RATE,
)
origin = Transform.identity()
pose_elem = sensor_elem.find("{*}pose")
if pose_elem is not None and pose_elem.text:
parts = pose_elem.text.strip().split()
if len(parts) >= 6:
try:
xyz = parse_vector3(" ".join(parts[0:3]))
rpy = parse_vector3(" ".join(parts[3:6]))
origin = Transform(xyz=xyz, rpy=rpy)
except RobotModelError as e:
logger.warning(f"Invalid sensor pose in Gazebo element: {e}")
# Initialize info objects based on sensor type
camera_info = None
lidar_info = None
imu_info = None
gps_info = None
contact_info = None
force_torque_info = None
if sensor_type in (SensorType.CAMERA, SensorType.DEPTH_CAMERA):
camera_elem = sensor_elem.find("{*}camera")
if camera_elem is not None:
camera_info = CameraInfo(
horizontal_fov=parse_float(
camera_elem.findtext("{*}horizontal_fov"),
check_name="horizontal_fov",
default=DEFAULT_CAMERA_FOV,
),
width=parse_int(
camera_elem.findtext("{*}image/{*}width"), default=DEFAULT_CAMERA_WIDTH
),
height=parse_int(
camera_elem.findtext("{*}image/{*}height"), default=DEFAULT_CAMERA_HEIGHT
),
format=camera_elem.findtext("{*}image/{*}format", DEFAULT_CAMERA_FORMAT),
near_clip=parse_float(
camera_elem.findtext("{*}clip/{*}near"),
check_name="near",
default=DEFAULT_CAMERA_NEAR,
),
far_clip=parse_float(
camera_elem.findtext("{*}clip/{*}far"),
check_name="far",
default=DEFAULT_CAMERA_FAR,
),
noise=self._parse_sensor_noise(camera_elem),
)
else:
camera_info = CameraInfo()
elif sensor_type == SensorType.LIDAR:
ray_elem = sensor_elem.find("{*}ray")
if ray_elem is not None:
lidar_info = LidarInfo(
horizontal_samples=parse_int(
ray_elem.findtext("{*}scan/{*}horizontal/{*}samples"),
default=DEFAULT_LIDAR_SAMPLES,
),
horizontal_min_angle=parse_float(
ray_elem.findtext("{*}scan/{*}horizontal/{*}min_angle"),
check_name="min_angle",
default=DEFAULT_LIDAR_MIN_ANGLE,
),
horizontal_max_angle=parse_float(
ray_elem.findtext("{*}scan/{*}horizontal/{*}max_angle"),
check_name="max_angle",
default=DEFAULT_LIDAR_MAX_ANGLE,
),
range_min=parse_float(
ray_elem.findtext("{*}range/{*}min"),
check_name="range_min",
default=DEFAULT_LIDAR_RANGE_MIN,
),
range_max=parse_float(
ray_elem.findtext("{*}range/{*}max"),
check_name="range_max",
default=DEFAULT_LIDAR_RANGE_MAX,
),
noise=self._parse_sensor_noise(ray_elem),
)
else:
lidar_info = LidarInfo()
elif sensor_type == SensorType.GPS:
gps_elem = sensor_elem.find("{*}gps")
if gps_elem is not None:
gps_info = GPSInfo(
position_sensing_horizontal_noise=self._parse_sensor_noise(
gps_elem.find("{*}position_sensing/{*}horizontal")
),
position_sensing_vertical_noise=self._parse_sensor_noise(
gps_elem.find("{*}position_sensing/{*}vertical")
),
velocity_sensing_horizontal_noise=self._parse_sensor_noise(
gps_elem.find("{*}velocity_sensing/{*}horizontal")
),
velocity_sensing_vertical_noise=self._parse_sensor_noise(
gps_elem.find("{*}velocity_sensing/{*}vertical")
),
)
else:
gps_info = GPSInfo()
elif sensor_type == SensorType.IMU:
imu_elem = sensor_elem.find("{*}imu")
if imu_elem is not None:
imu_info = IMUInfo(
angular_velocity_noise=self._parse_sensor_noise(
imu_elem.find("{*}angular_velocity/{*}x")
),
linear_acceleration_noise=self._parse_sensor_noise(
imu_elem.find("{*}linear_acceleration/{*}x")
),
)
else:
imu_info = IMUInfo()
elif sensor_type == SensorType.CONTACT:
contact_elem = sensor_elem.find("{*}contact")
if contact_elem is not None:
collision = contact_elem.findtext("{*}collision")
if not collision:
raise RobotValidationError(
ValidationErrorCode.VALUE_EMPTY,
f"Sensor '{sensor_name}' is missing collision reference",
target="SensorCollision",
value=sensor_name,
)
contact_info = ContactInfo(
collision=collision, noise=self._parse_sensor_noise(contact_elem)
)
else:
raise RobotValidationError(
ValidationErrorCode.VALUE_EMPTY,
f"Sensor '{sensor_name}' expects contact info but none found",
target="SensorCollision",
value=sensor_name,
)
elif sensor_type == SensorType.FORCE_TORQUE:
ft_elem = sensor_elem.find("{*}force_torque")
force_torque_info = ForceTorqueInfo(
frame=ft_elem.findtext("{*}frame", "child") if ft_elem is not None else "child",
measure_direction=ft_elem.findtext("{*}measure_direction", "child_to_parent")
if ft_elem is not None
else "child_to_parent",
noise=self._parse_sensor_noise(ft_elem),
)
topic = sensor_elem.findtext("{*}topic") or f"/{sensor_name}"
return Sensor(
name=sensor_name,
type=sensor_type,
link_name=link_name,
origin=origin,
update_rate=update_rate,
camera_info=camera_info,
lidar_info=lidar_info,
imu_info=imu_info,
gps_info=gps_info,
contact_info=contact_info,
force_torque_info=force_torque_info,
plugin=self._parse_gazebo_plugin(sensor_elem.find("{*}plugin")),
topic=topic,
)
def _parse_gazebo_plugin(self, plugin_elem: ET.Element | None) -> GazeboPlugin | None:
"""Parse a Gazebo plugin element.
Args:
plugin_elem: The plugin XML element.
Returns:
A populated GazeboPlugin model or None.
"""
if plugin_elem is None:
return None
name = plugin_elem.get("name", "")
filename = plugin_elem.get("filename", "")
parameters = {
child.tag: child.text.strip() for child in plugin_elem if child.text and not len(child)
}
raw_xml = (
"".join(ET.tostring(child, encoding="unicode") for child in plugin_elem)
if len(plugin_elem)
else None
)
return GazeboPlugin(name=name, filename=filename, parameters=parameters, raw_xml=raw_xml)
def _parse_gazebo_element(self, gazebo_elem: ET.Element) -> GazeboElement:
"""Parse a <gazebo> extension element.
Args:
gazebo_elem: The XML element for the Gazebo extension.
Returns:
A populated GazeboElement model.
"""
reference = gazebo_elem.get("reference")
plugins = [
self._parse_gazebo_plugin(p) for p in gazebo_elem.findall("{*}plugin") if p is not None
]
# Filter out None values
valid_plugins = [p for p in plugins if p is not None]
properties = {}
excluded_tags = [
"plugin",
"sensor",
"material",
"selfCollide",
"static",
"gravity",
"provideFeedback",
"implicitSpringDamper",
"mu1",
"mu2",
"kp",
"kd",
"stopCfm",
"stopErp",
]
for child in gazebo_elem:
if child.tag not in excluded_tags and child.text:
properties[child.tag] = child.text.strip()
return GazeboElement(
reference=reference,
properties=properties,
plugins=valid_plugins,
material=gazebo_elem.findtext("material") or gazebo_elem.findtext("{*}material"),
static=parse_optional_bool(gazebo_elem, "static"),
stop_cfm=parse_optional_float(gazebo_elem, "stopCfm"),
stop_erp=parse_optional_float(gazebo_elem, "stopErp"),
provide_feedback=parse_optional_bool(gazebo_elem, "provideFeedback"),
implicit_spring_damper=parse_optional_bool(gazebo_elem, "implicitSpringDamper"),
)
def _detect_xacro_file(self, root: ET.Element, filepath: Path | None = None) -> None:
"""Detect if file is XACRO and raise helpful error."""
is_xacro = False
if filepath:
is_xacro = filepath.suffix.lower() in [".xacro", ".urdf.xacro"]
if not is_xacro:
try:
content = filepath.read_text(encoding="utf-8")
is_xacro = "xmlns:xacro" in content
except (OSError, UnicodeDecodeError):
pass
if not is_xacro:
for child in root:
if get_xml_namespace(child.tag) in XACRO_URIS:
is_xacro = True
break
if not is_xacro:
for elem in root.iter():
if any("${" in v or "$(" in v for v in elem.attrib.values() if isinstance(v, str)):
is_xacro = True
break
if is_xacro:
filename = filepath.name if filepath else "URDF String"
raise XacroDetectedError(filename)
def _parse_from_context(
self, context: Any, root: ET.Element, filepath: Path | None = None
) -> Robot:
"""Process iterative XML parsing internally.
This method orchestrates the element-by-element construction of the
Robot model. It handles depth tracking for security and manages the
processing order of dependent elements (links must exist before joints).
Args:
context: The iterparse context.
root: The root <robot> element.
filepath: Path to the source file for resolution.
Returns:
A fully populated Robot model.
Raises:
RobotParserUnexpectedError: If XML nesting exceeds MAX_XML_DEPTH.
XacroDetectedError: If the parser detects unexpanded XACRO macros.
"""
self._detect_xacro_file(root, filepath)
default_name = filepath.stem if filepath else "unnamed_robot"
kwargs: dict[str, Any] = {"name": root.get("name", default_name)}
if self.resource_resolver is not None:
kwargs["resource_resolver"] = self.resource_resolver
robot = Robot(**kwargs)
materials: dict[str, Material] = {}
depth = 0
# Elements that depend on links (joints, transmissions, etc.) are processed
# in a second pass to ensure the robot's kinematic graph is complete.
delayed_joints: list[Joint] = []
delayed_transmissions: list[Transmission] = []
delayed_ros2_controls: list[Ros2Control] = []
delayed_sensors: list[Sensor] = []
delayed_gazebo_elements: list[tuple[GazeboElement, dict[str, Any]]] = []
# Determine base directory for resolving relative mesh paths.
source_directory = (
filepath.parent if filepath and filepath.is_file() else filepath
) or Path(".")
for event, elem in context:
if event == "start":
depth += 1
if depth > MAX_XML_DEPTH:
raise RobotParserUnexpectedError(
source_area="XML nesting", original_error=depth
)
else:
if depth == 1:
tag = strip_xml_namespace(elem.tag)
if tag == "material":
mat = self._parse_material_element(elem, materials)
if mat:
materials[mat.name] = mat
robot.materials[mat.name] = mat
elif tag == "link":
try:
link = self._parse_link(elem, materials, source_directory)
robot.add_link(link)
except (RobotModelError, ValueError) as e:
logger.warning(f"Skipping invalid link '{elem.get('name')}': {e}")
elif tag == "joint":
try:
delayed_joints.append(self._parse_joint(elem))
except (RobotModelError, ValueError) as e:
logger.warning(f"Skipping invalid joint '{elem.get('name')}': {e}")
elif tag == "transmission":
try:
trans = self._parse_transmission(elem)
if trans:
delayed_transmissions.append(trans)
except (RobotModelError, ValueError) as e:
logger.warning(
f"Skipping invalid transmission '{elem.get('name')}': {e}"
)
elif tag == "ros2_control":
try:
ros2_ctrl = self._parse_ros2_control(elem)
if ros2_ctrl:
delayed_ros2_controls.append(ros2_ctrl)
except (RobotModelError, ValueError) as e:
logger.warning(
f"Skipping invalid ros2_control '{elem.get('name')}': {e}"
)
elif tag == "gazebo":
try:
# 1. Try to extract a sensor if present
sensor = self._parse_sensor_from_gazebo(elem)
if sensor:
delayed_sensors.append(sensor)
# 2. Extract physics fields (regardless of sensor presence)
physics_data = {
"mu": parse_optional_float(elem, "mu1", default=None),
"mu2": parse_optional_float(elem, "mu2", default=None),
"kp": parse_optional_float(elem, "kp", default=None),
"kd": parse_optional_float(elem, "kd", default=None),
"self_collide": parse_optional_bool(elem, "selfCollide"),
"gravity": parse_optional_bool(elem, "gravity"),
}
# Filter out None values
physics_data = {k: v for k, v in physics_data.items() if v is not None}
# 3. Extract other Gazebo metadata (plugins, material, properties)
gazebo_elem = self._parse_gazebo_element(elem)
delayed_gazebo_elements.append((gazebo_elem, physics_data))
except (RobotModelError, ValueError) as e:
logger.warning(
f"Skipping invalid gazebo element '{elem.get('name') or elem.get('reference')}': {e}"
)
elem.clear()
depth -= 1
# Second Pass: Process dependent elements
for joint in delayed_joints:
try:
robot.add_joint(joint)
except (RobotModelError, ValueError) as e:
logger.warning(f"Skipping invalid joint '{joint.name}': {e}")
for trans in delayed_transmissions:
try:
robot.add_transmission(trans)
except (RobotModelError, ValueError) as e:
logger.warning(f"Skipping invalid transmission '{trans.name}': {e}")
for ros2_ctrl in delayed_ros2_controls:
try:
robot.add_ros2_control(ros2_ctrl)
except (RobotModelError, ValueError) as e:
logger.warning(f"Skipping invalid ros2_control '{ros2_ctrl.name}': {e}")
for sensor in delayed_sensors:
try:
robot.add_sensor(sensor)
except (RobotModelError, ValueError) as e:
logger.warning(f"Skipping invalid sensor '{sensor.name}': {e}")
for gazebo_elem, physics_data in delayed_gazebo_elements:
try:
# 1. Apply physics data to link if reference is a link
if gazebo_elem.reference and robot.has_link(gazebo_elem.reference):
link = robot.link(gazebo_elem.reference)
# Create updated physics object (cast for replace compatibility)
new_physics = replace(link.physics, **cast(dict[str, Any], physics_data))
# Replace link with updated physics
robot.add_link(replace(link, physics=new_physics), overwrite=True)
# 2. Add gazebo element if it has unique data (plugins, material, etc.)
# We skip "empty" gazebo elements that only contained physics
if (
gazebo_elem.plugins
or gazebo_elem.material
or gazebo_elem.properties
or gazebo_elem.static is not None
or gazebo_elem.stop_cfm is not None
or gazebo_elem.stop_erp is not None
or gazebo_elem.provide_feedback is not None
or gazebo_elem.implicit_spring_damper is not None
):
robot.add_gazebo_element(gazebo_elem)
except (RobotModelError, ValueError) as e:
logger.warning(f"Skipping invalid gazebo element '{gazebo_elem.reference}': {e}")
return robot
[docs]
def parse(self, filepath: Path, **_kwargs: Any) -> Robot:
"""Parse URDF file into a Robot model using iterative parsing.
This implementation uses iterparse to maintain O(1) memory complexity
even for massive URDF files.
Args:
filepath: Path to the input file
**kwargs: Additional parsing options
Returns:
The generic Robot model (Intermediate Representation)
Raises:
RobotParserIOError: If file cannot be read or exceeds size limit
XacroDetectedError: If a XACRO file is passed instead of URDF
RobotParserXMLRootError: If root tag is not <robot>
RobotParserUnexpectedError: If XML is malformed or internal error occurs
"""
self._validate_file(filepath)
# Check if this is a XACRO file by extension (proactive check)
if filepath.suffix == ".xacro" or filepath.name.endswith(".urdf.xacro"):
raise XacroDetectedError(filepath.name)
try:
# We use iterparse to process elements as they are closed
context = ET.iterparse(str(filepath), events=("start", "end"))
event, root = next(context)
if strip_xml_namespace(root.tag) != "robot":
raise RobotParserXMLRootError(actual_tag=root.tag)
return self._parse_from_context(context, root, filepath)
except ET.ParseError as e:
raise RobotParserUnexpectedError(source_area="URDF XML", original_error=e) from e
except Exception as e:
if isinstance(
e,
(
RobotParserUnexpectedError,
RobotParserIOError,
RobotParserXMLRootError,
XacroDetectedError,
),
):
raise
raise RobotParserUnexpectedError(
source_area="Unexpected URDF parse", original_error=e
) from e
[docs]
def parse_string(
self,
content: str,
source_directory: Path | None = None,
**_kwargs: Any,
) -> Robot:
"""Parse URDF from string.
Args:
content: URDF XML content as string
source_directory: Base directory for relative mesh path resolution
**kwargs: Additional parsing options
Returns:
The generic Robot model (Intermediate Representation)
Raises:
RobotParserIOError: If string exceeds size limit
XacroDetectedError: If string contains XACRO markers
RobotParserXMLRootError: If root tag is not <robot>
RobotParserUnexpectedError: If XML parsing fails
"""
self._validate_content(content)
if "<xacro:" in content:
raise XacroDetectedError(message="URDF String contains XACRO")
try:
stream = io.BytesIO(content.encode("utf-8"))
context = ET.iterparse(stream, events=("start", "end"))
event, root = next(context)
if strip_xml_namespace(root.tag) != "robot":
raise RobotParserXMLRootError(actual_tag=root.tag)
return self._parse_from_context(context, root, source_directory)
except ET.ParseError as e:
raise RobotParserUnexpectedError(source_area="URDF XML", original_error=e) from e
except Exception as e:
if isinstance(
e,
(
RobotParserUnexpectedError,
RobotParserIOError,
RobotParserXMLRootError,
XacroDetectedError,
),
):
raise
raise RobotParserUnexpectedError(
source_area="Unexpected URDF parse", original_error=e
) from e