diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py index c9a704ded..725fc9e61 100644 --- a/imap_processing/hi/hi_goodtimes.py +++ b/imap_processing/hi/hi_goodtimes.py @@ -8,8 +8,9 @@ import numpy as np import pandas as pd import xarray as xr +from scipy.ndimage import convolve1d -from imap_processing.hi.utils import CoincidenceBitmap, parse_sensor_number +from imap_processing.hi.utils import CoincidenceBitmap, HiConstants, parse_sensor_number from imap_processing.quality_flags import ImapHiL1bDeFlags logger = logging.getLogger(__name__) @@ -139,7 +140,7 @@ class GoodtimesAccessor: * cull_flags : xarray.DataArray (met, spin_bin) Cull flags where 0=good time, non-zero=bad time with cull reason code * esa_step : xarray.DataArray (met,) - ESA energy step for each MET timestamp + ESA step for each MET timestamp * Attributes * sensor : str Sensor identifier ('Hi45' or 'Hi90') @@ -301,7 +302,7 @@ def get_good_intervals(self) -> np.ndarray: - spin_bin_low: Lowest good spin bin in interval - spin_bin_high: Highest good spin bin in interval - n_good_bins: Number of good bins - - esa_step: ESA energy step for this MET + - esa_step: ESA step for this MET Notes ----- @@ -353,7 +354,7 @@ def _add_intervals_for_pattern( pattern : numpy.ndarray Cull flags pattern for spin bins. esa_step : int - ESA energy step for this MET. + ESA step for this MET. """ good_bins = np.nonzero(pattern == 0)[0] @@ -906,26 +907,27 @@ def _compute_normalized_counts_per_sweep( counts_per_sweep = np.zeros(n_sweeps, dtype=np.int64) np.add.at(counts_per_sweep, event_sweep_idx[is_valid_ab.values], 1) - # Normalize by number of unique ESA steps - n_unique_esa_steps = len(np.unique(l1b_de["esa_step"].values)) - normalized_counts = counts_per_sweep / n_unique_esa_steps + # Normalize by number of unique ESA energy steps + n_unique_esa_energy_steps = len(np.unique(l1b_de["esa_energy_step"].values)) + normalized_counts = counts_per_sweep / n_unique_esa_energy_steps # Remove all variables that depend on event_met dimension ds = l1b_de.drop_dims("event_met", errors="ignore") - # Set esa_sweep and esa_step as a multi-index on epoch dimension - ds = ds.set_index(epoch=["esa_sweep", "esa_step"]) + # Set esa_sweep and esa_energy_step as a multi-index on epoch dimension + ds = ds.set_index(epoch=["esa_sweep", "esa_energy_step"]) - # Drop duplicates, keeping first occurrence of each (esa_sweep, esa_step) pair - # This handles cases where multiple packets have the same esa_sweep and esa_step + # Drop duplicates, keeping first occurrence of each (esa_sweep, esa_energy_step) + # pair. This handles cases where multiple packets have the same esa_sweep + # and esa_energy_step. ds = ds.drop_duplicates(dim="epoch", keep="first") - # Unstack to make esa_sweep and esa_step into separate dimensions - # This creates a 2D array with dimensions (esa_sweep, esa_step) + # Unstack to make esa_sweep and esa_energy_step into separate dimensions + # This creates a 2D array with dimensions (esa_sweep, esa_energy_step) ds_reshaped = ds.unstack("epoch") # Add normalized_count as a new variable - # It only has esa_sweep dimension (no esa_step variation within a sweep) + # It only has esa_sweep dimension (no esa_energy_step variation within a sweep) ds_reshaped["normalized_count"] = xr.DataArray( normalized_counts, dims=["esa_sweep"], @@ -939,10 +941,10 @@ def mark_statistical_filter_0( goodtimes_ds: xr.Dataset, l1b_de_datasets: list[xr.Dataset], current_index: int, - threshold_factor: float = 1.5, - tof_ab_limit_ns: int = 15, + threshold_factor: float = HiConstants.STAT_FILTER_0_THRESHOLD_FACTOR, + tof_ab_limit_ns: int = HiConstants.STAT_FILTER_0_TOF_AB_LIMIT_NS, cull_code: int = CullCode.LOOSE, - min_pointings: int = 4, + min_pointings: int = HiConstants.STAT_FILTER_MIN_POINTINGS, ) -> None: """ Apply Statistical Filter 0 to detect drastic penetrating background changes. @@ -965,13 +967,16 @@ def mark_statistical_filter_0( current_index : int Index of the current Pointing in l1b_de_datasets. threshold_factor : float, optional - Multiplier for median comparison. Default is 1.5 (150% of median). + Multiplier for median comparison. + Default is HiConstants.STAT_FILTER_0_THRESHOLD_FACTOR. tof_ab_limit_ns : int, optional - Maximum |tof_ab| in nanoseconds for AB coincidences. Default is 15. + Maximum |tof_ab| in nanoseconds for AB coincidences. + Default is HiConstants.STAT_FILTER_0_TOF_AB_LIMIT_NS. cull_code : int, optional Cull code to use for marking bad times. Default is CullCode.LOOSE. min_pointings : int, optional - Minimum number of Pointings required. Default is 4. + Minimum number of Pointings required. + Default is HiConstants.STAT_FILTER_MIN_POINTINGS. Raises ------ @@ -987,7 +992,7 @@ def mark_statistical_filter_0( Algorithm: 1. For each complete ESA sweep across all Pointings, count AB coincidences - where |tof_ab| <= 15ns and divide by number of ESA steps + where |tof_ab| <= tof_ab_limit_ns and divide by number of ESA steps 2. Calculate median of all normalized sweep counts 3. For each sweep in current Pointing, mark all METs in that sweep as bad if normalized count > threshold_factor * median @@ -1054,7 +1059,7 @@ def mark_statistical_filter_0( # For each bad sweep, mark the time range from first to last ccsds_met for sweep_idx in range(len(bad_sweeps_ds["esa_sweep"])): - # Get all ccsds_met values for this sweep across all esa_steps + # Get all ccsds_met values for this sweep across all esa_energy_steps sweep_mets = bad_sweeps_ds["ccsds_met"].isel(esa_sweep=sweep_idx).values # Get min and max MET values, ignoring NaNs @@ -1073,3 +1078,420 @@ def mark_statistical_filter_0( ) else: logger.info("No bad ESA sweeps identified by Statistical Filter 0") + + +def _compute_qualified_counts_per_sweep( + l1b_de: xr.Dataset, + qualified_coincidence_types: set[int], +) -> xr.Dataset: + """ + Compute qualified calibration product counts per 8-spin interval and reshape. + + Uses the (esa_sweep, esa_energy_step) multi-index to identify unique 8-spin sets, + following the same pattern as _compute_normalized_counts_per_sweep. + + Parameters + ---------- + l1b_de : xarray.Dataset + L1B Direct Event dataset with esa_sweep coordinate on epoch dimension. + qualified_coincidence_types : set[int] + Set of coincidence type integers that qualify for calibration products. + + Returns + ------- + xarray.Dataset + Reshaped dataset with dimensions (esa_sweep, esa_energy_step) containing: + - qualified_count: total qualified counts per 8-spin interval + - ccsds_met: first MET for each 8-spin interval + """ + if "esa_sweep" not in l1b_de.coords: + raise ValueError("Dataset must have esa_sweep coordinate") + + # Get values needed for counting + coincidence_type = l1b_de["coincidence_type"].values + ccsds_index = l1b_de["ccsds_index"].values + esa_sweep = l1b_de.coords["esa_sweep"].values + esa_energy_step = l1b_de["esa_energy_step"].values + + # Identify qualified events + is_qualified = np.isin(coincidence_type, list(qualified_coincidence_types)) + + # Map qualified events to their packet's (esa_sweep, esa_energy_step) + qualified_packet_idx = ccsds_index[is_qualified] + qualified_sweep = esa_sweep[qualified_packet_idx] + qualified_energy_step = esa_energy_step[qualified_packet_idx] + + # Count qualified events per (esa_sweep, esa_energy_step) using 2D array + n_sweeps = int(esa_sweep.max()) + 1 + n_esa_energy_steps = int(esa_energy_step.max()) + 1 + counts_2d = np.zeros((n_sweeps, n_esa_energy_steps), dtype=np.float64) + np.add.at(counts_2d, (qualified_sweep, qualified_energy_step), 1) + + # Remove event_met dimension and reshape using multi-index + ds = l1b_de.drop_dims("event_met", errors="ignore") + ds = ds.set_index(epoch=["esa_sweep", "esa_energy_step"]) + ds = ds.drop_duplicates(dim="epoch", keep="first") + ds_reshaped = ds.unstack("epoch") + + # Add qualified_count - aligns with (esa_sweep, esa_energy_step) coordinates + ds_reshaped["qualified_count"] = xr.DataArray( + counts_2d, + dims=["esa_sweep", "esa_energy_step"], + coords={ + "esa_sweep": np.arange(n_sweeps), + "esa_energy_step": np.arange(n_esa_energy_steps), + }, + ) + + # Set missing (sweep, energy_step) pairs to NaN so they don't affect statistics + missing_mask = ds_reshaped["ccsds_met"].isnull() + ds_reshaped["qualified_count"] = ds_reshaped["qualified_count"].where(~missing_mask) + + return ds_reshaped + + +def _build_per_sweep_datasets( + l1b_de_datasets: list[xr.Dataset], + qualified_coincidence_types: set[int], +) -> dict[int, xr.Dataset]: + """ + Build per-sweep datasets with qualified counts for each Pointing. + + Parameters + ---------- + l1b_de_datasets : list[xarray.Dataset] + List of L1B DE datasets for multiple Pointings. + qualified_coincidence_types : set[int] + Set of coincidence type integers that qualify for calibration products. + + Returns + ------- + dict[int, xarray.Dataset] + Dictionary mapping dataset index to 2D Dataset with + (esa_sweep, esa_energy_step) dims. + """ + per_sweep_datasets: dict[int, xr.Dataset] = {} + + for i, l1b_de in enumerate(l1b_de_datasets): + # Add esa_sweep coordinate and compute counts per 8-spin interval + l1b_de_with_sweep = _add_sweep_indices(l1b_de) + per_sweep = _compute_qualified_counts_per_sweep( + l1b_de_with_sweep, qualified_coincidence_types + ) + per_sweep_datasets[i] = per_sweep + + return per_sweep_datasets + + +def _compute_median_and_sigma_per_esa( + per_sweep_datasets: dict[int, xr.Dataset], +) -> tuple[xr.DataArray, xr.DataArray]: + """ + Compute median and sigma for each ESA energy step using xarray. + + Combines all per-sweep datasets and computes the median qualified count + per ESA energy step across all sweeps and pointings. + + Parameters + ---------- + per_sweep_datasets : dict[int, xarray.Dataset] + Dictionary mapping dataset index to 2D Dataset with + (esa_sweep, esa_energy_step) dims. + + Returns + ------- + tuple[xarray.DataArray, xarray.DataArray] + Tuple of (median_per_esa, sigma_per_esa) DataArrays with esa_energy_step + coordinate. ESA energy steps with zero/nan median or esa_energy_step=0 + are set to NaN/0. + """ + if not per_sweep_datasets: + empty = xr.DataArray( + [], dims=["esa_energy_step"], coords={"esa_energy_step": []} + ) + return empty, empty.astype(int) + + # Concatenate datasets along esa_sweep dimension using xarray. This handles + # different esa_energy_step coordinates by aligning and filling with NaN. + combined = xr.concat( + [ds["qualified_count"] for ds in per_sweep_datasets.values()], + dim="esa_sweep", + ) + + # Compute median along esa_sweep dimension using xarray + median_per_esa = combined.median(dim="esa_sweep", skipna=True) + + # Compute sigma: sigma ≈ √(median + 1) rounded to closest integer + sigma_per_esa = np.sqrt(median_per_esa + 1).round().astype(int) + + # Set invalid ESA energy steps (zero/nan median or esa_energy_step=0) to NaN/0 + esa_energy_step_coords = median_per_esa.coords["esa_energy_step"] + invalid_mask = ( + (esa_energy_step_coords == 0) | (median_per_esa <= 0) | median_per_esa.isnull() + ) + median_per_esa = median_per_esa.where(~invalid_mask) + sigma_per_esa = sigma_per_esa.where(~invalid_mask, 0) + + # Log warnings for invalid ESA energy steps (excluding esa_energy_step=0) + invalid_esa_energy_steps = esa_energy_step_coords.values[ + (esa_energy_step_coords != 0).values & invalid_mask.values + ] + for esa in invalid_esa_energy_steps: + logger.warning( + f"Statistical Filter 1: Median is zero/nan for ESA energy step {esa}, " + "skipping this ESA energy step" + ) + + # Log valid ESA energy steps + valid_esa_energy_steps = esa_energy_step_coords.values[~invalid_mask.values] + for esa in valid_esa_energy_steps: + logger.debug( + f"Statistical Filter 1: ESA {esa}: " + f"median={median_per_esa.sel(esa_energy_step=esa).values:.2f}, " + f"sigma={sigma_per_esa.sel(esa_energy_step=esa).values}" + ) + + return median_per_esa, sigma_per_esa + + +def _identify_cull_pattern( + current_counts: xr.DataArray, + median_per_esa: xr.DataArray, + sigma_per_esa: xr.DataArray, + consecutive_threshold_sigma: float = HiConstants.STAT_FILTER_1_CONSECUTIVE_SIGMA, + extreme_threshold_sigma: float = HiConstants.STAT_FILTER_1_EXTREME_SIGMA, + min_consecutive: int = HiConstants.STAT_FILTER_1_MIN_CONSECUTIVE, +) -> xr.DataArray: + """ + Identify 2D cull pattern for statistical filter 1 using convolution. + + Detects three patterns: + 1. Consecutive runs: 3+ consecutive sweeps exceeding threshold with ESA neighbor + confirmation (isotropic excursion pattern from C implementation) + 2. Isolated intervals: Good intervals surrounded by bad on both sides in time + 3. Extreme outliers: Any position exceeding 5-sigma threshold + + Parameters + ---------- + current_counts : xr.DataArray + 2D array of qualified counts with dims (esa_sweep, esa_energy_step). + median_per_esa : xr.DataArray + Median counts per ESA energy step. + sigma_per_esa : xr.DataArray + Sigma values per ESA energy step. + consecutive_threshold_sigma : float + Sigma multiplier for consecutive interval check. + Default is HiConstants.STAT_FILTER_1_CONSECUTIVE_SIGMA. + extreme_threshold_sigma : float + Sigma multiplier for extreme outlier check. + Default is HiConstants.STAT_FILTER_1_EXTREME_SIGMA. + min_consecutive : int + Minimum consecutive intervals above threshold. + Default is HiConstants.STAT_FILTER_1_MIN_CONSECUTIVE. + + Returns + ------- + xr.DataArray + Boolean mask with dims (esa_sweep, esa_energy_step) where + True = cull this position. + """ + # Compute thresholds using xarray broadcasting + consecutive_threshold = median_per_esa + consecutive_threshold_sigma * sigma_per_esa + extreme_threshold = median_per_esa + extreme_threshold_sigma * sigma_per_esa + + # Compute exceeds masks - handle NaN by treating as False + exceeds_consecutive = (current_counts > consecutive_threshold).fillna(False) + exceeds_extreme = (current_counts > extreme_threshold).fillna(False) + + # Get underlying numpy arrays for convolution (dims: esa_sweep x esa_energy_step) + exceeds_arr = exceeds_consecutive.values.astype(int) + + # Initialize cull mask + cull_arr = np.zeros_like(exceeds_arr, dtype=bool) + + # === Pass 1: Find consecutive runs with ESA neighbor confirmation === + # Use convolution to find runs of min_consecutive in time (axis=0 = esa_sweep) + time_kernel = np.ones(min_consecutive) + consecutive_sum = convolve1d(exceeds_arr, time_kernel, axis=0, mode="constant") + + # Dilate the consecutive detection to mark all positions in runs + # convolve1d centers the kernel, so we dilate to capture run edges + run_kernel = np.ones(min_consecutive) + run_positions = convolve1d( + (consecutive_sum >= min_consecutive).astype(int), + run_kernel, + axis=0, + mode="constant", + ) + in_consecutive_run = (run_positions >= 1) & exceeds_arr.astype(bool) + + # Check ESA neighbors at same time position using convolution along ESA axis + # Kernel [1, 0, 1] sums neighbors without counting self + # Use cval=1 so edges pass the neighbor check (matches C implementation where + # edges are treated as "not good", i.e., the check passes at boundaries) + esa_neighbor_kernel = np.array([1, 0, 1]) + esa_neighbor_exceeds = convolve1d( + exceeds_arr, esa_neighbor_kernel, axis=1, mode="constant", cval=1 + ) + has_esa_neighbor = esa_neighbor_exceeds >= 1 + + # Combine: in a consecutive run AND has ESA neighbor exceeding at same time + cull_arr |= in_consecutive_run & has_esa_neighbor + + # === Pass 2: Mark isolated good intervals (orphans) === + # Pattern: [bad, good, bad] in time dimension + # Sum neighbors in time - if both neighbors are bad (in cull_arr), sum = 2 + # Kernel [1, 0, 1] sums neighbors without counting self + neighbor_kernel = np.array([1, 0, 1]) + bad_neighbor_sum = convolve1d( + cull_arr.astype(int), neighbor_kernel, axis=0, mode="constant" + ) + # Current position is good (not in cull_arr) but both time neighbors are bad + isolated = ~cull_arr & (bad_neighbor_sum == 2) + cull_arr |= isolated + + # Log isolated intervals found + n_isolated = int(isolated.sum()) + if n_isolated > 0: + logger.debug(f"Statistical Filter 1: Found {n_isolated} isolated intervals") + + # === Pass 3: Mark extreme outliers (5-sigma) === + extreme_arr = exceeds_extreme.values + n_extreme = int((extreme_arr & ~cull_arr).sum()) + if n_extreme > 0: + logger.debug(f"Statistical Filter 1: Found {n_extreme} extreme outliers") + cull_arr |= extreme_arr + + # Convert back to xarray DataArray with same coordinates + cull_mask = xr.DataArray( + cull_arr, + dims=current_counts.dims, + coords=current_counts.coords, + ) + + return cull_mask + + +def mark_statistical_filter_1( + goodtimes_ds: xr.Dataset, + l1b_de_datasets: list[xr.Dataset], + current_index: int, + qualified_coincidence_types: set[int], + consecutive_threshold_sigma: float = HiConstants.STAT_FILTER_1_CONSECUTIVE_SIGMA, + extreme_threshold_sigma: float = HiConstants.STAT_FILTER_1_EXTREME_SIGMA, + min_consecutive_intervals: int = HiConstants.STAT_FILTER_1_MIN_CONSECUTIVE, + cull_code: int = CullCode.LOOSE, + min_pointings: int = HiConstants.STAT_FILTER_MIN_POINTINGS, +) -> None: + """ + Apply Statistical Filter 1 to detect isotropic count rate increases. + + Statistical Filter 1 from Algorithm Document Section 2.3.2.3 detects times + when qualified calibration product counts increase fairly isotropically for + a limited time. It operates per sensor, per ESA energy step, per 8-spin + interval, summing counts over all angles. + + The filter applies three passes: + 1. Mark intervals where counts exceed median + consecutive_threshold_sigma + for at least min_consecutive_intervals AND in at least one adjacent ESA step. + 2. Remove isolated good intervals (good sandwiched between two bad). + 3. Mark remaining intervals where counts exceed median + extreme_threshold_sigma. + + Parameters + ---------- + goodtimes_ds : xarray.Dataset + Goodtimes dataset for the current Pointing to update. + l1b_de_datasets : list[xarray.Dataset] + List of L1B DE datasets for surrounding Pointings. Typically includes + current plus 3 preceding and 3 following Pointings. + current_index : int + Index of the current Pointing in l1b_de_datasets. + qualified_coincidence_types : set[int] + Set of coincidence type integers that qualify for calibration products. + consecutive_threshold_sigma : float, optional + Sigma multiplier for consecutive interval check. + Default is HiConstants.STAT_FILTER_1_CONSECUTIVE_SIGMA. + extreme_threshold_sigma : float, optional + Sigma multiplier for extreme outlier check. + Default is HiConstants.STAT_FILTER_1_EXTREME_SIGMA. + min_consecutive_intervals : int, optional + Minimum consecutive intervals above threshold. + Default is HiConstants.STAT_FILTER_1_MIN_CONSECUTIVE. + cull_code : int, optional + Cull code to use for marking bad times. Default is CullCode.LOOSE. + min_pointings : int, optional + Minimum number of Pointings required. + Default is HiConstants.STAT_FILTER_MIN_POINTINGS. + + Raises + ------ + ValueError + If current_index is out of range or if fewer than min_pointings + datasets are provided. + + Notes + ----- + This function modifies goodtimes_ds in place. Should be called after + Statistical Filter 0 and other angle-independent filters. + """ + logger.info("Running mark_statistical_filter_1 culling") + + # Validate inputs + if current_index < 0 or current_index >= len(l1b_de_datasets): + raise ValueError( + f"current_index {current_index} out of range for list of " + f"length {len(l1b_de_datasets)}" + ) + + if len(l1b_de_datasets) < min_pointings: + raise ValueError( + f"At least {min_pointings} valid Pointings required, " + f"got {len(l1b_de_datasets)}" + ) + + # Step 1: Build per-sweep datasets with qualified counts for each Pointing + per_sweep_datasets = _build_per_sweep_datasets( + l1b_de_datasets, qualified_coincidence_types + ) + + # Step 2: Compute median and sigma per ESA energy step using xarray + median_per_esa, sigma_per_esa = _compute_median_and_sigma_per_esa( + per_sweep_datasets + ) + + if np.all(np.isnan(median_per_esa.values)): + logger.warning( + "Statistical Filter 1: No valid ESA energy steps " + "with non-zero median, skipping" + ) + return + + # Get current Pointing's per-sweep data (2D: esa_sweep x esa_energy_step) + current_ds = per_sweep_datasets[current_index] + current_counts = current_ds["qualified_count"] + + # Identify cull pattern using convolution-based detection + cull_mask = _identify_cull_pattern( + current_counts, + median_per_esa, + sigma_per_esa, + consecutive_threshold_sigma=consecutive_threshold_sigma, + extreme_threshold_sigma=extreme_threshold_sigma, + min_consecutive=min_consecutive_intervals, + ) + + # Apply culling to goodtimes - get METs where cull_mask is True + if cull_mask.any(): + # Use xarray's where to get METs for culled intervals, then flatten + mets_to_cull = current_ds["ccsds_met"].where(cull_mask).values.ravel() + # Remove NaN values + mets_to_cull = mets_to_cull[~np.isnan(mets_to_cull)] + + if len(mets_to_cull) > 0: + goodtimes_ds.goodtimes.mark_bad_times(met=mets_to_cull, cull=cull_code) + + logger.info( + f"Statistical Filter 1: Marked {len(mets_to_cull)} 8-spin intervals as bad" + ) + else: + logger.info("Statistical Filter 1: No bad intervals identified") diff --git a/imap_processing/hi/utils.py b/imap_processing/hi/utils.py index 44aa34ec9..a547a67b1 100644 --- a/imap_processing/hi/utils.py +++ b/imap_processing/hi/utils.py @@ -63,6 +63,19 @@ class HiConstants: Tuple of values indicating TOF2 does not contain a valid time. TOF3_BAD_VALUES : tuple[int] Tuple of values indicating TOF3 does not contain a valid time. + STAT_FILTER_MIN_POINTINGS : int + Minimum number of Pointings required for statistical filters. + STAT_FILTER_0_THRESHOLD_FACTOR : float + Multiplier for median comparison in Statistical Filter 0. + Values exceeding threshold_factor * median are culled. + STAT_FILTER_0_TOF_AB_LIMIT_NS : int + Maximum abs(tof_ab) in nanoseconds for AB coincidences in Filter 0. + STAT_FILTER_1_CONSECUTIVE_SIGMA : float + Sigma multiplier for consecutive interval check in Filter 1. + STAT_FILTER_1_EXTREME_SIGMA : float + Sigma multiplier for extreme outlier check in Filter 1. + STAT_FILTER_1_MIN_CONSECUTIVE : int + Minimum consecutive intervals above threshold in Filter 1. """ TOF1_TICK_DUR = 1 # 1 ns @@ -76,6 +89,15 @@ class HiConstants: TOF2_BAD_VALUES = (511,) TOF3_BAD_VALUES = (1023,) + # Statistical Filter tuning parameters + # See IMAP-Hi Algorithm Document Section 2.3.2.3 + STAT_FILTER_MIN_POINTINGS = 4 + STAT_FILTER_0_THRESHOLD_FACTOR = 1.5 + STAT_FILTER_0_TOF_AB_LIMIT_NS = 15 + STAT_FILTER_1_CONSECUTIVE_SIGMA = 1.8 + STAT_FILTER_1_EXTREME_SIGMA = 5.0 + STAT_FILTER_1_MIN_CONSECUTIVE = 3 + def parse_sensor_number(full_string: str) -> int: """ diff --git a/imap_processing/tests/hi/test_hi_goodtimes.py b/imap_processing/tests/hi/test_hi_goodtimes.py index 5e745dea2..fcb1f8882 100644 --- a/imap_processing/tests/hi/test_hi_goodtimes.py +++ b/imap_processing/tests/hi/test_hi_goodtimes.py @@ -9,13 +9,18 @@ INTERVAL_DTYPE, CullCode, _add_sweep_indices, + _build_per_sweep_datasets, + _compute_median_and_sigma_per_esa, _compute_normalized_counts_per_sweep, + _compute_qualified_counts_per_sweep, _get_sweep_indices, + _identify_cull_pattern, create_goodtimes_dataset, mark_drf_times, mark_incomplete_spin_sets, mark_overflow_packets, mark_statistical_filter_0, + mark_statistical_filter_1, ) from imap_processing.quality_flags import ImapHiL1bDeFlags @@ -1523,6 +1528,10 @@ def _create_test_dataset( np.repeat(np.arange(1, n_esa_steps + 1), packets_per_esa_step), n_sweeps ).astype(np.uint8) + # esa_energy_step same as esa_step for test purposes + # (in real data they can differ) + esa_energy_step = esa_step.copy() + # Create METs with unique incrementing values for each packet ccsds_met = np.arange(1000.0, 1000.0 + n_packets * 60, 60) @@ -1539,6 +1548,7 @@ def _create_test_dataset( { "ccsds_met": (["epoch"], ccsds_met), "esa_step": (["epoch"], esa_step), + "esa_energy_step": (["epoch"], esa_energy_step), "tof_ab": (["event_met"], tof_ab), "coincidence_type": (["event_met"], coincidence_type), "ccsds_index": (["event_met"], ccsds_index), @@ -1560,7 +1570,7 @@ def test_output_dimensions(self): result = _compute_normalized_counts_per_sweep(ds, tof_ab_limit_ns=15) assert "esa_sweep" in result.dims - assert "esa_step" in result.dims + assert "esa_energy_step" in result.dims assert "epoch" not in result.dims assert "event_met" not in result.dims @@ -1616,7 +1626,7 @@ def test_preserves_epoch_variables(self): assert "ccsds_met" in result.data_vars # esa_step becomes a coordinate (dimension) after unstack - assert "esa_step" in result.coords + assert "esa_energy_step" in result.coords def test_removes_event_met_variables(self): """Test that event_met dimension variables are removed.""" @@ -1710,6 +1720,8 @@ def _create_l1b_de_dataset( # Create ESA steps cycling through 1-9 for each sweep esa_step = np.tile(np.arange(1, n_esa_steps + 1), n_sweeps).astype(np.uint8) + # esa_energy_step same as esa_step for test purposes + esa_energy_step = esa_step.copy() # Create ccsds_met for packets ccsds_met = np.arange(base_met, base_met + n_packets * 60, 60, dtype=np.float64) @@ -1728,6 +1740,7 @@ def _create_l1b_de_dataset( "ccsds_index": (["event_met"], ccsds_index), "ccsds_met": (["epoch"], ccsds_met), "esa_step": (["epoch"], esa_step, {"FILLVAL": 255}), + "esa_energy_step": (["epoch"], esa_energy_step, {"FILLVAL": 255}), }, coords={ "event_met": np.arange(n_events), @@ -1807,6 +1820,7 @@ def test_partial_sweep_culling(self, goodtimes_for_filter): n_events = events_sweep1 + events_sweep2 esa_step = np.tile(np.arange(1, 10), 2).astype(np.uint8) + esa_energy_step = esa_step.copy() ccsds_met = np.arange(1000.0, 1000.0 + n_packets * 60, 60, dtype=np.float64) # Events for first sweep (packets 0-8) @@ -1827,6 +1841,7 @@ def test_partial_sweep_culling(self, goodtimes_for_filter): "ccsds_index": (["event_met"], ccsds_index), "ccsds_met": (["epoch"], ccsds_met), "esa_step": (["epoch"], esa_step, {"FILLVAL": 255}), + "esa_energy_step": (["epoch"], esa_energy_step, {"FILLVAL": 255}), }, coords={ "event_met": np.arange(n_events), @@ -1853,3 +1868,619 @@ def test_partial_sweep_culling(self, goodtimes_for_filter): assert np.all(first_sweep_flags == CullCode.GOOD) assert np.all(second_sweep_flags == CullCode.LOOSE) + + +class TestIdentifyCullPattern: + """Test suite for _identify_cull_pattern() convolution-based pattern detection.""" + + def _create_test_data( + self, n_sweeps: int = 10, n_esa_steps: int = 5 + ) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]: + """Create test counts, median, and sigma DataArrays.""" + # Create counts array (esa_sweep x esa_energy_step) + counts = xr.DataArray( + np.zeros((n_sweeps, n_esa_steps)), + dims=["esa_sweep", "esa_energy_step"], + coords={ + "esa_sweep": np.arange(n_sweeps), + "esa_energy_step": np.arange(1, n_esa_steps + 1), + }, + ) + + # Create median and sigma per ESA energy step (all valid) + median = xr.DataArray( + np.full(n_esa_steps, 10.0), + dims=["esa_energy_step"], + coords={"esa_energy_step": np.arange(1, n_esa_steps + 1)}, + ) + sigma = xr.DataArray( + np.full(n_esa_steps, 3), + dims=["esa_energy_step"], + coords={"esa_energy_step": np.arange(1, n_esa_steps + 1)}, + ) + + return counts, median, sigma + + def test_no_exceedances(self): + """Test with counts below all thresholds.""" + counts, median, sigma = self._create_test_data() + # All counts are 0, well below threshold of ~15 (10 + 1.8*3) + + cull_mask = _identify_cull_pattern(counts, median, sigma) + + assert cull_mask.dims == ("esa_sweep", "esa_energy_step") + assert not cull_mask.any() + + def test_consecutive_run_with_esa_neighbor(self): + """Test finding 3+ consecutive high counts with ESA neighbor confirmation.""" + counts, median, sigma = self._create_test_data(n_sweeps=10, n_esa_steps=5) + # threshold = 10 + 1.8 * 3 = 15.4 + + # Create 4 consecutive high counts at ESA step 3 (sweeps 2-5) + counts.loc[2:5, 3] = 20 + # Also make ESA step 2 high at same time positions (neighbor at same time) + counts.loc[2:5, 2] = 20 + + cull_mask = _identify_cull_pattern(counts, median, sigma) + + # Sweeps 2-5 at ESA 3 should be marked + # (consecutive run with neighbor at same time) + assert cull_mask.sel(esa_sweep=2, esa_energy_step=3).values + assert cull_mask.sel(esa_sweep=3, esa_energy_step=3).values + assert cull_mask.sel(esa_sweep=4, esa_energy_step=3).values + assert cull_mask.sel(esa_sweep=5, esa_energy_step=3).values + # ESA 2 should also be marked (consecutive run with ESA 3 as neighbor) + assert cull_mask.sel(esa_sweep=2, esa_energy_step=2).values + assert cull_mask.sel(esa_sweep=3, esa_energy_step=2).values + assert cull_mask.sel(esa_sweep=4, esa_energy_step=2).values + assert cull_mask.sel(esa_sweep=5, esa_energy_step=2).values + + def test_consecutive_run_no_esa_neighbor(self): + """Test that consecutive run without ESA neighbor is not marked.""" + counts, median, sigma = self._create_test_data(n_sweeps=10, n_esa_steps=5) + + # Create 4 consecutive high counts at ESA step 3, but no neighbors high + counts.loc[2:5, 3] = 20 + + cull_mask = _identify_cull_pattern(counts, median, sigma) + + # Without ESA neighbor confirmation, consecutive runs alone don't trigger + # (but extreme outliers at 5-sigma would - threshold = 10 + 5*3 = 25) + assert not cull_mask.sel(esa_energy_step=3).any() + + def test_isolated_interval_marked(self): + """Test that good interval surrounded by bad is marked.""" + counts, median, sigma = self._create_test_data(n_sweeps=10, n_esa_steps=5) + + # Create pattern: bad - good - bad at ESA step 3 + # First create a setup that triggers consecutive culling + counts.loc[0:3, 3] = 20 # 4 consecutive high + counts.loc[0:3, 2] = 20 # ESA neighbor + counts.loc[5:8, 3] = 20 # Another 4 consecutive high + counts.loc[5:8, 2] = 20 # ESA neighbor + # Sweep 4 at ESA 3 is good (low count) but surrounded by bad + + cull_mask = _identify_cull_pattern(counts, median, sigma) + + # Sweep 4 should be marked as isolated + assert cull_mask.sel(esa_sweep=4, esa_energy_step=3).values + + def test_extreme_outlier(self): + """Test detection of extreme outliers (5-sigma).""" + counts, median, sigma = self._create_test_data() + # extreme threshold = 10 + 5 * 3 = 25 + + # Single extreme outlier at sweep 5, ESA 3 + counts.loc[5, 3] = 30 + + cull_mask = _identify_cull_pattern(counts, median, sigma) + + # Only the extreme outlier should be marked + assert cull_mask.sel(esa_sweep=5, esa_energy_step=3).values + # Other positions should not be marked + assert not cull_mask.sel(esa_sweep=4, esa_energy_step=3).values + assert not cull_mask.sel(esa_sweep=6, esa_energy_step=3).values + + def test_nan_handling(self): + """Test that NaN values in counts are handled correctly.""" + counts, median, sigma = self._create_test_data() + + # Set some counts to NaN + counts.loc[3, 2] = np.nan + + cull_mask = _identify_cull_pattern(counts, median, sigma) + + # NaN positions should not be marked (treated as not exceeding) + assert not cull_mask.sel(esa_sweep=3, esa_energy_step=2).values + + def test_returns_dataarray_with_correct_coords(self): + """Test that returned mask has correct dimensions and coordinates.""" + counts, median, sigma = self._create_test_data(n_sweeps=8, n_esa_steps=4) + + cull_mask = _identify_cull_pattern(counts, median, sigma) + + assert isinstance(cull_mask, xr.DataArray) + assert cull_mask.dims == counts.dims + np.testing.assert_array_equal( + cull_mask.coords["esa_sweep"].values, counts.coords["esa_sweep"].values + ) + np.testing.assert_array_equal( + cull_mask.coords["esa_energy_step"].values, + counts.coords["esa_energy_step"].values, + ) + + def test_consecutive_run_at_first_esa_edge(self): + """Test that consecutive run at first ESA step passes neighbor check at edge.""" + counts, median, sigma = self._create_test_data(n_sweeps=10, n_esa_steps=5) + # threshold = 10 + 1.8 * 3 = 15.4 + + # Create 4 consecutive high counts at ESA step 1 (first ESA step) + counts.loc[2:5, 1] = 20 + # No ESA neighbor below (edge), but edge should pass the check + + cull_mask = _identify_cull_pattern(counts, median, sigma) + + # Sweeps 2-5 at ESA 1 should be marked (edge passes neighbor check) + assert cull_mask.sel(esa_sweep=2, esa_energy_step=1).values + assert cull_mask.sel(esa_sweep=3, esa_energy_step=1).values + assert cull_mask.sel(esa_sweep=4, esa_energy_step=1).values + assert cull_mask.sel(esa_sweep=5, esa_energy_step=1).values + + def test_consecutive_run_at_last_esa_edge(self): + """Test that consecutive run at last ESA step passes neighbor check at edge.""" + counts, median, sigma = self._create_test_data(n_sweeps=10, n_esa_steps=5) + # threshold = 10 + 1.8 * 3 = 15.4 + + # Create 4 consecutive high counts at ESA step 5 (last ESA step) + counts.loc[2:5, 5] = 20 + # No ESA neighbor above (edge), but edge should pass the check + + cull_mask = _identify_cull_pattern(counts, median, sigma) + + # Sweeps 2-5 at ESA 5 should be marked (edge passes neighbor check) + assert cull_mask.sel(esa_sweep=2, esa_energy_step=5).values + assert cull_mask.sel(esa_sweep=3, esa_energy_step=5).values + assert cull_mask.sel(esa_sweep=4, esa_energy_step=5).values + assert cull_mask.sel(esa_sweep=5, esa_energy_step=5).values + + def test_orphan_not_marked_at_time_edge(self): + """Test that positions at time edges are not marked as orphans.""" + counts, median, sigma = self._create_test_data(n_sweeps=10, n_esa_steps=5) + + # Create bad intervals at sweeps 1 and 3, leaving sweep 0 as "orphan-like" + # But sweep 0 is at edge and should NOT be marked as orphan + counts.loc[1:3, 3] = 20 # Consecutive run + counts.loc[1:3, 2] = 20 # ESA neighbor + + cull_mask = _identify_cull_pattern(counts, median, sigma) + + # Sweep 0 should NOT be marked (edge, not a true orphan) + assert not cull_mask.sel(esa_sweep=0, esa_energy_step=3).values + # Sweeps 1-3 should be marked (consecutive with neighbor) + assert cull_mask.sel(esa_sweep=1, esa_energy_step=3).values + assert cull_mask.sel(esa_sweep=2, esa_energy_step=3).values + assert cull_mask.sel(esa_sweep=3, esa_energy_step=3).values + + +class TestComputeQualifiedCountsPerSweep: + """Test suite for _compute_qualified_counts_per_sweep() helper function.""" + + def _create_test_dataset( + self, + n_packets: int = 10, + events_per_packet: int = 5, + coincidence_types: list[int] | None = None, + ) -> xr.Dataset: + """Create a test L1B DE dataset with esa_sweep coordinate.""" + n_events = n_packets * events_per_packet + + if coincidence_types is None: + # Default: mix of types, 12 (AB) is qualified + coincidence_types = [12, 4, 8, 12, 4] * (n_events // 5 + 1) + coincidence_types = coincidence_types[:n_events] + + ccsds_index = np.repeat(np.arange(n_packets), events_per_packet).astype( + np.uint16 + ) + + # Create ESA steps: 2 packets per ESA step, ESA steps 1-5 + esa_step = np.repeat(np.arange(1, n_packets // 2 + 1), 2)[:n_packets].astype( + np.uint8 + ) + # esa_energy_step same as esa_step for test purposes + esa_energy_step = esa_step.copy() + + ds = xr.Dataset( + { + "coincidence_type": ( + ["event_met"], + np.array(coincidence_types, dtype=np.uint8), + ), + "ccsds_index": (["event_met"], ccsds_index), + "ccsds_met": ( + ["epoch"], + np.arange(1000.0, 1000.0 + n_packets * 60, 60), + ), + "esa_step": (["epoch"], esa_step), + "esa_energy_step": (["epoch"], esa_energy_step), + }, + coords={ + "event_met": np.arange(n_events), + "epoch": np.arange(n_packets), + }, + ) + + # Add esa_sweep coordinate + return _add_sweep_indices(ds) + + def test_sums_counts_per_eight_spin(self): + """Test that counts are summed per 8-spin interval (esa_sweep, esa_step).""" + # 10 packets, 2 packets per ESA step = 5 unique (esa_sweep, esa_step) combos + # All in same sweep (no high-to-low transition), ESA steps 1-5 + ds = self._create_test_dataset(n_packets=10, events_per_packet=10) + qualified_types = {12} + + result = _compute_qualified_counts_per_sweep(ds, qualified_types) + + assert "qualified_count" in result.data_vars + assert "esa_sweep" in result.dims + assert "esa_energy_step" in result.dims + + # Each packet has 10 events, 4 are type 12 (from pattern [12,4,8,12,4] * 2) + # 2 packets per 8-spin set = 8 qualified counts per (esa_sweep, esa_step) + # Select only the valid ESA steps (1-5) + for esa in range(1, 6): + count = ( + result["qualified_count"].sel(esa_sweep=0, esa_energy_step=esa).values + ) + assert count == 8 + + def test_raises_without_coordinate(self): + """Test that function raises error without esa_sweep coordinate.""" + ds = xr.Dataset( + { + "coincidence_type": (["event_met"], np.array([12, 4], dtype=np.uint8)), + "ccsds_index": (["event_met"], np.array([0, 0], dtype=np.uint16)), + "ccsds_met": (["epoch"], np.array([1000.0])), + "esa_step": (["epoch"], np.array([1], dtype=np.uint8)), + }, + coords={"event_met": np.arange(2), "epoch": np.arange(1)}, + ) + + with pytest.raises(ValueError, match="must have esa_sweep coordinate"): + _compute_qualified_counts_per_sweep(ds, {12}) + + +class TestBuildPerSweepDatasets: + """Test suite for _build_per_sweep_datasets() helper function.""" + + def _create_test_dataset( + self, n_packets: int = 18, base_met: float = 1000.0 + ) -> xr.Dataset: + """Create a test L1B DE dataset with 2 packets per ESA step (9 ESA steps).""" + events_per_packet = 10 + n_events = n_packets * events_per_packet + + # All events are type 12 (qualified) + coincidence_types = np.full(n_events, 12, dtype=np.uint8) + ccsds_index = np.repeat(np.arange(n_packets), events_per_packet).astype( + np.uint16 + ) + + # 2 packets per ESA step: [1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9] + esa_step = np.repeat(np.arange(1, 10), 2).astype(np.uint8) + # esa_energy_step same as esa_step for test purposes + esa_energy_step = esa_step.copy() + + return xr.Dataset( + { + "coincidence_type": (["event_met"], coincidence_types), + "ccsds_index": (["event_met"], ccsds_index), + "ccsds_met": ( + ["epoch"], + np.arange(base_met, base_met + n_packets * 60, 60), + ), + "esa_step": (["epoch"], esa_step), + "esa_energy_step": (["epoch"], esa_energy_step), + }, + coords={ + "event_met": np.arange(n_events), + "epoch": np.arange(n_packets), + }, + ) + + def test_builds_per_sweep_datasets(self): + """Test that per-sweep datasets are built correctly.""" + ds = self._create_test_dataset() + qualified_types = {12} + + per_sweep_datasets = _build_per_sweep_datasets([ds], qualified_types) + + # Should have per-sweep dataset for index 0 with 2D structure + assert 0 in per_sweep_datasets + assert "esa_sweep" in per_sweep_datasets[0].dims + assert "esa_energy_step" in per_sweep_datasets[0].dims + # 9 ESA energy steps (1-9) in the data + assert len(per_sweep_datasets[0].coords["esa_energy_step"]) == 9 + + # Each (esa_sweep, esa_energy_step) should have 20 qualified counts + # 2 packets per ESA step, 10 events each = 20 qualified counts per 8-spin + for esa in range(1, 10): + count = ( + per_sweep_datasets[0]["qualified_count"] + .sel(esa_sweep=0, esa_energy_step=esa) + .values + ) + assert count == 20 + + def test_multiple_datasets(self): + """Test with multiple datasets.""" + ds1 = self._create_test_dataset(base_met=1000.0) + ds2 = self._create_test_dataset(base_met=2000.0) + qualified_types = {12} + + per_sweep_datasets = _build_per_sweep_datasets([ds1, ds2], qualified_types) + + # Should have per-sweep datasets for both indices + assert 0 in per_sweep_datasets + assert 1 in per_sweep_datasets + + +class TestComputeMedianAndSigmaPerEsa: + """Test suite for _compute_median_and_sigma_per_esa() helper function.""" + + def test_basic_calculation(self): + """Test basic median and sigma calculation.""" + # Create dataset with counts where median is 4 for each ESA energy step + # Using 5 sweeps with counts [2, 4, 6, 4, 4] for ESA energy steps 1-9 + n_sweeps = 5 + counts_per_sweep = [2, 4, 6, 4, 4] + counts_2d = np.zeros((n_sweeps, 10)) # ESA energy steps 0-9 + for sweep_idx, count in enumerate(counts_per_sweep): + for esa in range(1, 10): + counts_2d[sweep_idx, esa] = count + + per_sweep_datasets = { + 0: xr.Dataset( + { + "qualified_count": (["esa_sweep", "esa_energy_step"], counts_2d), + "ccsds_met": ( + ["esa_sweep", "esa_energy_step"], + np.full_like(counts_2d, 1000.0), + ), + }, + coords={ + "esa_sweep": np.arange(n_sweeps), + "esa_energy_step": np.arange(10), + }, + ) + } + + median_per_esa, sigma_per_esa = _compute_median_and_sigma_per_esa( + per_sweep_datasets + ) + + for esa in range(1, 10): + assert median_per_esa.sel(esa_energy_step=esa).values == 4.0 + # sigma = round(sqrt(4 + 1)) = round(2.236) = 2 + assert sigma_per_esa.sel(esa_energy_step=esa).values == 2 + + def test_zero_median_excluded(self): + """Test that ESA energy steps with zero median are excluded.""" + # ESA energy step 1: all zeros, ESA energy step 2: median 4 + n_sweeps = 5 + counts_2d = np.zeros((n_sweeps, 10)) + # ESA 1 stays at 0 + # ESA 2 gets counts [2, 4, 6, 4, 4] + for sweep_idx, count in enumerate([2, 4, 6, 4, 4]): + counts_2d[sweep_idx, 2] = count + + per_sweep_datasets = { + 0: xr.Dataset( + { + "qualified_count": (["esa_sweep", "esa_energy_step"], counts_2d), + "ccsds_met": ( + ["esa_sweep", "esa_energy_step"], + np.full_like(counts_2d, 1000.0), + ), + }, + coords={ + "esa_sweep": np.arange(n_sweeps), + "esa_energy_step": np.arange(10), + }, + ) + } + + median_per_esa, sigma_per_esa = _compute_median_and_sigma_per_esa( + per_sweep_datasets + ) + + # ESA energy step 1 should have NaN median (zero counts excluded) + assert np.isnan(median_per_esa.sel(esa_energy_step=1).values) + # ESA energy step 2 should have valid median + assert median_per_esa.sel(esa_energy_step=2).values == 4.0 + + def test_empty_datasets_handled(self): + """Test that empty datasets result in empty DataArrays.""" + # Empty per_sweep_datasets + per_sweep_datasets: dict[int, xr.Dataset] = {} + + median_per_esa, sigma_per_esa = _compute_median_and_sigma_per_esa( + per_sweep_datasets + ) + + assert len(median_per_esa) == 0 + assert len(sigma_per_esa) == 0 + + +class TestStatisticalFilter1: + """Test suite for mark_statistical_filter_1() integration tests.""" + + @pytest.fixture + def goodtimes_for_filter1(self): + """Create a goodtimes dataset for testing statistical filter 1.""" + # Create 18 METs (2 complete ESA sweeps) + n_mets = 18 + met_values = np.arange(1000.0, 1000.0 + n_mets * 60, 60) + + gt = xr.Dataset( + { + "cull_flags": xr.DataArray( + np.zeros((n_mets, 90), dtype=np.uint8), dims=["met", "spin_bin"] + ), + "esa_step": xr.DataArray( + np.tile(np.arange(1, 10), 2).astype(np.uint8), dims=["met"] + ), + }, + coords={"met": met_values, "spin_bin": np.arange(90)}, + attrs={"sensor": "Hi45", "pointing": 1}, + ) + return gt + + def _create_l1b_de_dataset( + self, + n_packets: int = 18, + events_per_packet: int = 10, + base_met: float = 1000.0, + coincidence_type: int = 12, + ) -> xr.Dataset: + """Create a mock L1B DE dataset.""" + n_events = n_packets * events_per_packet + + # ESA steps cycling 1-9 for each sweep + esa_step = np.tile(np.arange(1, 10), n_packets // 9 + 1)[:n_packets].astype( + np.uint8 + ) + # esa_energy_step same as esa_step for test purposes + esa_energy_step = esa_step.copy() + ccsds_met = np.arange(base_met, base_met + n_packets * 60, 60, dtype=np.float64) + coincidence_types = np.full(n_events, coincidence_type, dtype=np.uint8) + ccsds_index = np.repeat(np.arange(n_packets), events_per_packet).astype( + np.uint16 + ) + + return xr.Dataset( + data_vars={ + "coincidence_type": (["event_met"], coincidence_types), + "ccsds_index": (["event_met"], ccsds_index), + "ccsds_met": (["epoch"], ccsds_met), + "esa_step": (["epoch"], esa_step), + "esa_energy_step": (["epoch"], esa_energy_step), + }, + coords={ + "event_met": np.arange(n_events), + "epoch": np.arange(n_packets), + }, + ) + + def test_passes_normal_data(self, goodtimes_for_filter1): + """Test that normal data with consistent counts passes the filter.""" + # Current pointing at index 2 must have METs matching goodtimes (1000.0-2020.0) + l1b_de_datasets = [ + self._create_l1b_de_dataset(events_per_packet=10, base_met=0.0), + self._create_l1b_de_dataset(events_per_packet=10, base_met=500.0), + self._create_l1b_de_dataset( + events_per_packet=10, base_met=1000.0 + ), # Current + self._create_l1b_de_dataset(events_per_packet=10, base_met=2500.0), + self._create_l1b_de_dataset(events_per_packet=10, base_met=3500.0), + ] + qualified_types = {12} + + mark_statistical_filter_1( + goodtimes_for_filter1, + l1b_de_datasets, + current_index=2, + qualified_coincidence_types=qualified_types, + ) + + # All times should still be good + assert np.all(goodtimes_for_filter1["cull_flags"].values == CullCode.GOOD) + + def test_fails_extreme_outlier(self, goodtimes_for_filter1): + """Test that extreme outliers (>5-sigma) are marked as bad.""" + # Create datasets - current pointing at index 2 must have + # METs matching goodtimes + # goodtimes has METs from 1000.0 to ~2020.0 (18 METs, 60s apart) + l1b_de_datasets = [ + self._create_l1b_de_dataset(events_per_packet=10, base_met=0.0), + self._create_l1b_de_dataset(events_per_packet=10, base_met=500.0), + self._create_l1b_de_dataset( + events_per_packet=10, base_met=1000.0 + ), # Current - matches goodtimes + self._create_l1b_de_dataset(events_per_packet=10, base_met=2500.0), + self._create_l1b_de_dataset(events_per_packet=10, base_met=3500.0), + ] + + # Make current pointing have extreme counts for one interval + current_ds = l1b_de_datasets[2] + # Add many more events to first packet only + extra_events = 100 + new_coincidence = np.concatenate( + [ + current_ds["coincidence_type"].values, + np.full(extra_events, 12, dtype=np.uint8), + ] + ) + new_ccsds_index = np.concatenate( + [ + current_ds["ccsds_index"].values, + np.zeros(extra_events, dtype=np.uint16), # All to first packet + ] + ) + + l1b_de_datasets[2] = xr.Dataset( + data_vars={ + "coincidence_type": (["event_met"], new_coincidence), + "ccsds_index": (["event_met"], new_ccsds_index), + "ccsds_met": current_ds["ccsds_met"], + "esa_step": current_ds["esa_step"], + "esa_energy_step": current_ds["esa_energy_step"], + }, + coords={ + "event_met": np.arange(len(new_coincidence)), + "epoch": current_ds["epoch"], + }, + ) + + qualified_types = {12} + + mark_statistical_filter_1( + goodtimes_for_filter1, + l1b_de_datasets, + current_index=2, + qualified_coincidence_types=qualified_types, + ) + + # At least the first MET should be marked bad (extreme outlier) + assert np.any(goodtimes_for_filter1["cull_flags"].values == CullCode.LOOSE) + + def test_insufficient_pointings(self, goodtimes_for_filter1): + """Test that fewer than min_pointings raises ValueError.""" + l1b_de_datasets = [ + self._create_l1b_de_dataset(), + self._create_l1b_de_dataset(), + self._create_l1b_de_dataset(), + ] + qualified_types = {12} + + with pytest.raises(ValueError, match="At least 4 valid Pointings required"): + mark_statistical_filter_1( + goodtimes_for_filter1, + l1b_de_datasets, + current_index=1, + qualified_coincidence_types=qualified_types, + ) + + def test_current_index_out_of_range(self, goodtimes_for_filter1): + """Test that current_index out of range raises ValueError.""" + l1b_de_datasets = [self._create_l1b_de_dataset()] * 5 + qualified_types = {12} + + with pytest.raises(ValueError, match="current_index.*out of range"): + mark_statistical_filter_1( + goodtimes_for_filter1, + l1b_de_datasets, + current_index=10, + qualified_coincidence_types=qualified_types, + )