Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions mapillary_tools/exif_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,13 @@ def extract_altitude(self) -> float | None:
altitude_ref = {0: 1, 1: -1}
return altitude * altitude_ref.get(ref, 1)

def extract_gps_accuracy(self) -> float | None:
"""Extract GPS horizontal position accuracy from EXIF GPSHPositioningError tag."""
val = self._extract_alternative_fields(
["GPS GPSHPositioningError", "GPS Tag 0x001F"], float
)
return val

def extract_gps_datetime(self) -> datetime.datetime | None:
"""
Extract timestamp from GPS field.
Expand Down
8 changes: 8 additions & 0 deletions mapillary_tools/exif_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ def add_direction(self, direction: float, ref: str = "T") -> None:
self._ef["GPS"][piexif.GPSIFD.GPSImgDirection],
)

def add_gps_accuracy(self, accuracy: float) -> None:
"""Add GPS horizontal position accuracy in meters (EXIF GPSHPositioningError)."""
if math.isinf(accuracy) or math.isnan(accuracy) or accuracy <= 0:
accuracy = 99999.0
accuracy = min(accuracy, 99999.0)
num = round(accuracy * 100)
self._ef["GPS"][piexif.GPSIFD.GPSHPositioningError] = (num, 100)

def add_make(self, make: str) -> None:
if not make:
raise ValueError("Make cannot be empty")
Expand Down
62 changes: 62 additions & 0 deletions mapillary_tools/geotag/geotag_images_from_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing_extensions import override

from .. import types, utils
from ..gpmf import gps_smoother, gps_weigher
from .base import GeotagImagesFromGeneric
from .geotag_images_from_gpx import GeotagImagesFromGPX
from .geotag_videos_from_video import GeotagVideosFromVideo
Expand Down Expand Up @@ -83,6 +84,17 @@ def to_description(

image_metadatas = geotag.to_description(image_paths)

# If weighted points available, refine positions with weighted median
has_weights = (
video_metadata.point_weights is not None
and video_metadata.point_sigma_xys is not None
)
if has_weights:
_apply_weighted_interpolation(
image_metadatas,
video_metadata,
)

for metadata in image_metadatas:
if isinstance(metadata, types.ImageMetadata):
metadata.MAPDeviceMake = video_metadata.make
Expand Down Expand Up @@ -131,3 +143,53 @@ def to_description(
num_processes=self.num_processes,
)
return geotag.to_description(image_paths)


def _apply_weighted_interpolation(
image_metadatas: list[types.ImageMetadataOrError],
video_metadata: types.VideoMetadata,
) -> None:
"""Refine image positions using weighted median interpolation.

Modifies ImageMetadata objects in-place: updates lat, lon, and sets
MAPGPSAccuracyMeters.

