-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelpers.py
More file actions
68 lines (57 loc) · 1.73 KB
/
helpers.py
File metadata and controls
68 lines (57 loc) · 1.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""
Helper classes and functions that other PyAerial modules use.
"""
import sys
import threading
import operator
class Datum:
"""
Store a value/time pair.
"""
def __init__(self, value, timestamp):
self.value: float = value
self.time: float = timestamp
def __str__(self):
return f"Datum({self.value}, {self.time})"
def __repr__(self):
return self.__str__()
def __eq__(self, other):
if other is None:
return False
return self.value == other.value and self.time == other.time
class ImprovedThread(threading.Thread):
"""
A very similar version of threading.Thread that returns the value of the thread process
with Thread.join()
It also prints exceptions when they are thrown.
Ripped from finiteCraft lmao
"""
def __init__(self, *args, **kwargs):
"""
Initialize the ImprovedThread
:param args:
:param kwargs:
"""
super().__init__(*args, **kwargs)
self.result = None
def run(self) -> None:
"""
Run the ImprovedThread.
:return:
"""
if self._target is None:
return # could alternatively raise an exception, depends on the use case
try:
self.result = self._target(*self._args, **self._kwargs)
except Exception as exc:
print(f'{type(exc).__name__}: {exc}', file=sys.stderr) # properly handle the exception
raise exc
def join(self, *args, **kwargs) -> dict:
"""
The highlight of the class. Returns the thread result upon ending.
:param args:
:param kwargs:
:return:
"""
super().join(*args, **kwargs)
return self.result