Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
8ea1c09
refactor(dag): pass frame index through frame graph without changing …
harryswift01 May 19, 2026
e258de3
refactor(parallel): set up `frame_indicies` within `shared_data`
harryswift01 May 20, 2026
145e1ab
refactor(frame): use python slicing semantics
harryswift01 May 20, 2026
cfb5b82
refactor(frame): update range index frame loop within `ComputeNeighbo…
harryswift01 May 20, 2026
9ab66c6
refactor(trajectory): introduce explicit frame source for DAG execution
harryswift01 May 20, 2026
edb302f
refactor(conformations): use frame selection for dihedral analysis
harryswift01 May 20, 2026
bd412c9
refactor(trajectory): standardise explicit frame-index execution
harryswift01 May 21, 2026
4c3601c
tests(unit): fix and update unit tests in relation to frame indexing …
harryswift01 May 21, 2026
062d376
tests(unit): update `_extract_timeseries` to ensure it follows valid …
harryswift01 May 21, 2026
b9db3c0
chore(workflow): remove duplicated type checks on inputs
harryswift01 May 21, 2026
7d28f9a
tests(unit): additonal test units due to frame index order
harryswift01 May 22, 2026
071fa9a
feat(mda): move `mda.py` to new `trajectory` folder
harryswift01 May 22, 2026
81b0def
docs(api): regenerate autodoc stubs after trajectory refactor
harryswift01 May 22, 2026
48c46cb
chore(comments): tidy up comments within frame index refactor
harryswift01 May 22, 2026
6989466
test(regression): update baselines
harryswift01 May 22, 2026
da30e1b
Merge remote-tracking branch 'origin/main' into 306-frame-index-refactor
harryswift01 May 22, 2026
25f0edf
test(regression): update baselines
harryswift01 May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CodeEntropy/config/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
from CodeEntropy.core.logging import LoggingConfig
from CodeEntropy.entropy.workflow import EntropyWorkflow
from CodeEntropy.levels.dihedrals import ConformationStateBuilder
from CodeEntropy.levels.mda import UniverseOperations
from CodeEntropy.molecules.grouping import MoleculeGrouper
from CodeEntropy.results.reporter import ResultsReporter
from CodeEntropy.trajectory.mda import UniverseOperations

logger = logging.getLogger(__name__)
console = LoggingConfig.get_console()
Expand Down
180 changes: 95 additions & 85 deletions CodeEntropy/entropy/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
from __future__ import annotations

import logging
import math
from collections import defaultdict
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any

import pandas as pd
Expand All @@ -29,30 +27,15 @@
from CodeEntropy.entropy.water import WaterEntropy
from CodeEntropy.levels.hierarchy import HierarchyBuilder
from CodeEntropy.levels.level_dag import LevelDAG
from CodeEntropy.trajectory.frames import FrameSelection
from CodeEntropy.trajectory.source import FrameSource

logger = logging.getLogger(__name__)
console = LoggingConfig.get_console()

SharedData = dict[str, Any]


@dataclass(frozen=True)
class TrajectorySlice:
"""Trajectory slicing parameters.

Attributes:
start: Inclusive start frame index.
end: Exclusive end frame index (or a concrete index derived from args).
step: Step size between frames.
n_frames: Number of frames in the slice.
"""

start: int
end: int
step: int
n_frames: int


class EntropyWorkflow:
"""Coordinate entropy calculations across structural levels.

Expand Down Expand Up @@ -95,21 +78,22 @@ def execute(self) -> None:
"""Run the full entropy workflow and emit results.

This orchestrates the complete entropy pipeline:
1. Build trajectory slice.
2. Apply atom and frame selection to create a reduced universe.
1. Build trajectory frame selection.
2. Apply atom/frame selection to create the current analysis universe.
3. Detect hierarchy levels.
4. Group molecules.
5. Split groups into water and non-water.
6. Optionally compute water entropy (only if solute exists).
6. Optionally compute water entropy.
7. Run level DAG and entropy graph.
8. Finalize and persist results.
"""
traj = self._build_trajectory_slice()
frame_selection = self._build_frame_selection()
console.print(
f"Analyzing a total of {traj.n_frames} frames in this calculation."
f"Analyzing a total of {frame_selection.n_frames} "
f"frames in this calculation."
)

reduced_universe = self._build_reduced_universe()
reduced_universe = self._build_reduced_universe(frame_selection)

levels = self._detect_levels(reduced_universe)
groups = self._group_molecules.grouping_molecules(
Expand All @@ -121,15 +105,15 @@ def execute(self) -> None:
)

if self._args.water_entropy and water_groups and nonwater_groups:
self._compute_water_entropy(traj, water_groups)
self._compute_water_entropy(frame_selection, water_groups)
else:
nonwater_groups.update(water_groups)

shared_data = self._build_shared_data(
reduced_universe=reduced_universe,
levels=levels,
groups=nonwater_groups,
traj=traj,
frame_selection=frame_selection,
)

with self._reporter.progress(transient=False) as p:
Expand All @@ -139,24 +123,47 @@ def execute(self) -> None:
self._finalize_molecule_results()
self._reporter.log_tables()

def _build_frame_selection(self) -> FrameSelection:
"""Build the workflow frame selection.

Returns:
FrameSelection containing absolute source-trajectory frame indices.

Notes:
Physical frame slicing is not used. The selected frame indices are the
global workflow frame contract and are consumed by FrameSource.
"""
start, end, step = self._get_trajectory_bounds()
return FrameSelection.from_bounds(
start=start,
stop=end,
step=step,
)

def _build_shared_data(
self,
reduced_universe: Any,
levels: Any,
groups: Mapping[int, Any],
traj: TrajectorySlice,
frame_selection: FrameSelection,
) -> SharedData:
"""Build the shared_data dict used by nodes and graphs.

Args:
reduced_universe: Universe after applying selection.
reduced_universe: Active analysis universe after atom selection.
The trajectory is not physically frame-sliced.
levels: Level definition per molecule id.
groups: Mapping of group id -> list of molecule ids.
traj: Trajectory slice parameters.
groups: Mapping of group id to molecule ids.
frame_selection: Explicit absolute workflow frame selection.

Returns:
Shared data dictionary for DAG/graph execution.
"""
frame_source = FrameSource(
universe=reduced_universe,
selection=frame_selection,
)

shared_data: SharedData = {
"entropy_manager": self,
"run_manager": self._run_manager,
Expand All @@ -166,10 +173,14 @@ def _build_shared_data(
"reduced_universe": reduced_universe,
"levels": levels,
"groups": dict(groups),
"start": traj.start,
"end": traj.end,
"step": traj.step,
"n_frames": traj.n_frames,
"start": frame_selection.source_start,
"end": frame_selection.source_stop_exclusive,
"step": frame_selection.infer_source_step(),
"n_frames": frame_selection.n_frames,
"frame_selection": frame_selection,
"frame_source": frame_source,
"frame_indices": list(frame_selection.indices),
"source_frame_indices": list(frame_selection.indices),
}
return shared_data

Expand Down Expand Up @@ -198,66 +209,56 @@ def _run_entropy_graph(
entropy_results = EntropyGraph().build().execute(shared_data, progress=progress)
shared_data.update(entropy_results)

def _build_trajectory_slice(self) -> TrajectorySlice:
"""Compute trajectory slicing parameters from args.
def _get_trajectory_bounds(self) -> tuple[int, int, int]:
"""Return validated start, end, and step frame indices from args.

Returns:
A TrajectorySlice describing the frames to analyze.
Tuple of ``(start, end, step)``.

Raises:
ValueError: If the frame window is invalid.
"""
start, end, step = self._get_trajectory_bounds()
n_frames = self._get_number_frames(start, end, step)
return TrajectorySlice(start=start, end=end, step=step, n_frames=n_frames)
n_total = len(self._universe.trajectory)

def _get_trajectory_bounds(self) -> tuple[int, int, int]:
"""Return start, end, and step frame indices from args.
start = 0 if self._args.start is None else int(self._args.start)
end = (
n_total
if self._args.end is None or int(self._args.end) == -1
else int(self._args.end)
)
step = 1 if self._args.step is None else int(self._args.step)

Returns:
Tuple of (start, end, step).
"""
start = self._args.start or 0
end = len(self._universe.trajectory) if self._args.end == -1 else self._args.end
step = self._args.step or 1
return start, end, step

def _get_number_frames(self, start: int, end: int, step: int) -> int:
"""Compute the number of frames in a trajectory slice.
def _build_reduced_universe(self, frame_selection: FrameSelection) -> Any:
"""Apply atom selection and return the active analysis universe.

Args:
start: Inclusive start frame index.
end: Exclusive end frame index.
step: Step between frames.
frame_selection: Workflow frame selection. Used for validation.

Returns:
Number of frames processed.
"""
return math.floor((end - start) / step)
MDAnalysis Universe after atom selection. Frames are not physically
sliced; selected-frame access is handled by FrameSource.

def _build_reduced_universe(self) -> Any:
"""Apply atom and frame selection and return the reduced universe.

Returns:
MDAnalysis Universe (reduced according to user selections).
Raises:
ValueError: If no frames are selected.
"""
if frame_selection.n_frames == 0:
raise ValueError("Frame selection is empty.")

selection = self._args.selection_string
start = self._args.start
end = len(self._universe.trajectory) if self._args.end == -1 else self._args.end
step = self._args.step

if selection == "all":
reduced_atoms = self._universe
else:
reduced_atoms = self._universe_operations.select_atoms(
self._universe, selection
)
name = f"{len(reduced_atoms.trajectory)}_frame_dump_atom_selection"
self._run_manager.write_universe(reduced_atoms, name)
return self._universe

reduced_frames = self._universe_operations.select_frames(
reduced_atoms, start, end, step
reduced_atoms = self._universe_operations.select_atoms(
self._universe,
selection,
)
name = f"{len(reduced_frames.trajectory)}_frame_dump_frame_selection"
self._run_manager.write_universe(reduced_frames, name)
name = f"{len(reduced_atoms.trajectory)}_frame_dump_atom_selection"
self._run_manager.write_universe(reduced_atoms, name)

return reduced_frames
return reduced_atoms

def _detect_levels(self, reduced_universe: Any) -> Any:
"""Detect hierarchy levels for each molecule in the reduced universe.
Expand Down Expand Up @@ -316,25 +317,34 @@ def _split_water_groups(
return nonwater_groups, water_groups

def _compute_water_entropy(
self, traj: TrajectorySlice, water_groups: Mapping[int, Any]
self,
frame_selection: FrameSelection,
water_groups: Mapping[int, Any],
) -> None:
"""Compute water entropy for each water group and adjust selection string.

Args:
traj: Trajectory slice parameters.
water_groups: Mapping of group id -> molecule ids for waters.
frame_selection: Workflow frame selection.
water_groups: Mapping of group id to molecule ids for waters.
"""
if not water_groups or not self._args.water_entropy:
return

start = frame_selection.source_start
end = frame_selection.source_stop_exclusive
step = frame_selection.infer_source_step()

if start is None or end is None:
return

water_entropy = WaterEntropy(self._args, self._reporter)

for group_id in water_groups.keys():
water_entropy.calculate_and_log(
universe=self._universe,
start=traj.start,
end=traj.end,
step=traj.step,
start=start,
end=end,
step=step,
group_id=group_id,
)

Expand All @@ -344,8 +354,8 @@ def _compute_water_entropy(
else "not water"
)

logger.debug(f"WaterEntropy: molecule_data= {self._reporter.molecule_data}")
logger.debug(f"WaterEntropy: residue_data= {self._reporter.residue_data}")
logger.debug("WaterEntropy: molecule_data= %s", self._reporter.molecule_data)
logger.debug("WaterEntropy: residue_data= %s", self._reporter.residue_data)

def _finalize_molecule_results(self) -> None:
"""Aggregate group totals and persist results to JSON.
Expand Down
Loading
Loading