The image metadata times are in image-EXIF time (absolute), but the
video points are in video-relative time (0 to duration). GeotagImagesFromGPX
shifts video points by adding the first image's time. We apply the same
shift here so the query times align.
"""
assert video_metadata.point_weights is not None
assert video_metadata.point_sigma_xys is not None

# Frame-sampling path only: smooth the GPS track (speed-gate + sigma-weighted
# Kalman/RTS) before interpolating image positions. Falls back to the raw points
# if numpy is unavailable. NOTE: this does NOT touch the native CAMM muxing path,
# which still muxes the unsmoothed track (see PR description).
points = gps_smoother.smooth_gps_points(video_metadata.points)
weights = video_metadata.point_weights
sigma_xys = video_metadata.point_sigma_xys

# Find the time shift: first image time = offset added to video-relative times
valid_images = [m for m in image_metadatas if isinstance(m, types.ImageMetadata)]
if not valid_images:
return
first_image_time = min(m.time for m in valid_images)

# Shift video point times to match image time frame
times = [first_image_time + p.time for p in points]
lats = [p.lat for p in points]
lons = [p.lon for p in points]

for metadata in valid_images:
lat, lon, accuracy = gps_weigher.weighted_interpolate(
metadata.time,
times,
lats,
lons,
weights,
sigma_xys,
)
metadata.lat = lat
metadata.lon = lon
metadata.MAPGPSAccuracyMeters = round(accuracy, 2)
1 change: 1 addition & 0 deletions mapillary_tools/geotag/image_extractors/exif.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def extract(self) -> types.ImageMetadata:
MAPOrientation=exif.extract_orientation(),
MAPDeviceMake=exif.extract_make(),
MAPDeviceModel=exif.extract_model(),
MAPGPSAccuracyMeters=exif.extract_gps_accuracy(),
MAPCameraUUID=exif.extract_camera_uuid(),
)

Expand Down
11 changes: 7 additions & 4 deletions mapillary_tools/geotag/video_extractors/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ def extract(self) -> types.VideoMetadata:
if not gps_points:
raise exceptions.MapillaryGPXEmptyError("Empty GPS data found")

gps_points = T.cast(
T.List[telemetry.GPSPoint], gpmf_gps_filter.remove_noisy_points(gps_points)
)
if not gps_points:
gps_points, sigma_xys, weights = gpmf_gps_filter.weight_points(gps_points)
gps_points = T.cast(T.List[telemetry.GPSPoint], gps_points)

n_finite = sum(1 for w in weights if w > 0)
if n_finite == 0:
raise exceptions.MapillaryGPSNoiseError("GPS is too noisy")

video_metadata = types.VideoMetadata(
Expand All @@ -50,6 +51,8 @@ def extract(self) -> types.VideoMetadata:
points=T.cast(T.List[geo.Point], gps_points),
make=gopro_info.make,
model=gopro_info.model,
point_sigma_xys=sigma_xys,
point_weights=weights,
)

return video_metadata
Expand Down
50 changes: 49 additions & 1 deletion mapillary_tools/gpmf/gpmf_gps_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
# LICENSE file in the root directory of this source tree.

import logging
import math
import typing as T

from .. import constants, geo
from ..telemetry import GPSPoint
from . import gps_filter
from . import gps_filter, gps_weigher

"""
This module was originally used for GoPro GPS data (GPMF) filtering,
Expand Down Expand Up @@ -59,6 +60,53 @@ def remove_outliers(
)


def weight_points(
sequence: T.Sequence[GPSPoint],
uere_nominal: float = 3.0,
) -> tuple[T.Sequence[GPSPoint], list[float], list[float]]:
"""Compute per-sample sigma and weight without discarding any points.

Returns (points, sigma_xys, weights). Points with fix=0 get sigma=inf
and weight=0. Outliers detected by speed consistency get sigma=inf.
"""
if not sequence:
return sequence, [], []

times = [p.time for p in sequence]
lats = [p.lat for p in sequence]
lons = [p.lon for p in sequence]
doppler_speeds = [
p.ground_speed if p.ground_speed is not None else 0.0 for p in sequence
]

sigma_xys: list[float] = []
for p in sequence:
hdop = (p.precision / 100.0) if p.precision is not None else 10.0
fix_val = p.fix.value if p.fix is not None else 0
sigma_xy, _ = gps_weigher.compute_sample_sigma(hdop, fix_val, uere_nominal)
sigma_xys.append(sigma_xy)

sigma_xys = gps_weigher.apply_speed_consistency(
sigma_xys,
times,
lats,
lons,
doppler_speeds,
)

weights = [gps_weigher.sample_weight(s) for s in sigma_xys]

n_finite = sum(1 for s in sigma_xys if not math.isinf(s))
LOG.debug(
"weight_points: %d total, %d with finite sigma (%d excluded)",
len(sequence),
n_finite,
len(sequence) - n_finite,
)

return sequence, sigma_xys, weights


def remove_noisy_points(
sequence: T.Sequence[GPSPoint],
) -> T.Sequence[GPSPoint]:
Expand Down
134 changes: 134 additions & 0 deletions mapillary_tools/gpmf/gps_smoother.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.

"""Robust GPS track smoothing: reject physically-impossible jumps, then run a
sigma-weighted constant-velocity Kalman filter + RTS smoother over the whole track.

This is the "middle ground" between the old hard noise filter (which DELETED points
and cut corners) and the soft weight-only approach (which kept all jitter): every
point is replaced by a smoothed estimate that trusts good fixes tightly and bad
fixes loosely, so outlier spikes and jitter are removed while the real shape is kept.

NOTE: uses numpy for the 4x4 linear algebra. Pure-python port is a follow-up before
upstreaming (mapillary_tools core has no numpy dependency).
"""
from __future__ import annotations

import dataclasses
import logging
import math
import typing as T

from .. import geo
from ..telemetry import GPSPoint
from . import gps_weigher

LOG = logging.getLogger(__name__)


