Skip to content

Commit cd7816a

Browse files
committed
feat: add multiprocessing support and documentation for orthogonal projections
- Add -j/--jobs flag for parallel well processing - Add comprehensive module documentation with usage examples - Document movie generation features (XY, XZ, YZ slice animations) - Document z-gap consistency between figures and movies
1 parent 9e82b3f commit cd7816a

1 file changed

Lines changed: 287 additions & 28 deletions

File tree

  • scripts/figures/orthogonal_projections

scripts/figures/orthogonal_projections/cli.py

Lines changed: 287 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,37 @@
22
Command-line interface for orthogonal projection generation.
33
44
This module provides the main entry point for:
5-
- Processing plates one well at a time
6-
- Generating projections, composites, and mosaics
5+
- Processing plates one well at a time (or in parallel)
6+
- Generating projections, composites, mosaics, and Z-stack slice movies
77
- Tracking progress and handling errors
88
9+
Features:
10+
- Well-by-well memory-efficient processing
11+
- Optional multiprocessing via -j/--jobs flag
12+
- Composite figures with XY/XZ/YZ projections
13+
- Z-stack slice movies (XY through Z, XZ through Y, YZ through X)
14+
- Z-gap stretching for XZ/YZ (consistent between figures and movies)
15+
- Plate mosaics and arbitrary group mosaics
16+
17+
Usage:
18+
# Single process - composites only
19+
python -m scripts.figures.orthogonal_projections.cli \\
20+
--plate-dir /path/to/plate_stitched/ \\
21+
--output-dir /output/path/ \\
22+
--z-gap 4.25
23+
24+
# Parallel processing with movies
25+
python -m scripts.figures.orthogonal_projections.cli \\
26+
--plate-dir /path/to/plate_stitched/ \\
27+
--output-dir /output/path/ \\
28+
--z-gap 4.25 \\
29+
--create-movies \\
30+
--movie-types xy xz yz \\
31+
--movie-fps 10 \\
32+
-j 4
33+
934
Invariants:
10-
- Sequential well processing (memory safety)
35+
- Sequential or parallel well processing
1136
- Progress tracking via logging
1237
- Failures are logged, not silent
1338
- All outputs tracked
@@ -16,7 +41,10 @@
1641
import argparse
1742
import gc
1843
import logging
44+
import os
45+
from concurrent.futures import ProcessPoolExecutor, as_completed
1946
from dataclasses import dataclass, field
47+
from multiprocessing import cpu_count
2048
from pathlib import Path
2149
from typing import Dict, FrozenSet, List, Optional, Tuple
2250

@@ -87,6 +115,18 @@ class ProcessingConfig:
87115
dpi: int = 150
88116
z_gap: float = 1.0
89117
z_aspect: float = 0.1
118+
num_workers: int = 1
119+
120+
121+
@dataclass
122+
class WellResult:
123+
"""Result from processing a single well."""
124+
125+
well_id: str
126+
success: bool
127+
composite_path: Optional[Path] = None
128+
movie_outputs: List[MovieOutput] = field(default_factory=list)
129+
error: Optional[str] = None
90130

91131

