diff --git a/mapillary_tools/exif_read.py b/mapillary_tools/exif_read.py index 94266570..d874f5f6 100644 --- a/mapillary_tools/exif_read.py +++ b/mapillary_tools/exif_read.py @@ -679,6 +679,13 @@ def extract_altitude(self) -> float | None: altitude_ref = {0: 1, 1: -1} return altitude * altitude_ref.get(ref, 1) + def extract_gps_accuracy(self) -> float | None: + """Extract GPS horizontal position accuracy from EXIF GPSHPositioningError tag.""" + val = self._extract_alternative_fields( + ["GPS GPSHPositioningError", "GPS Tag 0x001F"], float + ) + return val + def extract_gps_datetime(self) -> datetime.datetime | None: """ Extract timestamp from GPS field. diff --git a/mapillary_tools/exif_write.py b/mapillary_tools/exif_write.py index cd90aed9..3c823bf0 100644 --- a/mapillary_tools/exif_write.py +++ b/mapillary_tools/exif_write.py @@ -156,6 +156,14 @@ def add_direction(self, direction: float, ref: str = "T") -> None: self._ef["GPS"][piexif.GPSIFD.GPSImgDirection], ) + def add_gps_accuracy(self, accuracy: float) -> None: + """Add GPS horizontal position accuracy in meters (EXIF GPSHPositioningError).""" + if math.isinf(accuracy) or math.isnan(accuracy) or accuracy <= 0: + accuracy = 99999.0 + accuracy = min(accuracy, 99999.0) + num = round(accuracy * 100) + self._ef["GPS"][piexif.GPSIFD.GPSHPositioningError] = (num, 100) + def add_make(self, make: str) -> None: if not make: raise ValueError("Make cannot be empty") diff --git a/mapillary_tools/geotag/geotag_images_from_video.py b/mapillary_tools/geotag/geotag_images_from_video.py index c20d9f1a..fd457118 100644 --- a/mapillary_tools/geotag/geotag_images_from_video.py +++ b/mapillary_tools/geotag/geotag_images_from_video.py @@ -16,6 +16,7 @@ from typing_extensions import override from .. import types, utils +from ..gpmf import gps_smoother, gps_weigher from .base import GeotagImagesFromGeneric from .geotag_images_from_gpx import GeotagImagesFromGPX from .geotag_videos_from_video import GeotagVideosFromVideo @@ -83,6 +84,17 @@ def to_description( image_metadatas = geotag.to_description(image_paths) + # If weighted points available, refine positions with weighted median + has_weights = ( + video_metadata.point_weights is not None + and video_metadata.point_sigma_xys is not None + ) + if has_weights: + _apply_weighted_interpolation( + image_metadatas, + video_metadata, + ) + for metadata in image_metadatas: if isinstance(metadata, types.ImageMetadata): metadata.MAPDeviceMake = video_metadata.make @@ -131,3 +143,53 @@ def to_description( num_processes=self.num_processes, ) return geotag.to_description(image_paths) + + +def _apply_weighted_interpolation( + image_metadatas: list[types.ImageMetadataOrError], + video_metadata: types.VideoMetadata, +) -> None: + """Refine image positions using weighted median interpolation. + + Modifies ImageMetadata objects in-place: updates lat, lon, and sets + MAPGPSAccuracyMeters. + + The image metadata times are in image-EXIF time (absolute), but the + video points are in video-relative time (0 to duration). GeotagImagesFromGPX + shifts video points by adding the first image's time. We apply the same + shift here so the query times align. + """ + assert video_metadata.point_weights is not None + assert video_metadata.point_sigma_xys is not None + + # Frame-sampling path only: smooth the GPS track (speed-gate + sigma-weighted + # Kalman/RTS) before interpolating image positions. Falls back to the raw points + # if numpy is unavailable. NOTE: this does NOT touch the native CAMM muxing path, + # which still muxes the unsmoothed track (see PR description). + points = gps_smoother.smooth_gps_points(video_metadata.points) + weights = video_metadata.point_weights + sigma_xys = video_metadata.point_sigma_xys + + # Find the time shift: first image time = offset added to video-relative times + valid_images = [m for m in image_metadatas if isinstance(m, types.ImageMetadata)] + if not valid_images: + return + first_image_time = min(m.time for m in valid_images) + + # Shift video point times to match image time frame + times = [first_image_time + p.time for p in points] + lats = [p.lat for p in points] + lons = [p.lon for p in points] + + for metadata in valid_images: + lat, lon, accuracy = gps_weigher.weighted_interpolate( + metadata.time, + times, + lats, + lons, + weights, + sigma_xys, + ) + metadata.lat = lat + metadata.lon = lon + metadata.MAPGPSAccuracyMeters = round(accuracy, 2) diff --git a/mapillary_tools/geotag/image_extractors/exif.py b/mapillary_tools/geotag/image_extractors/exif.py index 01470964..eba1d214 100644 --- a/mapillary_tools/geotag/image_extractors/exif.py +++ b/mapillary_tools/geotag/image_extractors/exif.py @@ -60,6 +60,7 @@ def extract(self) -> types.ImageMetadata: MAPOrientation=exif.extract_orientation(), MAPDeviceMake=exif.extract_make(), MAPDeviceModel=exif.extract_model(), + MAPGPSAccuracyMeters=exif.extract_gps_accuracy(), MAPCameraUUID=exif.extract_camera_uuid(), ) diff --git a/mapillary_tools/geotag/video_extractors/native.py b/mapillary_tools/geotag/video_extractors/native.py index a4a329e7..91f2d385 100644 --- a/mapillary_tools/geotag/video_extractors/native.py +++ b/mapillary_tools/geotag/video_extractors/native.py @@ -37,10 +37,11 @@ def extract(self) -> types.VideoMetadata: if not gps_points: raise exceptions.MapillaryGPXEmptyError("Empty GPS data found") - gps_points = T.cast( - T.List[telemetry.GPSPoint], gpmf_gps_filter.remove_noisy_points(gps_points) - ) - if not gps_points: + gps_points, sigma_xys, weights = gpmf_gps_filter.weight_points(gps_points) + gps_points = T.cast(T.List[telemetry.GPSPoint], gps_points) + + n_finite = sum(1 for w in weights if w > 0) + if n_finite == 0: raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") video_metadata = types.VideoMetadata( @@ -50,6 +51,8 @@ def extract(self) -> types.VideoMetadata: points=T.cast(T.List[geo.Point], gps_points), make=gopro_info.make, model=gopro_info.model, + point_sigma_xys=sigma_xys, + point_weights=weights, ) return video_metadata diff --git a/mapillary_tools/gpmf/gpmf_gps_filter.py b/mapillary_tools/gpmf/gpmf_gps_filter.py index 61f2ff90..eea9212e 100644 --- a/mapillary_tools/gpmf/gpmf_gps_filter.py +++ b/mapillary_tools/gpmf/gpmf_gps_filter.py @@ -4,11 +4,12 @@ # LICENSE file in the root directory of this source tree. import logging +import math import typing as T from .. import constants, geo from ..telemetry import GPSPoint -from . import gps_filter +from . import gps_filter, gps_weigher """ This module was originally used for GoPro GPS data (GPMF) filtering, @@ -59,6 +60,53 @@ def remove_outliers( ) +def weight_points( + sequence: T.Sequence[GPSPoint], + uere_nominal: float = 3.0, +) -> tuple[T.Sequence[GPSPoint], list[float], list[float]]: + """Compute per-sample sigma and weight without discarding any points. + + Returns (points, sigma_xys, weights). Points with fix=0 get sigma=inf + and weight=0. Outliers detected by speed consistency get sigma=inf. + """ + if not sequence: + return sequence, [], [] + + times = [p.time for p in sequence] + lats = [p.lat for p in sequence] + lons = [p.lon for p in sequence] + doppler_speeds = [ + p.ground_speed if p.ground_speed is not None else 0.0 for p in sequence + ] + + sigma_xys: list[float] = [] + for p in sequence: + hdop = (p.precision / 100.0) if p.precision is not None else 10.0 + fix_val = p.fix.value if p.fix is not None else 0 + sigma_xy, _ = gps_weigher.compute_sample_sigma(hdop, fix_val, uere_nominal) + sigma_xys.append(sigma_xy) + + sigma_xys = gps_weigher.apply_speed_consistency( + sigma_xys, + times, + lats, + lons, + doppler_speeds, + ) + + weights = [gps_weigher.sample_weight(s) for s in sigma_xys] + + n_finite = sum(1 for s in sigma_xys if not math.isinf(s)) + LOG.debug( + "weight_points: %d total, %d with finite sigma (%d excluded)", + len(sequence), + n_finite, + len(sequence) - n_finite, + ) + + return sequence, sigma_xys, weights + + def remove_noisy_points( sequence: T.Sequence[GPSPoint], ) -> T.Sequence[GPSPoint]: diff --git a/mapillary_tools/gpmf/gps_smoother.py b/mapillary_tools/gpmf/gps_smoother.py new file mode 100644 index 00000000..c292a04c --- /dev/null +++ b/mapillary_tools/gpmf/gps_smoother.py @@ -0,0 +1,134 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +"""Robust GPS track smoothing: reject physically-impossible jumps, then run a +sigma-weighted constant-velocity Kalman filter + RTS smoother over the whole track. + +This is the "middle ground" between the old hard noise filter (which DELETED points +and cut corners) and the soft weight-only approach (which kept all jitter): every +point is replaced by a smoothed estimate that trusts good fixes tightly and bad +fixes loosely, so outlier spikes and jitter are removed while the real shape is kept. + +NOTE: uses numpy for the 4x4 linear algebra. Pure-python port is a follow-up before +upstreaming (mapillary_tools core has no numpy dependency). +""" +from __future__ import annotations + +import dataclasses +import logging +import math +import typing as T + +from .. import geo +from ..telemetry import GPSPoint +from . import gps_weigher + +LOG = logging.getLogger(__name__) + + +def _speed_gate(lats: list[float], lons: list[float], times: list[float], cap_mps: float) -> list[bool]: + ok = [True] * len(times) + last = 0 + for i in range(1, len(times)): + dt = times[i] - times[last] + if dt <= 0: + ok[i] = False + continue + d = geo.gps_distance((lats[last], lons[last]), (lats[i], lons[i])) + if d / dt > cap_mps: + ok[i] = False + else: + last = i + return ok + + +def _kalman_rts(xy, t, sig, q_accel): + import numpy as np + + n = len(xy) + H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]], float) + x = np.zeros((n, 4)); P = np.zeros((n, 4, 4)) + xp = np.zeros((n, 4)); Pp = np.zeros((n, 4, 4)) + Fs = [np.eye(4) for _ in range(n)] + x[0] = [xy[0][0], 0.0, xy[0][1], 0.0] + P[0] = np.diag([100.0, 100.0, 100.0, 100.0]) + xp[0] = x[0]; Pp[0] = P[0] + for i in range(1, n): + dt = max(t[i] - t[i - 1], 1e-3) + F = np.array([[1, dt, 0, 0], [0, 1, 0, 0], [0, 0, 1, dt], [0, 0, 0, 1]], float) + Fs[i] = F + qb = q_accel * np.array([[dt**3 / 3, dt**2 / 2], [dt**2 / 2, dt]]) + Q = np.zeros((4, 4)); Q[:2, :2] = qb; Q[2:, 2:] = qb + xpred = F @ x[i - 1]; Ppred = F @ P[i - 1] @ F.T + Q + xp[i] = xpred; Pp[i] = Ppred + s = sig[i] + if s is None or math.isinf(s): + x[i] = xpred; P[i] = Ppred + else: + R = np.diag([s * s, s * s]) + y = np.array([xy[i][0], xy[i][1]]) - H @ xpred + S = H @ Ppred @ H.T + R + K = Ppred @ H.T @ np.linalg.inv(S) + x[i] = xpred + K @ y + P[i] = (np.eye(4) - K @ H) @ Ppred + xs = x.copy() + for i in range(n - 2, -1, -1): + F = Fs[i + 1] + C = P[i] @ F.T @ np.linalg.inv(Pp[i + 1]) + xs[i] = x[i] + C @ (xs[i + 1] - xp[i + 1]) + return [(float(xs[i, 0]), float(xs[i, 2])) for i in range(n)] + + +def smooth_gps_points( + sequence: T.Sequence[GPSPoint], + q_accel: float = 4.0, + uere_nominal: float = 3.0, + cap_mps: float | None = None, +) -> list[GPSPoint]: + """Return a new GPSPoint list with smoothed lat/lon (same length, same times).""" + if len(sequence) < 3: + return list(sequence) + + lats = [p.lat for p in sequence] + lons = [p.lon for p in sequence] + times = [p.time for p in sequence] + + # per-sample sigma; fix==0 -> inf (followed by dynamics only, not trusted) + sig = [] + for p in sequence: + hdop = (p.precision / 100.0) if p.precision is not None else 10.0 + fix_val = p.fix.value if p.fix is not None else 0 + s, _ = gps_weigher.compute_sample_sigma(hdop, fix_val, uere_nominal) + sig.append(s) + + # auto speed cap: 5x median ground speed, floor 15 m/s + if cap_mps is None: + spds = sorted(p.ground_speed for p in sequence if p.ground_speed and p.ground_speed > 0) + med = spds[len(spds) // 2] if spds else 3.0 + cap_mps = max(med * 5.0, 15.0) + + ok = _speed_gate(lats, lons, times, cap_mps) + for i in range(len(sig)): + if not ok[i]: + sig[i] = float("inf") # gated points: dynamics-only + + # project to local ENU meters + lat0, lon0 = lats[0], lons[0] + mlat = 111320.0 + mlon = 111320.0 * math.cos(math.radians(lat0)) + xy = [((lo - lon0) * mlon, (la - lat0) * mlat) for la, lo in zip(lats, lons)] + + try: + sm = _kalman_rts(xy, times, sig, q_accel) + except ImportError: + # numpy is not a hard dependency of mapillary_tools; if it is unavailable, + # degrade gracefully to the unsmoothed points rather than failing. + LOG.warning("gps_smoother: numpy unavailable; returning unsmoothed GPS points") + return list(sequence) + + out: list[GPSPoint] = [] + for p, (x, y) in zip(sequence, sm): + out.append(dataclasses.replace(p, lat=y / mlat + lat0, lon=x / mlon + lon0)) + return out diff --git a/mapillary_tools/gpmf/gps_weigher.py b/mapillary_tools/gpmf/gps_weigher.py new file mode 100644 index 00000000..af074554 --- /dev/null +++ b/mapillary_tools/gpmf/gps_weigher.py @@ -0,0 +1,239 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +"""Pluggable GPS weight computation and weighted median interpolation. + +This module implements the soft-weighting GPS interpolation algorithm +described in the design doc. Swap weight functions here without touching +the interpolation kernel or the pipeline glue. + +All functions are pure — no I/O, no global state. +""" + +from __future__ import annotations + +import bisect +import math + +MIN_SIGMA_XY = 0.5 +DOPPLER_TOLERANCE_MPS = 2.0 +MAX_SPEED_DISC = 20.0 +TYPICAL_SPEED_MPS = 1.5 + +from .. import geo + + +def gps_distance(p1: tuple[float, float], p2: tuple[float, float]) -> float: + return geo.gps_distance(p1, p2) + + +def compute_sample_sigma( + hdop: float, + fix: int, + uere_nominal: float, +) -> tuple[float, float]: + if fix == 0: + return float("inf"), float("inf") + if hdop <= 0: + hdop = 10.0 + uere_eff = uere_nominal * (1.0 + 0.12 * hdop**1.5) + sigma_xy = hdop * uere_eff + sigma_z = 1.8 * hdop * uere_eff + if fix == 2: + sigma_xy *= 4.0 + sigma_z = float("inf") + sigma_xy = max(sigma_xy, MIN_SIGMA_XY) + return sigma_xy, sigma_z + + +def sample_weight(sigma_xy: float) -> float: + if math.isinf(sigma_xy): + return 0.0 + return 1.0 / (sigma_xy * sigma_xy) + + +def apply_speed_consistency( + sigma_xys: list[float], + times: list[float], + lats: list[float], + lons: list[float], + doppler_speeds: list[float], + max_abs_speed: float = 0.0, +) -> list[float]: + """Penalize samples with implausible speed. + + Two checks: + 1. Absolute speed cap: if the GPS-reported Doppler speed exceeds + max_abs_speed, the sample is almost certainly a multipath glitch. + Set max_abs_speed=0 to auto-detect from median track speed × 5. + 2. Doppler discrepancy: if implied speed between consecutive samples + exceeds the reported Doppler speed, inflate sigma. + """ + adjusted = list(sigma_xys) + + # Auto-detect max plausible speed from the track itself + if max_abs_speed <= 0: + valid_speeds = [s for s in doppler_speeds if 0 < s < 50] + if valid_speeds: + med_speed = sorted(valid_speeds)[len(valid_speeds) // 2] + max_abs_speed = max(med_speed * 5.0, 10.0) + else: + max_abs_speed = 10.0 + + # Two-pass: forward then backward, using last accepted sample as reference + for direction in (1, -1): + rng = range(len(times)) if direction == 1 else range(len(times) - 1, -1, -1) + last_good_i = -1 + for i in rng: + if math.isinf(adjusted[i]): + continue + + # Check 1: absolute speed cap on Doppler + if doppler_speeds[i] > max_abs_speed: + adjusted[i] = float("inf") + continue + + # Check 2: implied speed from last accepted sample + if last_good_i >= 0: + dt = abs(times[i] - times[last_good_i]) + if dt > 0: + v_implied = ( + gps_distance( + (lats[last_good_i], lons[last_good_i]), + (lats[i], lons[i]), + ) + / dt + ) + if v_implied > max_abs_speed: + adjusted[i] = float("inf") + continue + + v_doppler = doppler_speeds[i] + disc = min( + max(0.0, v_implied - v_doppler - DOPPLER_TOLERANCE_MPS), + MAX_SPEED_DISC, + ) + if disc > 0: + adjusted[i] *= 1.0 + 1.5 * disc + + last_good_i = i + return adjusted + + +def weighted_median( + values: list[float], + lo: int, + hi: int, + weights: list[float], +) -> float: + n = hi - lo + if n <= 0: + raise ValueError("empty range") + paired = sorted( + ((values[lo + i], weights[i]) for i in range(n)), + key=lambda x: x[0], + ) + total = sum(weights) + if total <= 0: + return values[lo] + half = total / 2.0 + cumsum = 0.0 + for val, w in paired: + cumsum += w + if cumsum >= half: + return val + return paired[-1][0] + + +def weighted_median_unwrapped( + lons: list[float], + lo: int, + hi: int, + weights: list[float], +) -> float: + n = hi - lo + raw = [lons[lo + i] for i in range(n)] + if max(raw) - min(raw) > 180.0: + raw = [v + 360.0 if v < 0 else v for v in raw] + paired = sorted(zip(raw, weights), key=lambda x: x[0]) + total = sum(weights) + if total <= 0: + return lons[lo] + half = total / 2.0 + cumsum = 0.0 + for val, w in paired: + cumsum += w + if cumsum >= half: + return ((val + 180.0) % 360.0) - 180.0 + result = paired[-1][0] + return ((result + 180.0) % 360.0) - 180.0 + + +def _find_nearest_valid( + weights: list[float], start: int, direction: int, limit: int = -1 +) -> int: + """Find the nearest index with positive weight, searching in direction.""" + i = start + while 0 <= i < len(weights) and (limit < 0 or i != limit): + if weights[i] > 0: + return i + i += direction + return -1 + + +def weighted_interpolate( + t: float, + times: list[float], + lats: list[float], + lons: list[float], + weights: list[float], + sigma_xys: list[float], + tau: float = 1.0, +) -> tuple[float, float, float]: + window = 3.0 * tau + inv_2tau2 = 1.0 / (2.0 * tau * tau) + + lo = bisect.bisect_left(times, t - window) + hi = bisect.bisect_right(times, t + window) + + if lo >= hi: + # Find nearest points with positive weight on each side + left_i = _find_nearest_valid(weights, lo - 1, -1) + right_i = _find_nearest_valid(weights, hi, 1, len(times)) + if left_i < 0 and right_i < 0: + return 0.0, 0.0, float("inf") + if left_i < 0: + return lats[right_i], lons[right_i], sigma_xys[right_i] + if right_i < 0: + return lats[left_i], lons[left_i], sigma_xys[left_i] + dt_total = times[right_i] - times[left_i] + w = (t - times[left_i]) / dt_total if dt_total > 0 else 0.5 + lat = lats[left_i] + (lats[right_i] - lats[left_i]) * w + lon = lons[left_i] + (lons[right_i] - lons[left_i]) * w + nearest_dt = min(abs(t - times[left_i]), abs(t - times[right_i])) + accuracy = ( + max(sigma_xys[left_i], sigma_xys[right_i]) + nearest_dt * TYPICAL_SPEED_MPS + ) + return lat, lon, max(accuracy, MIN_SIGMA_XY) + + combined_w: list[float] = [] + for i in range(lo, hi): + dt = t - times[i] + k = math.exp(-dt * dt * inv_2tau2) + combined_w.append(weights[i] * k) + + total_w = sum(combined_w) + if total_w <= 0: + valid = _find_nearest_valid(weights, lo, 1, hi) + if valid < 0: + valid = _find_nearest_valid(weights, lo - 1, -1) + if valid < 0: + return 0.0, 0.0, float("inf") + return lats[valid], lons[valid], sigma_xys[valid] + + lat = weighted_median(lats, lo, hi, combined_w) + lon = weighted_median_unwrapped(lons, lo, hi, combined_w) + accuracy = 1.0 / math.sqrt(total_w) + return lat, lon, max(accuracy, MIN_SIGMA_XY) diff --git a/mapillary_tools/sample_video.py b/mapillary_tools/sample_video.py index 12978718..188540b8 100644 --- a/mapillary_tools/sample_video.py +++ b/mapillary_tools/sample_video.py @@ -7,8 +7,10 @@ import datetime import logging +import math import os import shutil +import struct import time import typing as T from contextlib import contextmanager @@ -17,6 +19,8 @@ from . import constants, exceptions, ffmpeg as ffmpeglib, geo, types, utils from .exif_write import ExifEdit from .geotag import geotag_videos_from_video +from .gpmf.gps_smoother import smooth_gps_points +from .gpmf.gps_weigher import weighted_interpolate from .mp4 import mp4_sample_parser from .serializer.description import parse_capture_time @@ -316,8 +320,22 @@ def _sample_single_video_by_distance( video_stream_idx = video_stream["index"] moov_parser = mp4_sample_parser.MovieBoxParser.parse_file(video_path) video_track_parser = moov_parser.extract_track_at(video_stream_idx) + + # Frame-sampling path only: smooth the GoPro GPS track once (speed-gate + + # sigma-weighted Kalman/RTS) and use it for BOTH frame selection and per-frame + # positions, so sampling follows the cleaned track rather than the raw/jittery one. + # Falls back to the raw points if numpy is unavailable. This does NOT change the + # native CAMM muxing path, which still muxes video_metadata.points (see PR description). + if ( + video_metadata.point_weights is not None + and video_metadata.point_sigma_xys is not None + ): + sampling_points = smooth_gps_points(video_metadata.points) + else: + sampling_points = video_metadata.points + sample_points_by_frame_idx = _sample_video_stream_by_distance( - video_metadata.points, video_track_parser, sample_distance + sampling_points, video_track_parser, sample_distance ) sorted_sample_indices = sorted(sample_points_by_frame_idx.keys()) @@ -362,11 +380,35 @@ def _sample_single_video_by_distance( ) else: timestamp = start_time + datetime.timedelta(seconds=interp.time) + lat, lon = interp.lat, interp.lon + accuracy: float | None = None + + # Refine with weighted interpolation if weights available + if ( + video_metadata.point_weights is not None + and video_metadata.point_sigma_xys is not None + ): + w_times = [p.time for p in sampling_points] + w_lats = [p.lat for p in sampling_points] + w_lons = [p.lon for p in sampling_points] + lat, lon, accuracy = weighted_interpolate( + interp.time, + w_times, + w_lats, + w_lons, + video_metadata.point_weights, + video_metadata.point_sigma_xys, + ) + + # Skip frames with no valid position (inf accuracy = no GPS data) + if accuracy is not None and math.isinf(accuracy): + continue + exif_edit = ExifEdit(sample_paths[0]) exif_edit.add_date_time_original(timestamp) exif_edit.add_gps_datetime(timestamp) - exif_edit.add_lat_lon(interp.lat, interp.lon) - if interp.alt is not None: + exif_edit.add_lat_lon(lat, lon) + if interp.alt is not None and abs(interp.alt) < 100000: exif_edit.add_altitude(interp.alt) if interp.angle is not None: exif_edit.add_direction(interp.angle) @@ -374,4 +416,20 @@ def _sample_single_video_by_distance( exif_edit.add_make(video_metadata.make) if video_metadata.model: exif_edit.add_model(video_metadata.model) - exif_edit.write() + if ( + accuracy is not None + and not math.isinf(accuracy) + and not math.isnan(accuracy) + ): + exif_edit.add_gps_accuracy(accuracy) + try: + exif_edit.write() + except (struct.error, OverflowError, ValueError): + LOG.warning( + "Skipping frame %s: EXIF write failed (lat=%.4f lon=%.4f alt=%s acc=%s)", + sample_paths[0].name if sample_paths[0] else "?", + lat, + lon, + interp.alt, + accuracy, + ) diff --git a/mapillary_tools/types.py b/mapillary_tools/types.py index a57d5481..249cd2f9 100644 --- a/mapillary_tools/types.py +++ b/mapillary_tools/types.py @@ -82,6 +82,8 @@ def update_md5sum(self) -> None: if self.md5sum is None: with self.filename.open("rb") as fp: self.md5sum = utils.md5sum_fp(fp).hexdigest() + point_sigma_xys: list[float] | None = None + point_weights: list[float] | None = None @dataclasses.dataclass