Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions monitoring/monitorlib/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,28 @@ def from_f3548v21(vol: f3548v21.Polygon | dict) -> Polygon:
vertices=[ImplicitDict.parse(p, LatLngPoint) for p in vol.vertices]
)

def is_equivalent(self, other: Polygon) -> bool:
if "vertices" not in self and "vertices" not in other:
return True
elif "vertices" not in self or "vertices" not in other:
return False

if self.vertices == other.vertices:
# covers both None and exact equality
return True
elif not self.vertices or not other.vertices:
# covers one being None
return False
elif len(self.vertices) != len(other.vertices):
return False

return all(
[
vertices[0].match(vertices[1])
for vertices in zip(self.vertices, other.vertices)
]
)


class Circle(ImplicitDict):
center: LatLngPoint
Expand All @@ -181,6 +203,15 @@ def from_f3548v21(vol: f3548v21.Circle | dict) -> Circle:
radius=ImplicitDict.parse(vol.radius, Radius),
)

def is_equivalent(self, other: Circle) -> bool:
if not self.center.match(other.center):
return False

return (
abs(self.radius.in_meters() - other.radius.in_meters())
<= DISTANCE_TOLERANCE_M
)


class AltitudeDatum(str, Enum):
W84 = "W84"
Expand Down Expand Up @@ -224,6 +255,14 @@ def to_w84_m(self):
def from_f3548v21(vol: f3548v21.Altitude | dict) -> Altitude:
return ImplicitDict.parse(vol, Altitude)

def is_equivalent(self, other: Altitude) -> bool:
if self.reference != other.reference:
return False
return (
abs(self.units.in_meters(self.value) - other.units.in_meters(other.value))
<= DISTANCE_TOLERANCE_M
)


class Volume3D(ImplicitDict):
outline_circle: Optional[Circle] = None
Expand Down Expand Up @@ -447,6 +486,38 @@ def s2_vertices(self) -> list[s2sphere.LatLng]:
else:
return get_latlngrect_vertices(make_latlng_rect(self))

def is_equivalent(
self,
other: Volume3D,
) -> bool:
if self.altitude_lower and other.altitude_lower:
if not self.altitude_lower.is_equivalent(other.altitude_lower):
return False
elif self.altitude_lower or other.altitude_lower:
return False

if self.altitude_upper and other.altitude_upper:
if not self.altitude_upper.is_equivalent(other.altitude_upper):
return False
elif self.altitude_upper or other.altitude_upper:
return False

if self.outline_polygon and other.outline_polygon:
if not self.outline_polygon.is_equivalent(other.outline_polygon):
return False
elif self.outline_circle and other.outline_circle:
if not self.outline_circle.is_equivalent(other.outline_circle):
return False
elif (
self.outline_circle
or self.outline_polygon
or other.outline_circle
or other.outline_polygon
):
return False

return True