def _speed_gate(lats: list[float], lons: list[float], times: list[float], cap_mps: float) -> list[bool]:
ok = [True] * len(times)
last = 0
for i in range(1, len(times)):
dt = times[i] - times[last]
if dt <= 0:
ok[i] = False
continue
d = geo.gps_distance((lats[last], lons[last]), (lats[i], lons[i]))
if d / dt > cap_mps:
ok[i] = False
else:
last = i
return ok


def _kalman_rts(xy, t, sig, q_accel):
import numpy as np

n = len(xy)
H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]], float)
x = np.zeros((n, 4)); P = np.zeros((n, 4, 4))
xp = np.zeros((n, 4)); Pp = np.zeros((n, 4, 4))
Fs = [np.eye(4) for _ in range(n)]
x[0] = [xy[0][0], 0.0, xy[0][1], 0.0]
P[0] = np.diag([100.0, 100.0, 100.0, 100.0])
xp[0] = x[0]; Pp[0] = P[0]
for i in range(1, n):
dt = max(t[i] - t[i - 1], 1e-3)
F = np.array([[1, dt, 0, 0], [0, 1, 0, 0], [0, 0, 1, dt], [0, 0, 0, 1]], float)
Fs[i] = F
qb = q_accel * np.array([[dt**3 / 3, dt**2 / 2], [dt**2 / 2, dt]])
Q = np.zeros((4, 4)); Q[:2, :2] = qb; Q[2:, 2:] = qb
xpred = F @ x[i - 1]; Ppred = F @ P[i - 1] @ F.T + Q
xp[i] = xpred; Pp[i] = Ppred
s = sig[i]
if s is None or math.isinf(s):
x[i] = xpred; P[i] = Ppred
else:
R = np.diag([s * s, s * s])
y = np.array([xy[i][0], xy[i][1]]) - H @ xpred
S = H @ Ppred @ H.T + R
K = Ppred @ H.T @ np.linalg.inv(S)
x[i] = xpred + K @ y
P[i] = (np.eye(4) - K @ H) @ Ppred
xs = x.copy()
for i in range(n - 2, -1, -1):
F = Fs[i + 1]
C = P[i] @ F.T @ np.linalg.inv(Pp[i + 1])
xs[i] = x[i] + C @ (xs[i + 1] - xp[i + 1])
return [(float(xs[i, 0]), float(xs[i, 2])) for i in range(n)]


def smooth_gps_points(
sequence: T.Sequence[GPSPoint],
q_accel: float = 4.0,
uere_nominal: float = 3.0,
cap_mps: float | None = None,
) -> list[GPSPoint]:
"""Return a new GPSPoint list with smoothed lat/lon (same length, same times)."""
if len(sequence) < 3:
return list(sequence)

lats = [p.lat for p in sequence]
lons = [p.lon for p in sequence]
times = [p.time for p in sequence]

# per-sample sigma; fix==0 -> inf (followed by dynamics only, not trusted)
sig = []
for p in sequence:
hdop = (p.precision / 100.0) if p.precision is not None else 10.0
fix_val = p.fix.value if p.fix is not None else 0
s, _ = gps_weigher.compute_sample_sigma(hdop, fix_val, uere_nominal)
sig.append(s)

# auto speed cap: 5x median ground speed, floor 15 m/s
if cap_mps is None:
spds = sorted(p.ground_speed for p in sequence if p.ground_speed and p.ground_speed > 0)
med = spds[len(spds) // 2] if spds else 3.0
cap_mps = max(med * 5.0, 15.0)

ok = _speed_gate(lats, lons, times, cap_mps)
for i in range(len(sig)):
if not ok[i]:
sig[i] = float("inf") # gated points: dynamics-only

# project to local ENU meters
lat0, lon0 = lats[0], lons[0]
mlat = 111320.0
mlon = 111320.0 * math.cos(math.radians(lat0))
xy = [((lo - lon0) * mlon, (la - lat0) * mlat) for la, lo in zip(lats, lons)]

try:
sm = _kalman_rts(xy, times, sig, q_accel)
except ImportError:
# numpy is not a hard dependency of mapillary_tools; if it is unavailable,
# degrade gracefully to the unsmoothed points rather than failing.
LOG.warning("gps_smoother: numpy unavailable; returning unsmoothed GPS points")
return list(sequence)

out: list[GPSPoint] = []
for p, (x, y) in zip(sequence, sm):
out.append(dataclasses.replace(p, lat=y / mlat + lat0, lon=x / mlon + lon0))
return out
Loading
Loading