92132
@dataclass
@@ -236,9 +276,157 @@ def process_well_all_channels(
236276
return all_projections, composite_path, movie_outputs
237277

238278

279+
def _process_well_worker(
280+
well_id: str,
281+
plate_path_str: str,
282+
z_paths_dict: Dict,
283+
config_dict: Dict,
284+
) -> WellResult:
285+
"""
286+
Worker function for parallel well processing.
287+
288+
Args:
289+
well_id: Well ID to process
290+
plate_path_str: String path to plate directory
291+
z_paths_dict: Serialized z_paths_by_channel dict
292+
config_dict: Serialized ProcessingConfig dict
293+
294+
Returns:
295+
WellResult with outputs
296+
"""
297+
from pathlib import Path as PP
298+
299+
from openhcs.core.config import DtypeConfig
300+
from openhcs.constants.constants import DtypeConversion
301+
from openhcs.processing.backends.processors.numpy_processor import (
302+
create_orthogonal_projections,
303+
)
304+
305+
from .composer import create_multi_channel_composite, save_composite_figure
306+
from .constants import ChannelColorMapping, CompositeLayout, DEFAULT_CHANNEL_COLORS
307+
from .discovery import WellChannelKey
308+
from .io_handler import MovieOutput, load_z_stack, save_slice_movies_for_well
309+
from .labeling import get_labeler
310+
311+
try:
312+
plate_path = PP(plate_path_str)
313+
dtype_config = DtypeConfig(
314+
default_dtype_conversion=DtypeConversion.PRESERVE_INPUT
315+
)
316+
317+
channel_colors = tuple(
318+
ChannelColorMapping(
319+
cc["channel_id"], cc["channel_name"], cc["color"], cc["visible"]
320+
)
321+
for cc in config_dict.get("channel_colors", [])
322+
)
323+
324+
output_dir = PP(config_dict["output_dir"])
325+
create_composites = config_dict.get("create_composites", True)
326+
create_movies = config_dict.get("create_movies", False)
327+
movie_types = tuple(config_dict.get("movie_types", ["xy", "xz", "yz"]))
328+
movie_fps = config_dict.get("movie_fps", 10)
329+
z_gap = config_dict.get("z_gap", 1.0)
330+
z_aspect = config_dict.get("z_aspect", 0.1)
331+
dpi = config_dict.get("dpi", 150)
332+
333+
z_paths_by_channel = {
334+
WellChannelKey(
335+
well_id=wk[0],
336+
channel_id=wk[1],
337+
channel_name=wk[2],
338+
): tuple(PP(p) for p in paths)
339+
for wk, paths in z_paths_dict.items()
340+
}
341+
342+
well_channel_keys = [
343+
wk for wk in z_paths_by_channel.keys() if wk.well_id == well_id
344+
]
345+
346+
included_channels = frozenset(config_dict.get("included_channels", []))
347+
excluded_channels = frozenset(config_dict.get("excluded_channels", []))
348+
include_mode = config_dict.get("include_mode", False)
349+
350+
all_z_stacks = {}
351+
all_projections = {}
352+
353+
for wk in well_channel_keys:
354+
if include_mode:
355+
if wk.channel_id not in included_channels:
356+
continue
357+
else:
358+
if wk.channel_id in excluded_channels:
359+
continue
360+
361+
z_paths = z_paths_by_channel.get(wk)
362+
if not z_paths:
363+
continue
364+
365+
z_stack = load_z_stack(z_paths)
366+
all_z_stacks[wk.channel_id] = z_stack
367+
368+
projections = create_orthogonal_projections(
369+
z_stack,
370+
dtype_config=dtype_config,
371+
)
372+
all_projections[wk.channel_id] = projections
373+
374+
composite_path = None
375+
if create_composites and all_projections:
376+
layout = CompositeLayout(z_gap=z_gap, z_aspect=z_aspect)
377+
channel_names = tuple(
378+
wk.channel_name
379+
for wk in well_channel_keys
380+
if wk.channel_id in all_projections
381+
)
382+
title = f"Well {well_id}"
383+
384+
fig = create_multi_channel_composite(
385+
all_projections,
386+
title,
387+
layout=layout,
388+
channel_colors=channel_colors or DEFAULT_CHANNEL_COLORS,
389+
labeler=get_labeler("standard"),
390+
)
391+
392+
composite_dir = output_dir / "composites"
393+
composite_dir.mkdir(parents=True, exist_ok=True)
394+
composite_path = composite_dir / f"{well_id}_composite.png"
395+
save_composite_figure(fig, composite_path, dpi=dpi)
396+
397+
movie_outputs = []
398+
if create_movies and all_z_stacks:
399+
movie_dir = output_dir / "movies"
400+
movie_outputs = save_slice_movies_for_well(
401+
all_z_stacks,
402+
well_id,
403+
movie_dir,
404+
slice_types=movie_types,
405+
fps=movie_fps,
406+
z_gap=z_gap,
407+
channel_colors=channel_colors or DEFAULT_CHANNEL_COLORS,
408+
)
409+
410+
return WellResult(
411+
well_id=well_id,
412+
success=True,
413+
composite_path=composite_path,
414+
movie_outputs=movie_outputs,
415+
)
416+
417+
except Exception as e:
418+
import traceback
419+
420+
return WellResult(
421+
well_id=well_id,
422+
success=False,
423+
error=f"{type(e).__name__}: {e}",
424+
)
425+
426+
239427
def process_plate(config: ProcessingConfig) -> ProcessingResult:
240428
"""
241-
Process entire plate, one well at a time.
429+
Process entire plate, with optional parallel processing.
242430
"""
243431
result = ProcessingResult()
244432

@@ -275,30 +463,91 @@ def process_plate(config: ProcessingConfig) -> ProcessingResult:
275463

276464
composite_paths = {}
277465

278-
for i, well_id in enumerate(wells, 1):
279-
logger.info(f"Processing well {well_id} ({i}/{len(wells)})")
280-
281-
try:
282-
_, composite_path, movie_outputs = process_well_all_channels(
283-
well_id, well_keys, z_paths_by_channel, config, labeler
284-
)
285-
286-
if composite_path:
287-
composite_paths[well_id] = composite_path
288-
result.composite_outputs.append(composite_path)
289-
290-
result.movie_outputs.extend(movie_outputs)
291-
292-
result.processed_wells += 1
293-
294-
except Exception as e:
295-
import traceback
296-
297-
logger.error(f"Failed to process well {well_id}: {e}")
298-
logger.debug(traceback.format_exc())
299-
result.failed_wells.append(well_id)
300-
301-
gc.collect()
466+
if config.num_workers > 1:
467+
logger.info(f"Processing {len(wells)} wells with {config.num_workers} workers")
468+
469+
config_dict = {
470+
"output_dir": str(config.output_dir),
471+
"create_composites": config.create_composites,
472+
"create_movies": config.create_movies,
473+
"movie_types": config.movie_types,
474+
"movie_fps": config.movie_fps,
475+
"z_gap": config.z_gap,
476+
"z_aspect": config.z_aspect,
477+
"dpi": config.dpi,
478+
"included_channels": list(config.included_channels),
479+
"excluded_channels": list(config.excluded_channels),
480+
"include_mode": config.include_mode,
481+
"channel_colors": [
482+
{
483+
"channel_id": cc.channel_id,
484+
"channel_name": cc.channel_name,
485+
"color": cc.color,
486+
"visible": cc.visible,
487+
}
488+
for cc in config.channel_colors
489+
],
490+
}
491+
492+
z_paths_dict = {
493+
(wk.well_id, wk.channel_id, wk.channel_name): [str(p) for p in paths]
494+
for wk, paths in z_paths_by_channel.items()
495+
}
496+
497+
with ProcessPoolExecutor(max_workers=config.num_workers) as executor:
498+
futures = {
499+
executor.submit(
500+
_process_well_worker,
501+
well_id,
502+
str(config.plate_path),
503+
z_paths_dict,
504+
config_dict,
505+
): well_id
506+
for well_id in wells
507+
}
508+
509+
for future in as_completed(futures):
510+
well_id = futures[future]
511+
try:
512+
well_result = future.result()
513+
if well_result.success:
514+
if well_result.composite_path:
515+
composite_paths[well_id] = well_result.composite_path
516+
result.composite_outputs.append(well_result.composite_path)
517+
result.movie_outputs.extend(well_result.movie_outputs)
518+
result.processed_wells += 1
519+
logger.info(f" Completed {well_id}")
520+
else:
521+
result.failed_wells.append(well_id)
522+
logger.error(f" Failed {well_id}: {well_result.error}")
523+
except Exception as e:
524+
result.failed_wells.append(well_id)
525+
logger.error(f" Failed {well_id}: {e}")
526+
else:
527+
for i, well_id in enumerate(wells, 1):
528+
logger.info(f"Processing well {well_id} ({i}/{len(wells)})")
529+
530+
try:
531+
_, composite_path, movie_outputs = process_well_all_channels(
532+
well_id, well_keys, z_paths_by_channel, config, labeler
533+
)
534+
535+
if composite_path:
536+
composite_paths[well_id] = composite_path
537+
result.composite_outputs.append(composite_path)
538+
539+
result.movie_outputs.extend(movie_outputs)
540+
541+
result.processed_wells += 1
542+
543+
except Exception as e:
544+
import traceback
545+
546+
logger.error(f"Failed to process well {well_id}: {e}")
547+
logger.debug(traceback.format_exc())
548+
result.failed_wells.append(well_id)
549+
550+
gc.collect()
302551

303552
if config.create_plate_mosaic and composite_paths:
304553
logger.info("Creating plate mosaic...")
@@ -469,6 +718,14 @@ def main():
469718
"--z-aspect", type=float, default=0.1, help="Aspect ratio for XZ/YZ projections"
470719
)
471720

721+
parser.add_argument(
722+
"-j",
723+
"--jobs",
724+
type=int,
725+
default=1,
726+
help="Number of parallel workers (default: 1)",
727+
)
728+
472729
args = parser.parse_args()
473730

474731
channel_colors = DEFAULT_CHANNEL_COLORS
@@ -504,11 +761,13 @@ def main():
504761
dpi=args.dpi,
505762
z_gap=args.z_gap,
506763
z_aspect=args.z_aspect,
764+
num_workers=args.jobs,
507765
)
508766

509767
logger.info(f"Processing plate: {config.plate_path}")
510768
logger.info(f"Output directory: {config.output_dir}")
511769
logger.info(f"Projections: {config.projections}")
770+
logger.info(f"Workers: {config.num_workers}")
512771

513772
result = process_plate(config)
514773

0 commit comments

Comments
 (0)