def make_latlng_rect(area) -> s2sphere.LatLngRect:
"""Make an S2 LatLngRect from the provided input.
Expand Down
200 changes: 200 additions & 0 deletions monitoring/monitorlib/geo_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import unittest

from s2sphere import LatLng

from monitoring.monitorlib.geo import (
Altitude,
AltitudeDatum,
Circle,
DistanceUnits,
LatLngPoint,
Polygon,
Radius,
Volume3D,
generate_area_in_vicinity,
generate_slight_overlap_area,
)
Expand All @@ -12,6 +22,196 @@ def _points(in_points: list[tuple[float, float]]) -> list[LatLng]:
return [LatLng.from_degrees(*p) for p in in_points]


class AltitudeIsEquivalentTest(unittest.TestCase):
def setUp(self):
self.alt1 = Altitude(
value=100, reference=AltitudeDatum.W84, units=DistanceUnits.M
)

def test_equivalent_altitudes(self):
alt2 = Altitude(value=100, reference=AltitudeDatum.W84, units=DistanceUnits.M)
self.assertTrue(self.alt1.is_equivalent(alt2))

def test_equivalent_altitudes_within_tolerance(self):
alt2 = Altitude(
value=100.000001, reference=AltitudeDatum.W84, units=DistanceUnits.M
)
self.assertTrue(self.alt1.is_equivalent(alt2))

def test_equivalent_altitudes_different_units(self):
alt2 = Altitude(
value=328.084, reference=AltitudeDatum.W84, units=DistanceUnits.FT
)
self.assertTrue(self.alt1.is_equivalent(alt2))

def test_nonequivalent_altitudes_different_value(self):
alt2 = Altitude(value=101, reference=AltitudeDatum.W84, units=DistanceUnits.M)
self.assertFalse(self.alt1.is_equivalent(alt2))

def test_nonequivalent_altitudes_different_reference(self):
alt2 = Altitude(value=100, reference=AltitudeDatum.SFC, units=DistanceUnits.M)
self.assertFalse(self.alt1.is_equivalent(alt2))


class PolygonIsEquivalentTest(unittest.TestCase):
def setUp(self):
self.poly1 = Polygon(
vertices=[
LatLngPoint(lat=10, lng=10),
LatLngPoint(lat=11, lng=10),
LatLngPoint(lat=11, lng=11),
LatLngPoint(lat=10, lng=11),
]
)

def test_equivalent_polygons(self):
poly2 = Polygon(
vertices=[
LatLngPoint(lat=10, lng=10),
LatLngPoint(lat=11, lng=10),
LatLngPoint(lat=11, lng=11),
LatLngPoint(lat=10, lng=11),
]
)
self.assertTrue(self.poly1.is_equivalent(poly2))

def test_equivalent_polygons_within_tolerance(self):
poly2 = Polygon(
vertices=[
LatLngPoint(lat=10.00000001, lng=10.00000001),
LatLngPoint(lat=11.00000001, lng=10.00000001),
LatLngPoint(lat=11.00000001, lng=11.00000001),
LatLngPoint(lat=10.00000001, lng=11.00000001),
]
)
self.assertTrue(self.poly1.is_equivalent(poly2))

def test_nonequivalent_polygons(self):
poly2 = Polygon(
vertices=[
LatLngPoint(lat=10, lng=10),
LatLngPoint(lat=12, lng=10),
LatLngPoint(lat=12, lng=11),
LatLngPoint(lat=10, lng=11),
]
)
self.assertFalse(self.poly1.is_equivalent(poly2))

def test_equivalent_polygons_none(self):
poly1 = Polygon(vertices=None)
poly2 = Polygon(vertices=None)
self.assertTrue(poly1.is_equivalent(poly2))

def test_nonequivalent_polygons_one_none(self):
poly1 = Polygon(vertices=[])
poly2 = Polygon(vertices=None)
self.assertFalse(poly1.is_equivalent(poly2))


class Volume3DIsEquivalentTest(unittest.TestCase):
def setUp(self):
self.vol_poly = Volume3D(
outline_polygon=Polygon(
vertices=[
LatLngPoint(lat=10, lng=10),
LatLngPoint(lat=11, lng=10),
LatLngPoint(lat=11, lng=11),
LatLngPoint(lat=10, lng=11),
]
),
altitude_lower=Altitude(
value=100, reference=AltitudeDatum.W84, units=DistanceUnits.M
),
altitude_upper=Altitude(
value=200, reference=AltitudeDatum.W84, units=DistanceUnits.M
),
)
self.vol_circle = Volume3D(
outline_circle=Circle(
center=LatLngPoint(lat=10, lng=10),
radius=Radius(value=100, units=DistanceUnits.M),
),
altitude_lower=Altitude(
value=100, reference=AltitudeDatum.W84, units=DistanceUnits.M
),
altitude_upper=Altitude(
value=200, reference=AltitudeDatum.W84, units=DistanceUnits.M
),
)

def test_equivalent_volumes_polygon(self):
vol2 = Volume3D(
outline_polygon=Polygon(
vertices=[
LatLngPoint(lat=10, lng=10),
LatLngPoint(lat=11, lng=10),
LatLngPoint(lat=11, lng=11),
LatLngPoint(lat=10, lng=11),
]
),
altitude_lower=Altitude(
value=100, reference=AltitudeDatum.W84, units=DistanceUnits.M
),
altitude_upper=Altitude(
value=200, reference=AltitudeDatum.W84, units=DistanceUnits.M
),
)
self.assertTrue(self.vol_poly.is_equivalent(vol2))

def test_equivalent_volumes_circle(self):
vol2 = Volume3D(
outline_circle=Circle(
center=LatLngPoint(lat=10, lng=10),
radius=Radius(value=100, units=DistanceUnits.M),
),
altitude_lower=Altitude(
value=100, reference=AltitudeDatum.W84, units=DistanceUnits.M
),
altitude_upper=Altitude(
value=200, reference=AltitudeDatum.W84, units=DistanceUnits.M
),
)
self.assertTrue(self.vol_circle.is_equivalent(vol2))

def test_nonequivalent_volumes_circle(self):
vol2 = Volume3D(
outline_circle=Circle(
center=LatLngPoint(lat=10, lng=10),
radius=Radius(value=200, units=DistanceUnits.M),
),
altitude_lower=Altitude(
value=100, reference=AltitudeDatum.W84, units=DistanceUnits.M
),
altitude_upper=Altitude(
value=200, reference=AltitudeDatum.W84, units=DistanceUnits.M
),
)
self.assertFalse(self.vol_circle.is_equivalent(vol2))

def test_nonequivalent_volumes_different_shape(self):
vol2 = Volume3D(
outline_circle=Circle(
center=LatLngPoint(lat=10.5, lng=10.5),
radius=Radius(value=50000, units=DistanceUnits.M),
)
)
self.assertFalse(self.vol_poly.is_equivalent(vol2))

def test_equivalent_volumes_none_fields(self):
vol1 = Volume3D()
vol2 = Volume3D()
self.assertTrue(vol1.is_equivalent(vol2))

def test_nonequivalent_volumes_one_none_field(self):
vol1 = Volume3D(
altitude_lower=Altitude(
value=100, reference=AltitudeDatum.W84, units=DistanceUnits.M
)
)
vol2 = Volume3D()
self.assertFalse(vol1.is_equivalent(vol2))


def test_generate_slight_overlap_area():
# Square around 0,0 of edge length 2 -> first corner at 1,1 -> expect a square with overlapping corner at 1,1
assert generate_slight_overlap_area(
Expand Down
46 changes: 46 additions & 0 deletions monitoring/monitorlib/geotemporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
)
from monitoring.monitorlib.transformations import Transformation

TIME_TOLERANCE = timedelta(seconds=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably too lenient since we're just talking about numeric precision/representation errors

Suggested change
TIME_TOLERANCE = timedelta(seconds=1)
TIME_TOLERANCE = timedelta(milliseconds=10)



class Volume4DTemplate(ImplicitDict):
outline_polygon: Optional[Polygon] = None
Expand Down Expand Up @@ -121,6 +123,30 @@ class Volume4D(ImplicitDict):
time_start: Optional[Time] = None
time_end: Optional[Time] = None

def is_equivalent(
self,
other: Volume4D,
) -> bool:
if not self.volume.is_equivalent(other.volume):
return False

if (self.time_start is None) != (other.time_start is None):
return False
if self.time_start and other.time_start:
if (
abs(self.time_start.datetime - other.time_start.datetime)
> TIME_TOLERANCE
):
return False

if (self.time_end is None) != (other.time_end is None):
return False
if self.time_end and other.time_end:
if abs(self.time_end.datetime - other.time_end.datetime) > TIME_TOLERANCE:
return False

return True

def offset_time(self, dt: timedelta) -> Volume4D:
kwargs = {"volume": self.volume}
if self.time_start:
Expand Down Expand Up @@ -288,6 +314,26 @@ def __iadd__(self, other):
f"Cannot iadd {type(other).__name__} to {type(self).__name__}"
)

def is_equivalent(
self,
other: Volume4DCollection,
) -> bool:
if len(self) != len(other):
return False

# different order is acceptable
other_copy = list(other)
for vol in self:
found = False
for i, other_vol in enumerate(other_copy):
if vol.is_equivalent(other_vol):
other_copy.pop(i)
found = True
break
if not found:
return False
return True

@property
def time_start(self) -> Time | None:
return (
Expand Down
Loading
Loading