Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ jobs:
working-directory: waveform-controller
run: uv pip install '.[dev]'

- name: Prepare config env files for compose
working-directory: waveform-controller
run: |
mkdir -p ../config
cp config.EXAMPLE/exporter.env.EXAMPLE ../config/exporter.env
cp config.EXAMPLE/hasher.env.EXAMPLE ../config/hasher.env
cp config.EXAMPLE/controller.env.EXAMPLE ../config/controller.env

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Build exporter image (cached)
uses: docker/build-push-action@v6
with:
context: .
file: waveform-controller/Dockerfile
target: waveform_exporter
cache-from: type=gha
cache-to: type=gha,mode=max

- name: Run the tests
working-directory: waveform-controller
run: uv run pytest tests
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ wheels/

# settings files (should not be in the source tree anyway, but just in case)
*.env


# pytest tmp paths
.pytest_tmp
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
A controller for reading waveform data from a rabbitmq queue and processing it.

# Running the Code

## Pre-reqs

Up-to-date Docker and Docker Compose. We have seen config bugs when using old Docker Compose versions,
such as that packaged with recent Ubuntu LTS. Docker Compose v5.0.1 and Docker 29.1.5 are known to work.

## 1 Install and deploy EMAP
Follow the emap development [instructions](https://github.com/SAFEHR-data/emap/blob/main/docs/dev/core.md#deploying-a-live-version "Instructions for deploying a live version of EMAP") configure and deploy a version of EMAP. To run a local version you'll need to set

Expand Down
1 change: 1 addition & 0 deletions exporter-scripts/scheduled-script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ snakemake --snakefile /app/src/pipeline/Snakefile \
ret_code=$?
set -e
echo "$0: snakemake exited with return code $ret_code"
exit $ret_code
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,10 @@ emap-extract-waveform = "controller:receiver"
emap-csv-pseudon = "pseudon.pseudon:psudon_cli"
emap-send-ftps = "exporter.ftps:do_upload_cli"

[tool.pytest.ini_options]
# Force temp dirs under the repo so Docker can mount them on macOS.
# The default under /private/var/folders seems to silently fail (gives you an empty directory)
addopts = "--basetemp=.pytest_tmp"

[tool.mypy]
ignore_missing_imports = true
1 change: 1 addition & 0 deletions src/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
WAVEFORM_EXPORT_BASE = Path("/waveform-export")
WAVEFORM_ORIGINAL_CSV = WAVEFORM_EXPORT_BASE / "original-csv"
WAVEFORM_ORIGINAL_PARQUET = WAVEFORM_EXPORT_BASE / "original-parquet"
WAVEFORM_HASH_LOOKUPS = WAVEFORM_EXPORT_BASE / "hash-lookups"
WAVEFORM_PSEUDONYMISED_PARQUET = WAVEFORM_EXPORT_BASE / "pseudonymised"
WAVEFORM_SNAKEMAKE_LOGS = WAVEFORM_EXPORT_BASE / "snakemake-logs"
WAVEFORM_FTPS_LOGS = WAVEFORM_EXPORT_BASE / "ftps-logs"
Expand Down
166 changes: 151 additions & 15 deletions src/pipeline/Snakefile
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import json
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path

import pyarrow.parquet as pq
from snakemake.io import glob_wildcards

from exporter.ftps import do_upload
from locations import (
WAVEFORM_HASH_LOOKUPS,
WAVEFORM_ORIGINAL_CSV,
WAVEFORM_SNAKEMAKE_LOGS,
WAVEFORM_PSEUDONYMISED_PARQUET,
WAVEFORM_FTPS_LOGS,
ORIGINAL_PARQUET_PATTERN,
FILE_STEM_PATTERN,
FILE_STEM_PATTERN_HASHED,
CSV_PATTERN,
make_file_name,
)
from pathlib import Path
from pseudon.hashing import do_hash
from pseudon.pseudon import csv_to_parquets


def get_file_age(file_path: Path) -> timedelta:
# need to use UTC to avoid DST issues
file_time_utc = datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc)
Expand All @@ -43,8 +49,53 @@ CSV_WAIT_TIME = timedelta(minutes=5)
# Therefore, look at the input files and work out the eventual output files so they can
# be fed into snakemake.


