Skip to content

Commit ca08834

Browse files
committed
Make a single and required frame handler in physicalLayer for enforcing that the stack exists (Setting physicalLayer.onReceivedFrame to CanLink's receiveListener during CanLink constructor is enforced).
1 parent 62aff80 commit ca08834

17 files changed

+283
-165
lines changed

examples/example_tcp_message_interface.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from logging import getLogger
1515
# region same code as other examples
1616
from examples_settings import Settings
17-
from openlcb.rawphysicallayer import RealtimeRawPhysicalLayer # do 1st to fix path if no pip install
17+
from openlcb.realtimephysicallayer import RealtimePhysicalLayer # do 1st to fix path if no pip install
1818
settings = Settings()
1919

2020
if __name__ == "__main__":
@@ -56,14 +56,14 @@
5656
# assert isinstance(data, (bytes, bytearray))
5757
# print(" SR: {}".format(data))
5858
# sock.send(data)
59-
# ^ Moved to RealtimeRawPhysicalLayer sendFrameAfter override
59+
# ^ Moved to RealtimePhysicalLayer sendFrameAfter override
6060

6161

6262
def printMessage(msg):
6363
print("RM: {} from {}".format(msg, msg.source))
6464

6565

66-
physicalLayer = RealtimeRawPhysicalLayer(sock)
66+
physicalLayer = RealtimePhysicalLayer(sock)
6767
# ^ this was not in the example before
6868
# (just gave sendToSocket to TcpLink)
6969

examples/examples_gui.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
file=sys.stderr)
2929
raise
3030
from tkinter import ttk
31-
from collections import OrderedDict
31+
from collections import OrderedDict, deque
3232

3333
from examples_settings import Settings
3434
# ^ adds parent of module to sys.path, so openlcb imports *after* this
@@ -131,7 +131,7 @@ def __init__(self, parent):
131131
self.zeroconf = None
132132
self.listener = None
133133
self.browser = None
134-
self.errors = []
134+
self.errors = deque()
135135
self.root = parent
136136
self._connect_thread = None
137137
try:
@@ -170,7 +170,7 @@ def on_form_loaded(self):
170170
def show_next_error(self):
171171
if not self.errors:
172172
return 0
173-
error = self.errors.pop(0)
173+
error = self.errors.popleft()
174174
if not error:
175175
return 0
176176
self.set_status(error)

openlcb/canbus/canlink.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ class CanLink(LinkLayer):
8282

8383
# MIN_STATE_VALUE & MAX_STATE_VALUE are set statically below the
8484
# State class declaration:
85-
ALIAS_RESPONSE_DELAY = .2 # See docstring.
85+
STANDARD_ALIAS_RESPONSE_DELAY = .2
86+
ALIAS_RESPONSE_DELAY = 20 # See docstring.
8687

8788
class State(Enum):
8889
"""Used as a linux-like "runlevel"
@@ -104,7 +105,7 @@ class State(Enum):
104105
determined to be success, this state triggers
105106
_enqueueReserveID.
106107
"""
107-
Initial = LinkLayer.State.Undefined.value # special case of .Inhibited
108+
Initial = 1 # special case of .Inhibited
108109
# where init hasn't started.
109110
Inhibited = 2
110111
EnqueueAliasAllocationRequest = 3
@@ -126,6 +127,9 @@ class State(Enum):
126127
Permitted = 20 # formerly 3. queued via frame
127128
# (formerly set at end of _notifyReservation code)
128129

130+
InitialState = State.Initial
131+
DisconnectedState = State.Inhibited
132+
129133
MIN_STATE_VALUE = min(entry.value for entry in State)
130134
MAX_STATE_VALUE = max(entry.value for entry in State)
131135

@@ -230,7 +234,8 @@ def isDuplicateAlias(self, alias):
230234

231235
def _onStateChanged(self, oldState, newState):
232236
# return super()._onStateChanged(oldState, newState)
233-
assert isinstance(newState, CanLink.State)
237+
assert isinstance(newState, CanLink.State), \
238+
"expected a CanLink.State, got {}".format(emit_cast(newState))
234239
if newState == CanLink.State.EnqueueAliasAllocationRequest:
235240
self._enqueueCIDSequence()
236241
# - sets state to BusyLocalCIDSequence

