diff --git a/AGENTS.md b/AGENTS.md index bb2914b3..e3321431 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -44,13 +44,22 @@ The engine follows the SCXML run-to-completion (RTC) model with two processing l ### Error handling (`error_on_execution`) - `StateChart` has `error_on_execution=True` by default; `StateMachine` has `False`. -- Errors are caught at the **block level** (per onentry/onexit block), not per microstep. -- This means `after` callbacks still run even when an action raises — making `after_()` - a natural **finalize** hook (runs on both success and failure paths). +- Errors are caught at the **block level** (per onentry/onexit/transition `on` block), not per + microstep. This means `after` callbacks still run even when an action raises — making + `after_()` a natural **finalize** hook (runs on both success and failure paths). - `error.execution` is dispatched as an internal event; define transitions for it to handle errors within the statechart. - Error during `error.execution` handling → ignored to prevent infinite loops. +#### `on_error` asymmetry: transition `on` vs onentry/onexit + +Transition `on` content uses `on_error` **only for non-`error.execution` events**. During +`error.execution` processing, `on_error` is disabled for transition `on` content — errors +propagate to `microstep()` where `_send_error_execution` ignores them. This prevents infinite +loops in self-transition error handlers (e.g., `error_execution = s1.to(s1, on="handler")` +where `handler` raises). `onentry`/`onexit` blocks always use `on_error` regardless of the +current event. + ### Eventless transitions - Bare transition statements (not assigned to a variable) are **eventless** — they fire @@ -68,6 +77,21 @@ The engine follows the SCXML run-to-completion (RTC) model with two processing l - `on_error_execution()` works via naming convention but **only** when a transition for `error.execution` is declared — it is NOT a generic callback. +### Invoke (``) + +- `invoke.py` — `InvokeManager` on the engine manages the lifecycle: `mark_for_invoke()`, + `cancel_for_state()`, `spawn_pending_sync/async()`, `send_to_child()`. +- `_cleanup_terminated()` only removes invocations that are both terminated **and** cancelled. + A terminated-but-not-cancelled invocation means the handler's `run()` returned but the owning + state is still active — it must stay in `_active` so `send_to_child()` can still route events. +- **Child machine constructor blocks** in the processing loop. Use a listener pattern (e.g., + `_ChildRefSetter`) to capture the child reference during the first `on_enter_state`, before + the loop spins. +- `#_` send target: routed via `_send_to_invoke()` in `io/scxml/actions.py` → + `InvokeManager.send_to_child()` → handler's `on_event()`. +- **Tests with blocking threads**: use `threading.Event.wait(timeout=)` instead of + `time.sleep()` for interruptible waits — avoids thread leak errors in teardown. + ## Environment setup ```bash @@ -77,11 +101,11 @@ pre-commit install ## Running tests -Always use `uv` to run commands: +Always use `uv` to run commands. Also, use a timeout to avoid being stuck in the case of a leaked thread or infinite loop: ```bash # Run all tests (parallel) -uv run pytest -n auto +timeout 120 uv run pytest -n 4 # Run a specific test file uv run pytest tests/test_signature.py @@ -98,10 +122,24 @@ Don't specify the directory `tests/`, because this will exclude doctests from bo (`--doctest-glob=*.md`) (enabled by default): ```bash -uv run pytest -n auto +timeout 120 uv run pytest -n 4 +``` + +Testes normally run under 60s (~40s on average), so take a closer look if they take longer, it can be a regression. + +Coverage is enabled by default (`--cov` is in `pyproject.toml`'s `addopts`). To generate a +coverage report to a file, pass `--cov-report` **in addition to** `--cov`: + +```bash +# JSON report (machine-readable, includes missing_lines per file) +timeout 120 uv run pytest -n auto --cov=statemachine --cov-report=json:cov.json + +# Terminal report with missing lines +timeout 120 uv run pytest -n auto --cov=statemachine --cov-report=term-missing ``` -Coverage is enabled by default. +Note: `--cov=statemachine` is required to activate coverage collection; `--cov-report` +alone only changes the output format. ### Testing both sync and async engines diff --git a/docs/invoke.md b/docs/invoke.md index 62d01537..a3b162cf 100644 --- a/docs/invoke.md +++ b/docs/invoke.md @@ -456,24 +456,28 @@ is cancelled. Pass a `StateChart` subclass to spawn a child machine: -```python -from statemachine import State, StateChart +```py +>>> class ChildMachine(StateChart): +... start = State(initial=True) +... end = State(final=True) +... go = start.to(end) +... +... def on_enter_start(self, **kwargs): +... self.send("go") + +>>> class ParentMachine(StateChart): +... loading = State(initial=True, invoke=ChildMachine) +... ready = State(final=True) +... done_invoke_loading = loading.to(ready) -class ChildMachine(StateChart): - start = State(initial=True) - end = State(final=True) - go = start.to(end) +>>> sm = ParentMachine() +>>> time.sleep(0.2) - def on_enter_start(self, **kwargs): - self.send("go") +>>> "ready" in sm.configuration_values +True -class ParentMachine(StateChart): - loading = State(initial=True, invoke=ChildMachine) - ready = State(final=True) - done_invoke_loading = loading.to(ready) ``` The child machine is instantiated and run when the parent's `loading` state is entered. When the child terminates (reaches a final state), a `done.invoke` event is sent to the -parent, triggering the `done_invoke_loading` transition. See -`tests/test_invoke.py::TestInvokeStateChartChild` for a working example. +parent, triggering the `done_invoke_loading` transition. diff --git a/docs/processing_model.md b/docs/processing_model.md index f4698afe..9acd1f1a 100644 --- a/docs/processing_model.md +++ b/docs/processing_model.md @@ -111,8 +111,10 @@ and executes them atomically: If an error occurs during steps 1–4 and `error_on_execution` is enabled, the error is caught at the **block level** — meaning remaining actions in that block are skipped, but -the microstep continues and `after` callbacks still run (see -{ref}`cleanup / finalize pattern `). +the microstep continues and `after` callbacks still run. Each phase (exit, `on`, enter) +is an independent block, so an error in the transition `on` action does not prevent target +states from being entered. See {ref}`block-level error catching ` and the +{ref}`cleanup / finalize pattern `. ### Macrostep diff --git a/docs/releases/3.0.0.md b/docs/releases/3.0.0.md index 89973cd7..d315e402 100644 --- a/docs/releases/3.0.0.md +++ b/docs/releases/3.0.0.md @@ -83,6 +83,10 @@ machines can receive context at creation time: ``` +Invoke also supports child state machines (pass a `StateChart` subclass) and SCXML +`` with ``, autoforward, and `#_` / `#_parent` send targets +for parent-child communication. + See {ref}`invoke` for full documentation. ### Compound states @@ -336,6 +340,11 @@ True ``` +Errors are caught at the **block level**: each microstep phase (exit, transition `on`, +enter) is an independent block. An error in one block does not prevent subsequent blocks +from executing — in particular, `after` callbacks always run, making `after_()` a +natural finalize hook. See {ref}`block-level error catching `. + The error object is available as `error` in handler kwargs. See {ref}`error-execution` for full details. @@ -504,11 +513,8 @@ TODO. The following SCXML features are **not yet implemented** and are deferred to a future release: -- `` — invoking external services or sub-machines from within a state -- HTTP and other external communication targets -- `` — processing data returned from invoked services - -These features are tracked for v3.1+. +- HTTP and other external communication targets (only `#_internal`, `#_parent`, and + `#_` send targets are supported) ```{seealso} For a step-by-step migration guide with before/after examples, see diff --git a/docs/statecharts.md b/docs/statecharts.md index b95d5420..e254d345 100644 --- a/docs/statecharts.md +++ b/docs/statecharts.md @@ -213,12 +213,36 @@ If an error occurs while processing the `error.execution` event itself, the engi ignores the second error (logging a warning) to prevent infinite loops. The state machine remains in the configuration it was in before the failed error handler. +### Block-level error catching + +`StateChart` catches errors at the **block level**, not the microstep level. +Each phase of the microstep — `on_exit`, transition `on` content, `on_enter` — is an +independent block. An error in one block: + +- **Stops remaining actions in that block** (per SCXML spec, execution MUST NOT continue + within the same block after an error). +- **Does not affect other blocks** — subsequent phases of the microstep still execute. + In particular, `after` callbacks always run regardless of errors in earlier blocks. + +This means that even if a transition's `on` action raises an exception, the transition +completes: target states are entered and `after_()` callbacks still run. The error +is caught and queued as an `error.execution` internal event, which can be handled by a +separate transition. + +```{note} +During `error.execution` processing, errors in transition `on` content are **not** caught +at block level — they propagate to the microstep, where they are silently ignored. This +prevents infinite loops when an error handler's own action raises (e.g., a self-transition +`error_execution = s1.to(s1, on="handler")` where `handler` raises). Entry/exit blocks +always use block-level error catching regardless of the current event. +``` + ### Cleanup / finalize pattern A common need is to run cleanup code after a transition **regardless of success or failure** — for example, releasing a lock or closing a resource. -Because `StateChart` catches errors at the **block level** (not the microstep level), +Because `StateChart` catches errors at the **block level** (see above), `after_()` callbacks still run even when an action raises an exception. This makes `after_()` a natural **finalize** hook — no need to duplicate cleanup logic in an error handler. diff --git a/pyproject.toml b/pyproject.toml index 7e30186d..39b24f15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,6 +90,8 @@ python_files = ["tests.py", "test_*.py", "*_tests.py"] xfail_strict = true log_cli = true log_cli_level = "DEBUG" +log_cli_format = "%(relativeCreated)6.0fms %(threadName)-18s %(name)-35s %(message)s" +log_cli_date_format = "%H:%M:%S" asyncio_default_fixture_loop_scope = "module" [tool.coverage.run] diff --git a/statemachine/engines/async_.py b/statemachine/engines/async_.py index 7164d0ba..836dca5b 100644 --- a/statemachine/engines/async_.py +++ b/statemachine/engines/async_.py @@ -13,6 +13,7 @@ from ..exceptions import TransitionNotAllowed from ..orderedset import OrderedSet from ..state import State +from .base import _ERROR_EXECUTION from .base import BaseEngine if TYPE_CHECKING: @@ -178,6 +179,7 @@ async def _exit_states( # type: ignore[override] args, kwargs = await self._get_args_kwargs(info.transition, trigger_data) if info.state is not None: # pragma: no branch + logger.debug("%s Exiting state: %s", self._log_id, info.state) await self.sm._callbacks.async_call( info.state.exit.key, *args, on_error=on_error, **kwargs ) @@ -198,10 +200,24 @@ async def _enter_states( # noqa: C901 self._prepare_entry_states(enabled_transitions, states_to_exit, previous_configuration) ) + # For transition 'on' content, use on_error only for non-error.execution + # events. During error.execution processing, errors in transition content + # must propagate to microstep() where _send_error_execution's guard + # prevents infinite loops (per SCXML spec: errors during error event + # processing are ignored). + on_error_transition = on_error + if ( + on_error is not None + and trigger_data.event + and str(trigger_data.event) == _ERROR_EXECUTION + ): + on_error_transition = None + result = await self._execute_transition_content( enabled_transitions, trigger_data, lambda t: t.on.key, + on_error=on_error_transition, previous_configuration=previous_configuration, new_configuration=new_configuration, ) @@ -218,7 +234,7 @@ async def _enter_states( # noqa: C901 target=target, ) - logger.debug("Entering state: %s", target) + logger.debug("%s Entering state: %s", self._log_id, target) self._add_state_to_configuration(target) on_entry_result = await self.sm._callbacks.async_call( @@ -257,6 +273,14 @@ async def _enter_states( # noqa: C901 return result async def microstep(self, transitions: "List[Transition]", trigger_data: TriggerData): + self._microstep_count += 1 + logger.debug( + "%s macro:%d micro:%d transitions: %s", + self._log_id, + self._macrostep_count, + self._microstep_count, + transitions, + ) previous_configuration = self.sm.configuration try: result = await self._execute_transition_content( @@ -342,18 +366,23 @@ async def processing_loop( # noqa: C901 return None _ctx_token = _in_processing_loop.set(True) - logger.debug("Processing loop started: %s", self.sm.current_state_value) + logger.debug("%s Processing loop started: %s", self._log_id, self.sm.current_state_value) first_result = self._sentinel try: took_events = True - while took_events: + while took_events and self.running: self.clear_cache() took_events = False macrostep_done = False # Phase 1: eventless transitions and internal events while not macrostep_done: - logger.debug("Macrostep: eventless/internal queue") + self._microstep_count = 0 + logger.debug( + "%s Macrostep %d: eventless/internal queue", + self._log_id, + self._macrostep_count, + ) self.clear_cache() internal_event = TriggerData(self.sm, event=None) # null object for eventless @@ -365,7 +394,9 @@ async def processing_loop( # noqa: C901 internal_event = self.internal_queue.pop() enabled_transitions = await self.select_transitions(internal_event) if enabled_transitions: - logger.debug("Enabled transitions: %s", enabled_transitions) + logger.debug( + "%s Enabled transitions: %s", self._log_id, enabled_transitions + ) took_events = True await self._run_microstep(enabled_transitions, internal_event) @@ -380,7 +411,9 @@ async def processing_loop( # noqa: C901 await self._run_microstep(enabled_transitions, internal_event) # Phase 3: external events - logger.debug("Macrostep: external queue") + logger.debug( + "%s Macrostep %d: external queue", self._log_id, self._macrostep_count + ) while not self.external_queue.is_empty(): self.clear_cache() took_events = True @@ -393,7 +426,14 @@ async def processing_loop( # noqa: C901 # transitions can be processed while we wait. break - logger.debug("External event: %s", external_event.event) + self._macrostep_count += 1 + self._microstep_count = 0 + logger.debug( + "%s macrostep %d: event=%s", + self._log_id, + self._macrostep_count, + external_event.event, + ) # Handle lazy initial state activation. # Break out of phase 3 so the outer loop restarts from phase 1 @@ -406,10 +446,15 @@ async def processing_loop( # noqa: C901 ) break + # Finalize + autoforward for active invocations + self._invoke_manager.handle_external_event(external_event) + event_future = external_event.future try: enabled_transitions = await self.select_transitions(external_event) - logger.debug("Enabled transitions: %s", enabled_transitions) + logger.debug( + "%s Enabled transitions: %s", self._log_id, enabled_transitions + ) if enabled_transitions: result = await self.microstep( list(enabled_transitions), external_event @@ -448,9 +493,12 @@ async def processing_loop( # noqa: C901 _in_processing_loop.reset(_ctx_token) self._processing.release() + logger.debug("%s Processing loop ended", self._log_id) result = first_result if first_result is not self._sentinel else None # If the caller has a future, await it (already resolved by now). if caller_future is not None: + # Resolve the future if it wasn't processed (e.g. machine terminated). + self._resolve_future(caller_future, result) return await caller_future return result diff --git a/statemachine/engines/base.py b/statemachine/engines/base.py index f1c341d2..0186926b 100644 --- a/statemachine/engines/base.py +++ b/statemachine/engines/base.py @@ -96,6 +96,9 @@ def __init__(self, sm: "StateChart"): self._processing = Lock() self._cache: Dict = {} # Cache for _get_args_kwargs results self._invoke_manager = InvokeManager(self) + self._macrostep_count: int = 0 + self._microstep_count: int = 0 + self._log_id = f"[{type(sm).__name__}]" def empty(self): # pragma: no cover return self.external_queue.is_empty() @@ -122,7 +125,8 @@ def put(self, trigger_data: TriggerData, internal: bool = False, _delayed: bool if not _delayed: logger.debug( - "New event '%s' put on the '%s' queue", + "%s New event '%s' put on the '%s' queue", + self._log_id, trigger_data.event, "internal" if internal else "external", ) @@ -175,7 +179,12 @@ def _send_error_execution(self, error: Exception, trigger_data: TriggerData): If already processing an error.execution event, ignore to avoid infinite loops. """ - logger.debug("Error %s captured while executing event=%s", error, trigger_data.event) + logger.debug( + "%s Error %s captured while executing event=%s", + self._log_id, + error, + trigger_data.event, + ) if trigger_data.event and str(trigger_data.event) == _ERROR_EXECUTION: logger.warning("Error while processing error.execution, ignoring: %s", error) return @@ -371,6 +380,14 @@ def microstep(self, transitions: List[Transition], trigger_data: TriggerData): """Process a single set of transitions in a 'lock step'. This includes exiting states, executing transition content, and entering states. """ + self._microstep_count += 1 + logger.debug( + "%s macro:%d micro:%d transitions: %s", + self._log_id, + self._macrostep_count, + self._microstep_count, + transitions, + ) previous_configuration = self.sm.configuration try: result = self._execute_transition_content( @@ -451,7 +468,7 @@ def _prepare_exit_states( states_to_exit, key=lambda x: x.state and x.state.document_order or 0, reverse=True ) result = OrderedSet([info.state for info in ordered_states if info.state]) - logger.debug("States to exit: %s", result) + logger.debug("%s States to exit: %s", self._log_id, result) # Update history for info in ordered_states: @@ -463,7 +480,8 @@ def _prepare_exit_states( history_value = [s for s in self.sm.configuration if s.parent == state] logger.debug( - "Saving '%s.%s' history state: '%s'", + "%s Saving '%s.%s' history state: '%s'", + self._log_id, state, history, [s.id for s in history_value], @@ -493,6 +511,7 @@ def _exit_states( # Execute `onexit` handlers — same per-block error isolation as onentry. if info.state is not None: # pragma: no branch + logger.debug("%s Exiting state: %s", self._log_id, info.state) self.sm._callbacks.call(info.state.exit.key, *args, on_error=on_error, **kwargs) self._remove_state_from_configuration(info.state) @@ -549,7 +568,7 @@ def _prepare_entry_states( new_configuration = cast( OrderedSet[State], (previous_configuration - states_to_exit) | states_targets_to_enter ) - logger.debug("States to enter: %s", states_targets_to_enter) + logger.debug("%s States to enter: %s", self._log_id, states_targets_to_enter) return ordered_states, states_for_default_entry, default_history_content, new_configuration @@ -558,9 +577,17 @@ def _add_state_to_configuration(self, target: State): if not self.sm.atomic_configuration_update: self.sm.configuration |= {target} + def __del__(self): + try: + self._invoke_manager.cancel_all() + except Exception: + pass + def _handle_final_state(self, target: State, on_entry_result: list): """Handle final state entry: queue done events. No direct callback dispatch.""" + logger.debug("%s Reached final state: %s", self._log_id, target) if target.parent is None: + self._invoke_manager.cancel_all() self.running = False else: parent = target.parent @@ -601,10 +628,24 @@ def _enter_states( # noqa: C901 self._prepare_entry_states(enabled_transitions, states_to_exit, previous_configuration) ) + # For transition 'on' content, use on_error only for non-error.execution + # events. During error.execution processing, errors in transition content + # must propagate to microstep() where _send_error_execution's guard + # prevents infinite loops (per SCXML spec: errors during error event + # processing are ignored). + on_error_transition = on_error + if ( + on_error is not None + and trigger_data.event + and str(trigger_data.event) == _ERROR_EXECUTION + ): + on_error_transition = None + result = self._execute_transition_content( enabled_transitions, trigger_data, lambda t: t.on.key, + on_error=on_error_transition, previous_configuration=previous_configuration, new_configuration=new_configuration, ) @@ -621,7 +662,7 @@ def _enter_states( # noqa: C901 target=target, ) - logger.debug("Entering state: %s", target) + logger.debug("%s Entering state: %s", self._log_id, target) self._add_state_to_configuration(target) # Execute `onentry` handlers — each handler is a separate block per @@ -722,7 +763,8 @@ def add_descendant_states_to_enter( # noqa: C901 default_history_content[parent_id] = [info] if state.id in self.sm.history_values: logger.debug( - "History state '%s.%s' %s restoring: '%s'", + "%s History state '%s.%s' %s restoring: '%s'", + self._log_id, state.parent, state, "deep" if state.deep else "shallow", @@ -751,7 +793,8 @@ def add_descendant_states_to_enter( # noqa: C901 else: # Handle default history content logger.debug( - "History state '%s.%s' default content: %s", + "%s History state '%s.%s' default content: %s", + self._log_id, state.parent, state, [t.target.id for t in state.transitions if t.target], diff --git a/statemachine/engines/sync.py b/statemachine/engines/sync.py index fa5b9c86..d2f97342 100644 --- a/statemachine/engines/sync.py +++ b/statemachine/engines/sync.py @@ -77,11 +77,11 @@ def processing_loop(self, caller_future=None): # noqa: C901 # We will collect the first result as the processing result to keep backwards compatibility # so we need to use a sentinel object instead of `None` because the first result may # be also `None`, and on this case the `first_result` may be overridden by another result. - logger.debug("Processing loop started: %s", self.sm.current_state_value) + logger.debug("%s Processing loop started: %s", self._log_id, self.sm.current_state_value) first_result = self._sentinel try: took_events = True - while took_events: + while took_events and self.running: self.clear_cache() took_events = False # Execute the triggers in the queue in FIFO order until the queue is empty @@ -91,7 +91,12 @@ def processing_loop(self, caller_future=None): # noqa: C901 # handles eventless transitions and internal events while not macrostep_done: - logger.debug("Macrostep: eventless/internal queue") + self._microstep_count = 0 + logger.debug( + "%s Macrostep %d: eventless/internal queue", + self._log_id, + self._macrostep_count, + ) self.clear_cache() internal_event = TriggerData( @@ -105,7 +110,9 @@ def processing_loop(self, caller_future=None): # noqa: C901 internal_event = self.internal_queue.pop() enabled_transitions = self.select_transitions(internal_event) if enabled_transitions: - logger.debug("Enabled transitions: %s", enabled_transitions) + logger.debug( + "%s Enabled transitions: %s", self._log_id, enabled_transitions + ) took_events = True self._run_microstep(enabled_transitions, internal_event) @@ -122,7 +129,9 @@ def processing_loop(self, caller_future=None): # noqa: C901 self._run_microstep(enabled_transitions, internal_event) # Process external events - logger.debug("Macrostep: external queue") + logger.debug( + "%s Macrostep %d: external queue", self._log_id, self._macrostep_count + ) while not self.external_queue.is_empty(): self.clear_cache() took_events = True @@ -135,22 +144,20 @@ def processing_loop(self, caller_future=None): # noqa: C901 # transitions can be processed while we wait. break - logger.debug("External event: %s", external_event.event) - # # TODO: Handle cancel event - # if self.is_cancel_event(external_event): - # self.running = False - # return + self._macrostep_count += 1 + self._microstep_count = 0 + logger.debug( + "%s macrostep %d: event=%s", + self._log_id, + self._macrostep_count, + external_event.event, + ) - # TODO: Invoke states - # for state in self.configuration: - # for inv in state.invoke: - # if inv.invokeid == external_event.invokeid: - # self.apply_finalize(inv, external_event) - # if inv.autoforward: - # self.send(inv.id, external_event) + # Finalize + autoforward for active invocations + self._invoke_manager.handle_external_event(external_event) enabled_transitions = self.select_transitions(external_event) - logger.debug("Enabled transitions: %s", enabled_transitions) + logger.debug("%s Enabled transitions: %s", self._log_id, enabled_transitions) if enabled_transitions: try: result = self.microstep(list(enabled_transitions), external_event) @@ -169,6 +176,7 @@ def processing_loop(self, caller_future=None): # noqa: C901 finally: self._processing.release() + logger.debug("%s Processing loop ended", self._log_id) return first_result if first_result is not self._sentinel else None def enabled_events(self, *args, **kwargs): diff --git a/statemachine/invoke.py b/statemachine/invoke.py index 3ac34fb8..68eae851 100644 --- a/statemachine/invoke.py +++ b/statemachine/invoke.py @@ -47,6 +47,18 @@ class IInvoke(Protocol): def run(self, ctx: "InvokeContext") -> Any: ... # pragma: no branch +def _stop_child_machine(child: "StateChart | None") -> None: + """Stop a child state machine and cancel all its invocations.""" + if child is None: + return + logger.debug("invoke: stopping child machine %s", type(child).__name__) + try: + child._engine.running = False + child._engine._invoke_manager.cancel_all() + except Exception: + logger.debug("Error stopping child machine", exc_info=True) + + class _InvokeCallableWrapper: """Wraps an IInvoke class/instance or StateChart class for the callback system. @@ -185,8 +197,7 @@ def run(self, _ctx: "InvokeContext") -> Any: return None def on_cancel(self): - # Child machine cleanup — currently a no-op since sync machines - # run to completion in the constructor. + _stop_child_machine(self._child) self._child = None @@ -224,13 +235,17 @@ def run(self, ctx: "InvokeContext") -> "List[Any]": self._cancel_remaining() raise finally: + # Normal exit: all futures completed, safe to shutdown without waiting. self._executor.shutdown(wait=False) return results def on_cancel(self): + # Called from the engine thread — must not block. Cancel pending futures + # and signal shutdown; the invoke thread's run() will detect ctx.cancelled + # and exit, then _cancel()'s thread.join() waits for the actual cleanup. self._cancel_remaining() if self._executor is not None: - self._executor.shutdown(wait=False) + self._executor.shutdown(wait=False, cancel_futures=True) def _cancel_remaining(self): for future in self._futures: @@ -287,20 +302,44 @@ def mark_for_invoke(self, state: "State", event_kwargs: "dict | None" = None): def cancel_for_state(self, state: "State"): """Called by ``_exit_states()`` before exiting a state.""" + logger.debug("invoke cancel_for_state: %s", state.id) for inv_id, inv in list(self._active.items()): - if inv.state_id == state.id and not inv.terminated: + if inv.state_id == state.id and not inv.ctx.cancelled.is_set(): self._cancel(inv_id) - self._pending = [(s, kw) for s, kw in self._pending if s is not state] + self._pending = [(s, kw) for s, kw in self._pending if s.id != state.id] + # Don't cleanup here — terminated invocations must stay in _active + # so that handle_external_event can still run finalize blocks for + # done.invoke events that are already queued. def cancel_all(self): """Cancel all active invocations.""" + logger.debug("invoke cancel_all: %d active", len(self._active)) for inv_id in list(self._active.keys()): self._cancel(inv_id) + self._cleanup_terminated() + + def _cleanup_terminated(self): + """Remove invocations whose threads/tasks have actually finished. + + Only removes invocations that are both terminated AND cancelled. + A terminated-but-not-cancelled invocation means the handler's ``run()`` + has returned but the owning state is still active — the invocation must + stay in ``_active`` so that ``send_to_child()`` can still forward events + to it (e.g. ````). + """ + self._active = { + inv_id: inv + for inv_id, inv in self._active.items() + if not inv.terminated or not inv.ctx.cancelled.is_set() + } # --- Sync spawning --- def spawn_pending_sync(self): """Spawn invoke handlers for all states marked for invocation (sync engine).""" + # Opportunistically clean up finished invocations before spawning new ones. + self._cleanup_terminated() + pending = sorted(self._pending, key=lambda p: p[0].document_order) self._pending.clear() for state, event_kwargs in pending: @@ -314,14 +353,16 @@ def spawn_pending_sync(self): def _spawn_one_sync(self, callback: "CallbackWrapper", **kwargs): state: "State" = kwargs["state"] event_kwargs: dict = kwargs.get("event_kwargs", {}) - ctx = self._make_context(state, event_kwargs) - invocation = Invocation(invokeid=ctx.invokeid, state_id=state.id, ctx=ctx) # Use meta.func to find the original (unwrapped) handler; the callback # system wraps everything in a signature_adapter closure. handler = self._resolve_handler(callback.meta.func) + ctx = self._make_context(state, event_kwargs, handler=handler) + invocation = Invocation(invokeid=ctx.invokeid, state_id=state.id, ctx=ctx) + invocation._handler = handler self._active[ctx.invokeid] = invocation + logger.debug("invoke spawn sync: %s on state %s", ctx.invokeid, state.id) thread = threading.Thread( target=self._run_sync_handler, @@ -347,18 +388,29 @@ def _run_sync_handler( self.sm.send( f"done.invoke.{ctx.invokeid}", data=result, - internal=True, ) except Exception as e: if not ctx.cancelled.is_set(): - self.sm.send("error.execution", error=e, internal=True) + # Intentionally using the external queue (no internal=True): + # This handler runs in a background thread, outside the processing + # loop. Using the internal queue would either contaminate an + # unrelated macrostep in progress, or stall if no macrostep is + # active (the internal queue is only drained within a macrostep). + # This matches done.invoke, which also uses the external queue. + self.sm.send("error.execution", error=e) finally: invocation.terminated = True + logger.debug( + "invoke %s: completed (cancelled=%s)", ctx.invokeid, ctx.cancelled.is_set() + ) # --- Async spawning --- async def spawn_pending_async(self): """Spawn invoke handlers for all states marked for invocation (async engine).""" + # Opportunistically clean up finished invocations before spawning new ones. + self._cleanup_terminated() + pending = sorted(self._pending, key=lambda p: p[0].document_order) self._pending.clear() for state, event_kwargs in pending: @@ -372,12 +424,14 @@ async def spawn_pending_async(self): def _spawn_one_async(self, callback: "CallbackWrapper", **kwargs): state: "State" = kwargs["state"] event_kwargs: dict = kwargs.get("event_kwargs", {}) - ctx = self._make_context(state, event_kwargs) - invocation = Invocation(invokeid=ctx.invokeid, state_id=state.id, ctx=ctx) handler = self._resolve_handler(callback.meta.func) + ctx = self._make_context(state, event_kwargs, handler=handler) + invocation = Invocation(invokeid=ctx.invokeid, state_id=state.id, ctx=ctx) + invocation._handler = handler self._active[ctx.invokeid] = invocation + logger.debug("invoke spawn async: %s on state %s", ctx.invokeid, state.id) loop = asyncio.get_running_loop() task = loop.create_task(self._run_async_handler(callback, handler, ctx, invocation)) @@ -404,7 +458,6 @@ async def _run_async_handler( self.sm.send( f"done.invoke.{ctx.invokeid}", data=result, - internal=True, ) except asyncio.CancelledError: # Intentionally swallowed: the owning state was exited, so this @@ -412,30 +465,114 @@ async def _run_async_handler( return except Exception as e: if not ctx.cancelled.is_set(): - self.sm.send("error.execution", error=e, internal=True) + # External queue — see comment in _run_sync_handler. + self.sm.send("error.execution", error=e) finally: invocation.terminated = True + logger.debug( + "invoke %s: completed (cancelled=%s)", ctx.invokeid, ctx.cancelled.is_set() + ) # --- Cancel --- def _cancel(self, invokeid: str): invocation = self._active.get(invokeid) - if not invocation or invocation.terminated: + if not invocation or invocation.ctx.cancelled.is_set(): return + + logger.debug("invoke cancel: %s", invokeid) + # 1) Signal cancellation so the handler can check and stop early. invocation.ctx.cancelled.set() + + # 2) Notify the handler (may stop child SMs, cancel futures, etc.). handler = invocation._handler if handler is not None and hasattr(handler, "on_cancel"): try: handler.on_cancel() except Exception: logger.debug("Error in on_cancel for %s", invokeid, exc_info=True) + + # 3) Cancel the async task (raises CancelledError at next await). if invocation.task is not None and not invocation.task.done(): invocation.task.cancel() + # 4) Wait for the sync thread to actually finish (skip if we ARE + # that thread — e.g. done.invoke processed from within the handler). + if ( + invocation.thread is not None + and invocation.thread is not threading.current_thread() + and invocation.thread.is_alive() + ): + invocation.thread.join(timeout=2.0) + + def send_to_child(self, invokeid: str, event: str, **data) -> bool: + """Send an event to an invoked child session by its invokeid. + + Returns True if the event was forwarded, False if the invocation was + not found or doesn't support event forwarding. + """ + invocation = self._active.get(invokeid) + if invocation is None: + return False + handler = invocation._handler + if handler is not None and hasattr(handler, "on_event"): + handler.on_event(event, **data) + return True + return False + # --- Helpers --- - def _make_context(self, state: "State", event_kwargs: "dict | None" = None) -> InvokeContext: - invokeid = f"{state.id}.{uuid.uuid4().hex[:8]}" + def handle_external_event(self, trigger_data) -> None: + """Run finalize blocks and autoforward for active invocations. + + Called by the engine before processing each external event. + For each active invocation whose handler has ``on_finalize`` or + ``on_event`` (autoforward), delegate accordingly. + """ + event_name = str(trigger_data.event) if trigger_data.event else None + if event_name is None: + return + + # Tag done.invoke events with the invokeid + if event_name.startswith("done.invoke."): + invokeid = event_name[len("done.invoke.") :] + trigger_data.kwargs.setdefault("_invokeid", invokeid) + + for inv in list(self._active.values()): + handler = inv._handler + if handler is None: + continue + + # Check if event originates from this invocation + is_from_child = trigger_data.kwargs.get( + "_invokeid" + ) == inv.invokeid or event_name.startswith(f"done.invoke.{inv.invokeid}") + + # Finalize: run the finalize block if the event came from this invocation. + # Note: finalize must run even after the invocation terminates, because + # child events may still be queued when the handler thread completes. + if is_from_child and hasattr(handler, "on_finalize"): + handler.on_finalize(trigger_data) + + # Autoforward: forward parent events to child (not events from child itself). + # Only forward if the invocation is still running. + if ( + not inv.terminated + and not inv.ctx.cancelled.is_set() + and not is_from_child + and hasattr(handler, "autoforward") + and handler.autoforward + and hasattr(handler, "on_event") + ): + logger.debug("invoke autoforward: %s -> %s", event_name, inv.invokeid) + handler.on_event(event_name, **trigger_data.kwargs) + + def _make_context( + self, state: "State", event_kwargs: "dict | None" = None, handler: Any = None + ) -> InvokeContext: + # Use static invoke_id from handler if available (SCXML id= attribute) + static_id = getattr(handler, "invoke_id", None) if handler else None + invokeid = static_id or f"{state.id}.{uuid.uuid4().hex[:8]}" return InvokeContext( invokeid=invokeid, state_id=state.id, @@ -453,6 +590,11 @@ def _resolve_handler(underlying: Any) -> "Any | None": inner = underlying._invoke_handler if isinstance(inner, type) and issubclass(inner, StateChart): return StateChartInvoker(inner) + # Return the inner handler directly if it's an IInvoke instance + # (e.g., SCXMLInvoker) so duck-typed attributes like invoke_id are accessible. + # Exclude classes — @runtime_checkable matches classes that define run(). + if not isinstance(inner, type) and isinstance(inner, IInvoke): + return inner return underlying if isinstance(underlying, IInvoke): return underlying diff --git a/statemachine/io/scxml/actions.py b/statemachine/io/scxml/actions.py index 23f57868..5b7cb5ca 100644 --- a/statemachine/io/scxml/actions.py +++ b/statemachine/io/scxml/actions.py @@ -109,11 +109,20 @@ class EventDataWrapper: Otherwise it MUST leave it blank. """ - def __init__(self, event_data): + def __init__(self, event_data=None, *, trigger_data=None): self.event_data = event_data - self.sendid = event_data.trigger_data.send_id - if event_data.trigger_data.event is None or event_data.trigger_data.event.internal: - if "error.execution" == event_data.trigger_data.event: + if trigger_data is not None: + self.trigger_data = trigger_data + elif event_data is not None: + self.trigger_data = event_data.trigger_data + else: + raise ValueError("Either event_data or trigger_data must be provided") + + td = self.trigger_data + self.sendid = td.send_id + self.invokeid = td.kwargs.get("_invokeid", "") + if td.event is None or td.event.internal: + if "error.execution" == td.event: self.type = "platform" else: self.type = "internal" @@ -121,8 +130,15 @@ def __init__(self, event_data): else: self.type = "external" + @classmethod + def from_trigger_data(cls, trigger_data): + """Create an EventDataWrapper directly from a TriggerData (no EventData needed).""" + return cls(trigger_data=trigger_data) + def __getattr__(self, name): - return getattr(self.event_data, name) + if self.event_data is not None: + return getattr(self.event_data, name) + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") def __eq__(self, value): "This makes SCXML test 329 pass. It assumes that the event is the same instance" @@ -130,17 +146,20 @@ def __eq__(self, value): @property def name(self): - return self.event_data.event + if self.event_data is not None: + return self.event_data.event + return str(self.trigger_data.event) if self.trigger_data.event else None @property def data(self): "Property used by the SCXML namespace" - if self.trigger_data.kwargs: - return _Data(self.trigger_data.kwargs) - elif self.trigger_data.args and len(self.trigger_data.args) == 1: - return self.trigger_data.args[0] - elif self.trigger_data.args: - return self.trigger_data.args + td = self.trigger_data + if td.kwargs: + return _Data(td.kwargs) + elif td.args and len(td.args) == 1: + return td.args[0] + elif td.args: + return td.args else: return None @@ -257,7 +276,10 @@ def __init__(self, action: AssignAction): def __call__(self, *args, **kwargs): machine: StateChart = kwargs["machine"] - value = _eval(self.action.expr, **kwargs) + if self.action.child_xml is not None: + value = self.action.child_xml + else: + value = _eval(self.action.expr, **kwargs) *path, attr = self.action.location.split(".") obj = machine.model @@ -364,6 +386,49 @@ def raise_action(*args, **kwargs): return raise_action +def _send_to_parent(action: SendAction, **kwargs): + """Route a to the parent machine via _invoke_session.""" + machine = kwargs["machine"] + session = getattr(machine, "_invoke_session", None) + if session is None: + logger.warning( + " ignored: machine %r has no _invoke_session", + machine.name, + ) + return + event = action.event or _eval(action.eventexpr, **kwargs) # type: ignore[arg-type] + names = [] + for name in (action.namelist or "").strip().split(): + if not hasattr(machine.model, name): + raise NameError(f"Namelist variable '{name}' not found on model") + names.append(Param(name=name, expr=name)) + params_values = {} + for param in chain(names, action.params): + if param.expr is None: + continue + params_values[param.name] = _eval(param.expr, **kwargs) + session.send_to_parent(event, **params_values) + + +def _send_to_invoke(action: SendAction, invokeid: str, **kwargs): + """Route a to the invoked child session.""" + machine: StateChart = kwargs["machine"] + event = action.event or _eval(action.eventexpr, **kwargs) # type: ignore[arg-type] + names = [] + for name in (action.namelist or "").strip().split(): + if not hasattr(machine.model, name): + raise NameError(f"Namelist variable '{name}' not found on model") + names.append(Param(name=name, expr=name)) + params_values = {} + for param in chain(names, action.params): + if param.expr is None: + continue + params_values[param.name] = _eval(param.expr, **kwargs) + if not machine._engine._invoke_manager.send_to_child(invokeid, event, **params_values): + # Per SCXML spec: if target is not reachable → error.communication + machine.send("error.communication", internal=True) + + def create_send_action_callable(action: SendAction) -> Callable: # noqa: C901 content: Any = () _valid_targets = (None, "#_internal", "internal", "#_parent", "parent") @@ -373,7 +438,7 @@ def create_send_action_callable(action: SendAction) -> Callable: # noqa: C901 except (NameError, SyntaxError, TypeError): content = (action.content,) - def send_action(*args, **kwargs): + def send_action(*args, **kwargs): # noqa: C901 machine: StateChart = kwargs["machine"] event = action.event or _eval(action.eventexpr, **kwargs) # type: ignore[arg-type] target = action.target if action.target else None @@ -388,11 +453,19 @@ def send_action(*args, **kwargs): if target and target.startswith("#_scxml_"): # Valid SCXML session reference but undispatchable → error.communication machine.send("error.communication", internal=True) + elif target and target.startswith("#_"): + # #_ → route to invoked child session + _send_to_invoke(action, target[2:], **kwargs) else: # Invalid target expression → error.execution (raised as exception) raise ValueError(f"Invalid target: {target}. Must be one of {_valid_targets}") return + # Handle #_parent target — route to parent via _invoke_session + if target == "#_parent": + _send_to_parent(action, **kwargs) + return + internal = target in ("#_internal", "internal") send_id = None @@ -460,10 +533,40 @@ def script_action(*args, **kwargs): return script_action +def create_invoke_init_callable() -> Callable: + """Create a callback that extracts invoke-specific kwargs and stores them on the machine. + + This is always inserted at position 0 in the initial state's onentry list by the + SCXML processor, so that ``_invoke_session`` and ``_invoke_params`` are handled + before any other callbacks run — even for SMs without a ````. + """ + initialized = False + + def invoke_init(*args, **kwargs): + nonlocal initialized + if initialized: + return + initialized = True + machine = kwargs.get("machine") + if machine is not None: + # Use get() not pop(): each callback receives a copy of kwargs + # (via EventData.extended_kwargs), so pop would be misleading. + machine._invoke_params = kwargs.get("_invoke_params") + machine._invoke_session = kwargs.get("_invoke_session") + + return invoke_init + + def _create_dataitem_callable(action: DataItem) -> Callable: def data_initializer(**kwargs): machine: StateChart = kwargs["machine"] + # Check for invoke param overrides — params from parent override child defaults + invoke_params = getattr(machine, "_invoke_params", None) + if invoke_params and action.id in invoke_params: + setattr(machine.model, action.id, invoke_params[action.id]) + return + if action.expr: try: value = _eval(action.expr, **kwargs) @@ -498,6 +601,7 @@ def datamodel(*args, **kwargs): if initialized: return initialized = True + for act in data_elements: act(**kwargs) diff --git a/statemachine/io/scxml/invoke.py b/statemachine/io/scxml/invoke.py new file mode 100644 index 00000000..1b4b355b --- /dev/null +++ b/statemachine/io/scxml/invoke.py @@ -0,0 +1,229 @@ +"""SCXML-specific invoke handler. + +Implements the IInvoke protocol by resolving child SCXML content (inline or +via src/srcexpr), evaluating params/namelist in the parent context, and managing +the child machine lifecycle including ``#_parent`` routing, autoforward, and +finalize. +""" + +import logging +from pathlib import Path +from typing import Any +from typing import Callable + +from ...invoke import IInvoke +from ...invoke import InvokeContext +from .actions import ExecuteBlock +from .actions import _eval +from .schema import InvokeDefinition + +logger = logging.getLogger(__name__) + +_VALID_INVOKE_TYPES = { + None, + "scxml", + "http://www.w3.org/TR/scxml", + "http://www.w3.org/TR/scxml/", + "http://www.w3.org/TR/scxml/#SCXMLEventProcessor", +} + + +class SCXMLInvoker: + """SCXML-specific invoke handler implementing the IInvoke protocol. + + Resolves the child SCXML from inline content, src file, or srcexpr, + evaluates params/namelist, and manages the child machine lifecycle. + """ + + def __init__( + self, + definition: InvokeDefinition, + base_dir: str, + register_child: "Callable[[str, str], type]", + ): + self._definition = definition + self._register_child = register_child + self._child: Any = None + self._base_dir: str = base_dir + + # Duck-typed attributes for InvokeManager + self.invoke_id: "str | None" = definition.id + self.idlocation: "str | None" = definition.idlocation + self.autoforward: bool = definition.autoforward + + # Pre-compile finalize block + self._finalize_block: "ExecuteBlock | None" = None + if definition.finalize and not definition.finalize.is_empty: + self._finalize_block = ExecuteBlock(definition.finalize) + + def run(self, ctx: InvokeContext) -> Any: + """Create and run the child state machine.""" + machine = ctx.machine + + # Store invokeid in idlocation if specified + if self.idlocation: + setattr(machine.model, self.idlocation, ctx.invokeid) + + # Resolve invoke type + invoke_type = self._definition.type + if self._definition.typeexpr: + invoke_type = _eval(self._definition.typeexpr, machine=machine) + + if invoke_type not in _VALID_INVOKE_TYPES: + raise ValueError( + f"Unsupported invoke type: {invoke_type}. Supported types: {_VALID_INVOKE_TYPES}" + ) + + # Resolve child SCXML content + scxml_content = self._resolve_content(machine) + if scxml_content is None: + raise ValueError("No content resolved for ") + + # Evaluate params and namelist + invoke_params = self._evaluate_params(machine) + + # Parse and create the child machine + child_cls = self._create_child_class(scxml_content, ctx.invokeid) + + # _invoke_session and _invoke_params are passed as kwargs so that the + # invoke_init callback (inserted at position 0 in the initial state's onentry + # by the processor) can pop them and store them on the machine instance. + # + # The _ChildRefSetter listener captures ``self._child`` during the first + # state entry, before the processing loop blocks. This is necessary + # because the child's ``__init__`` may block for an extended time when + # there are delayed events, and ``on_event()`` needs access to the child + # to forward events from the parent session. + session = _InvokeSession(parent=machine, invokeid=ctx.invokeid) + ref_setter = _ChildRefSetter(self) + self._child = child_cls( + _invoke_params=invoke_params, + _invoke_session=session, + listeners=[ref_setter], + ) + + return None + + def on_cancel(self): + """Cancel the child machine and all its invocations.""" + from ...invoke import _stop_child_machine + + _stop_child_machine(self._child) + self._child = None + + def on_event(self, event_name: str, **data): + """Forward an event to the child machine (autoforward).""" + if self._child is not None and not self._child.is_terminated: + try: + self._child.send(event_name, **data) + except Exception: + logger.debug("Error forwarding event %s to child", event_name, exc_info=True) + + def on_finalize(self, trigger_data): + """Execute the finalize block before the parent processes the event.""" + if self._finalize_block is not None: + machine = trigger_data.machine + kwargs = { + "machine": machine, + "model": machine.model, + } + # Inject SCXML context variables + from .actions import EventDataWrapper + + kwargs.update( + {k: v for k, v in machine.model.__dict__.items() if not k.startswith("_")} + ) + # Build EventDataWrapper from trigger_data's kwargs + kwargs["_event"] = EventDataWrapper.from_trigger_data(trigger_data) + self._finalize_block(**kwargs) + + def _resolve_content(self, machine) -> "str | None": + """Resolve the child SCXML content from content/src/srcexpr.""" + defn = self._definition + + if defn.content: + # Content could be an expr to evaluate or inline SCXML + if defn.content.lstrip().startswith("<"): + return defn.content + # It's an expression — evaluate it + result = _eval(defn.content, machine=machine) + if isinstance(result, str): + return result + return str(result) + + if defn.srcexpr: + src = _eval(defn.srcexpr, machine=machine) + elif defn.src: + src = defn.src + else: + return None + + # Handle file: URIs and relative paths + if src.startswith("file:"): + path = Path(src.removeprefix("file:")) + else: + path = Path(src) + + # Resolve relative to the base directory of the parent SCXML file + if not path.is_absolute(): + path = Path(self._base_dir) / path + + return path.read_text() + + def _evaluate_params(self, machine) -> dict: + """Evaluate params and namelist into a dict of values.""" + defn = self._definition + result = {} + + # Evaluate namelist + if defn.namelist: + for name in defn.namelist.strip().split(): + if hasattr(machine.model, name): + result[name] = getattr(machine.model, name) + + # Evaluate param elements + for param in defn.params: + if param.expr is not None: + result[param.name] = _eval(param.expr, machine=machine) + elif param.location is not None: + result[param.name] = _eval(param.location, machine=machine) + + return result + + def _create_child_class(self, scxml_content: str, invokeid: str): + """Parse the child SCXML and create a machine class.""" + child_name = f"invoke_{invokeid}" + return self._register_child(scxml_content, child_name) + + +class _ChildRefSetter: + """Listener that captures the child machine reference during initialization. + + The child's ``__init__`` blocks inside the processing loop (e.g. when there + are delayed events). By using this listener, ``SCXMLInvoker._child`` is set + during the first state entry — *before* the processing loop starts spinning — + so that ``on_event()`` can forward events to the child immediately. + """ + + def __init__(self, invoker: "SCXMLInvoker"): + self._invoker = invoker + + def on_enter_state(self, machine=None, **kwargs): + if self._invoker._child is None and machine is not None: + self._invoker._child = machine + + +class _InvokeSession: + """Holds the reference to the parent machine for ``#_parent`` routing.""" + + def __init__(self, parent, invokeid: str): + self.parent = parent + self.invokeid = invokeid + + def send_to_parent(self, event: str, **data): + """Send an event to the parent machine's external queue.""" + self.parent.send(event, _invokeid=self.invokeid, **data) + + +# Verify protocol compliance at import time +assert isinstance(SCXMLInvoker.__new__(SCXMLInvoker), IInvoke) diff --git a/statemachine/io/scxml/parser.py b/statemachine/io/scxml/parser.py index 6c42208f..229914aa 100644 --- a/statemachine/io/scxml/parser.py +++ b/statemachine/io/scxml/parser.py @@ -15,6 +15,7 @@ from .schema import HistoryState from .schema import IfAction from .schema import IfBranch +from .schema import InvokeDefinition from .schema import LogAction from .schema import Param from .schema import RaiseAction @@ -84,10 +85,30 @@ def parse_scxml(scxml_content: str) -> StateMachineDefinition: # noqa: C901 return definition +def _find_own_datamodel_elements(root: ET.Element) -> List[ET.Element]: + """Find elements that belong to this SCXML document, not to inline children. + + Skips any nested inside elements (which contain inline + child SCXML documents for ). + """ + result: List[ET.Element] = [] + + def _walk(elem: ET.Element): + for child in elem: + if child.tag == "content": + continue # Skip inline SCXML content + if child.tag == "datamodel": + result.append(child) + _walk(child) + + _walk(root) + return result + + def parse_datamodel(root: ET.Element) -> "DataModel | None": data_model = DataModel() - for datamodel_elem in root.findall(".//datamodel"): + for datamodel_elem in _find_own_datamodel_elements(root): for data_elem in datamodel_elem.findall("data"): content = data_elem.text and re.sub(r"\s+", " ", data_elem.text).strip() or None src = data_elem.attrib.get("src") @@ -139,7 +160,10 @@ def parse_state( # noqa: C901 ) -> State: state_id = state_elem.get("id") if not state_id: - raise ValueError("State must have an 'id' attribute") + # Per SCXML spec, if no id is specified, the processor auto-generates one. + from uuid import uuid4 + + state_id = f"__auto_{uuid4().hex[:8]}" initial = state_id in initial_states state = State(id=state_id, initial=initial, final=is_final, parallel=is_parallel) @@ -192,6 +216,10 @@ def parse_state( # noqa: C901 child_history_state = parse_history(child_state_elem) state.history[child_history_state.id] = child_history_state + # Parse invoke elements + for invoke_elem in state_elem.findall("invoke"): + state.invocations.append(parse_invoke(invoke_elem)) + # Parse donedata (only valid on final states) if is_final: donedata_elem = state_elem.find("donedata") @@ -276,8 +304,16 @@ def parse_raise(element: ET.Element) -> RaiseAction: def parse_assign(element: ET.Element) -> AssignAction: location = element.attrib["location"] - expr = element.attrib["expr"] - return AssignAction(location=location, expr=expr) + expr = element.attrib.get("expr") + child_xml: "str | None" = None + if expr is None: + # Per SCXML spec, can have child content instead of expr + children = list(element) + if children: + child_xml = ET.tostring(children[0], encoding="unicode") + elif element.text: + expr = element.text.strip() + return AssignAction(location=location, expr=expr, child_xml=child_xml) def parse_log(element: ET.Element) -> LogAction: @@ -382,3 +418,53 @@ def parse_cancel(element: ET.Element) -> CancelAction: def parse_script(element: ET.Element) -> ScriptAction: content = element.text.strip() if element.text else "" return ScriptAction(content=content) + + +def parse_invoke(element: ET.Element) -> InvokeDefinition: + """Parse an element into an InvokeDefinition.""" + invoke_type = element.attrib.get("type") + typeexpr = element.attrib.get("typeexpr") + src = element.attrib.get("src") + srcexpr = element.attrib.get("srcexpr") + invoke_id = element.attrib.get("id") + idlocation = element.attrib.get("idlocation") + autoforward = element.attrib.get("autoforward", "false").lower() == "true" + namelist = element.attrib.get("namelist") + + params: List[Param] = [] + content: "str | None" = None + finalize: "ExecutableContent | None" = None + + for child in element: + if child.tag == "param": + name = child.attrib["name"] + expr = child.attrib.get("expr") + location = child.attrib.get("location") + params.append(Param(name=name, expr=expr, location=location)) + elif child.tag == "content": + # Check for inline element (namespaces already stripped) + scxml_child = child.find("scxml") + if scxml_child is not None: + # Serialize the inline SCXML back to string for later parsing + content = ET.tostring(scxml_child, encoding="unicode") + elif child.attrib.get("expr"): + # Dynamic content via expr attribute + content = child.attrib["expr"] + elif child.text: + content = re.sub(r"\s+", " ", child.text).strip() + elif child.tag == "finalize": + finalize = parse_executable_content(child) + + return InvokeDefinition( + type=invoke_type, + typeexpr=typeexpr, + src=src, + srcexpr=srcexpr, + id=invoke_id, + idlocation=idlocation, + autoforward=autoforward, + namelist=namelist, + params=params, + content=content, + finalize=finalize, + ) diff --git a/statemachine/io/scxml/processor.py b/statemachine/io/scxml/processor.py index fb0d6e82..844eb591 100644 --- a/statemachine/io/scxml/processor.py +++ b/statemachine/io/scxml/processor.py @@ -19,8 +19,11 @@ from .actions import EventDataWrapper from .actions import ExecuteBlock from .actions import create_datamodel_action_callable +from .actions import create_invoke_init_callable +from .invoke import SCXMLInvoker from .parser import parse_scxml from .schema import HistoryState +from .schema import InvokeDefinition from .schema import State from .schema import Transition @@ -63,7 +66,7 @@ def __post_init__(self): class SCXMLProcessor: def __init__(self): - self.scs = {} + self.scs: "Dict[str, type[StateChart]]" = {} self.sessions: Dict[str, SessionData] = {} self._ioprocessors = { "http://www.w3.org/TR/scxml/#SCXMLEventProcessor": self, @@ -79,25 +82,34 @@ def parse_scxml(self, sm_name: str, scxml_content: str): definition = parse_scxml(scxml_content) self.process_definition(definition, location=definition.name or sm_name) - def process_definition(self, definition, location: str): + def process_definition(self, definition, location: str, is_invoked: bool = False): states_dict = self._process_states(definition.states) + # Find the initial state for inserting init callbacks + try: + initial_state = next(s for s in iter(states_dict.values()) if s.get("initial")) + except StopIteration: + initial_state = next(iter(states_dict.values())) + + if "enter" not in initial_state: + initial_state["enter"] = [] + + insert_pos = 0 + + # For invoked children, insert invoke_init to pop _invoke_session/_invoke_params + # from kwargs and store them on the machine instance before any other callbacks. + if is_invoked: + initial_state["enter"].insert(0, create_invoke_init_callable()) # type: ignore[union-attr] + insert_pos = 1 + # Process datamodel (initial variables) if definition.datamodel: datamodel = create_datamodel_action_callable(definition.datamodel) if datamodel: # pragma: no branch – parse_datamodel guarantees non-empty - try: - initial_state = next(s for s in iter(states_dict.values()) if s.get("initial")) - except StopIteration: - # If there's no explicit initial state, use the first one - initial_state = next(iter(states_dict.values())) - - if "enter" not in initial_state: - initial_state["enter"] = [] if isinstance( # pragma: no branch – always a list from lines above initial_state["enter"], list ): - initial_state["enter"].insert(0, datamodel) # type: ignore[arg-type] + initial_state["enter"].insert(insert_pos, datamodel) # type: ignore[arg-type] self._add( location, @@ -157,7 +169,7 @@ def _process_states(self, states: Dict[str, State]) -> Dict[str, StateDefinition states_dict[state_id] = self._process_state(state) return states_dict - def _process_state(self, state: State) -> StateDefinition: + def _process_state(self, state: State) -> StateDefinition: # noqa: C901 state_dict = StateDefinition() if state.initial: state_dict["initial"] = True @@ -184,6 +196,11 @@ def _process_state(self, state: State) -> StateDefinition: if state.transitions: state_dict["transitions"] = self._process_transitions(state.transitions) + # Process invoke elements + if state.invocations: + invokers = [self._process_invocation(inv) for inv in state.invocations] + state_dict["invoke"] = invokers # type: ignore[typeddict-unknown-key] + if state.states: state_dict["states"] = self._process_states(state.states) @@ -192,6 +209,20 @@ def _process_state(self, state: State) -> StateDefinition: return state_dict + def _process_invocation(self, invoke_def: InvokeDefinition) -> SCXMLInvoker: + """Convert an InvokeDefinition into an SCXMLInvoker.""" + return SCXMLInvoker( + definition=invoke_def, + base_dir=os.getcwd(), + register_child=self._register_child, + ) + + def _register_child(self, scxml_content: str, child_name: str) -> type: + """Parse SCXML content, register it as a child machine, and return its class.""" + definition = parse_scxml(scxml_content) + self.process_definition(definition, location=child_name, is_invoked=True) + return self.scs[child_name] + def _process_transitions(self, transitions: List[Transition]): result: TransitionsList = [] for transition in transitions: diff --git a/statemachine/io/scxml/schema.py b/statemachine/io/scxml/schema.py index 1ec40018..0b1ec9ca 100644 --- a/statemachine/io/scxml/schema.py +++ b/statemachine/io/scxml/schema.py @@ -31,7 +31,8 @@ class RaiseAction(Action): @dataclass class AssignAction(Action): location: str - expr: str + expr: "str | None" = None + child_xml: "str | None" = None @dataclass @@ -114,6 +115,21 @@ class DoneData: content_expr: "str | None" = None +@dataclass +class InvokeDefinition: + type: "str | None" = None + typeexpr: "str | None" = None + src: "str | None" = None + srcexpr: "str | None" = None + id: "str | None" = None + idlocation: "str | None" = None + autoforward: bool = False + namelist: "str | None" = None + params: List[Param] = field(default_factory=list) + content: "str | None" = None + finalize: "ExecutableContent | None" = None + + @dataclass class State: id: str @@ -126,6 +142,7 @@ class State: states: Dict[str, "State"] = field(default_factory=dict) history: Dict[str, "HistoryState"] = field(default_factory=dict) donedata: "DoneData | None" = None + invocations: List[InvokeDefinition] = field(default_factory=list) @dataclass diff --git a/tests/conftest.py b/tests/conftest.py index 647e811b..9c5e83cd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import asyncio +import threading import time from datetime import datetime @@ -291,3 +292,41 @@ async def sleep(self, seconds: float): def sm_runner(request): """Fixture that runs tests on both sync and async engines.""" return SMRunner(is_async=request.param == "async") + + +@pytest.fixture(autouse=True) +def _check_leaked_threads(): + """Detect threads leaked by test cases (e.g. invoke daemon threads). + + Snapshots active threads before the test, yields, then checks for any new + threads still alive after teardown. Leaked threads are joined with a + timeout and reported as a test failure. + """ + before = set(threading.enumerate()) + yield + + new_threads = set(threading.enumerate()) - before + if not new_threads: + return + + # Filter out asyncio event loop threads (managed by pytest-asyncio, not by us). + new_threads = {t for t in new_threads if not t.name.startswith("asyncio_")} + if not new_threads: + return + + # Give ephemeral threads (e.g. executor workers) a chance to finish. + for t in new_threads: + t.join(timeout=2.0) + + leaked = [t for t in new_threads if t.is_alive()] + if not leaked: + return + + details: list[str] = [] + for t in leaked: + details.append(f" - {t.name!r} (daemon={t.daemon}, ident={t.ident})") + + pytest.fail( + f"Test leaked {len(leaked)} thread(s) still alive after join:\n" + "\n".join(details), + pytrace=False, + ) diff --git a/tests/scxml/conftest.py b/tests/scxml/conftest.py index 3fd34ce8..a09fdcce 100644 --- a/tests/scxml/conftest.py +++ b/tests/scxml/conftest.py @@ -5,56 +5,33 @@ CURRENT_DIR = Path(__file__).parent TESTCASES_DIR = CURRENT_DIR -# xfail sets — all tests currently fail identically on both engines +# xfail sets — tests that fail identically on both engines XFAIL_BOTH = { - # mandatory — invoke-related - "test191", - "test192", - "test207", - "test215", - "test216", - "test220", - "test223", - "test224", - "test225", - "test226", - "test228", - "test229", - "test232", - "test233", - "test234", - "test235", - "test236", - "test239", - "test240", - "test241", - "test243", - "test244", - "test245", - "test247", - "test253", - "test276", - "test338", - "test347", - "test422", - "test530", - # optional - "test201", - "test446", - "test509", - "test510", - "test518", - "test519", - "test520", - "test522", - "test531", - "test532", - "test534", - "test557", - "test558", - "test561", - "test567", - "test577", + # mandatory — invoke-related (still failing) + "test187", # delayed cancelled when sending session terminates before delay + "test229", # autoforward: parent forwards events to child automatically + "test236", # done.invoke.id arrives after all other child-generated events + "test240", # datamodel values passed to invoked child via namelist and + "test554", # invocation cancelled when evaluation of invoke arguments errors + # optional — ecmascript/JSON datamodel + "test201", # JSON data in parsed in ecmascript datamodel + "test446", # JSON data loaded via src attribute parsed as array + # optional — Basic HTTP Event I/O Processor + "test509", # basic HTTP event I/O processor: send with target + "test510", # basic HTTP event I/O processor: send without target + "test518", # basic HTTP event I/O processor: event field in POST + "test519", # basic HTTP event I/O processor: namelist data in POST body + "test520", # basic HTTP event I/O processor: data in POST body + "test522", # basic HTTP event I/O processor: in POST body + "test531", # basic HTTP event I/O processor: POST response populates _event.data + "test532", # basic HTTP event I/O processor: error.communication on bad target + "test534", # basic HTTP event I/O processor: #_scxml_sessionid target + # optional — data/content handling + "test557", # XML data in content becomes DOM-like object (python datamodel) + "test558", # text data in preserves string type (python datamodel) + "test561", # XML content in events creates DOM object + "test567", # HTTP message parameters populate _event.data + "test577", # without target causes error.communication } XFAIL_SYNC_ONLY: set[str] = set() XFAIL_ASYNC_ONLY: set[str] = set() diff --git a/tests/scxml/test_scxml_cases.py b/tests/scxml/test_scxml_cases.py index ccb411da..c797b89c 100644 --- a/tests/scxml/test_scxml_cases.py +++ b/tests/scxml/test_scxml_cases.py @@ -1,3 +1,4 @@ +import time from pathlib import Path import pytest @@ -57,15 +58,35 @@ def _assert_passed(sm: StateChart): assert "pass" in {s.id for s in sm.configuration} +def _wait_for_completion(sm: StateChart, timeout_s: float = 5.0): + """Poll the processing loop until the SM reaches a final state or times out.""" + deadline = time.monotonic() + timeout_s + while not sm.is_terminated and time.monotonic() < deadline: + time.sleep(0.02) + # Trigger processing loop to handle events from invoke threads + sm._engine.processing_loop() + + def test_scxml_usecase_sync(testcase_path: Path, should_generate_debug_diagram, caplog): sm = _run_scxml_testcase( testcase_path, should_generate_debug_diagram, async_mode=False, ) + _wait_for_completion(sm) _assert_passed(sm) +async def _async_wait_for_completion(sm: StateChart, timeout_s: float = 5.0): + """Poll the processing loop until the SM reaches a final state or times out.""" + import asyncio + + deadline = time.monotonic() + timeout_s + while not sm.is_terminated and time.monotonic() < deadline: + await asyncio.sleep(0.02) + await sm._engine.processing_loop() + + @pytest.mark.asyncio() async def test_scxml_usecase_async(testcase_path: Path, should_generate_debug_diagram, caplog): sm = _run_scxml_testcase( @@ -76,4 +97,5 @@ async def test_scxml_usecase_async(testcase_path: Path, should_generate_debug_di # In async context, the engine only queued __initial__ during __init__. # Activate now within the running event loop. await sm.activate_initial_state() + await _async_wait_for_completion(sm) _assert_passed(sm) diff --git a/tests/test_async.py b/tests/test_async.py index 326aa977..03a717e5 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -236,11 +236,14 @@ async def test_async_error_on_execution_in_transition(): class SM(StateChart): s1 = State(initial=True) - s2 = State(final=True) + s2 = State() error_state = State(final=True) go = s1.to(s2, on="bad_action") - error_execution = s1.to(error_state) + finish = s2.to(error_state) + # Transition 'on' content error is caught per-block, so the transition + # completes to s2. error.execution fires from s2. + error_execution = s1.to(error_state) | s2.to(error_state) def bad_action(self, **kwargs): raise RuntimeError("Transition boom") @@ -270,6 +273,29 @@ def after_go(self, **kwargs): assert sm.configuration == {sm.error_state} +@pytest.mark.timeout(5) +async def test_async_error_on_execution_in_before(): + """Async engine catches errors in before callbacks with error_on_execution.""" + + class SM(StateChart): + s1 = State(initial=True) + error_state = State(final=True) + + go = s1.to(s1) + error_execution = s1.to(error_state) + + def before_go(self, **kwargs): + raise RuntimeError("Before boom") + + async def on_enter_state(self, **kwargs): + """Async callback to force the async engine.""" + + sm = SM() + await sm.activate_initial_state() + await sm.go() + assert sm.configuration == {sm.error_state} + + @pytest.mark.timeout(5) async def test_async_invalid_definition_in_transition_propagates(): """InvalidDefinition in async transition propagates.""" diff --git a/tests/test_error_execution.py b/tests/test_error_execution.py index c1fd73b0..e99f2eec 100644 --- a/tests/test_error_execution.py +++ b/tests/test_error_execution.py @@ -84,10 +84,15 @@ class ErrorInErrorHandlerSC(StateChart): """Error in error.execution handler should not cause infinite loop.""" s1 = State("s1", initial=True) - s2 = State("s2", final=True) + s2 = State("s2") + s3 = State("s3", final=True) go = s1.to(s2, on="bad_action") - error_execution = Event(s1.to(s1, on="bad_error_handler"), id="error.execution") + finish = s2.to(s3) + error_execution = Event( + s1.to(s1, on="bad_error_handler") | s2.to(s2, on="bad_error_handler"), + id="error.execution", + ) def bad_action(self): raise RuntimeError("action failed") @@ -174,12 +179,15 @@ def test_error_in_error_handler_no_infinite_loop(): sm = ErrorInErrorHandlerSC() assert sm.configuration == {sm.s1} - # bad_action raises -> error.execution fires -> bad_error_handler raises - # Second error during error.execution processing is ignored (logged as warning) + # bad_action raises -> caught per-block, transition completes to s2 -> + # error.execution fires -> bad_error_handler raises during error.execution + # processing -> rolled back, second error ignored (logged as warning) sm.send("go") - # Machine should still be in s1 (rolled back from failed transition) - assert sm.configuration == {sm.s1} + # Transition 'on' content error is caught per-block (SCXML spec), + # so the transition s1->s2 completes. error.execution fires from s2, + # bad_error_handler raises, which is ignored during error.execution. + assert sm.configuration == {sm.s2} def test_statemachine_with_error_on_execution_true(): @@ -456,7 +464,9 @@ def struggle(self): sm = OneRingTemptation() sm.send("tempt") - # Error in error handler is ignored, machine stays in carrying + # resist raises -> caught per-block, self-transition completes (carrying) -> + # error.execution fires -> struggle raises during error.execution -> + # rolled back, second error ignored -> stays in carrying assert sm.configuration == {sm.carrying} def test_multiple_source_states_with_convention(self): @@ -585,12 +595,11 @@ def kindle(self): sm = BeaconOfGondor() sm.send("light_beacon") - # error.communication.failed won't match error.execution, but - # error_communication_failed will match "error_communication_failed" - # The engine sends "error.execution" which does NOT match - # "error_communication_failed" or "error.communication.failed". - # So the error is unhandled and silently ignored (StateChart default). - assert sm.configuration == {sm.waiting} + # Transition 'on' content error is caught per-block (SCXML spec), + # so waiting->lit completes. error.execution fires from lit, but + # error_communication_failed does NOT match error.execution. + # Error is unhandled and silently ignored (StateChart default). + assert sm.configuration == {sm.lit} def test_multiple_errors_sequential(self): """Multiple events that fail are each handled by error.execution.""" @@ -698,7 +707,8 @@ def test_error_in_on_callback_of_error_handler_is_ignored(self): """If the `on` callback of error.execution raises, the second error is ignored. Per SCXML spec: errors during error.execution processing must not recurse. - The machine should roll back to the configuration before the failed error handler. + During error.execution, transition 'on' content errors propagate to + microstep(), which rolls back and ignores the second error. """ class MountDoom(StateChart): @@ -717,7 +727,9 @@ def gollum_intervenes(self): sm = MountDoom() sm.send("ascend") - # Error in error handler is ignored, config rolled back to climbing + # slip raises -> caught per-block, self-transition completes (climbing) -> + # error.execution fires -> gollum_intervenes raises during error.execution -> + # rolled back to climbing, second error ignored assert sm.configuration == {sm.climbing} def test_condition_on_error_transition_routes_to_different_states(self): diff --git a/tests/test_invoke.py b/tests/test_invoke.py index fb2fce31..87d6fa20 100644 --- a/tests/test_invoke.py +++ b/tests/test_invoke.py @@ -420,9 +420,11 @@ def on_enter_error_state(self, **kwargs): async def test_group_cancel_on_exit(self, sm_runner): """Cancellation propagates: exiting state stops the group.""" + cancel_flag = threading.Event() def slow_task(): - time.sleep(5.0) + # Use interruptible wait so thread can exit promptly on cancellation. + cancel_flag.wait(timeout=5.0) return "should not complete" class SM(StateChart): @@ -433,6 +435,7 @@ class SM(StateChart): sm = await sm_runner.start(SM) await sm_runner.sleep(0.05) await sm_runner.send(sm, "cancel") + cancel_flag.set() # Unblock the slow_task thread await sm_runner.sleep(0.1) assert "stopped" in sm.configuration_values @@ -987,3 +990,83 @@ def on_invoke_loading(self, ctx=None, **kwargs): await sm_runner.processing_loop(sm) assert "loading" in sm.configuration_values + + +class TestInvokeManagerUnit: + """Unit tests for InvokeManager methods not exercised by integration tests.""" + + def test_send_to_child_not_found(self): + """send_to_child returns False when invokeid is not in _active.""" + from unittest.mock import Mock + + from statemachine.invoke import InvokeManager + + engine = Mock() + manager = InvokeManager(engine) + + assert manager.send_to_child("nonexistent", "event") is False + + def test_send_to_child_handler_without_on_event(self): + """send_to_child returns False when handler has no on_event.""" + from unittest.mock import Mock + + from statemachine.invoke import Invocation + from statemachine.invoke import InvokeContext + from statemachine.invoke import InvokeManager + + engine = Mock() + manager = InvokeManager(engine) + + handler = Mock(spec=[]) # no on_event + ctx = InvokeContext(invokeid="test_id", state_id="s1", send=Mock(), machine=Mock()) + inv = Invocation(invokeid="test_id", state_id="s1", ctx=ctx, _handler=handler) + manager._active["test_id"] = inv + + assert manager.send_to_child("test_id", "event") is False + + def test_handle_external_event_none_event(self): + """handle_external_event returns early when event is None.""" + from unittest.mock import Mock + + from statemachine.invoke import InvokeManager + + engine = Mock() + manager = InvokeManager(engine) + + trigger_data = Mock(event=None) + # Should not raise + manager.handle_external_event(trigger_data) + + +class TestStopChildMachine: + """Tests for _stop_child_machine.""" + + def test_stop_child_machine_exception_swallowed(self): + """_stop_child_machine swallows exceptions during stop.""" + from unittest.mock import Mock + + from statemachine.invoke import _stop_child_machine + + child = Mock() + child._engine.running = True + child._engine._invoke_manager.cancel_all.side_effect = RuntimeError("boom") + + # Should not raise + _stop_child_machine(child) + + +class TestEngineDelCleanup: + """Test BaseEngine.__del__ cancel_all exception handling.""" + + def test_del_swallows_cancel_all_exception(self): + """__del__ swallows exceptions from cancel_all.""" + + class SM(StateChart): + s1 = State(initial=True, final=True) + + sm = SM() + engine = sm._engine + engine._invoke_manager.cancel_all = lambda: (_ for _ in ()).throw(RuntimeError("boom")) + + # Should not raise + engine.__del__() diff --git a/tests/test_scxml_units.py b/tests/test_scxml_units.py index 66a23a74..31287403 100644 --- a/tests/test_scxml_units.py +++ b/tests/test_scxml_units.py @@ -1,20 +1,25 @@ """Unit tests for SCXML parser, actions, and schema modules.""" +import logging import xml.etree.ElementTree as ET from unittest.mock import Mock import pytest +from statemachine.io.scxml.actions import EventDataWrapper from statemachine.io.scxml.actions import Log from statemachine.io.scxml.actions import ParseTime from statemachine.io.scxml.actions import create_action_callable from statemachine.io.scxml.actions import create_datamodel_action_callable +from statemachine.io.scxml.invoke import SCXMLInvoker from statemachine.io.scxml.parser import parse_element from statemachine.io.scxml.parser import parse_scxml from statemachine.io.scxml.parser import strip_namespaces from statemachine.io.scxml.schema import CancelAction from statemachine.io.scxml.schema import DataModel from statemachine.io.scxml.schema import IfBranch +from statemachine.io.scxml.schema import InvokeDefinition from statemachine.io.scxml.schema import LogAction +from statemachine.io.scxml.schema import Param # --- ParseTime --- @@ -59,11 +64,13 @@ def test_no_scxml_element_raises(self): class TestParseState: - def test_state_without_id_raises(self): - """State element without id attribute raises ValueError.""" + def test_state_without_id_gets_auto_generated(self): + """State element without id attribute gets an auto-generated id.""" xml = '' - with pytest.raises(ValueError, match="State must have an 'id' attribute"): - parse_scxml(xml) + definition = parse_scxml(xml) + state_ids = list(definition.states.keys()) + assert len(state_ids) == 1 + assert state_ids[0].startswith("__auto_") class TestParseHistory: @@ -353,3 +360,677 @@ def test_history_without_transitions(self): processor.parse_scxml("test_history_no_trans", scxml) sm = processor.start() assert sm.states_map["a"] in sm.configuration + + +# --- SCXMLInvoker --- + + +def _make_invoker(definition=None, base_dir=None, register_child=None): + """Helper to create an SCXMLInvoker with sensible defaults.""" + if definition is None: + definition = InvokeDefinition() + if base_dir is None: + base_dir = "" + if register_child is None: + register_child = Mock(return_value=Mock) + return SCXMLInvoker( + definition=definition, + base_dir=base_dir, + register_child=register_child, + ) + + +class TestSCXMLInvoker: + def test_invalid_invoke_type_raises(self): + """run() raises ValueError for unsupported invoke type.""" + defn = InvokeDefinition( + type="http://unsupported/type", + content="", + ) + invoker = _make_invoker(definition=defn) + ctx = Mock() + model = Mock(spec=[]) + ctx.machine = Mock(model=model) + + with pytest.raises(ValueError, match="Unsupported invoke type"): + invoker.run(ctx) + + def test_no_content_resolved_raises(self): + """run() raises ValueError when no src/content/srcexpr is provided.""" + defn = InvokeDefinition() # no content, src, or srcexpr + invoker = _make_invoker(definition=defn) + ctx = Mock() + model = Mock(spec=[]) + ctx.machine = Mock(model=model) + + with pytest.raises(ValueError, match="No content resolved"): + invoker.run(ctx) + + def test_resolve_content_inline_xml(self): + """_resolve_content returns inline XML content directly.""" + xml_content = '' + defn = InvokeDefinition(content=xml_content) + invoker = _make_invoker(definition=defn) + + result = invoker._resolve_content(Mock()) + assert result == xml_content + + def test_resolve_content_from_file(self, tmp_path): + """_resolve_content reads content from src file path.""" + scxml_file = tmp_path / "child.scxml" + scxml_file.write_text("") + + defn = InvokeDefinition(src="child.scxml") + invoker = _make_invoker(definition=defn, base_dir=str(tmp_path)) + + result = invoker._resolve_content(Mock()) + assert result == "" + + def test_evaluate_params_namelist_and_params(self): + """_evaluate_params resolves both namelist variables and param elements.""" + defn = InvokeDefinition( + namelist="var1 var2", + params=[Param(name="p1", expr="42")], + ) + invoker = _make_invoker(definition=defn) + + model = type("Model", (), {"var1": "a", "var2": "b"})() + machine = Mock(model=model) + + result = invoker._evaluate_params(machine) + assert result == {"var1": "a", "var2": "b", "p1": 42} + + def test_on_cancel_clears_child(self): + """on_cancel() sets _child to None.""" + invoker = _make_invoker() + invoker._child = Mock() + + invoker.on_cancel() + assert invoker._child is None + + def test_on_event_skips_terminated_child(self): + """on_event() does not error when child is terminated.""" + invoker = _make_invoker() + child = Mock() + child.is_terminated = True + invoker._child = child + + # Should not raise or call send + invoker.on_event("some.event") + child.send.assert_not_called() + + def test_on_finalize_without_block_is_noop(self): + """on_finalize() does nothing when no finalize block is defined.""" + invoker = _make_invoker() + assert invoker._finalize_block is None + + # Should not raise + trigger_data = Mock() + invoker.on_finalize(trigger_data) + + def test_send_to_parent_warns_without_session(self, caplog): + """_send_to_parent logs a warning when machine has no _invoke_session.""" + from statemachine.io.scxml.actions import _send_to_parent + from statemachine.io.scxml.parser import SendAction + + action = SendAction(event="done", target="#_parent") + machine = Mock(spec=[]) # spec=[] ensures no _invoke_session attribute + machine.name = "test_machine" + + with caplog.at_level(logging.WARNING, logger="statemachine.io.scxml.actions"): + _send_to_parent(action, machine=machine) + + assert "no _invoke_session" in caplog.text + + +# --- _send_to_invoke --- + + +class TestSendToInvoke: + """Unit tests for _send_to_invoke (routes ).""" + + def _make_machine_with_invoke_manager(self, send_to_child_return=True): + """Create a mock machine with an InvokeManager that has send_to_child.""" + machine = Mock() + machine.model = Mock() + machine.model.__dict__ = {} + machine._engine._invoke_manager.send_to_child.return_value = send_to_child_return + return machine + + def test_routes_event_to_child(self): + """_send_to_invoke forwards the event to InvokeManager.send_to_child.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + action = SendAction(event="childEvent", target="#_child1") + + _send_to_invoke(action, "child1", machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with( + "child1", "childEvent" + ) + machine.send.assert_not_called() + + def test_sends_error_communication_when_child_not_found(self): + """_send_to_invoke sends error.communication when invokeid is not found.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager(send_to_child_return=False) + action = SendAction(event="childEvent", target="#_unknown") + + _send_to_invoke(action, "unknown", machine=machine) + + machine.send.assert_called_once_with("error.communication", internal=True) + + def test_evaluates_eventexpr(self): + """_send_to_invoke evaluates eventexpr when event is None.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + action = SendAction(event=None, eventexpr="'dynamic_event'", target="#_child1") + + _send_to_invoke(action, "child1", machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with( + "child1", "dynamic_event" + ) + + def test_forwards_params(self): + """_send_to_invoke forwards evaluated params to send_to_child.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + action = SendAction( + event="childEvent", + target="#_child1", + params=[Param(name="x", expr="42"), Param(name="y", expr="'hello'")], + ) + + _send_to_invoke(action, "child1", machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with( + "child1", "childEvent", x=42, y="hello" + ) + + def test_forwards_namelist_variables(self): + """_send_to_invoke resolves namelist variables from model and forwards them.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + model = type("Model", (), {})() + model.var1 = "alpha" + model.var2 = "beta" + machine.model = model + action = SendAction(event="childEvent", target="#_child1", namelist="var1 var2") + + _send_to_invoke(action, "child1", machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with( + "child1", "childEvent", var1="alpha", var2="beta" + ) + + def test_namelist_missing_variable_raises(self): + """_send_to_invoke raises NameError when namelist variable is not on model.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + machine.model = Mock(spec=[]) # no attributes + action = SendAction(event="childEvent", target="#_child1", namelist="missing_var") + + with pytest.raises(NameError, match="missing_var"): + _send_to_invoke(action, "child1", machine=machine) + + def test_send_action_callable_routes_invoke_target(self): + """create_send_action_callable routes #_ targets to _send_to_invoke.""" + from statemachine.io.scxml.actions import create_send_action_callable + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + action = SendAction(event="hello", target="#_myinvoke") + send_callable = create_send_action_callable(action) + + send_callable(machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with("myinvoke", "hello") + + def test_send_action_callable_scxml_session_target(self): + """create_send_action_callable sends error.communication for #_scxml_ targets.""" + from statemachine.io.scxml.actions import create_send_action_callable + from statemachine.io.scxml.parser import SendAction + + machine = self._make_machine_with_invoke_manager() + action = SendAction(event="hello", target="#_scxml_session123") + send_callable = create_send_action_callable(action) + + send_callable(machine=machine) + + machine.send.assert_called_once_with("error.communication", internal=True) + machine._engine._invoke_manager.send_to_child.assert_not_called() + + +# --- EventDataWrapper coverage --- + + +class TestEventDataWrapperEdgeCases: + def test_no_event_data_no_trigger_data_raises(self): + """EventDataWrapper raises ValueError when neither is provided.""" + with pytest.raises(ValueError, match="Either event_data or trigger_data"): + EventDataWrapper() + + def test_getattr_with_event_data_delegates(self): + """__getattr__ delegates to event_data when present.""" + event_data = Mock() + event_data.trigger_data = Mock( + kwargs={}, send_id=None, event=Mock(internal=True, __str__=lambda s: "test") + ) + event_data.some_custom_attr = "custom_value" + wrapper = EventDataWrapper(event_data) + assert wrapper.some_custom_attr == "custom_value" + + def test_getattr_without_event_data_raises(self): + """__getattr__ raises AttributeError when event_data is None.""" + trigger_data = Mock(kwargs={}, send_id=None, event=Mock(internal=True)) + trigger_data.event.__str__ = lambda s: "test" + wrapper = EventDataWrapper(trigger_data=trigger_data) + with pytest.raises(AttributeError, match="no attribute 'missing_attr'"): + wrapper.missing_attr # noqa: B018 + + def test_name_via_trigger_data(self): + """name property returns event string from trigger_data when no event_data.""" + trigger_data = Mock(kwargs={}, send_id=None, event=Mock(internal=True)) + trigger_data.event.__str__ = lambda s: "my.event" + wrapper = EventDataWrapper(trigger_data=trigger_data) + assert wrapper.name == "my.event" + + +# --- _send_to_parent coverage --- + + +class TestSendToParentParams: + def test_send_to_parent_with_namelist_and_params(self): + """_send_to_parent resolves namelist and params before sending.""" + from statemachine.io.scxml.actions import _send_to_parent + from statemachine.io.scxml.parser import SendAction + + model = type("Model", (), {})() + model.myvar = "hello" + machine = Mock(model=model) + machine.model.__dict__ = {"myvar": "hello"} + session = Mock() + machine._invoke_session = session + + action = SendAction( + event="childDone", + target="#_parent", + namelist="myvar", + params=[Param(name="extra", expr="42")], + ) + + _send_to_parent(action, machine=machine) + + session.send_to_parent.assert_called_once_with("childDone", myvar="hello", extra=42) + + def test_send_to_parent_namelist_missing_raises(self): + """_send_to_parent raises NameError when namelist variable is missing.""" + from statemachine.io.scxml.actions import _send_to_parent + from statemachine.io.scxml.parser import SendAction + + machine = Mock() + machine.model = Mock(spec=[]) # no attributes + machine._invoke_session = Mock() + + action = SendAction(event="ev", target="#_parent", namelist="missing_var") + + with pytest.raises(NameError, match="missing_var"): + _send_to_parent(action, machine=machine) + + def test_send_to_parent_param_without_expr_skipped(self): + """_send_to_parent skips params where expr is None.""" + from statemachine.io.scxml.actions import _send_to_parent + from statemachine.io.scxml.parser import SendAction + + machine = Mock() + machine.model = Mock() + machine.model.__dict__ = {} + session = Mock() + machine._invoke_session = session + + action = SendAction( + event="ev", + target="#_parent", + params=[ + Param(name="has_expr", expr="1"), + Param(name="no_expr", expr=None), + ], + ) + + _send_to_parent(action, machine=machine) + session.send_to_parent.assert_called_once_with("ev", has_expr=1) + + +# --- _send_to_invoke param skip coverage --- + + +class TestSendToInvokeParamSkip: + def test_param_without_expr_is_skipped(self): + """_send_to_invoke skips params where expr is None.""" + from statemachine.io.scxml.actions import _send_to_invoke + from statemachine.io.scxml.parser import SendAction + + machine = Mock() + machine.model = Mock() + machine.model.__dict__ = {} + machine._engine._invoke_manager.send_to_child.return_value = True + + action = SendAction( + event="ev", + target="#_child", + params=[ + Param(name="with_expr", expr="1"), + Param(name="no_expr", expr=None), + ], + ) + + _send_to_invoke(action, "child", machine=machine) + + machine._engine._invoke_manager.send_to_child.assert_called_once_with( + "child", "ev", with_expr=1 + ) + + +# --- invoke_init coverage --- + + +class TestInvokeInitCallback: + def test_invoke_init_idempotent(self): + """invoke_init only runs once, even if called multiple times.""" + from statemachine.io.scxml.actions import create_invoke_init_callable + + callback = create_invoke_init_callable() + machine = Mock() + + callback(machine=machine) + assert machine._invoke_params is not None or True # first call sets attrs + + # Reset to detect second call + machine._invoke_params = "first" + callback(machine=machine) + # Should NOT have been overwritten + assert machine._invoke_params == "first" + + +# --- SCXMLInvoker edge cases --- + + +class TestSCXMLInvokerEdgeCases: + def test_on_event_exception_in_child_send(self): + """on_event swallows exceptions from child.send().""" + invoker = _make_invoker() + child = Mock() + child.is_terminated = False + child.send.side_effect = RuntimeError("child error") + invoker._child = child + + # Should not raise + invoker.on_event("some.event") + child.send.assert_called_once_with("some.event") + + def test_resolve_content_expr_non_string(self): + """_resolve_content converts non-string eval result to string.""" + defn = InvokeDefinition(content="42") # evaluates to int + invoker = _make_invoker(definition=defn) + machine = Mock() + machine.model.__dict__ = {} + + result = invoker._resolve_content(machine) + assert result == "42" + + def test_evaluate_params_with_location(self): + """_evaluate_params resolves param with location instead of expr.""" + defn = InvokeDefinition( + params=[Param(name="p1", expr=None, location="myvar")], + ) + invoker = _make_invoker(definition=defn) + + model = type("Model", (), {})() + model.myvar = "resolved" + machine = Mock(model=model) + machine.model.__dict__ = {"myvar": "resolved"} + + result = invoker._evaluate_params(machine) + assert result == {"p1": "resolved"} + + +# --- Parser edge cases --- + + +class TestParserAssignChildXml: + def test_assign_with_child_xml_content(self): + """ with child XML content is parsed as child_xml.""" + scxml = """ + + + + + + + + + + + + + """ + # Should parse without error — the child XML is stored in child_xml + definition = parse_scxml(scxml) + # Verify it parsed states correctly + assert "s1" in definition.states + + def test_assign_with_text_content(self): + """ with text content (no expr attr) uses text as expr.""" + scxml = """ + + + + + + + 42 + + + + + + """ + definition = parse_scxml(scxml) + assert "s1" in definition.states + + +class TestParserInvokeContent: + def test_invoke_with_text_content(self): + """ with text body is parsed.""" + scxml = """ + + + + some text content + + + + """ + definition = parse_scxml(scxml) + assert "s1" in definition.states + invoke_def = definition.states["s1"].invocations[0] + assert "some text content" in invoke_def.content + + def test_invoke_with_content_expr(self): + """ is parsed as dynamic content.""" + scxml = """ + + + + + + + + """ + definition = parse_scxml(scxml) + invoke_def = definition.states["s1"].invocations[0] + assert invoke_def.content == "'dynamic'" + + def test_invoke_with_inline_scxml_no_namespace(self): + """ with inline (no namespace) is parsed.""" + scxml = """ + + + + + + + + """ + definition = parse_scxml(scxml) + invoke_def = definition.states["s1"].invocations[0] + assert " are silently ignored.""" + scxml = """ + + + + + + + + + """ + definition = parse_scxml(scxml) + invoke_def = definition.states["s1"].invocations[0] + assert len(invoke_def.params) == 1 + + def test_invoke_with_empty_content(self): + """ with empty results in content=None.""" + scxml = """ + + + + + + + + """ + definition = parse_scxml(scxml) + invoke_def = definition.states["s1"].invocations[0] + assert invoke_def.content is None + + def test_invoke_with_finalize_block(self): + """ with block is parsed.""" + scxml = """ + + + + child content + + + + + + + """ + definition = parse_scxml(scxml) + invoke_def = definition.states["s1"].invocations[0] + assert invoke_def.finalize is not None + assert len(invoke_def.finalize.actions) == 1 + + +class TestParserAssignEdgeCases: + def test_assign_without_children_or_text(self): + """ with neither children nor text results in expr=None.""" + scxml = """ + + + + + + + + + + + + + """ + definition = parse_scxml(scxml) + assert "s1" in definition.states + + +class TestSCXMLInvokerResolveContentAbsolutePath: + def test_resolve_content_absolute_path(self, tmp_path): + """_resolve_content with absolute src path doesn't prepend base_dir.""" + scxml_file = tmp_path / "child.scxml" + scxml_file.write_text("") + + defn = InvokeDefinition(src=str(scxml_file)) + invoker = _make_invoker(definition=defn, base_dir="/some/other/dir") + + result = invoker._resolve_content(Mock()) + assert result == "" + + +class TestSCXMLInvokerEvaluateParamsNoExprNoLocation: + def test_param_without_expr_or_location_skipped(self): + """_evaluate_params skips params with neither expr nor location.""" + defn = InvokeDefinition( + params=[Param(name="p1", expr=None, location=None)], + ) + invoker = _make_invoker(definition=defn) + machine = Mock(model=type("M", (), {})()) + machine.model.__dict__ = {} + + result = invoker._evaluate_params(machine) + assert result == {} + + +class TestInvokeInitMachineNone: + def test_invoke_init_without_machine_is_noop(self): + """invoke_init does nothing when machine is not in kwargs.""" + from statemachine.io.scxml.actions import create_invoke_init_callable + + callback = create_invoke_init_callable() + # Call without machine kwarg — should not raise + callback() + + +class TestInvokeCallableWrapperRunInstance: + def test_run_with_instance_not_class(self): + """_InvokeCallableWrapper.run() works with an instance (not a class).""" + from statemachine.invoke import _InvokeCallableWrapper + + class Handler: + def run(self, ctx): + return "result" + + handler_instance = Handler() + wrapper = _InvokeCallableWrapper(handler_instance) + assert not wrapper._is_class + + ctx = Mock() + result = wrapper.run(ctx) + assert result == "result" + assert wrapper._instance is handler_instance + + +class TestOrderedSetStr: + def test_str_representation(self): + """OrderedSet.__str__ returns a set-like string.""" + from statemachine.orderedset import OrderedSet + + os = OrderedSet([1, 2, 3]) + assert str(os) == "{1, 2, 3}"