From 91b30c5d2ec3f708e3fe609554a60aed0a8e940f Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 24 Jan 2026 06:41:20 +1100 Subject: [PATCH 1/5] feat: enhance transfer process to include profiling artifacts and handle empty uploads --- transfers/profiling.py | 4 +++- transfers/transfer.py | 45 +++++++++++++++++++++++++++--------------- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/transfers/profiling.py b/transfers/profiling.py index f3ec5048..934f3b47 100644 --- a/transfers/profiling.py +++ b/transfers/profiling.py @@ -87,10 +87,12 @@ def run( def upload_profile_artifacts(artifacts: Iterable[ProfileArtifact]) -> None: """Upload generated profiling artifacts to the configured storage bucket.""" - artifacts = list(artifacts) if not artifacts: + logger.info("No profiling artifacts to upload") return + artifacts = list(artifacts) + bucket = get_storage_bucket() for artifact in artifacts: for path in (artifact.stats_path, artifact.report_path): diff --git a/transfers/transfer.py b/transfers/transfer.py index 33767bd2..c7e052ca 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -296,6 +296,8 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): transfer_minor_trace_chemistry, transfer_nma_stratigraphy, transfer_associated_data, + profile_waterlevels, + profile_artifacts, ) else: _transfer_sequential( @@ -329,6 +331,8 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): profile_artifacts, ) + return profile_artifacts + def _transfer_parallel( metrics, @@ -357,6 +361,8 @@ def _transfer_parallel( transfer_minor_trace_chemistry, transfer_nma_stratigraphy, transfer_associated_data, + profile_waterlevels, + profile_artifacts, ): """Execute transfers in parallel where possible.""" message("PARALLEL TRANSFER GROUP 1") @@ -548,24 +554,31 @@ def _transfer_parallel( ("Acoustic", WaterLevelsContinuousAcousticTransferer, flags) ) - with ThreadPoolExecutor(max_workers=2) as executor: - futures = {} + if profile_waterlevels: for name, klass, task_flags in parallel_tasks_2: - future = executor.submit( - _execute_transfer_with_timing, name, klass, task_flags - ) - futures[future] = name - - for future in as_completed(futures): - name = futures[future] - try: - result_name, result, elapsed = future.result() - results_map[result_name] = result - logger.info( - f"Parallel task {result_name} completed in {elapsed:.2f}s" + profiler = TransferProfiler(f"waterlevels_continuous_{name.lower()}") + results, artifact = profiler.run(_execute_transfer, klass, task_flags) + profile_artifacts.append(artifact) + results_map[name] = results + else: + with ThreadPoolExecutor(max_workers=2) as executor: + futures = {} + for name, klass, task_flags in parallel_tasks_2: + future = executor.submit( + _execute_transfer_with_timing, name, klass, task_flags ) - except Exception as e: - logger.critical(f"Parallel task {name} failed: {e}") + futures[future] = name + + for future in as_completed(futures): + name = futures[future] + try: + result_name, result, elapsed = future.result() + results_map[result_name] = result + logger.info( + f"Parallel task {result_name} completed in {elapsed:.2f}s" + ) + except Exception as e: + logger.critical(f"Parallel task {name} failed: {e}") if "Pressure" in results_map and results_map["Pressure"]: metrics.pressure_metrics(*results_map["Pressure"]) From 811594080162c61621d7ab533ed0aefd1e044dbe Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 24 Jan 2026 07:07:51 +1100 Subject: [PATCH 2/5] feat: implement transfer options management and logging context for transfers --- transfers/transfer.py | 370 ++++++++++++++++++++---------------------- 1 file changed, 175 insertions(+), 195 deletions(-) diff --git a/transfers/transfer.py b/transfers/transfer.py index c7e052ca..25851c60 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -16,6 +16,7 @@ import os import time from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass from dotenv import load_dotenv @@ -86,6 +87,8 @@ from transfers.soil_rock_results import SoilRockResultsTransferer from transfers.surface_water_data import SurfaceWaterDataTransferer from transfers.surface_water_photos import SurfaceWaterPhotosTransferer +from contextlib import contextmanager + from transfers.util import timeit from transfers.waterlevelscontinuous_pressure_daily import ( NMAWaterLevelsContinuousPressureDailyTransferer, @@ -95,6 +98,71 @@ from transfers.logger import logger, save_log_to_bucket +@dataclass +class TransferOptions: + transfer_screens: bool + transfer_sensors: bool + transfer_contacts: bool + transfer_waterlevels: bool + transfer_pressure: bool + transfer_acoustic: bool + transfer_link_ids: bool + transfer_groups: bool + transfer_assets: bool + transfer_surface_water_photos: bool + transfer_soil_rock_results: bool + transfer_surface_water_data: bool + transfer_hydraulics_data: bool + transfer_chemistry_sampleinfo: bool + transfer_major_chemistry: bool + transfer_radionuclides: bool + transfer_ngwmn_views: bool + transfer_pressure_daily: bool + transfer_weather_data: bool + transfer_weather_photos: bool + transfer_minor_trace_chemistry: bool + transfer_nma_stratigraphy: bool + transfer_associated_data: bool + + +def load_transfer_options() -> TransferOptions: + """Read boolean toggles for each transfer from the environment.""" + + return TransferOptions( + transfer_screens=get_bool_env("TRANSFER_WELL_SCREENS", True), + transfer_sensors=get_bool_env("TRANSFER_SENSORS", True), + transfer_contacts=get_bool_env("TRANSFER_CONTACTS", True), + transfer_waterlevels=get_bool_env("TRANSFER_WATERLEVELS", True), + transfer_pressure=get_bool_env("TRANSFER_WATERLEVELS_PRESSURE", True), + transfer_acoustic=get_bool_env("TRANSFER_WATERLEVELS_ACOUSTIC", True), + transfer_link_ids=get_bool_env("TRANSFER_LINK_IDS", True), + transfer_groups=get_bool_env("TRANSFER_GROUPS", True), + transfer_assets=get_bool_env("TRANSFER_ASSETS", False), + transfer_surface_water_photos=get_bool_env( + "TRANSFER_SURFACE_WATER_PHOTOS", True + ), + transfer_soil_rock_results=get_bool_env("TRANSFER_SOIL_ROCK_RESULTS", True), + transfer_surface_water_data=get_bool_env("TRANSFER_SURFACE_WATER_DATA", True), + transfer_hydraulics_data=get_bool_env("TRANSFER_HYDRAULICS_DATA", True), + transfer_chemistry_sampleinfo=get_bool_env( + "TRANSFER_CHEMISTRY_SAMPLEINFO", True + ), + transfer_major_chemistry=get_bool_env("TRANSFER_MAJOR_CHEMISTRY", True), + transfer_radionuclides=get_bool_env("TRANSFER_RADIONUCLIDES", True), + transfer_ngwmn_views=get_bool_env("TRANSFER_NGWMN_VIEWS", True), + transfer_pressure_daily=get_bool_env( + "TRANSFER_WATERLEVELS_PRESSURE_DAILY", True + ), + transfer_weather_data=get_bool_env("TRANSFER_WEATHER_DATA", True), + transfer_weather_photos=get_bool_env("TRANSFER_WEATHER_PHOTOS", True), + transfer_minor_trace_chemistry=get_bool_env( + "TRANSFER_MINOR_TRACE_CHEMISTRY", True + ), + transfer_nma_stratigraphy=get_bool_env("TRANSFER_NMA_STRATIGRAPHY", True), + transfer_associated_data=get_bool_env("TRANSFER_ASSOCIATED_DATA", True), + ) + + def message(msg, pad=10, new_line_at_top=True): pad = "*" * pad if new_line_at_top: @@ -102,6 +170,28 @@ def message(msg, pad=10, new_line_at_top=True): logger.info(f"{pad} {msg} {pad}") +def log_transfer_start(name: str) -> None: + logger.info("Starting transfer: %s", name) + + +def log_transfer_end(name: str, extra: str | None = None) -> None: + if extra: + logger.info("Completed transfer: %s (%s)", name, extra) + else: + logger.info("Completed transfer: %s", name) + + +@contextmanager +def transfer_context(name: str, *, pad: int = 10): + """Context manager to log start/end markers for a transfer block.""" + + message(f"TRANSFERRING {name}", pad=pad) + try: + yield + finally: + logger.info("Finished %s", name) + + def _execute_transfer(klass, flags: dict = None): """Execute a single transfer class. Thread-safe since each creates its own session.""" pointids = None @@ -241,31 +331,7 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): metrics.well_metrics(*results) # Get transfer flags - transfer_screens = get_bool_env("TRANSFER_WELL_SCREENS", True) - transfer_sensors = get_bool_env("TRANSFER_SENSORS", True) - transfer_contacts = get_bool_env("TRANSFER_CONTACTS", True) - transfer_waterlevels = get_bool_env("TRANSFER_WATERLEVELS", True) - transfer_pressure = get_bool_env("TRANSFER_WATERLEVELS_PRESSURE", True) - transfer_acoustic = get_bool_env("TRANSFER_WATERLEVELS_ACOUSTIC", True) - transfer_link_ids = get_bool_env("TRANSFER_LINK_IDS", True) - transfer_groups = get_bool_env("TRANSFER_GROUPS", True) - transfer_assets = get_bool_env("TRANSFER_ASSETS", False) - transfer_surface_water_photos = get_bool_env("TRANSFER_SURFACE_WATER_PHOTOS", True) - transfer_soil_rock_results = get_bool_env("TRANSFER_SOIL_ROCK_RESULTS", True) - transfer_surface_water_data = get_bool_env("TRANSFER_SURFACE_WATER_DATA", True) - transfer_hydraulics_data = get_bool_env("TRANSFER_HYDRAULICS_DATA", True) - transfer_chemistry_sampleinfo = get_bool_env("TRANSFER_CHEMISTRY_SAMPLEINFO", True) - transfer_major_chemistry = get_bool_env("TRANSFER_MAJOR_CHEMISTRY", True) - transfer_radionuclides = get_bool_env("TRANSFER_RADIONUCLIDES", True) - transfer_ngwmn_views = get_bool_env("TRANSFER_NGWMN_VIEWS", True) - transfer_pressure_daily = get_bool_env("TRANSFER_WATERLEVELS_PRESSURE_DAILY", True) - transfer_weather_data = get_bool_env("TRANSFER_WEATHER_DATA", True) - transfer_weather_photos = get_bool_env("TRANSFER_WEATHER_PHOTOS", True) - transfer_minor_trace_chemistry = get_bool_env( - "TRANSFER_MINOR_TRACE_CHEMISTRY", True - ) - transfer_nma_stratigraphy = get_bool_env("TRANSFER_NMA_STRATIGRAPHY", True) - transfer_associated_data = get_bool_env("TRANSFER_ASSOCIATED_DATA", True) + transfer_options = load_transfer_options() use_parallel = get_bool_env("TRANSFER_PARALLEL", True) if use_parallel: @@ -273,29 +339,7 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): metrics, flags, limit, - transfer_screens, - transfer_sensors, - transfer_contacts, - transfer_waterlevels, - transfer_pressure, - transfer_acoustic, - transfer_link_ids, - transfer_groups, - transfer_assets, - transfer_surface_water_photos, - transfer_soil_rock_results, - transfer_surface_water_data, - transfer_hydraulics_data, - transfer_chemistry_sampleinfo, - transfer_major_chemistry, - transfer_radionuclides, - transfer_ngwmn_views, - transfer_pressure_daily, - transfer_weather_data, - transfer_weather_photos, - transfer_minor_trace_chemistry, - transfer_nma_stratigraphy, - transfer_associated_data, + transfer_options, profile_waterlevels, profile_artifacts, ) @@ -304,29 +348,7 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): metrics, flags, limit, - transfer_screens, - transfer_sensors, - transfer_contacts, - transfer_waterlevels, - transfer_pressure, - transfer_acoustic, - transfer_link_ids, - transfer_groups, - transfer_assets, - transfer_surface_water_photos, - transfer_soil_rock_results, - transfer_surface_water_data, - transfer_hydraulics_data, - transfer_chemistry_sampleinfo, - transfer_major_chemistry, - transfer_radionuclides, - transfer_ngwmn_views, - transfer_pressure_daily, - transfer_weather_data, - transfer_weather_photos, - transfer_minor_trace_chemistry, - transfer_nma_stratigraphy, - transfer_associated_data, + transfer_options, profile_waterlevels, profile_artifacts, ) @@ -338,80 +360,59 @@ def _transfer_parallel( metrics, flags, limit, - transfer_screens, - transfer_sensors, - transfer_contacts, - transfer_waterlevels, - transfer_pressure, - transfer_acoustic, - transfer_link_ids, - transfer_groups, - transfer_assets, - transfer_surface_water_photos, - transfer_soil_rock_results, - transfer_surface_water_data, - transfer_hydraulics_data, - transfer_chemistry_sampleinfo, - transfer_major_chemistry, - transfer_radionuclides, - transfer_ngwmn_views, - transfer_pressure_daily, - transfer_weather_data, - transfer_weather_photos, - transfer_minor_trace_chemistry, - transfer_nma_stratigraphy, - transfer_associated_data, - profile_waterlevels, + transfer_options: TransferOptions, + profile_waterlevels: bool, profile_artifacts, ): """Execute transfers in parallel where possible.""" message("PARALLEL TRANSFER GROUP 1") + opts = transfer_options # ========================================================================= # PHASE 2: Parallel Group 1 (Independent transfers after wells) # ========================================================================= parallel_tasks_1 = [] - if transfer_screens: + if opts.transfer_screens: parallel_tasks_1.append(("WellScreens", WellScreenTransferer, flags)) - if transfer_contacts: + if opts.transfer_contacts: parallel_tasks_1.append(("Contacts", ContactTransfer, flags)) - if transfer_waterlevels: + if opts.transfer_waterlevels: parallel_tasks_1.append(("WaterLevels", WaterLevelTransferer, flags)) - if transfer_link_ids: + if opts.transfer_link_ids: parallel_tasks_1.append(("LinkIdsWellData", LinkIdsWellDataTransferer, flags)) parallel_tasks_1.append( ("LinkIdsLocation", LinkIdsLocationDataTransferer, flags) ) - if transfer_groups: + if opts.transfer_groups: parallel_tasks_1.append(("Groups", ProjectGroupTransferer, flags)) - if transfer_surface_water_photos: + if opts.transfer_surface_water_photos: parallel_tasks_1.append( ("SurfaceWaterPhotos", SurfaceWaterPhotosTransferer, flags) ) - if transfer_soil_rock_results: + if opts.transfer_soil_rock_results: parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer, flags)) - if transfer_weather_photos: + if opts.transfer_weather_photos: parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer, flags)) - if transfer_assets: + if opts.transfer_assets: parallel_tasks_1.append(("Assets", AssetTransferer, flags)) - if transfer_associated_data: + if opts.transfer_associated_data: parallel_tasks_1.append(("AssociatedData", AssociatedDataTransferer, flags)) - if transfer_surface_water_data: + if opts.transfer_surface_water_data: parallel_tasks_1.append(("SurfaceWaterData", SurfaceWaterDataTransferer, flags)) - if transfer_hydraulics_data: + if opts.transfer_hydraulics_data: parallel_tasks_1.append(("HydraulicsData", HydraulicsDataTransferer, flags)) - if transfer_chemistry_sampleinfo: + if opts.transfer_chemistry_sampleinfo: parallel_tasks_1.append( ("ChemistrySampleInfo", ChemistrySampleInfoTransferer, flags) ) - if transfer_ngwmn_views: + if opts.transfer_ngwmn_views: parallel_tasks_1.append( ("NGWMNWellConstruction", NGWMNWellConstructionTransferer, flags) ) parallel_tasks_1.append(("NGWMNWaterLevels", NGWMNWaterLevelsTransferer, flags)) parallel_tasks_1.append(("NGWMNLithology", NGWMNLithologyTransferer, flags)) - if transfer_pressure_daily: + if opts.transfer_pressure_daily: parallel_tasks_1.append( ( "WaterLevelsPressureDaily", @@ -419,7 +420,7 @@ def _transfer_parallel( flags, ) ) - if transfer_weather_data: + if opts.transfer_weather_data: parallel_tasks_1.append(("WeatherData", WeatherDataTransferer, flags)) # Track results for metrics @@ -437,7 +438,7 @@ def _transfer_parallel( futures[future] = name # Submit session-based transfers - if transfer_nma_stratigraphy: + if opts.transfer_nma_stratigraphy: future = executor.submit( _execute_transfer_with_timing, "Stratigraphy", @@ -515,17 +516,17 @@ def _transfer_parallel( metrics.weather_data_metrics(*results_map["WeatherData"]) if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]: metrics.weather_photos_metrics(*results_map["WeatherPhotos"]) - if transfer_major_chemistry: + if opts.transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") results = _execute_transfer(MajorChemistryTransferer, flags=flags) metrics.major_chemistry_metrics(*results) - if transfer_radionuclides: + if opts.transfer_radionuclides: message("TRANSFERRING RADIONUCLIDES") results = _execute_transfer(RadionuclidesTransferer, flags=flags) metrics.radionuclides_metrics(*results) - if transfer_minor_trace_chemistry: + if opts.transfer_minor_trace_chemistry: message("TRANSFERRING MINOR TRACE CHEMISTRY") results = _execute_transfer(MinorTraceChemistryTransferer, flags=flags) metrics.minor_trace_chemistry_metrics(*results) @@ -533,7 +534,7 @@ def _transfer_parallel( # ========================================================================= # PHASE 3: Sensors (Sequential - required before continuous water levels) # ========================================================================= - if transfer_sensors: + if opts.transfer_sensors: message("TRANSFERRING SENSORS") results = _execute_transfer(SensorTransferer, flags=flags) metrics.sensor_metrics(*results) @@ -541,15 +542,15 @@ def _transfer_parallel( # ========================================================================= # PHASE 4: Parallel Group 2 (Continuous water levels - after sensors) # ========================================================================= - if transfer_pressure or transfer_acoustic: + if opts.transfer_pressure or opts.transfer_acoustic: message("PARALLEL TRANSFER GROUP 2 (Continuous Water Levels)") parallel_tasks_2 = [] - if transfer_pressure: + if opts.transfer_pressure: parallel_tasks_2.append( ("Pressure", WaterLevelsContinuousPressureTransferer, flags) ) - if transfer_acoustic: + if opts.transfer_acoustic: parallel_tasks_2.append( ("Acoustic", WaterLevelsContinuousAcousticTransferer, flags) ) @@ -590,130 +591,109 @@ def _transfer_sequential( metrics, flags, limit, - transfer_screens, - transfer_sensors, - transfer_contacts, - transfer_waterlevels, - transfer_pressure, - transfer_acoustic, - transfer_link_ids, - transfer_groups, - transfer_assets, - transfer_surface_water_photos, - transfer_soil_rock_results, - transfer_surface_water_data, - transfer_hydraulics_data, - transfer_chemistry_sampleinfo, - transfer_major_chemistry, - transfer_radionuclides, - transfer_ngwmn_views, - transfer_pressure_daily, - transfer_weather_data, - transfer_weather_photos, - transfer_minor_trace_chemistry, - transfer_nma_stratigraphy, - transfer_associated_data, - profile_waterlevels, + transfer_options: TransferOptions, + profile_waterlevels: bool, profile_artifacts, ): """Original sequential transfer logic.""" - if transfer_screens: - message("TRANSFERRING WELL SCREENS") - results = _execute_transfer(WellScreenTransferer, flags=flags) - metrics.well_screen_metrics(*results) - - if transfer_sensors: - message("TRANSFERRING SENSORS") - results = _execute_transfer(SensorTransferer, flags=flags) - metrics.sensor_metrics(*results) - - if transfer_contacts: - message("TRANSFERRING CONTACTS") - results = _execute_transfer(ContactTransfer, flags=flags) - metrics.contact_metrics(*results) - - message("TRANSFERRING PERMISSIONS") - with session_ctx() as session: - transfer_permissions(session) - - if transfer_nma_stratigraphy: - message("TRANSFERRING NMA STRATIGRAPHY") - results = _execute_transfer(StratigraphyLegacyTransferer, flags=flags) - metrics.nma_stratigraphy_metrics(*results) - - message("TRANSFERRING STRATIGRAPHY") - with session_ctx() as session: - results = transfer_stratigraphy(session, limit=limit) - metrics.stratigraphy_metrics(*results) - - if transfer_waterlevels: - message("TRANSFERRING WATER LEVELS") - results = _execute_transfer(WaterLevelTransferer, flags=flags) - metrics.water_level_metrics(*results) - - if transfer_link_ids: + opts = transfer_options + if opts.transfer_screens: + with transfer_context("WELL SCREENS"): + results = _execute_transfer(WellScreenTransferer, flags=flags) + metrics.well_screen_metrics(*results) + + if opts.transfer_sensors: + with transfer_context("SENSORS"): + results = _execute_transfer(SensorTransferer, flags=flags) + metrics.sensor_metrics(*results) + + if opts.transfer_contacts: + with transfer_context("CONTACTS"): + results = _execute_transfer(ContactTransfer, flags=flags) + metrics.contact_metrics(*results) + + with transfer_context("PERMISSIONS"): + with session_ctx() as session: + transfer_permissions(session) + + if opts.transfer_nma_stratigraphy: + with transfer_context("NMA STRATIGRAPHY"): + results = _execute_transfer(StratigraphyLegacyTransferer, flags=flags) + metrics.nma_stratigraphy_metrics(*results) + + with transfer_context("STRATIGRAPHY"): + with session_ctx() as session: + results = transfer_stratigraphy(session, limit=limit) + metrics.stratigraphy_metrics(*results) + + if opts.transfer_waterlevels: + with transfer_context("WATER LEVELS"): + results = _execute_transfer(WaterLevelTransferer, flags=flags) + metrics.water_level_metrics(*results) + + if opts.transfer_link_ids: message("TRANSFERRING LINK IDS") results = _execute_transfer(LinkIdsWellDataTransferer, flags=flags) metrics.welldata_link_ids_metrics(*results) results = _execute_transfer(LinkIdsLocationDataTransferer, flags=flags) metrics.location_link_ids_metrics(*results) - if transfer_groups: + if opts.transfer_groups: message("TRANSFERRING GROUPS") results = _execute_transfer(ProjectGroupTransferer, flags=flags) metrics.group_metrics(*results) - if transfer_surface_water_photos: + if opts.transfer_surface_water_photos: message("TRANSFERRING SURFACE WATER PHOTOS") results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags) metrics.surface_water_photos_metrics(*results) - if transfer_soil_rock_results: + if opts.transfer_soil_rock_results: message("TRANSFERRING SOIL ROCK RESULTS") results = _execute_transfer(SoilRockResultsTransferer, flags=flags) metrics.soil_rock_results_metrics(*results) - if transfer_weather_photos: + if opts.transfer_weather_photos: message("TRANSFERRING WEATHER PHOTOS") results = _execute_transfer(WeatherPhotosTransferer, flags=flags) metrics.weather_photos_metrics(*results) - if transfer_assets: + if opts.transfer_assets: message("TRANSFERRING ASSETS") results = _execute_transfer(AssetTransferer, flags=flags) metrics.asset_metrics(*results) - if transfer_associated_data: + if opts.transfer_associated_data: message("TRANSFERRING ASSOCIATED DATA") results = _execute_transfer(AssociatedDataTransferer, flags=flags) metrics.associated_data_metrics(*results) - if transfer_surface_water_data: + if opts.transfer_surface_water_data: message("TRANSFERRING SURFACE WATER DATA") results = _execute_transfer(SurfaceWaterDataTransferer, flags=flags) metrics.surface_water_data_metrics(*results) - if transfer_hydraulics_data: + if opts.transfer_hydraulics_data: message("TRANSFERRING HYDRAULICS DATA") results = _execute_transfer(HydraulicsDataTransferer, flags=flags) metrics.hydraulics_data_metrics(*results) - if transfer_chemistry_sampleinfo: + if opts.transfer_chemistry_sampleinfo: message("TRANSFERRING CHEMISTRY SAMPLEINFO") results = _execute_transfer(ChemistrySampleInfoTransferer, flags=flags) metrics.chemistry_sampleinfo_metrics(*results) - if transfer_major_chemistry: + if opts.transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") results = _execute_transfer(MajorChemistryTransferer, flags=flags) metrics.major_chemistry_metrics(*results) - if transfer_radionuclides: + if opts.transfer_radionuclides: message("TRANSFERRING RADIONUCLIDES") results = _execute_transfer(RadionuclidesTransferer, flags=flags) metrics.radionuclides_metrics(*results) - if transfer_ngwmn_views: + if opts.transfer_ngwmn_views: message("TRANSFERRING NGWMN WELL CONSTRUCTION") results = _execute_transfer(NGWMNWellConstructionTransferer, flags=flags) metrics.ngwmn_well_construction_metrics(*results) @@ -724,24 +704,24 @@ def _transfer_sequential( results = _execute_transfer(NGWMNLithologyTransferer, flags=flags) metrics.ngwmn_lithology_metrics(*results) - if transfer_pressure_daily: + if opts.transfer_pressure_daily: message("TRANSFERRING WATER LEVELS PRESSURE DAILY") results = _execute_transfer( NMAWaterLevelsContinuousPressureDailyTransferer, flags=flags ) metrics.waterlevels_pressure_daily_metrics(*results) - if transfer_weather_data: + if opts.transfer_weather_data: message("TRANSFERRING WEATHER DATA") results = _execute_transfer(WeatherDataTransferer, flags=flags) metrics.weather_data_metrics(*results) - if transfer_minor_trace_chemistry: + if opts.transfer_minor_trace_chemistry: message("TRANSFERRING MINOR TRACE CHEMISTRY") results = _execute_transfer(MinorTraceChemistryTransferer, flags=flags) metrics.minor_trace_chemistry_metrics(*results) - if transfer_pressure: + if opts.transfer_pressure: message("TRANSFERRING WATER LEVELS PRESSURE") if profile_waterlevels: profiler = TransferProfiler("waterlevels_continuous_pressure") @@ -755,7 +735,7 @@ def _transfer_sequential( ) metrics.pressure_metrics(*results) - if transfer_acoustic: + if opts.transfer_acoustic: message("TRANSFERRING WATER LEVELS ACOUSTIC") if profile_waterlevels: profiler = TransferProfiler("waterlevels_continuous_acoustic") From 7423bf787dcd14a8e5b9264e03fdc7fd5ee0b326 Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Sat, 24 Jan 2026 07:09:20 +1100 Subject: [PATCH 3/5] Update transfers/profiling.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- transfers/profiling.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/transfers/profiling.py b/transfers/profiling.py index 934f3b47..a5235724 100644 --- a/transfers/profiling.py +++ b/transfers/profiling.py @@ -87,12 +87,10 @@ def run( def upload_profile_artifacts(artifacts: Iterable[ProfileArtifact]) -> None: """Upload generated profiling artifacts to the configured storage bucket.""" + artifacts = list(artifacts) if not artifacts: logger.info("No profiling artifacts to upload") return - - artifacts = list(artifacts) - bucket = get_storage_bucket() for artifact in artifacts: for path in (artifact.stats_path, artifact.report_path): From 075bf24b3b4ba6d5343077751ba1b150e87dc28a Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Sat, 24 Jan 2026 07:10:46 +1100 Subject: [PATCH 4/5] Update transfers/transfer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- transfers/transfer.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/transfers/transfer.py b/transfers/transfer.py index 25851c60..45d47258 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -170,17 +170,6 @@ def message(msg, pad=10, new_line_at_top=True): logger.info(f"{pad} {msg} {pad}") -def log_transfer_start(name: str) -> None: - logger.info("Starting transfer: %s", name) - - -def log_transfer_end(name: str, extra: str | None = None) -> None: - if extra: - logger.info("Completed transfer: %s (%s)", name, extra) - else: - logger.info("Completed transfer: %s", name) - - @contextmanager def transfer_context(name: str, *, pad: int = 10): """Context manager to log start/end markers for a transfer block.""" From d03eeb2e3424c2ed6bef4434189c7b6547b06773 Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 24 Jan 2026 07:22:06 +1100 Subject: [PATCH 5/5] refactor: remove redundant safety check for test database in transfer script --- transfers/transfer.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/transfers/transfer.py b/transfers/transfer.py index 45d47258..86604a55 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -16,6 +16,7 @@ import os import time from concurrent.futures import ThreadPoolExecutor, as_completed +from contextlib import contextmanager from dataclasses import dataclass from dotenv import load_dotenv @@ -35,16 +36,6 @@ from transfers.stratigraphy_legacy import StratigraphyLegacyTransferer from transfers.stratigraphy_transfer import transfer_stratigraphy -# Safety check: Ensure we're not writing to the test database -if ( - os.getenv("POSTGRES_DB") == "ocotilloapi_test" - or os.getenv("POSTGRES_DB") == "nmsamplelocations_test" -): - raise ValueError( - "ERROR: Transfer script is configured to write to test database! " - "Set POSTGRES_DB=ocotilloapi_dev in .env file" - ) - from transfers.waterlevels_transducer_transfer import ( WaterLevelsContinuousPressureTransferer, WaterLevelsContinuousAcousticTransferer, @@ -87,7 +78,6 @@ from transfers.soil_rock_results import SoilRockResultsTransferer from transfers.surface_water_data import SurfaceWaterDataTransferer from transfers.surface_water_photos import SurfaceWaterPhotosTransferer -from contextlib import contextmanager from transfers.util import timeit from transfers.waterlevelscontinuous_pressure_daily import (