openlcb/canbus/canphysicallayer.py

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
This is a class because it represents a single physical connection to a layout
55
and is subclassed.
66
'''
7-
import sys
87
from logging import getLogger
8+
import warnings
99

1010
from openlcb.canbus.canframe import CanFrame
1111
from openlcb.canbus.controlframe import ControlFrame
@@ -24,37 +24,61 @@ def __init__(self,):
2424
PhysicalLayer.__init__(self)
2525
self.listeners = []
2626

27+
def onReceivedFrame(self):
28+
raise NotImplementedError(
29+
"Your LinkLayer/subclass must set this manually (monkeypatch)"
30+
" to the CanLink instance's receiveListener method.")
31+
2732
def sendFrameAfter(self, frame: CanFrame):
28-
"""See sendFrameAfter documentation in PhysicalLayer.
29-
This implementation behaves the same except requires
30-
a specific type (CanFrame).
33+
"""Enqueue: *IMPORTANT* Main/other thread may have
34+
called this. Any other thread sending other than the _listen
35+
thread is bad, since overlapping calls to socket cause undefined
36+
behavior, so this just adds to a deque (double ended queue, used
37+
as FIFO).
38+
- CanPhysicalLayerGridConnect formerly had canSendCallback
39+
but now it uses its own frame deque, and the socket code pops
40+
and sends the frames.
41+
(formerly canSendCallback was set to a sendToPort function
42+
which was formerly a direct call to a port which was not
43+
thread-safe and could be called from anywhere in the
44+
openlcb stack)
45+
- Add a generalized LocalEvent queue avoid deep callstack?
46+
- See issue #62 comment about a local event queue.
47+
For now, CanFrame is used (improved since issue #62
48+
was solved by adding more states to CanLink so it
49+
can have incremental states instead of requiring two-way
50+
communication [race condition] during a single
51+
blocking call to defineAndReserveAlias)
3152
"""
32-
# formerly sendCan Frame, but now behavior is defined by superclass
33-
# (regardless of frame type, it is just added to self._sends)
3453
assert isinstance(frame, CanFrame)
54+
frame.encoder = self
3555
PhysicalLayer.sendFrameAfter(self, frame)
3656

37-
def pollFrame(self) -> CanFrame: # overloaded for type hinting.
38-
"""Check if there is another frame queued and get it.
39-
Returns:
40-
CanFrame: next frame in FIFO buffer (_sends).
41-
"""
42-
return PhysicalLayer.pollFrame(self)
57+
def pollFrame(self) -> CanFrame:
58+
frame = super().pollFrame()
59+
if frame is None:
60+
return None
61+
assert isinstance(frame, CanFrame)
62+
return frame
4363

4464
def encode(self, frame) -> str:
4565
'''abstract interface (encode frame to string)'''
4666
raise NotImplementedError("Each subclass must implement this.")
4767

4868
def registerFrameReceivedListener(self, listener):
69+
assert listener is not None
70+
warnings.warn(
71+
"You don't really need to listen to packets."
72+
" Use pollFrame instead, which will collect and decode"
73+
" packets into frames (this layer communicates to upper layers"
74+
" using self.onReceivedFrame set in LinkLayer/subclass"
75+
" constructor).")
4976
self.listeners.append(listener)
5077

5178
def fireListeners(self, frame):
52-
if not self.listeners:
53-
logger.warning(
54-
"No listeners for frame received."
55-
" CanLink (see LinkLayer superclass constructor)"
56-
" should at least register its receiveFrame method"
57-
" with a physical layer implementation.")
79+
"""At least the LinkLayer (CanLink in this case)
80+
should register one listener."""
81+
5882
for listener in self.listeners:
5983
listener(frame)
6084

openlcb/canbus/canphysicallayergridconnect.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,19 @@
1212

1313
from collections import deque
1414
from typing import Union
15-
from openlcb.canbus.canlink import CanLink
1615
from openlcb.canbus.canphysicallayer import CanPhysicalLayer
1716
from openlcb.canbus.canframe import CanFrame
17+
from openlcb.frameencoder import FrameEncoder
1818

1919
GC_START_BYTE = 0x3a # :
2020
GC_END_BYTE = 0x3b # ;
2121

2222

23-
class CanPhysicalLayerGridConnect(CanPhysicalLayer):
23+
class CanPhysicalLayerGridConnect(CanPhysicalLayer, FrameEncoder):
2424
"""CAN physical layer subclass for GridConnect
2525
2626
This acts as frame.encoder for canLink, and manages the packet
27-
_sends queue (deque is used for speed; defined & managed in base
27+
_send_frames queue (deque is used for speed; defined & managed in base
2828
class: PhysicalLayer)
2929
3030
Args:
@@ -39,7 +39,7 @@ def __init__(self):
3939
# ^ A CanLink requires a physical layer to operate,
4040
# so CanLink now requires a PhysicalLayer instance
4141
# such as this in its constructor.
42-
CanPhysicalLayer.__init__(self)
42+
CanPhysicalLayer.__init__(self) # creates self._send_frames
4343

4444
# region moved to CanLink constructor
4545
# from canLink.linkPhysicalLayer(self) # self.setCallBack(callback):
@@ -53,10 +53,6 @@ def __init__(self):
5353
# assert callable(callback)
5454
# self.canSendCallback = callback
5555

56-
def sendFrameAfter(self, frame: CanFrame) -> None:
57-
frame.encoder = self
58-
self._sends.appendleft(frame) # self.canSendCallback(frame)
59-
6056
def encodeFrameAsString(self, frame) -> str:
6157
'''Encode frame to string.'''
6258
output = ":X{:08X}N".format(frame.header) # at least 8 chars, hex
@@ -65,6 +61,11 @@ def encodeFrameAsString(self, frame) -> str:
6561
output += ";\n"
6662
return output
6763

64+
def encodeFrameAsData(self, frame) -> Union[bytearray, bytes]:
65+
# TODO: Consider doing this manually (in Python 3,
66+
# bytes/bytearray has no attribute 'format')
67+
return self.encodeFrameAsString(frame).encode("utf-8")
68+
6869
def handleDataString(self, string: str):
6970
'''Provide string from the outside link to be parsed
7071