class InputCsvFile:
"""Represent the different files in the pipeline from the point of view of one
csn + day + variable + channel combination (ie. one "original CSV" file).
These files are glued together by the Snakemake rules.
"""
def __init__(self,
date: str,
csn: str,
variable_id: str,
channel_id: str,
units: str):
self.date = date
self.csn = csn
self.hashed_csn = hash_csn(csn)
self.variable_id = variable_id
self.channel_id = channel_id
self.units = units
self._subs_dict = dict(
date=self.date,
csn=self.csn,
hashed_csn=self.hashed_csn,
variable_id=self.variable_id,
channel_id=self.channel_id,
units=self.units
)

def get_original_csv_path(self) -> Path:
return Path(make_file_name(str(CSV_PATTERN), self._subs_dict))

def get_original_parquet_path(self) -> Path:
return Path(make_file_name(str(ORIGINAL_PARQUET_PATTERN), self._subs_dict))

def get_pseudonymised_parquet_path(self) -> Path:
final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict)
return WAVEFORM_PSEUDONYMISED_PARQUET / f"{final_stem}.parquet"

def get_ftps_uploaded_file(self) -> Path:
final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict)
return WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json")

def get_daily_hash_lookup(self) -> Path:
return WAVEFORM_HASH_LOOKUPS / f"{self.date}.hashes.json"


def determine_eventual_outputs():
# Parse all CSVs using the basic file name pattern
# Discover all CSVs using the basic file name pattern
before = time.perf_counter()
all_wc = glob_wildcards(CSV_PATTERN)

Expand All @@ -59,31 +110,24 @@ def determine_eventual_outputs():
_all_outputs = []
for date, csn, variable_id, channel_id, units \
in zip(all_wc.date, all_wc.csn, all_wc.variable_id, all_wc.channel_id, all_wc.units):
subs_dict = dict(
date = date,
csn = csn,
hashed_csn = hash_csn(csn),
variable_id = variable_id,
channel_id = channel_id,
units = units
)
orig_file = Path(make_file_name(str(CSV_PATTERN), subs_dict))
input_file_obj = InputCsvFile(date, csn, variable_id, channel_id, units)
orig_file = input_file_obj.get_original_csv_path()
if csn == 'unmatched_csn':
print(f"Skipping file with unmatched CSN: {orig_file}")
continue
file_age = get_file_age(orig_file)
if file_age < CSV_WAIT_TIME:
print(f"File too new (age={file_age}): {orig_file}")
continue
final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, subs_dict)
final_output_file = WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json")
_all_outputs.append(final_output_file)
_all_outputs.append(input_file_obj)
after = time.perf_counter()
print(f"Calculated output files using newness threshold {CSV_WAIT_TIME} in {after - before} seconds")
return _all_outputs, _hash_to_csn


all_outputs, hash_to_csn = determine_eventual_outputs()
ALL_FTPS_UPLOADED = [ao.get_ftps_uploaded_file() for ao in all_outputs]
ALL_DAILY_HASH_LOOKUPS = sorted({ao.get_daily_hash_lookup() for ao in all_outputs})

def configure_file_logging(log_file):
import logging
Expand All @@ -98,9 +142,51 @@ def configure_file_logging(log_file):
return logger


def parquet_min_max_value(parquet_path: Path, column_name):
"""By the magic of parquet files we can get the min/max timestamps without loading it all into
memory or even reading every row."""
parquet_file = pq.ParquetFile(parquet_path)
column_index = parquet_file.schema_arrow.get_field_index(column_name)
if column_index == -1:
raise ValueError(f"Column '{column_name}' not found in {parquet_path}")

