"""Native XACRO resolver for LinkForge.
This module provides a pure-Python implementation for resolving XACRO macros,
properties, and includes.
"""
from __future__ import annotations
import copy
import json
import math
import os
import re
import sys
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from pathlib import Path
from types import SimpleNamespace
from typing import Any
from .._utils.dict_utils import AttrDict
from .._utils.path_utils import resolve_package_path
from .._utils.xml_utils import (
get_xml_namespace,
serialize_xml,
strip_xml_namespace,
)
from ..constants import (
XACRO_PREFIX,
XACRO_URIS,
)
from ..exceptions import (
RobotParserError,
RobotXacroError,
RobotXacroExpressionError,
RobotXacroRecursionError,
)
from ..logging_config import get_logger
try:
import yaml as _yaml_module
yaml: Any = _yaml_module
except ImportError:
yaml = None
logger = get_logger(__name__)
DEFAULT_MAX_DEPTH = 2000 # Increased for extremely complex industrial robots
RECURSION_LIMIT_BOOST = 5000 # Safer limit that prevents C-stack segmentation faults
_DUNDER_PATTERN: re.Pattern[str] = re.compile(r"__\w+__")
# Safe math context for evaluations
MATH_CONTEXT: dict[str, Any] = {
name: getattr(math, name) for name in dir(math) if not name.startswith("__")
}
# Security: Allow safe primitives but no dangerous builtins
MATH_CONTEXT["__builtins__"] = {
"abs": abs,
"float": float,
"int": int,
"len": len,
"max": max,
"min": min,
"round": round,
"str": str,
"list": list,
"dict": dict,
"bool": bool,
}
# Standard XACRO booleans
MATH_CONTEXT["true"] = True
MATH_CONTEXT["false"] = False
# Internal XML tags used for structural processing
_TAG_CONTAINER = "container"
_TAG_SKIP = "skip"
_PREFIX_XACRO = XACRO_PREFIX
@dataclass
class XacroTemplate:
"""A pre-parsed structural template of a XACRO file.
This represents the 'Structural Phase' of XACRO resolution where all
includes are expanded and files are parsed into memory, but no
substitutions or macro calls have been evaluated yet.
"""
filepath: Path
root_tag: str
root_attrib: dict[str, str]
container: ET.Element # Container of pre-resolved elements
macros: dict[str, tuple[list[str], ET.Element]]
# Global cache for structural templates to speed up repeated assembly of identical robots.
TEMPLATE_CACHE: dict[Path, XacroTemplate] = {}
def clear_xacro_cache() -> None:
"""Clear the global XACRO structural template cache."""
TEMPLATE_CACHE.clear()
[docs]
class XacroResolver:
"""Lightweight XACRO resolver with macro and math support."""
[docs]
def __init__(
self,
search_paths: list[Path] | None = None,
max_depth: int = DEFAULT_MAX_DEPTH,
start_dir: Path | None = None,
) -> None:
"""Initialize the XACRO resolver.
Args:
search_paths: List of additional directories to search for includes.
max_depth: Maximum recursion depth for macro expansions and includes.
start_dir: Base directory for resolving package:// URIs and relative paths.
"""
self.search_paths = search_paths or []
self.start_dir = start_dir
self.properties: dict[str, Any] = {}
self.macros: dict[str, tuple[list[str], ET.Element]] = {}
self.args: dict[str, str] = {}
self.max_depth = max_depth or DEFAULT_MAX_DEPTH
self._current_depth = 0
self._ns_stack: list[str] = []
self._file_stack: list[Path] = []
self._block_stack: set[str] = set()
# Shared evaluation context (load functions and standard arg)
self.eval_context = MATH_CONTEXT.copy()
self.eval_context["load_yaml"] = self._handle_load_yaml
self.eval_context["load_json"] = self._handle_load_json
self.eval_context["arg"] = self._handle_arg_eval
# ROS-standard 'xacro' namespace
xacro_ns = SimpleNamespace()
xacro_ns.load_yaml = self._handle_load_yaml
xacro_ns.load_json = self._handle_load_json
xacro_ns.arg = self._handle_arg_eval
xacro_ns.warning = logger.warning
xacro_ns.error = logger.error
xacro_ns.fatal = logger.critical
xacro_ns.message = logger.info
self.eval_context["xacro"] = xacro_ns
def _resolve_children(self, parent_elem: ET.Element, target_elem: ET.Element) -> None:
"""Recursively resolve and append children to a parent element.
Args:
parent_elem: The element to receive the resolved children.
target_elem: The element whose children should be resolved.
"""
for child in target_elem:
resolved = self.resolve_element(child)
self._append_resolved(parent_elem, resolved)
def _append_resolved(self, parent: ET.Element, items: list[ET.Element] | ET.Element) -> None:
"""Append resolved items to a parent, flattening containers and skipping tags.
Args:
parent: The parent XML element.
items: A single element or list of elements to append.
"""
if isinstance(items, ET.Element):
items = [items]
for item in items:
if item.tag == _TAG_CONTAINER:
# Flatten container recursively
self._append_resolved(parent, list(item))
elif item.tag != _TAG_SKIP and not item.tag.startswith(_PREFIX_XACRO):
# Add valid element
parent.append(item)
[docs]
def resolve_file(self, filepath: Path) -> str:
"""Resolve a XACRO file and return the final XML string.
Args:
filepath: Path to the XACRO file to resolve.
Returns:
The fully resolved XML as a string.
Raises:
RobotXacroError: If resolution fails or internal error occurs
RobotXacroRecursionError: If circular dependencies are found
"""
# Boost recursion limit to handle complex modular robot assemblies
old_limit = sys.getrecursionlimit()
sys.setrecursionlimit(max(old_limit, RECURSION_LIMIT_BOOST))
try:
filepath = filepath.resolve()
# Set start_dir from file if not already set
if self.start_dir is None:
self.start_dir = filepath.parent
template = self._get_structural_template(filepath)
# Standard XML requires a root tag
out_root = ET.Element(template.root_tag, template.root_attrib)
# Copy template macros into our active resolver
self.macros.update(copy.deepcopy(template.macros))
# Evaluation Phase: Process the structural container
# We deepcopy the container to ensure this evaluation doesn't mutate the cache.
resolved_container = self.resolve_element(copy.deepcopy(template.container))
# Use helper to flatten container and filter items
self._append_resolved(out_root, list(resolved_container))
# finalize_xml handles cleanup of xacro artifacts
return self._finalize_xml(out_root)
except Exception as e:
if isinstance(e, RobotParserError):
raise
raise RobotXacroError(str(e), context=str(filepath)) from e
finally:
if sys.getrecursionlimit() != old_limit:
sys.setrecursionlimit(old_limit)
[docs]
def resolve_string(self, xml_string: str) -> str:
"""Resolve a XACRO string and return the final XML string.
Args:
xml_string: The XACRO XML content as a string.
Returns:
The fully resolved XML as a string.
Raises:
RobotXacroError: If XML is malformed or resolution fails
"""
# Boost recursion limit to handle complex modular robot assemblies
old_limit = sys.getrecursionlimit()
sys.setrecursionlimit(max(old_limit, RECURSION_LIMIT_BOOST))
try:
root = ET.fromstring(xml_string)
resolved_root = self.resolve_element(root)
# finalize_xml handles cleanup of xacro artifacts
return self._finalize_xml(resolved_root)
except Exception as e:
if isinstance(e, RobotParserError):
raise
raise RobotXacroError(str(e), context="string resolution") from e
finally:
if sys.getrecursionlimit() != old_limit:
sys.setrecursionlimit(old_limit)
def _get_structural_template(self, filepath: Path) -> XacroTemplate:
"""Retrieve or build a structural template for a file.
Args:
filepath: Path to the XACRO file.
Returns:
The cached or newly built structural template.
Raises:
RobotXacroRecursionError: If circular includes are detected
RobotXacroError: If XML parsing fails or files cannot be located
"""
filepath = filepath.resolve()
if filepath in TEMPLATE_CACHE:
return TEMPLATE_CACHE[filepath]
logger.debug(f"XACRO: Building structural template for {filepath.name}")
# Cycle detection for includes during template build
if filepath in self._file_stack:
raise RobotXacroRecursionError(str(filepath.name))
self._file_stack.append(filepath)
try:
tree = ET.parse(filepath)
root = tree.getroot()
# Store search path for relative includes within this file
old_paths = self.search_paths[:]
if filepath.parent not in self.search_paths:
self.search_paths.insert(0, filepath.parent)
structural_macros: dict[str, tuple[list[str], ET.Element]] = {}
container = ET.Element(_TAG_CONTAINER)
for child in root:
# Basic tagging of XACRO elements
ns = get_xml_namespace(child.tag)
tag = child.tag
if ns in XACRO_URIS:
tag = f"{_PREFIX_XACRO}{strip_xml_namespace(tag)}"
if tag == f"{_PREFIX_XACRO}include":
# Handle structural inclusion
# filename may depend on $(arg) or ${}, so we substitute using current context
filename = str(self._substitute(child.get("filename") or ""))
ns = child.get("ns")
inc_path = self._find_file(filename)
if inc_path:
inc_template = self._get_structural_template(inc_path)
structural_macros.update(inc_template.macros)
# If namespaced, we wrap the included elements but keep them structural
if ns:
ns_container = ET.Element(_TAG_CONTAINER, ns=ns)
for sc in inc_template.container:
ns_container.append(sc)
container.append(ns_container)
else:
for sc in inc_template.container:
container.append(sc)
else:
logger.warning(f"XACRO: Could not find included file: '{filename}'")
elif tag == f"{_PREFIX_XACRO}macro":
# Collect macro definition but do not expand
name = child.get("name")
params_str = child.get("params") or ""
# Quick split (the resolver will do the full smart split during evaluation)
params = [p.strip() for p in params_str.split(",") if p.strip()]
if name:
structural_macros[name] = (params, child)
container.append(child)
else:
# Keep all other elements (properties, conditionals, XML tags) as is
container.append(child)
self.search_paths = old_paths
# Clean root attributes (remove xacro namespace)
clean_attrib = {
k: v for k, v in root.attrib.items() if get_xml_namespace(k) not in XACRO_URIS
}
template = XacroTemplate(
filepath=filepath,
root_tag=root.tag,
root_attrib=clean_attrib,
container=container,
macros=structural_macros,
)
TEMPLATE_CACHE[filepath] = template
return template
except ET.ParseError as e:
raise RobotXacroError(str(e), context=str(filepath)) from e
finally:
self._file_stack.pop()
[docs]
def resolve_element(self, element: ET.Element) -> ET.Element:
"""Process a single element recursively, tracking depth.
Args:
element: The XML element to resolve.
Returns:
The resolved XML element or a container.
Raises:
RobotXacroRecursionError: If maximum recursion depth is exceeded
"""
self._current_depth += 1
if self._current_depth > self.max_depth:
raise RobotXacroRecursionError(self.max_depth)
try:
return self._resolve_element_impl(element)
finally:
self._current_depth -= 1
def _resolve_element_impl(self, element: ET.Element) -> ET.Element:
"""Core recursive resolution logic for a single XML element.
Args:
element: The element to dispatch for processing.
Returns:
The resolved XML element.
"""
# Convert any recognized XACRO namespace URI to 'xacro:' prefix
ns = get_xml_namespace(element.tag)
tag = element.tag
if ns in XACRO_URIS:
tag = f"{_PREFIX_XACRO}{strip_xml_namespace(tag)}"
# Dispatch dictionary for known xacro tags
dispatch = {
f"{_PREFIX_XACRO}property": self._handle_property,
f"{_PREFIX_XACRO}arg": self._handle_arg,
f"{_PREFIX_XACRO}include": self._handle_include,
f"{_PREFIX_XACRO}macro": self._handle_macro_def,
f"{_PREFIX_XACRO}if": self._handle_conditional,
f"{_PREFIX_XACRO}unless": self._handle_conditional,
f"{_PREFIX_XACRO}insert_block": self._handle_insert_block,
}
if tag in dispatch:
return dispatch[tag](element)
if tag == _TAG_CONTAINER:
ns = element.get("ns")
if ns:
self._ns_stack.append(ns)
new_container = ET.Element(_TAG_CONTAINER)
self._resolve_children(new_container, element)
if ns:
self._ns_stack.pop()
return new_container
if tag.startswith(_PREFIX_XACRO):
return self._handle_macro_call(tag, element)
return self._handle_regular_element(element)
def _handle_property(self, element: ET.Element) -> ET.Element:
"""Handle property definitions: <xacro:property name="..." value="..."/>.
Args:
element: The XACRO property XML element.
Returns:
A 'skip' element as properties are consumed during resolution.
"""
name = element.get("name")
value = element.get("value")
if name:
# Apply namespace prefix if active
if self._ns_stack:
name = f"{'.'.join(self._ns_stack)}.{name}"
# If no 'value' attribute, check for children (block property)
if value is None and len(element) > 0:
self.properties[name] = list(element)
else:
self.properties[name] = self._try_parse_typed_value(
self._substitute(value or element.text or "")
)
return ET.Element(_TAG_SKIP)
def _handle_arg(self, element: ET.Element) -> ET.Element:
"""Handle argument definitions: <xacro:arg name="..." default="..."/>.
Args:
element: The XACRO arg XML element.
Returns:
A 'skip' element as args are consumed during resolution.
"""
name = element.get("name")
default = element.get("default")
if name and name not in self.args:
self.args[name] = self._try_parse_typed_value(self._substitute(default or ""))
return ET.Element(_TAG_SKIP)
def _handle_include(self, element: ET.Element) -> ET.Element:
"""Handle file includes: <xacro:include filename="..." ns="..."/>.
Args:
element: The XACRO include XML element.
Returns:
A container element containing the resolved included content.
"""
filename = str(self._substitute(element.get("filename") or ""))
ns = element.get("ns")
included_path = self._find_file(filename)
if not included_path:
logger.warning(
f"XACRO: Could not find included file: '{filename}'. Check your paths and $(find ...) usage."
)
return ET.Element(_TAG_SKIP)
template = self._get_structural_template(included_path)
# Merge template macros into current context
# Skip namespaced macros here; they will be handled by the container evaluation phase
if not ns:
self.macros.update(copy.deepcopy(template.macros))
if ns:
self._ns_stack.append(ns)
# Evaluate the template container in current context
# We use resolve_element on a deepcopy to prevent per-instance pollution of the cache.
container = self.resolve_element(copy.deepcopy(template.container))
if ns:
self._ns_stack.pop()
return container
def _handle_macro_def(self, element: ET.Element) -> ET.Element:
"""Handle macro definitions: <xacro:macro name="..." params="...">.
Args:
element: The XACRO macro XML element.
Returns:
A 'skip' element as macro definitions are cached for later calls.
"""
name = element.get("name")
params_str = element.get("params") or ""
params = []
if params_str:
# Smart split that respects nesting in substitutions
current: list[str] = []
nesting = 0
for char in params_str:
if char in "({[":
nesting += 1
elif char in ")}]":
nesting -= 1
if char.isspace() and nesting == 0:
if current:
params.append("".join(current))
current = []
else:
current.append(char)
if current:
params.append("".join(current))
if name:
# Apply namespace prefix if active
if self._ns_stack:
name = f"{'.'.join(self._ns_stack)}.{name}"
self.macros[name] = (params, element)
return ET.Element(_TAG_SKIP)
def _handle_conditional(self, element: ET.Element) -> ET.Element:
"""Handle conditionals: <xacro:if value="..."/> or <xacro:unless value="..."/>.
Args:
element: The XACRO conditional XML element.
Returns:
A container with children if the condition matches, otherwise a 'skip' element.
"""
tag = element.tag
# Check against normal and URI namespaces for the conditional tag
is_unless = f"{_PREFIX_XACRO}unless" in tag or "unless" in tag
condition = element.get("value") or "0"
is_true = self._eval_condition(condition)
if is_unless:
is_true = not is_true
if is_true:
container = ET.Element(_TAG_CONTAINER)
self._resolve_children(container, element)
return container
return ET.Element(_TAG_SKIP)
def _handle_insert_block(self, element: ET.Element) -> ET.Element:
"""Handle block insertion: <xacro:insert_block name="..."/>.
Args:
element: The XACRO insert_block XML element.
Returns:
The resolved XML block if found, otherwise a 'skip' element.
"""
name = str(self._substitute(element.get("name") or ""))
if name and name in self.properties:
# Cycle detection for blocks
if name in self._block_stack:
raise RobotXacroRecursionError(name)
self._block_stack.add(name)
try:
block = self.properties[name]
if isinstance(block, (ET.Element, list)):
# It's an XML block
# CRITICAL: We must deepcopy the block before insertion!
container = ET.Element(_TAG_CONTAINER)
def _process_block_item(item: ET.Element) -> None:
res = self.resolve_element(copy.deepcopy(item))
self._append_resolved(container, res)
if isinstance(block, ET.Element):
# Single element
_process_block_item(block)
else:
# List of elements
for b_elem in block:
_process_block_item(b_elem)
return container
finally:
self._block_stack.remove(name)
return ET.Element(_TAG_SKIP)
def _handle_macro_call(self, tag: str, element: ET.Element) -> ET.Element:
"""Handle macro calls: <xacro:my_macro_name ...>.
Args:
tag: The original XML tag name.
element: The XACRO macro call XML element.
Returns:
A container containing the fully expanded macro body.
Raises:
RobotParserError: If parent scope inheritance fails.
"""
macro_name = tag[len(_PREFIX_XACRO) :]
lookup_name = macro_name
if self._ns_stack and lookup_name not in self.macros:
lookup_name = f"{'.'.join(self._ns_stack)}.{macro_name}"
if lookup_name in self.macros:
params, macro_elem = self.macros[lookup_name]
local_props: dict[str, Any] = {}
# Parse parameters
block_params = [p[1:] for p in params if p.startswith("*")]
regular_params = [p for p in params if not p.startswith("*")]
# Map children to block parameters
resolved_block_content: list[ET.Element] = []
for child in element:
res = self.resolve_element(copy.deepcopy(child))
if isinstance(res, ET.Element) and res.tag == _TAG_CONTAINER:
resolved_block_content.extend(list(res))
elif res.tag != _TAG_SKIP:
resolved_block_content.append(res)
for bp in block_params:
local_props[bp] = resolved_block_content
# Map attributes to regular parameters
for p in regular_params:
bits = p.split(":=")
p_name = bits[0]
default_str = bits[1] if len(bits) > 1 else None
# Handle ^ parent-scope inheritance
if default_str is not None and default_str.startswith("^"):
fallback_str = default_str[2:] if default_str.startswith("^|") else None
if p_name in self.properties:
default = self.properties[p_name]
elif fallback_str is not None:
# Fallback value should be substituted too
default = self._try_parse_typed_value(self._substitute(fallback_str))
else:
raise RobotXacroExpressionError(
p_name, "Outer-scope property not found for '^' inheritance"
)
elif default_str is not None:
default = self._try_parse_typed_value(self._substitute(default_str))
else:
default = ""
raw_val = element.get(p_name)
val = self._substitute(raw_val) if raw_val is not None else default
local_props[p_name] = self._try_parse_typed_value(val)
# Expand macro body
parent_props = copy.deepcopy(self.properties)
self.properties.update(local_props)
# Push namespace from macro name to support local property resolution
ns_parts = macro_name.split(".")
ns_to_push = ns_parts[:-1]
self._ns_stack.extend(ns_to_push)
try:
container = ET.Element(_TAG_CONTAINER)
self._resolve_children(container, macro_elem)
finally:
# Pop pushed namespace parts
for _ in ns_to_push:
self._ns_stack.pop()
self.properties = parent_props
return container
# Unknown xacro tag - report warning and skip it
logger.warning(
f"XACRO: Unknown macro or tag: '{macro_name}'. Did you forget to include the corresponding file?"
)
return ET.Element(_TAG_SKIP)
def _handle_regular_element(self, element: ET.Element) -> ET.Element:
"""Handle substitutions in text and attributes for regular elements.
Args:
element: The XML element to process.
Returns:
A new XML element with all xacro expressions resolved.
"""
new_attrib = {}
for key, val in element.attrib.items():
# Attributes in XML must be strings
new_attrib[key] = str(self._substitute(val))
new_element = ET.Element(element.tag, attrib=new_attrib)
new_element.text = str(self._substitute(element.text or ""))
new_element.tail = str(self._substitute(element.tail or ""))
# Recursively process children
self._resolve_children(new_element, element)
return new_element
def _eval_condition(self, condition: str) -> bool:
"""Evaluate a XACRO condition string."""
res = self._substitute(condition)
if isinstance(res, bool):
return res
condition_str = str(res).lower().strip()
# Safe eval for boolean logic
if condition_str in ("true", "1"):
return True
elif condition_str in ("false", "0"):
return False
else:
try:
# Standard expression evaluation
if _DUNDER_PATTERN.search(condition_str):
raise RobotXacroExpressionError(condition_str, "Forbidden dunder attributes")
ctx = {**self.eval_context, **self.properties, **self.args}
return bool(eval(condition_str, ctx, {}))
except Exception as e:
if isinstance(e, RobotParserError):
raise
return condition_str not in ("", "0", "false")
def _try_parse_typed_value(self, value: Any) -> Any:
"""Attempt to parse a value into a more specific type (int, float, bool, list, dict)."""
if not isinstance(value, str):
return value
# yaml.safe_load("") returns None, so empty strings must be returned as-is
# before YAML parsing to avoid corrupting args with empty defaults.
if value == "":
return value
# Try YAML (most robust and standard compliant)
if yaml is not None:
try:
# safe_load handles ints, floats, bools (true/false), nulls, lists, dicts
parsed = yaml.safe_load(value)
# If it's a primitive type or collection, use it.
if isinstance(parsed, (int, float, bool, list, dict)) or parsed is None:
return AttrDict._wrap(parsed)
except Exception:
pass
# Fallback manual parsing (in case yaml fails or returns string for number)
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
return value
def _substitute(self, text: str) -> Any:
"""Handle ${prop}, $(arg name), and $(find pkg) substitutions with math.
Args:
text: The input string containing xacro expressions.
Returns:
The resolved value, which may be a primitive type if the input was
a single ${} block.
Raises:
RobotXacroExpressionError: If an undefined argument or environment variable is required.
"""
if not text:
return ""
# 1. Handle escaping ($$ -> literal $)
sentinel = "\x00LFDOLLAR\x00"
text = text.replace("$$", sentinel)
# 2. Resolve $(arg ...)
text = self._substitute_args(text)
# 3. Resolve $(env ...) and $(optenv ...)
text = self._substitute_env(text)
# 4. Resolve $(find ...)
text = self._substitute_find(text)
# 5. Resolve $(eval ...) and ${...}
result = self._substitute_math(text, sentinel)
# 6. Restore escaped dollar signs if result is a string
if isinstance(result, str):
result = result.replace(sentinel, "$")
return result
def _substitute_args(self, text: str) -> str:
"""Resolve $(arg name) expressions."""
def _resolve_arg(m: re.Match[str]) -> str:
name = m.group(1).strip()
if name not in self.args:
raise RobotXacroExpressionError(name, "Undefined substitution argument")
return str(self.args[name])
return re.sub(r"\$\(arg (.*?)\)", _resolve_arg, text)
def _substitute_env(self, text: str) -> str:
"""Resolve $(env VAR) and $(optenv VAR default) expressions."""
def _resolve_env(m: re.Match[str]) -> str:
parts = m.group(1).split(None, 1)
var = parts[0]
value = os.environ.get(var)
if value is None:
raise RobotXacroExpressionError(var, "Required environment variable not set")
return value
def _resolve_optenv(m: re.Match[str]) -> str:
parts = m.group(1).split(None, 1)
var = parts[0]
default = parts[1] if len(parts) > 1 else ""
return os.environ.get(var, default)
text = re.sub(r"\$\(env (.*?)\)", _resolve_env, text)
return re.sub(r"\$\(optenv (.*?)\)", _resolve_optenv, text)
def _substitute_find(self, text: str) -> str:
"""Resolve $(find pkg) expressions into package:// URIs."""
# Strip file:// prefix from ROS package finds to prevent double-prefix.
text = re.sub(r"file://\$\(find (.*?)\)", lambda m: f"package://{m.group(1)}", text)
return re.sub(r"\$\(find (.*?)\)", lambda m: f"package://{m.group(1)}", text)
def _substitute_math(self, text: str, sentinel: str) -> Any:
"""Resolve ${expression} and $(eval expression) with math engine."""
# Handle evaluation: $(eval expression) - Standard ROS XACRO feature
# We treat it as an alias for ${...} since our evaluation engine is Python-based.
text = re.sub(r"\$\(eval (.*?)\)", r"${\1}", text)
# Handle properties and math: ${expression}
# If the entire string is a single ${...} block, return the object directly.
# This keeps dicts/lists as real objects for subsequent evaluations.
stripped = text.strip()
is_pure_expression = (
stripped.startswith("${")
and stripped.endswith("}")
and stripped.count("${") == 1
and sentinel not in stripped
)
if is_pure_expression:
expr = stripped[2:-1]
return self._evaluate(expr)
def replace_expr(match: re.Match[str]) -> str:
expr = match.group(1)
res = self._evaluate(expr)
return str(res)
# If we have mixed text and expressions, we must stringify everything.
if "${" in text:
text = re.sub(r"\${(.*?)}", replace_expr, text)
return text
def _evaluate(self, expr: str) -> Any:
"""Evaluate a single XACRO expression with hierarchical namespace support.
Args:
expr: The python-like expression to evaluate.
Returns:
The result of the evaluation.
Raises:
RobotXacroExpressionError: If the expression is invalid or contains forbidden dunder attributes.
"""
if _DUNDER_PATTERN.search(expr):
raise RobotXacroExpressionError(expr, "Forbidden dunder attributes")
try:
# Build nested context for hierarchical namespaces (e.g. arm.mass)
ctx = self.eval_context.copy()
ctx.update(self.args)
for name, val in self.properties.items():
if "." in name:
parts = name.split(".")
curr = ctx
for part in parts[:-1]:
if part not in curr or not isinstance(curr[part], SimpleNamespace):
curr[part] = SimpleNamespace()
curr = curr[part].__dict__
curr[parts[-1]] = val
else:
ctx[name] = val
# Support local lookups if inside a namespace
if self._ns_stack:
current_ns = ".".join(self._ns_stack)
prefix = f"{current_ns}."
for name, val in self.properties.items():
if name.startswith(prefix):
short_name = name[len(prefix) :]
# Inject if it's a direct child of the current namespace
if "." not in short_name:
ctx[short_name] = val
return eval(expr, ctx, {})
except Exception as e:
# CRITICAL: Do not silent-fail! If math fails (e.g. missing variable),
# we must tell the user immediately rather than producing a corrupt output.
raise RobotXacroExpressionError(expr, str(e)) from e
def _handle_arg_eval(self, name: str) -> Any:
"""Access XACRO arguments in evaluation context.
Args:
name: Name of the argument to retrieve.
Returns:
The argument value if found, otherwise an empty string.
"""
return self.args.get(name, "")
def _handle_load_yaml(self, filename: str) -> Any:
"""Helper to load YAML file in XACRO context.
Args:
filename: Path to the YAML file.
Returns:
The parsed YAML data (dict or list).
"""
if yaml is None:
raise RobotXacroError("XACRO: PyYAML is not installed. load_yaml() failed.") # noqa: TRY003
path = self._find_file(filename)
if not path:
raise RobotXacroError(f"XACRO: Could not find YAML file {filename}") # noqa: TRY003
try:
with open(path) as f:
data = yaml.safe_load(f)
return AttrDict._wrap(data)
except Exception as e:
raise RobotXacroError(f"Failed to load YAML {filename}: {e}") from e # noqa: TRY003
def _handle_load_json(self, filename: str) -> Any:
"""Helper to load JSON file in XACRO context.
Args:
filename: Path to the JSON file.
Returns:
The parsed JSON data (dict or list).
"""
path = self._find_file(filename)
if not path:
raise RobotXacroError(f"XACRO: Could not find JSON file {filename}") # noqa: TRY003
try:
with open(path) as f:
data = json.load(f)
return AttrDict._wrap(data)
except Exception as e:
raise RobotXacroError(f"Failed to load JSON {filename}: {e}") from e # noqa: TRY003
def _finalize_xml(self, root: ET.Element) -> str:
"""Strip XACRO artifacts and serialize to XML string.
Args:
root: The root element of the resolved XML.
Returns:
The fully serialized XML string, formatted for readability.
"""
def cleanup(elem: ET.Element) -> None:
# 1. Strip attributes: remove all xacro-specific and internal metadata
for attr in list(elem.attrib.keys()):
attr_ns = get_xml_namespace(attr)
attr_name = strip_xml_namespace(attr)
# Remove if it's a xacro attribute (by URI or prefix or if it's a known internal attr)
if attr_ns in XACRO_URIS or attr_name.startswith(_PREFIX_XACRO):
del elem.attrib[attr]
# 2. Strip namespace from the tag itself to produce clean output
if isinstance(elem.tag, str) and elem.tag.startswith("{"):
elem.tag = strip_xml_namespace(elem.tag)
# 3. Process children: filter out xacro tags and recurse
for child in list(elem):
tag = child.tag
if not isinstance(tag, str):
cleanup(child)
continue
ns = get_xml_namespace(tag)
clean_tag = strip_xml_namespace(tag)
# Identify XACRO elements for removal
is_xacro = (
clean_tag == _TAG_SKIP
or ns in XACRO_URIS
or clean_tag.startswith(_PREFIX_XACRO)
)
if is_xacro:
elem.remove(child)
else:
cleanup(child)
cleanup(root)
return serialize_xml(root)
def _find_file(self, filename: str) -> Path | None:
"""Find file in search paths, supporting both relative paths and package:// URIs.
Args:
filename: The filename or URI to search for.
Returns:
The absolute path to the file if found, otherwise None.
"""
# Handle package:// URIs
if filename.startswith("package:"):
return resolve_package_path(filename, self.start_dir or Path.cwd())
# Handle absolute paths
path = Path(filename)
if path.is_absolute() and path.exists():
return path
# Handle relative paths in search_paths
for search_path in self.search_paths:
candidate = search_path / filename
if candidate.exists():
return candidate
return None
[docs]
class XACROParser:
"""Independent XACRO resolution utility.
This class acts as a pre-processor for URDF/SRDF files that use
the XACRO templating system. It resolves templates into plain XML strings.
"""
[docs]
def resolve(self, filepath: Path, **kwargs: Any) -> str:
"""Resolve a XACRO file into a plain XML string.
Args:
filepath: Path to the XACRO file to resolve.
**kwargs: Custom arguments and properties for resolution.
- search_paths: List of paths to search for includes.
- start_dir: Base directory for relative includes.
- All other keys are passed as $(arg) values.
Returns:
The fully resolved XML as a string.
Raises:
RobotXacroError: If resolution fails.
RobotXacroRecursionError: If circular includes are detected.
"""
resolver = XacroResolver(
search_paths=kwargs.get("search_paths"),
start_dir=kwargs.get("start_dir", filepath.parent),
)
for k, v in kwargs.items():
if k not in ["search_paths", "start_dir"] and v is not None:
resolver.args[k] = resolver._try_parse_typed_value(str(v))
return resolver.resolve_file(filepath)