openlcb/canbus/canphysicallayersimulation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
class CanPhysicalLayerSimulation(CanPhysicalLayer):
99

1010
def __init__(self):
11-
self.receivedFrames = []
11+
self.receivedPackets = []
1212
CanPhysicalLayer.__init__(self)
1313

14-
def sendFrameAfter(self, frame):
15-
self.receivedFrames.append(frame)
14+
def handlePacket(self, frame):
15+
self.receivedPackets.append(frame)

openlcb/dispatcher.py

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def start_listening(self, connected_port, localNodeID):
171171
# self._physicalLayer.registerFrameReceivedListener(
172172
# self._printFrame
173173
# )
174-
# ^ Commented since canlink already adds CanLink's default
174+
# ^ Commented since CanLink constructor now registers its default
175175
# receiveListener to CanLinkPhysicalLayer & that's all we need
176176
# for this application.
177177

@@ -203,18 +203,21 @@ def start_listening(self, connected_port, localNodeID):
203203
self.listen() # Must listen for alias reservation responses
204204
# (sendAliasConnectionSequence will occur for another 200ms
205205
# once, then another 200ms on each alias collision if any)
206+
# - must also keep doing frame = pollFrame() and sending
207+
# if not None.
206208

207209
self._callback_status("physicalLayerUp...")
208210
self._physicalLayer.physicalLayerUp()
209-
while canLink.pollState() != CanLink.State.Permitted:
211+
self._callback_status("Waiting for alias reservation...")
212+
while self._canLink.pollState() != CanLink.State.Permitted:
210213
precise_sleep(.02)
211-
212214
# ^ triggers fireListeners which calls CanLink's default
213215
# receiveListener by default since added on CanPhysicalLayer
214216
# arg of linkPhysicalLayer.
215217
# - Must happen *after* listen thread starts, since
216218
# generates ControlFrame.LinkUp and calls fireListeners
217219
# which calls sendAliasConnectionSequence on this thread!
220+
self._callback_status("Alias reservation complete.")
218221

