Skip to content

Commit 41017de

Browse files
committed
websocket sep
1 parent a8203d9 commit 41017de

File tree

4 files changed

+338
-337
lines changed

4 files changed

+338
-337
lines changed

thingsdb/client/baseprotocol.py

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
import enum
2+
import asyncio
3+
import logging
4+
import msgpack
5+
from abc import abstractmethod
6+
from typing import Any, Callable
7+
from .package import Package
8+
from ..exceptions import AssertionError
9+
from ..exceptions import AuthError
10+
from ..exceptions import BadDataError
11+
from ..exceptions import CancelledError
12+
from ..exceptions import CustomError
13+
from ..exceptions import ForbiddenError
14+
from ..exceptions import InternalError
15+
from ..exceptions import LookupError
16+
from ..exceptions import MaxQuotaError
17+
from ..exceptions import MemoryError
18+
from ..exceptions import NodeError
19+
from ..exceptions import NumArgumentsError
20+
from ..exceptions import OperationError
21+
from ..exceptions import OverflowError
22+
from ..exceptions import RequestCancelError
23+
from ..exceptions import RequestTimeoutError
24+
from ..exceptions import ResultTooLargeError
25+
from ..exceptions import SyntaxError
26+
from ..exceptions import TypeError
27+
from ..exceptions import ValueError
28+
from ..exceptions import WriteUVError
29+
from ..exceptions import ZeroDivisionError
30+
31+
32+
class Proto(enum.IntEnum):
33+
# Events
34+
ON_NODE_STATUS = 0x00
35+
ON_WARN = 0x05
36+
ON_ROOM_JOIN = 0x06
37+
ON_ROOM_LEAVE = 0x07
38+
ON_ROOM_EMIT = 0x08
39+
ON_ROOM_DELETE = 0x09
40+
41+
# Responses
42+
RES_PING = 0x10
43+
RES_OK = 0x11
44+
RES_DATA = 0x12
45+
RES_ERROR = 0x13
46+
47+
# Requests (initiated by the client)
48+
REQ_PING = 0x20
49+
REQ_AUTH = 0x21
50+
REQ_QUERY = 0x22
51+
REQ_RUN = 0x25
52+
REQ_JOIN = 0x26
53+
REQ_LEAVE = 0x27
54+
REQ_EMIT = 0x28
55+
56+
57+
class Err(enum.IntEnum):
58+
"""ThingsDB error codes."""
59+
60+
# ThingsDB build-in errors
61+
EX_CANCELLED = -64
62+
EX_OPERATION_ERROR = -63
63+
EX_NUM_ARGUMENTS = -62
64+
EX_TYPE_ERROR = -61
65+
EX_VALUE_ERROR = -60
66+
EX_OVERFLOW = -59
67+
EX_ZERO_DIV = -58
68+
EX_MAX_QUOTA = -57
69+
EX_AUTH_ERROR = -56
70+
EX_FORBIDDEN = -55
71+
EX_LOOKUP_ERROR = -54
72+
EX_BAD_DATA = -53
73+
EX_SYNTAX_ERROR = -52
74+
EX_NODE_ERROR = -51
75+
EX_ASSERT_ERROR = -50
76+
77+
# ThingsDB internal errors
78+
EX_TOO_LARGE_X = -6
79+
EX_REQUEST_TIMEOUT = -5
80+
EX_REQUEST_CANCEL = -4
81+
EX_WRITE_UV = -3
82+
EX_MEMORY = -2
83+
EX_INTERNAL = -1
84+
85+
86+
_ERRMAP = {
87+
Err.EX_CANCELLED: CancelledError,
88+
Err.EX_OPERATION_ERROR: OperationError,
89+
Err.EX_NUM_ARGUMENTS: NumArgumentsError,
90+
Err.EX_TYPE_ERROR: TypeError,
91+
Err.EX_VALUE_ERROR: ValueError,
92+
Err.EX_OVERFLOW: OverflowError,
93+
Err.EX_ZERO_DIV: ZeroDivisionError,
94+
Err.EX_MAX_QUOTA: MaxQuotaError,
95+
Err.EX_AUTH_ERROR: AuthError,
96+
Err.EX_FORBIDDEN: ForbiddenError,
97+
Err.EX_LOOKUP_ERROR: LookupError,
98+
Err.EX_BAD_DATA: BadDataError,
99+
Err.EX_SYNTAX_ERROR: SyntaxError,
100+
Err.EX_NODE_ERROR: NodeError,
101+
Err.EX_ASSERT_ERROR: AssertionError,
102+
Err.EX_TOO_LARGE_X: ResultTooLargeError,
103+
Err.EX_REQUEST_TIMEOUT: RequestTimeoutError,
104+
Err.EX_REQUEST_CANCEL: RequestCancelError,
105+
Err.EX_WRITE_UV: WriteUVError,
106+
Err.EX_MEMORY: MemoryError,
107+
Err.EX_INTERNAL: InternalError,
108+
}
109+
110+
_PROTO_RESPONSE_MAP = {
111+
Proto.RES_PING: lambda f, d: f.set_result(None),
112+
Proto.RES_OK: lambda f, d: f.set_result(None),
113+
Proto.RES_DATA: lambda f, d: f.set_result(d),
114+
Proto.RES_ERROR: lambda f, d: f.set_exception(_ERRMAP.get(
115+
d['error_code'],
116+
CustomError)(errdata=d)),
117+
}
118+
119+
_PROTO_EVENTS = (
120+
Proto.ON_NODE_STATUS,
121+
Proto.ON_WARN,
122+
Proto.ON_ROOM_JOIN,
123+
Proto.ON_ROOM_LEAVE,
124+
Proto.ON_ROOM_EMIT,
125+
Proto.ON_ROOM_DELETE,
126+
)
127+
128+
129+
def proto_unknown(f, d):
130+
f.set_exception(TypeError('unknown package type received ({})'.format(d)))
131+
132+
133+
class BaseProtocol:
134+
def __init__(
135+
self,
136+
on_connection_lost: Callable[[asyncio.Protocol, Exception], None],
137+
on_event: Callable[[Package], None],):
138+
self._requests = {}
139+
self._pid = 0
140+
self._on_connection_lost = on_connection_lost
141+
self._on_event = on_event
142+
143+
async def _timer(self, pid: int, timeout: int) -> None:
144+
await asyncio.sleep(timeout)
145+
try:
146+
future, task = self._requests.pop(pid)
147+
except KeyError:
148+
logging.error(f'Timed out package Id not found: {pid}')
149+
return None
150+
151+
future.set_exception(TimeoutError(
152+
f'request timed out on package Id {pid}'))
153+
154+
def _on_response(self, pkg: Package) -> None:
155+
try:
156+
future, task = self._requests.pop(pkg.pid)
157+
except KeyError:
158+
logging.error(f'Received package id not found: {pkg.pid}')
159+
return None
160+
161+
# cancel the timeout task
162+
if task is not None:
163+
task.cancel()
164+
165+
if future.cancelled():
166+
return
167+
168+
_PROTO_RESPONSE_MAP.get(pkg.tp, proto_unknown)(future, pkg.data)
169+
170+
def _handle_package(self, pkg: Package):
171+
tp = pkg.tp
172+
if tp in _PROTO_RESPONSE_MAP:
173+
self._on_response(pkg)
174+
elif tp in _PROTO_EVENTS:
175+
try:
176+
self._on_event(pkg)
177+
except Exception:
178+
logging.exception('')
179+
else:
180+
logging.error(f'Unsupported package type received: {tp}')
181+
182+
def write(
183+
self,
184+
tp: Proto,
185+
data: Any = None,
186+
is_bin: bool = False,
187+
timeout: int | None = None
188+
) -> asyncio.Future[Any]:
189+
"""Write data to ThingsDB.
190+
This will create a new PID and returns a Future which will be
191+
set when a response is received from ThingsDB, or time-out is reached.
192+
"""
193+
self._pid += 1
194+
self._pid %= 0x10000 # pid is handled as uint16_t
195+
196+
data = data if is_bin else b'' if data is None else \
197+
msgpack.packb(data, use_bin_type=True)
198+
199+
header = Package.st_package.pack(
200+
len(data),
201+
self._pid,
202+
tp,
203+
tp ^ 0xff)
204+
205+
self._write(header + data)
206+
207+
task = asyncio.ensure_future(
208+
self._timer(self._pid, timeout)) if timeout else None
209+
210+
future = asyncio.Future()
211+
self._requests[self._pid] = (future, task)
212+
return future
213+
214+
def cancel_requests(self):
215+
if self._requests:
216+
logging.error(
217+
f'Canceling {len(self._requests)} requests '
218+
'due to a lost connection'
219+
)
220+
while self._requests:
221+
_key, (future, task) = self._requests.popitem()
222+
if task is not None:
223+
task.cancel()
224+
if not future.cancelled():
225+
future.cancel()
226+
227+
@abstractmethod
228+
def _write(self, data: Any):
229+
...
230+
231+
@abstractmethod
232+
def close(self):
233+
...
234+
235+
@abstractmethod
236+
def is_closing(self) -> bool:
237+
...
238+
239+
@abstractmethod
240+
async def wait_closed(self):
241+
...
242+
243+
@abstractmethod
244+
async def close_and_wait(self):
245+
...

thingsdb/client/package.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class Package(object):
2626

2727
st_package = struct.Struct('<IHBB')
2828

29-
def __init__(self, barray: bytearray) -> None:
29+
def __init__(self, barray: bytearray | bytes) -> None:
3030
self.length, self.pid, self.tp, self.checkbit = \
3131
self.__class__.st_package.unpack_from(barray, offset=0)
3232
self.total = self.__class__.st_package.size + self.length

0 commit comments

Comments
 (0)