diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index cc5aa34..eb2a872 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -40,6 +40,26 @@ jobs: working-directory: waveform-controller run: uv pip install '.[dev]' + - name: Prepare config env files for compose + working-directory: waveform-controller + run: | + mkdir -p ../config + cp config.EXAMPLE/exporter.env.EXAMPLE ../config/exporter.env + cp config.EXAMPLE/hasher.env.EXAMPLE ../config/hasher.env + cp config.EXAMPLE/controller.env.EXAMPLE ../config/controller.env + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build exporter image (cached) + uses: docker/build-push-action@v6 + with: + context: . + file: waveform-controller/Dockerfile + target: waveform_exporter + cache-from: type=gha + cache-to: type=gha,mode=max + - name: Run the tests working-directory: waveform-controller run: uv run pytest tests diff --git a/.gitignore b/.gitignore index be3f5ce..cc2903b 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,7 @@ wheels/ # settings files (should not be in the source tree anyway, but just in case) *.env + + +# pytest tmp paths +.pytest_tmp diff --git a/README.md b/README.md index 4743482..5f24fc9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,12 @@ A controller for reading waveform data from a rabbitmq queue and processing it. # Running the Code + +## Pre-reqs + +Up-to-date Docker and Docker Compose. We have seen config bugs when using old Docker Compose versions, +such as that packaged with recent Ubuntu LTS. Docker Compose v5.0.1 and Docker 29.1.5 are known to work. + ## 1 Install and deploy EMAP Follow the emap development [instructions](https://github.com/SAFEHR-data/emap/blob/main/docs/dev/core.md#deploying-a-live-version "Instructions for deploying a live version of EMAP") configure and deploy a version of EMAP. To run a local version you'll need to set diff --git a/exporter-scripts/scheduled-script.sh b/exporter-scripts/scheduled-script.sh index 6faa7be..40b2887 100755 --- a/exporter-scripts/scheduled-script.sh +++ b/exporter-scripts/scheduled-script.sh @@ -30,3 +30,4 @@ snakemake --snakefile /app/src/pipeline/Snakefile \ ret_code=$? set -e echo "$0: snakemake exited with return code $ret_code" +exit $ret_code diff --git a/pyproject.toml b/pyproject.toml index 7ee53db..17f395e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,5 +25,10 @@ emap-extract-waveform = "controller:receiver" emap-csv-pseudon = "pseudon.pseudon:psudon_cli" emap-send-ftps = "exporter.ftps:do_upload_cli" +[tool.pytest.ini_options] +# Force temp dirs under the repo so Docker can mount them on macOS. +# The default under /private/var/folders seems to silently fail (gives you an empty directory) +addopts = "--basetemp=.pytest_tmp" + [tool.mypy] ignore_missing_imports = true diff --git a/src/locations.py b/src/locations.py index a961a17..0e9d12b 100644 --- a/src/locations.py +++ b/src/locations.py @@ -3,6 +3,7 @@ WAVEFORM_EXPORT_BASE = Path("/waveform-export") WAVEFORM_ORIGINAL_CSV = WAVEFORM_EXPORT_BASE / "original-csv" WAVEFORM_ORIGINAL_PARQUET = WAVEFORM_EXPORT_BASE / "original-parquet" +WAVEFORM_HASH_LOOKUPS = WAVEFORM_EXPORT_BASE / "hash-lookups" WAVEFORM_PSEUDONYMISED_PARQUET = WAVEFORM_EXPORT_BASE / "pseudonymised" WAVEFORM_SNAKEMAKE_LOGS = WAVEFORM_EXPORT_BASE / "snakemake-logs" WAVEFORM_FTPS_LOGS = WAVEFORM_EXPORT_BASE / "ftps-logs" diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 8d88fa7..2775b22 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -1,22 +1,28 @@ import json import time from datetime import datetime, timedelta, timezone +from pathlib import Path + +import pyarrow.parquet as pq from snakemake.io import glob_wildcards + from exporter.ftps import do_upload from locations import ( + WAVEFORM_HASH_LOOKUPS, WAVEFORM_ORIGINAL_CSV, WAVEFORM_SNAKEMAKE_LOGS, WAVEFORM_PSEUDONYMISED_PARQUET, WAVEFORM_FTPS_LOGS, + ORIGINAL_PARQUET_PATTERN, FILE_STEM_PATTERN, FILE_STEM_PATTERN_HASHED, CSV_PATTERN, make_file_name, ) -from pathlib import Path from pseudon.hashing import do_hash from pseudon.pseudon import csv_to_parquets + def get_file_age(file_path: Path) -> timedelta: # need to use UTC to avoid DST issues file_time_utc = datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc) @@ -43,8 +49,53 @@ CSV_WAIT_TIME = timedelta(minutes=5) # Therefore, look at the input files and work out the eventual output files so they can # be fed into snakemake. + +class InputCsvFile: + """Represent the different files in the pipeline from the point of view of one + csn + day + variable + channel combination (ie. one "original CSV" file). + These files are glued together by the Snakemake rules. + """ + def __init__(self, + date: str, + csn: str, + variable_id: str, + channel_id: str, + units: str): + self.date = date + self.csn = csn + self.hashed_csn = hash_csn(csn) + self.variable_id = variable_id + self.channel_id = channel_id + self.units = units + self._subs_dict = dict( + date=self.date, + csn=self.csn, + hashed_csn=self.hashed_csn, + variable_id=self.variable_id, + channel_id=self.channel_id, + units=self.units + ) + + def get_original_csv_path(self) -> Path: + return Path(make_file_name(str(CSV_PATTERN), self._subs_dict)) + + def get_original_parquet_path(self) -> Path: + return Path(make_file_name(str(ORIGINAL_PARQUET_PATTERN), self._subs_dict)) + + def get_pseudonymised_parquet_path(self) -> Path: + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) + return WAVEFORM_PSEUDONYMISED_PARQUET / f"{final_stem}.parquet" + + def get_ftps_uploaded_file(self) -> Path: + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) + return WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") + + def get_daily_hash_lookup(self) -> Path: + return WAVEFORM_HASH_LOOKUPS / f"{self.date}.hashes.json" + + def determine_eventual_outputs(): - # Parse all CSVs using the basic file name pattern + # Discover all CSVs using the basic file name pattern before = time.perf_counter() all_wc = glob_wildcards(CSV_PATTERN) @@ -59,15 +110,8 @@ def determine_eventual_outputs(): _all_outputs = [] for date, csn, variable_id, channel_id, units \ in zip(all_wc.date, all_wc.csn, all_wc.variable_id, all_wc.channel_id, all_wc.units): - subs_dict = dict( - date = date, - csn = csn, - hashed_csn = hash_csn(csn), - variable_id = variable_id, - channel_id = channel_id, - units = units - ) - orig_file = Path(make_file_name(str(CSV_PATTERN), subs_dict)) + input_file_obj = InputCsvFile(date, csn, variable_id, channel_id, units) + orig_file = input_file_obj.get_original_csv_path() if csn == 'unmatched_csn': print(f"Skipping file with unmatched CSN: {orig_file}") continue @@ -75,15 +119,15 @@ def determine_eventual_outputs(): if file_age < CSV_WAIT_TIME: print(f"File too new (age={file_age}): {orig_file}") continue - final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, subs_dict) - final_output_file = WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") - _all_outputs.append(final_output_file) + _all_outputs.append(input_file_obj) after = time.perf_counter() print(f"Calculated output files using newness threshold {CSV_WAIT_TIME} in {after - before} seconds") return _all_outputs, _hash_to_csn all_outputs, hash_to_csn = determine_eventual_outputs() +ALL_FTPS_UPLOADED = [ao.get_ftps_uploaded_file() for ao in all_outputs] +ALL_DAILY_HASH_LOOKUPS = sorted({ao.get_daily_hash_lookup() for ao in all_outputs}) def configure_file_logging(log_file): import logging @@ -98,9 +142,51 @@ def configure_file_logging(log_file): return logger +def parquet_min_max_value(parquet_path: Path, column_name): + """By the magic of parquet files we can get the min/max timestamps without loading it all into + memory or even reading every row.""" + parquet_file = pq.ParquetFile(parquet_path) + column_index = parquet_file.schema_arrow.get_field_index(column_name) + if column_index == -1: + raise ValueError(f"Column '{column_name}' not found in {parquet_path}") + + lowest_min = None + highest_max = None + + metadata = parquet_file.metadata + if metadata.num_rows == 0: + return None, None + + + # each row group will have its own min/max, so take the min of mins and the max of maxes + for row_group_index in range(metadata.num_row_groups): + column_meta = metadata.row_group(row_group_index).column(column_index) + column_stats = column_meta.statistics + # We created the parquets so we know they have up-to-date statistics. + # We have already checked the file is not empty (which causes empty stats), so treat missing + # statistics as an invalid file. + if column_stats is None or not column_stats.has_min_max: + raise ValueError(f"columns stats missing or min_max missing: {column_stats}") + if lowest_min is None or column_stats.min < lowest_min: + lowest_min = column_stats.min + if highest_max is None or column_stats.max > highest_max: + highest_max = column_stats.max + + return lowest_min, highest_max + + rule all: input: - all_outputs + ftps_uploaded = ALL_FTPS_UPLOADED, + daily_hash_lookups = ALL_DAILY_HASH_LOOKUPS + +rule all_ftps_uploaded: + input: + ALL_FTPS_UPLOADED + +rule all_daily_hash_lookups: + input: + ALL_DAILY_HASH_LOOKUPS def input_file_maker(wc): unhashed_csn = hash_to_csn[wc.hashed_csn] @@ -139,6 +225,56 @@ rule csv_to_parquet: units=wildcards.units) +def pseudonymised_parquet_files_for_date(wc): + return [ao.get_pseudonymised_parquet_path() for ao in all_outputs if ao.date == wc.date] + + +rule make_daily_hash_lookup: + input: + # Because we don't declare the original parquets in the output of csv_to_parquet, + # they are not present in the Snakemake dependency DAG. Therefore, we can't reference them + # here either. + # We lie to Snakemake that the input is the pseudon parquets, but then we use InputCsvFile + # to determine the actual input file (the original parquets). + pseudonymised_parquets = pseudonymised_parquet_files_for_date + output: + hash_lookup_json = WAVEFORM_HASH_LOOKUPS / "{date}.hashes.json" + run: + daily_files = [ao for ao in all_outputs if ao.date == wildcards.date] + print( + f"Making daily hash lookup {output.hash_lookup_json} from {len(daily_files)} files: " + f"{input.pseudonymised_parquets}" + ) + min_timestamp_key = 'min_timestamp' + max_timestamp_key = 'max_timestamp' + hash_summary_by_csn = {} + for daily_file in daily_files: + entry = {} + original_parquet = daily_file.get_original_parquet_path() + entry["csn"] = daily_file.csn + entry["hashed_csn"] = daily_file.hashed_csn + min_timestamp, max_timestamp = parquet_min_max_value(original_parquet, "timestamp") + if min_timestamp is None or max_timestamp is None: + # do not contribute to stats + print(f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}") + break + entry[min_timestamp_key] = min_timestamp + entry[max_timestamp_key] = max_timestamp + existing_entry = hash_summary_by_csn.get(daily_file.csn) + if existing_entry is None: + hash_summary_by_csn[daily_file.csn] = entry + else: + # update the limits (there can be multiple files for the same CSN because each variable/channel + # is in its own file) + existing_entry[min_timestamp_key] = min(min_timestamp, existing_entry[min_timestamp_key]) + existing_entry[max_timestamp_key] = max(max_timestamp, existing_entry[max_timestamp_key]) + + hash_summary = list(hash_summary_by_csn.values()) + + with open(output.hash_lookup_json, "w") as fh: + json.dump(hash_summary, fh, indent=0) + + rule send_ftps: input: WAVEFORM_PSEUDONYMISED_PARQUET / (FILE_STEM_PATTERN_HASHED + ".parquet") diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py new file mode 100644 index 0000000..59b56b2 --- /dev/null +++ b/tests/test_snakemake_integration.py @@ -0,0 +1,339 @@ +import json +import math +import os +import re +from dataclasses import dataclass +from decimal import Decimal +import random + +import pyarrow as pa +import pyarrow.parquet as pq +import subprocess +import time +from pathlib import Path + +import pytest +from stablehash import stablehash + +from src.pseudon.hashing import do_hash + + +def _run_compose( + compose_file: Path, args: list[str], cwd: Path +) -> subprocess.CompletedProcess: + cmd = ["docker", "compose", "-f", str(compose_file), *args] + return subprocess.run(cmd, cwd=str(cwd), capture_output=True, text=True) + + +EXPECTED_COLUMN_NAMES = [ + "csn", + "mrn", + "source_variable_id", + "source_channel_id", + "units", + "sampling_rate", + "timestamp", + "location", + "values", +] + + +@pytest.fixture(scope="session", autouse=True) +def build_exporter_image(): + repo_root = Path(__file__).resolve().parents[1] + compose_file = repo_root / "docker-compose.yml" + result = _run_compose(compose_file, ["build", "waveform-exporter"], cwd=repo_root) + print(f"stdout:\n{result.stdout}\n" f"stderr:\n{result.stderr}") + result.check_returncode() + + +@dataclass +class TestFileDescription: + date: str + start_timestamp: float + csn: str + mrn: str + location: str + variable_id: str + channel_id: str + sampling_rate: int + units: str + num_rows: int + _test_values: list = None + + def get_hashed_csn(self): + return do_hash("csn", self.csn) + + def get_orig_csv(self): + return f"{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.csv" + + def get_orig_parquet(self): + return f"{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" + + def get_pseudon_parquet(self): + return f"{self.date}.{self.get_hashed_csn()}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" + + def get_hashes(self): + return f"{self.date}.hashes.json" + + def get_stable_hash(self): + """To aid in generating different but repeatable test data for each file.""" + return stablehash( + ( + self.date, + self.csn, + self.mrn, + self.location, + self.variable_id, + self.channel_id, + ) + ) + + def get_stable_seed(self): + byte_hash = self.get_stable_hash().digest()[:4] + return int.from_bytes(byte_hash) + + def generate_data(self, vals_per_row: int) -> list[list[Decimal]]: + if self._test_values is None: + seed = self.get_stable_seed() + rng = random.Random(seed) + base_ampl = rng.normalvariate(1, 0.2) + base_offset = rng.normalvariate(0, 0.2) + self._test_values = [] + for row_num in range(self.num_rows): + values_row = [ + Decimal.from_float( + base_ampl * math.sin(base_offset + row_num * vals_per_row + i) + ).quantize(Decimal("1.0000")) + for i in range(vals_per_row) + ] + self._test_values.append(values_row) + # return as string but keep the numerical representation for comparison to parquet later + return self._test_values + + +def _make_test_input_csv(tmp_path, t: TestFileDescription) -> list[list[Decimal]]: + original_csv_dir = tmp_path / "original-csv" + original_csv_dir.mkdir(parents=True, exist_ok=True) + csv_path = original_csv_dir / t.get_orig_csv() + secs_per_row = 1 + vals_per_row = t.sampling_rate * secs_per_row + test_data = t.generate_data(vals_per_row) + with open(csv_path, "w") as f: + f.write(",".join(EXPECTED_COLUMN_NAMES) + "\n") + start_time = t.start_timestamp + row_time = start_time + for td in test_data: + row_values_str = ", ".join(str(v) for v in td) + f.write( + f'{t.csn},{t.mrn},{t.variable_id},{t.channel_id},{t.units},{t.sampling_rate},{row_time},{t.location},"[{row_values_str}]"\n' + ) + row_time += secs_per_row + # The test input CSV file needs to be old enough so that snakemake doesn't skip it + old_time = time.time() - (10 * 60) + os.utime(csv_path, (old_time, old_time)) + return test_data + + +def test_snakemake_pipeline_runs_via_exporter_wrapper(tmp_path: Path): + # ARRANGE + + # all fields that need to be de-IDed should contain the string "SECRET" so we can search for it later + file1 = TestFileDescription( + "2025-01-01", + 1735740780.0, + "SECRET_CSN_1234", + "SECRET_MRN_12345", + "SECRET_LOCATION_123", + "11", + "3", + 100, + "uV", + 5, + ) + # same day, same CSN, earlier time + file2 = TestFileDescription( + "2025-01-01", + 1735740765.0, + "SECRET_CSN_1234", + "SECRET_MRN_12345", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 2, + ) + # same day, different CSN + file3 = TestFileDescription( + "2025-01-01", + 1735740783.0, + "SECRET_CSN_1235", + "SECRET_MRN_12346", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 4, + ) + # new day, first CSN again + file4 = TestFileDescription( + "2025-01-02", + 1735801965.0, + "SECRET_CSN_1234", + "SECRET_MRN_12345", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 5, + ) + test_data_files = [] + for f in [file1, file2, file3, file4]: + test_data_values = _make_test_input_csv(tmp_path, f) + test_data_files.append((f, test_data_values)) + + expected_hash_summaries = { + "2025-01-01": [ + { + "csn": file1.csn, + "hashed_csn": file1.get_hashed_csn(), + "min_timestamp": file2.start_timestamp, + "max_timestamp": ( + file1.start_timestamp + file1.num_rows - 1 + ), # one sec per row + }, + { + "csn": file3.csn, + "hashed_csn": file3.get_hashed_csn(), + "min_timestamp": file3.start_timestamp, + "max_timestamp": ( + file3.start_timestamp + file3.num_rows - 1 + ), # one sec per row + }, + ], + "2025-01-02": [ + { + "csn": file4.csn, + "hashed_csn": file4.get_hashed_csn(), + "min_timestamp": file4.start_timestamp, + "max_timestamp": ( + file4.start_timestamp + file4.num_rows - 1 + ), # one sec per row + } + ], + } + + # ACT + run_snakemake(tmp_path) + + # ASSERT (data files) + for filename, expected_data in test_data_files: + original_parquet_path = ( + tmp_path / "original-parquet" / filename.get_orig_parquet() + ) + pseudon_path = tmp_path / "pseudonymised" / filename.get_pseudon_parquet() + + assert original_parquet_path.exists() + assert pseudon_path.exists() + + _compare_original_parquet_to_expected(original_parquet_path, expected_data) + _compare_parquets(expected_data, original_parquet_path, pseudon_path) + + # ASSERT (hash summaries) + # Hash summaries are one per day, not per input file + for datestr, expected_summary in expected_hash_summaries.items(): + expected_path = tmp_path / "hash-lookups" / f"{datestr}.hashes.json" + actual_hash_lookup_data = json.loads(expected_path.read_text()) + assert isinstance(actual_hash_lookup_data, list) + # sort order to match expected + actual_hash_lookup_data.sort(key=lambda x: x["csn"]) + assert expected_summary == actual_hash_lookup_data + + # check no extraneous files + assert 4 == len(list((tmp_path / "original-csv").iterdir())) + assert 4 == len(list((tmp_path / "original-parquet").iterdir())) + assert 4 == len(list((tmp_path / "pseudonymised").iterdir())) + assert 2 == len(list((tmp_path / "hash-lookups").iterdir())) + + +def run_snakemake(tmp_path): + repo_root = Path(__file__).resolve().parents[1] + compose_file = repo_root / "docker-compose.yml" + + compose_args = [ + "run", + "--rm", + # we override the volume defined in the compose file to be the pytest tmp path + "-v", + f"{tmp_path}:/waveform-export", + "--entrypoint", + "/app/exporter-scripts/scheduled-script.sh", + "-e", + "SNAKEMAKE_RULE_UNTIL=all_daily_hash_lookups", + "-e", + "SNAKEMAKE_CORES=1", + "waveform-exporter", + ] + result = _run_compose( + compose_file, + compose_args, + cwd=repo_root, + ) + # for convenience print the snakemake log files if they exist (on success or error) + outer_logs_dir = tmp_path / "snakemake-logs" + outer_logs = sorted(outer_logs_dir.glob("snakemake-outer-log*.log")) + if not outer_logs: + print("No outer logs found") + for ol in outer_logs: + print(f"Log file {ol}:") + print(ol.read_text()) + # print all output then raise if there was an error + print(f"stdout:\n{result.stdout}\n" f"stderr:\n{result.stderr}") + result.check_returncode() + + +def _compare_original_parquet_to_expected(original_parquet: Path, expected_test_values): + # CSV should always match original parquet + orig_parquet_file = pq.ParquetFile(original_parquet) + orig_reader = orig_parquet_file.read() + orig_all_values = orig_reader["values"].combine_chunks() + expected_pa = pa.array(expected_test_values, type=orig_all_values.type) + assert orig_all_values == expected_pa + + +def _compare_parquets( + expected_test_values, original_parquet_path: Path, pseudon_parquet_path: Path +): + # columns where we expect the values to differ due to pseudonymisation + COLUMN_EXPECT_DIFFERENT = ["csn", "mrn", "location"] + orig_parquet_file = pq.ParquetFile(original_parquet_path) + pseudon_parquet_file = pq.ParquetFile(pseudon_parquet_path) + column_names = orig_parquet_file.schema_arrow.names + assert column_names == EXPECTED_COLUMN_NAMES + assert column_names == pseudon_parquet_file.schema_arrow.names + orig_reader = orig_parquet_file.read() + pseudon_reader = pseudon_parquet_file.read() + for column_name in column_names: + orig_all_values = orig_reader[column_name].combine_chunks() + pseudon_all_values = pseudon_reader[column_name].combine_chunks() + # pseudonymised contains no secrets + assert not any( + ("SECRET" in str(v) for v in pseudon_all_values) + ), f"{pseudon_all_values} in column {column_name} contains SECRET string" + if column_name not in COLUMN_EXPECT_DIFFERENT: + # no pseudon expected, should be identical + assert orig_all_values == pseudon_all_values + else: + # pseudon expected, check that it looks like a hash + assert all( + # will need lengthening when we use real hashes! + re.match(r"[a-f0-9]{8}$", str(v)) + for v in pseudon_all_values + ), f"{pseudon_all_values} in column {column_name} does not appear to be a hash" + # orig, all sensitive values contain SECRET + assert all( + "SECRET" in str(v) for v in orig_all_values + ), f"{orig_all_values} in column {column_name} contains SECRET string"