219222
def listen(self):
220223
self._listen_thread = threading.Thread(
@@ -244,19 +247,49 @@ def _listen(self):
244247
# the alias from each node (collision or not)
245248
# to has to get the expected replies to the alias
246249
# reservation sequence below.
247-
precise_sleep(.05) # Wait for physicalLayerUp to complete
250+
precise_sleep(.05) # Wait for physicalLayerUp non-network Message
248251
while True:
249252
# Wait 200 ms for all nodes to announce (and for alias
250253
# reservation to complete), as per section 6.2.1 of CAN
251254
# Frame Transfer Standard (sendMessage requires )
252255
logger.debug("[_listen] _receive...")
253256
try:
254-
sends = self._physicalLayer.popFrames()
255-
while sends:
257+
# Receive mode (switches to write mode on BlockingIOError
258+
# which is expected and used on purpose)
259+
# print("Waiting for _receive")
260+
received = self._receive() # requires setblocking(False)
261+
print("[_listen] received {} byte(s)".format(len(received)),
262+
file=sys.stderr)
263+
# print(" RR: {}".format(received.strip()))
264+
# pass to link processor
265+
self._physicalLayer.handleData(received)
266+
# ^ will trigger self._printFrame if that was added
267+
# via registerFrameReceivedListener during connect.
268+
precise_sleep(.01) # let processor sleep before read
269+
if time.perf_counter() - self._connecting_t > .2:
270+
if self._canLink._state != CanLink.State.Permitted:
271+
if ((self._message_t is None)
272+
or (time.perf_counter() - self._message_t
273+
> 1)):
274+
logger.warning(
275+
"CanLink is not ready yet."
276+
" There must have been a collision"
277+
"--processCollision increments node alias"
278+
" in this case and tries again.")
279+
# else _on_link_state_change will be called
280+
# TODO: move *all* send calls to this loop.
281+
except BlockingIOError:
282+
# Nothing to receive right now, so perform all sends
283+
# This *must* occur (require socket.setblocking(False))
284+
# sends = self._physicalLayer.popFrames()
285+
# while sends:
286+
while True:
256287
# *Always* do send in the receive thread to
257288
# avoid overlapping calls to socket
258289
# (causes undefined behavior)!
259-
frame = sends.pop()
290+
frame = self._physicalLayer.pollFrame()
291+
if frame is None:
292+
break # allow receive to run!
260293
if isinstance(frame, CanFrame):
261294
if self._canLink.isDuplicateAlias(frame.alias):
262295
logger.warning(
@@ -266,43 +299,23 @@ def _listen(self):
266299
.format(frame.alias))
267300
continue
268301
logger.debug("[_listen] _sendString...")
269-
self._port.sendString(frame.encodeAsString())
302+
packet = frame.encodeAsString()
303+
assert isinstance(packet, str)
304+
print("Sending {}".format(packet))
305+
self._port.sendString(packet)
270306
if frame.afterSendState:
271307
self._canLink.setState(frame.afterSendState)
272308
else:
273309
raise NotImplementedError(
274310
"Event type {} is not handled."
275311
.format(type(frame).__name__))
276-
received = self._receive() # requires setblocking(False)
277312
# so that it doesn't block (or occur during) recv
278313
# (overlapping calls would cause undefined behavior)!
279-
# TODO: move *all* send calls to this loop.
280-
except BlockingIOError:
281314
# delay = random.uniform(.005,.02)
282315
# ^ random delay may help if send is on another thread
283316
# (but avoid that for stability and speed)
284317
precise_sleep(.01)
285-
continue
286-
print("[_listen] received {} byte(s)".format(len(received)),
287-
file=sys.stderr)
288-
# print(" RR: {}".format(received.strip()))
289-
# pass to link processor
290-
self._physicalLayer.handleData(received)
291-
# ^ will trigger self._printFrame if that was added
292-
# via registerFrameReceivedListener during connect.
293-
precise_sleep(.01) # let processor sleep briefly before read
294-
if time.perf_counter() - self._connecting_t > .2:
295-
if self._canLink._state != CanLink.State.Permitted:
296-
if ((self._message_t is None)
297-
or (time.perf_counter() - self._message_t
298-
> 1)):
299-
logger.warning(
300-
"CanLink is not ready yet."
301-
" There must have been a collision"
302-
"--processCollision increments node alias"
303-
" in this case and tries again.")
304-
# else _on_link_state_change will be called
305-
318+
# raise RuntimeError("We should never get here")
306319
except RuntimeError as ex:
307320
caught_ex = ex
308321
# If _port is a TcpSocket:

openlcb/frameencoder.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from typing import Union
2+
3+
4+
class FrameEncoder:
5+
def encodeFrameAsString(self, frame) -> str:
6+
'''Encode frame to string.'''
7+
raise NotImplementedError("Implement this in each subclass.")
8+
9+
def encodeFrameAsData(self, frame) -> Union[bytearray, bytes]:
10+
raise NotImplementedError("Implement this in each subclass.")

0 commit comments

Comments
 (0)