Skip to content

Commit c4f5db7

Browse files
authored
Merge pull request #298 from bobjacobsen/stream-update
Update streams code to as-observed and WN formats
2 parents 4565a99 + de44bb8 commit c4f5db7

14 files changed

Lines changed: 89 additions & 36 deletions

src/org/openlcb/LoaderClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ public void handleFailure(String where, int errorCode) {
222222
private void sendStream() {
223223
// System.out.println("lSend Stream ");
224224
// @todo the destStreamID is probably bogus at this point. Check why it is needed here.
225-
StreamInitiateRequestMessage m = new StreamInitiateRequestMessage(src, dest, bufferSize, SRC_STREAM_ID, destStreamID);
225+
// flags are 0 here by default
226+
StreamInitiateRequestMessage m = new StreamInitiateRequestMessage(src, dest, 0, bufferSize, SRC_STREAM_ID, destStreamID);
226227
connection.put(m, this);
227228
startTimeout(STREAM_INIT_TIMEOUT_MSEC);
228229
}

src/org/openlcb/StreamDataProceedMessage.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@
1515
public class StreamDataProceedMessage extends AddressedPayloadMessage {
1616

1717
public StreamDataProceedMessage(NodeID source, NodeID dest,
18-
byte sourceStreamID, byte destStreamID) {
19-
super(source, dest, new byte[] {sourceStreamID, destStreamID});
18+
byte sourceStreamID, byte destStreamID, int flags) {
19+
super(source, dest, new byte[] {sourceStreamID, destStreamID, (byte)((flags>>8)&0xFF), (byte)(flags&0xFF)});
2020
this.sourceStreamID = sourceStreamID;
2121
this.destStreamID = destStreamID;
22+
this.flags = flags;
2223
}
2324

2425
byte sourceStreamID;
2526
byte destStreamID;
27+
int flags;
2628

2729
public byte getSourceStreamID() { return sourceStreamID; }
2830
public byte getDestinationStreamID() { return destStreamID; }

src/org/openlcb/StreamInitiateReplyMessage.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
public class StreamInitiateReplyMessage extends AddressedPayloadMessage {
1616

1717
public StreamInitiateReplyMessage(NodeID source, NodeID dest,
18-
int bufferSize, byte sourceStreamID, byte destStreamID) {
19-
super(source, dest, toPayload(bufferSize, sourceStreamID, destStreamID));
18+
int flags, int bufferSize, byte sourceStreamID, byte destStreamID) {
19+
super(source, dest, toPayload(flags, bufferSize, sourceStreamID, destStreamID));
20+
this.flags = flags;
2021
this.bufferSize = bufferSize;
2122
this.sourceStreamID = sourceStreamID;
2223
this.destStreamID = destStreamID;
2324
}
2425

26+
int flags;
2527
int bufferSize;
2628
byte sourceStreamID;
2729
byte destStreamID;
@@ -30,8 +32,9 @@ public StreamInitiateReplyMessage(NodeID source, NodeID dest,
3032
public byte getDestinationStreamID() { return destStreamID; }
3133
public byte getSourceStreamID() { return sourceStreamID; } //dph 20151229
3234

33-
static byte[] toPayload(int bufferSize, byte sourceStreamID, byte destStreamID) {
35+
static byte[] toPayload(int flags, int bufferSize, byte sourceStreamID, byte destStreamID) {
3436
byte[] b = new byte[]{0, 0, 0, 0, sourceStreamID, destStreamID};
37+
Utilities.HostToNetworkUint16(b, 2, flags);
3538
Utilities.HostToNetworkUint16(b, 0, bufferSize);
3639
return b;
3740
}

src/org/openlcb/StreamInitiateRequestMessage.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,23 @@
77
/**
88
* Stream Initialization Request message implementation
99
*
10-
* @author Bob Jacobsen Copyright 2009
10+
* @author Bob Jacobsen Copyright 2009, 2026
1111
* @version $Revision$
1212
*/
1313
@Immutable
1414
@ThreadSafe
1515
public class StreamInitiateRequestMessage extends AddressedPayloadMessage {
1616

1717
public StreamInitiateRequestMessage(NodeID source, NodeID dest,
18-
int bufferSize, byte sourceStreamID, byte destinationStreamID) {
19-
super(source, dest, toPayload(bufferSize, sourceStreamID, destinationStreamID));
18+
int flags, int bufferSize, byte sourceStreamID, byte destinationStreamID) {
19+
super(source, dest, toPayload(flags, bufferSize, sourceStreamID, destinationStreamID));
20+
this.flags = flags;
2021
this.bufferSize = bufferSize;
2122
this.sourceStreamID = sourceStreamID;
2223
this.destinationStreamID = destinationStreamID;
2324
}
2425

26+
int flags;
2527
int bufferSize;
2628
byte sourceStreamID;
2729
byte destinationStreamID;
@@ -30,9 +32,10 @@ public StreamInitiateRequestMessage(NodeID source, NodeID dest,
3032
public byte getSourceStreamID() { return sourceStreamID; }
3133
public byte getDestinationStreamID() { return destinationStreamID; }
3234

33-
static byte[] toPayload(int bufferSize, byte sourceStreamID, byte destStreamID) {
35+
static byte[] toPayload(int flags, int bufferSize, byte sourceStreamID, byte destStreamID) {
3436
byte[] b = new byte[]{0, 0, 0, 0, sourceStreamID, destStreamID};
3537
Utilities.HostToNetworkUint16(b, 0, bufferSize);
38+
Utilities.HostToNetworkUint16(b, 2, flags);
3639
return b;
3740
}
3841

src/org/openlcb/can/MessageBuilder.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -376,15 +376,20 @@ List<Message> processFormat1(CanFrame f) {
376376
return retlist;
377377
// add all stream messages reply and proceed.
378378
case StreamInitiateRequest:
379-
retlist.add(new StreamInitiateRequestMessage(source,dest,Utilities.NetworkToHostUint16(content, 2),content[4],
379+
retlist.add(new StreamInitiateRequestMessage(source,dest,
380+
Utilities.NetworkToHostUint16(content, 2),
381+
Utilities.NetworkToHostUint16(content, 0),content[4],
380382
(content.length > 5 ? content[5] : -1)));
381383
return retlist;
382384
case StreamInitiateReply:
383-
retlist.add(new StreamInitiateReplyMessage(source,dest,Utilities.NetworkToHostUint16(content, 0),content[4], content[5]));
385+
retlist.add(new StreamInitiateReplyMessage(source,dest,
386+
Utilities.NetworkToHostUint16(content, 2),Utilities.NetworkToHostUint16(content, 0),
387+
content[4], content[5]));
384388
return retlist;
385389
// case StreamData is Format 7
386390
case StreamDataProceed:
387-
retlist.add(new StreamDataProceedMessage(source,dest,content[2], content[3]));
391+
int sdpflags = (int)(content[2])<<8+(int)content[3];
392+
retlist.add(new StreamDataProceedMessage(source,dest,content[0], content[1], sdpflags));
388393
return retlist;
389394
case StreamDataComplete:
390395
retlist.add(new StreamDataCompleteMessage(source,dest,content.length > 2 ?

src/org/openlcb/implementations/StreamReceiver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ public void handleStreamInitiateRequest(StreamInitiateRequestMessage msg, Connec
2929
int len = msg.getBufferSize();
3030
sourceStreamID = msg.getSourceStreamID();
3131

32-
Message m = new StreamInitiateReplyMessage(here, far, len, sourceStreamID, destStreamID);
32+
// flags are 0
33+
Message m = new StreamInitiateReplyMessage(here, far, 0, len, sourceStreamID, destStreamID);
3334
connection.put(m, this);
3435
}
3536

@@ -38,7 +39,7 @@ public void handleStreamInitiateRequest(StreamInitiateRequestMessage msg, Connec
3839
*/
3940
public void handleStreamDataSend(StreamDataSendMessage msg, Connection sender){
4041
// send proceed reply
41-
Message m = new StreamDataProceedMessage(here, far, sourceStreamID, destStreamID);
42+
Message m = new StreamDataProceedMessage(here, far, sourceStreamID, destStreamID, 0);
4243
connection.put(m, this);
4344
}
4445

src/org/openlcb/implementations/StreamTransmitter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ public StreamTransmitter(NodeID here, NodeID far, int bufferSize, int[] bytes, C
2828
// not be right, but we don't have a value passed to this method.
2929
destStreamID = 0;
3030

31-
// start negotiation
32-
StreamInitiateRequestMessage m = new StreamInitiateRequestMessage(here, far, bufferSize, sourceStreamID, destStreamID);
31+
// start negotiation - flags are 0 here
32+
StreamInitiateRequestMessage m = new StreamInitiateRequestMessage(here, far, 0, bufferSize, sourceStreamID, destStreamID);
3333
connection.put(m, this);
3434
}
3535

test/org/openlcb/LoaderClientTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -277,17 +277,17 @@ public void onDone(int errorCode, String errorString) {
277277
dcs.put(m, null);
278278
// Stream Setup
279279
Assert.assertEquals("StreamSetup", 1, messagesReceived.size());
280-
Assert.assertTrue(messagesReceived.get(0).equals(new StreamInitiateRequestMessage(hereID,farID,16384,(byte)4,(byte)0))); // Stream negn
280+
Assert.assertTrue(messagesReceived.get(0).equals(new StreamInitiateRequestMessage(hereID,farID,0,16384,(byte)4,(byte)0))); // Stream negn
281281
messagesReceived.clear();
282282
// *********** note small buffersize! **********
283-
xmt.put(new StreamInitiateReplyMessage(farID,hereID,6,(byte)4,(byte)6), null);
283+
xmt.put(new StreamInitiateReplyMessage(farID,hereID,0,6,(byte)4,(byte)6), null);
284284
// Stream Data
285285

286286
Assert.assertEquals("stream data", 1, messagesReceived.size());
287287
//System.out.println("Msg0: "+(messagesReceived.get(0) != null ? messagesReceived.get(0).toString() : " == null"));
288288
Assert.assertTrue(messagesReceived.get(0).equals(new StreamDataSendMessage(hereID,farID,(byte)6,new int[]{'a','b','c','d','e','f'})));
289289
messagesReceived.clear();
290-
xmt.put(new StreamDataProceedMessage(farID,hereID,(byte)4,(byte)6),null);
290+
xmt.put(new StreamDataProceedMessage(farID,hereID,(byte)4,(byte)6,0),null);
291291
Assert.assertEquals("second stream data, stream complete, unfreeze", 3, messagesReceived.size());
292292
Assert.assertTrue(messagesReceived.get(0).equals(new StreamDataSendMessage(hereID,farID,(byte)6,new int[]{'g','h','i','j'})));
293293
Assert.assertTrue(messagesReceived.get(1).equals(new StreamDataCompleteMessage(hereID,farID,(byte)4,(byte)6)));
@@ -331,10 +331,10 @@ public void onDone(int errorCode, String errorString) {
331331
Message m = new DatagramAcknowledgedMessage(farID,hereID);
332332
dcs.put(m, null);
333333
// Stream Setup
334-
Assert.assertEquals(new StreamInitiateRequestMessage(hereID, farID, 16384, (byte)4, (byte)0), messagesReceived.get(0));
334+
Assert.assertEquals(new StreamInitiateRequestMessage(hereID, farID, 0, 16384, (byte)4, (byte)0), messagesReceived.get(0));
335335
messagesReceived.clear();
336336
// *********** note larger buffersize! **********
337-
xmt.put(new StreamInitiateReplyMessage(farID,hereID,64,(byte)4,(byte)6), null);
337+
xmt.put(new StreamInitiateReplyMessage(farID,hereID,0,64,(byte)4,(byte)6), null);
338338
// Stream Data
339339
Assert.assertEquals("stream data", 3, messagesReceived.size());
340340
Assert.assertTrue(messagesReceived.get(0).equals(new StreamDataSendMessage(hereID,farID,(byte)6,new int[]{'a','b','c','d','e','f','g','h','i','j'})));

test/org/openlcb/StreamDataProceedMessageTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class StreamDataProceedMessageTest {
1818
public void testCTor() {
1919
NodeID id1 = new NodeID(new byte[]{1, 1, 0, 0, 0, 4});
2020
NodeID id2 = new NodeID(new byte[]{1, 1, 0, 0, 4, 4});
21-
StreamDataProceedMessage t = new StreamDataProceedMessage(id1,id2,(byte)0x00,(byte)0x00);
21+
StreamDataProceedMessage t = new StreamDataProceedMessage(id1,id2,(byte)0x00,(byte)0x00,0);
2222
Assert.assertNotNull("exists",t);
2323
}
2424

test/org/openlcb/StreamInitiateReplyMessageTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class StreamInitiateReplyMessageTest {
1818
public void testCTor() {
1919
NodeID id1 = new NodeID(new byte[]{1, 1, 0, 0, 0, 4});
2020
NodeID id2 = new NodeID(new byte[]{1, 1, 0, 0, 4, 4});
21-
StreamInitiateReplyMessage t = new StreamInitiateReplyMessage(id1,id2,0,(byte)0x00,(byte)0x00);
21+
StreamInitiateReplyMessage t = new StreamInitiateReplyMessage(id1,id2,0,0,(byte)0x00,(byte)0x00);
2222
Assert.assertNotNull("exists",t);
2323
}
2424

0 commit comments

Comments
 (0)