lowest_min = None
highest_max = None

metadata = parquet_file.metadata
if metadata.num_rows == 0:
return None, None


# each row group will have its own min/max, so take the min of mins and the max of maxes
for row_group_index in range(metadata.num_row_groups):
column_meta = metadata.row_group(row_group_index).column(column_index)
column_stats = column_meta.statistics
# We created the parquets so we know they have up-to-date statistics.
# We have already checked the file is not empty (which causes empty stats), so treat missing
# statistics as an invalid file.
if column_stats is None or not column_stats.has_min_max:
raise ValueError(f"columns stats missing or min_max missing: {column_stats}")
if lowest_min is None or column_stats.min < lowest_min:
lowest_min = column_stats.min
if highest_max is None or column_stats.max > highest_max:
highest_max = column_stats.max

return lowest_min, highest_max


rule all:
input:
all_outputs
ftps_uploaded = ALL_FTPS_UPLOADED,
daily_hash_lookups = ALL_DAILY_HASH_LOOKUPS

rule all_ftps_uploaded:
input:
ALL_FTPS_UPLOADED

rule all_daily_hash_lookups:
input:
ALL_DAILY_HASH_LOOKUPS

def input_file_maker(wc):
unhashed_csn = hash_to_csn[wc.hashed_csn]
Expand Down Expand Up @@ -139,6 +225,56 @@ rule csv_to_parquet:
units=wildcards.units)


def pseudonymised_parquet_files_for_date(wc):
return [ao.get_pseudonymised_parquet_path() for ao in all_outputs if ao.date == wc.date]


rule make_daily_hash_lookup:
input:
# Because we don't declare the original parquets in the output of csv_to_parquet,
# they are not present in the Snakemake dependency DAG. Therefore, we can't reference them
# here either.
# We lie to Snakemake that the input is the pseudon parquets, but then we use InputCsvFile
# to determine the actual input file (the original parquets).
pseudonymised_parquets = pseudonymised_parquet_files_for_date
output:
hash_lookup_json = WAVEFORM_HASH_LOOKUPS / "{date}.hashes.json"
run:
daily_files = [ao for ao in all_outputs if ao.date == wildcards.date]
print(
f"Making daily hash lookup {output.hash_lookup_json} from {len(daily_files)} files: "
f"{input.pseudonymised_parquets}"
)
min_timestamp_key = 'min_timestamp'
max_timestamp_key = 'max_timestamp'
hash_summary_by_csn = {}
for daily_file in daily_files:
entry = {}
original_parquet = daily_file.get_original_parquet_path()
entry["csn"] = daily_file.csn
entry["hashed_csn"] = daily_file.hashed_csn
min_timestamp, max_timestamp = parquet_min_max_value(original_parquet, "timestamp")
if min_timestamp is None or max_timestamp is None:
# do not contribute to stats
print(f"Parquet does not have a min/max value, assumed to be empty: {original_parquet}")
break
entry[min_timestamp_key] = min_timestamp
entry[max_timestamp_key] = max_timestamp
existing_entry = hash_summary_by_csn.get(daily_file.csn)
if existing_entry is None:
hash_summary_by_csn[daily_file.csn] = entry
else:
# update the limits (there can be multiple files for the same CSN because each variable/channel
# is in its own file)
existing_entry[min_timestamp_key] = min(min_timestamp, existing_entry[min_timestamp_key])
existing_entry[max_timestamp_key] = max(max_timestamp, existing_entry[max_timestamp_key])

hash_summary = list(hash_summary_by_csn.values())

with open(output.hash_lookup_json, "w") as fh:
json.dump(hash_summary, fh, indent=0)


rule send_ftps:
input:
WAVEFORM_PSEUDONYMISED_PARQUET / (FILE_STEM_PATTERN_HASHED + ".parquet")
Expand Down
Loading