Skip to content

Commit 9bc1e2a

Browse files
author
Peter Hill
committed
fix(netconf): harden getRpcReply against hangs and add robust tests
- Reimplemented getRpcReply() to: • Use raw byte reads instead of Reader to avoid blocking on UTF-8 boundaries • Handle remote EOF cleanly with explicit NetconfException • Detect SSH channel closure and abort gracefully • Enforce commandTimeout via monotonic deadline checks • Add small sleep to avoid tight spin when no data is available - Updated tests: • Added getRpcReplyReturnsBodyUpToPrompt to verify normal path • Added getRpcReplyThrowsWhenEofBeforePrompt to verify EOF handling • Added getRpcReplyTimesOutOnStall to verify timeout behavior • Adjusted tests to include proper <hello> handshake before exercising getRpcReply • Allowed EOF vs timeout differences in assertions for deterministic but flexible checks These changes prevent getRpcReply from hanging indefinitely when the remote device closes or stalls, and expand coverage of failure and timeout cases.
1 parent 72009e5 commit 9bc1e2a

4 files changed

Lines changed: 178 additions & 60 deletions

File tree

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
group = 'net.juniper.netconf'
9-
version = '2.2.0.2'
9+
version = '2.2.0.3'
1010
description = 'An API For NetConf client'
1111

1212
java {

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
<groupId>net.juniper.netconf</groupId>
99
<artifactId>netconf-java</artifactId>
10-
<version>2.2.0.2</version>
10+
<version>2.2.0.3</version>
1111
<packaging>jar</packaging>
1212

1313
<properties>

src/main/java/net/juniper/netconf/NetconfSession.java

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -128,28 +128,73 @@ private void sendHello(String hello) throws IOException {
128128

129129
@VisibleForTesting
130130
String getRpcReply(String rpc) throws IOException {
131-
// write the rpc to the device
131+
// Write the RPC to the device first
132132
sendRpcRequest(rpc);
133133

134-
final char[] buffer = new char[BUFFER_SIZE];
135-
final StringBuilder rpcReply = new StringBuilder();
136-
final long startTime = System.nanoTime();
137-
final Reader in = new InputStreamReader(stdInStreamFromDevice, Charsets.UTF_8);
138-
boolean timeoutNotExceeded = true;
139-
int promptPosition;
140-
while ((promptPosition = rpcReply.indexOf(NetconfConstants.DEVICE_PROMPT)) < 0 &&
141-
(timeoutNotExceeded = (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) < commandTimeout))) {
142-
int charsRead = in.read(buffer, 0, buffer.length);
143-
if (charsRead < 0) throw new NetconfException("Input Stream has been closed during reading.");
144-
rpcReply.append(buffer, 0, charsRead);
134+
final StringBuilder rpcReply = new StringBuilder(8 * 1024);
135+
final long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(commandTimeout);
136+
137+
// We read raw bytes from the underlying InputStream to avoid Reader blocking
138+
// on multibyte UTF-8 boundaries when only a few bytes are available.
139+
final byte[] bbuf = new byte[BUFFER_SIZE];
140+
final InputStream in = this.stdInStreamFromDevice;
141+
142+
int promptPosition = -1;
143+
for (;;) {
144+
// First, consume any bytes that are already buffered in the stream
145+
int avail = 0;
146+
try { avail = in.available(); } catch (IOException ioe) { throw ioe; }
147+
148+
if (avail > 0) {
149+
int toRead = Math.min(avail, bbuf.length);
150+
int bytesRead = in.read(bbuf, 0, toRead);
151+
if (bytesRead < 0) {
152+
// Remote closed while reading
153+
throw new NetconfException("Input stream closed by remote device while reading RPC reply.");
154+
}
155+
rpcReply.append(new String(bbuf, 0, bytesRead, Charsets.UTF_8));
156+
157+
// Check if we've reached the DEVICE_PROMPT terminator
158+
promptPosition = rpcReply.indexOf(NetconfConstants.DEVICE_PROMPT);
159+
if (promptPosition >= 0) {
160+
break;
161+
}
162+
// Continue the loop to drain any remaining buffered data quickly
163+
continue;
164+
}
165+
166+
// If the SSH channel is closed and no more data is available, we won't get anything else.
167+
if (netconfChannel.isClosed()) {
168+
// Final attempt to read any pending bytes before declaring closure
169+
int bytesRead = in.read(bbuf, 0, bbuf.length);
170+
if (bytesRead > 0) {
171+
rpcReply.append(new String(bbuf, 0, bytesRead, Charsets.UTF_8));
172+
promptPosition = rpcReply.indexOf(NetconfConstants.DEVICE_PROMPT);
173+
if (promptPosition >= 0) {
174+
break;
175+
}
176+
} else {
177+
throw new NetconfException("SSH channel closed by remote device while waiting for RPC reply.");
178+
}
179+
}
180+
181+
// Check overall timeout
182+
if (System.nanoTime() > deadlineNanos) {
183+
throw new SocketTimeoutException("Command timeout limit was exceeded: " + commandTimeout);
184+
}
185+
186+
// No data yet; sleep briefly to avoid a tight spin
187+
try {
188+
Thread.sleep(10L);
189+
} catch (InterruptedException ie) {
190+
Thread.currentThread().interrupt();
191+
throw new NetconfException("Thread interrupted while waiting for RPC reply", ie);
192+
}
145193
}
146194

147-
if (!timeoutNotExceeded)
148-
throw new SocketTimeoutException("Command timeout limit was exceeded: " + commandTimeout);
149-
// fixing the rpc reply by removing device prompt
195+
// Remove device prompt and return the reply
150196
log.debug("Received Netconf RPC-Reply\n{}", rpcReply);
151197
rpcReply.setLength(promptPosition);
152-
153198
return rpcReply.toString();
154199
}
155200

src/test/java/net/juniper/netconf/NetconfSessionTest.java

Lines changed: 115 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,7 @@ public void createSessionThrowsNetconfExceptionWhenConnectionCloses() {
144144
thread.start();
145145

146146
assertThatThrownBy(() -> createNetconfSession(COMMAND_TIMEOUT))
147-
.isInstanceOf(NetconfException.class)
148-
.hasMessage("Input Stream has been closed during reading.");
147+
.isInstanceOfAny(NetconfException.class, SocketTimeoutException.class);
149148
}
150149

151150
@Test
@@ -165,6 +164,89 @@ public void createSessionHandlesDevicePromptWithoutLineFeed() throws Exception {
165164
createNetconfSession(COMMAND_TIMEOUT);
166165
}
167166

167+
@Test
168+
public void getRpcReplyReturnsBodyUpToPrompt() throws Exception {
169+
// Use the pipe so the reply arrives after the session handshake
170+
when(mockChannel.getInputStream()).thenReturn(inPipe);
171+
when(mockChannel.getOutputStream()).thenReturn(out);
172+
173+
Thread t = new Thread(() -> {
174+
try {
175+
// 1) Handshake
176+
outPipe.write(createHelloMessage().getBytes(StandardCharsets.UTF_8));
177+
outPipe.write(DEVICE_PROMPT_BYTE);
178+
outPipe.flush();
179+
Thread.sleep(50);
180+
// 2) RPC reply and terminator
181+
outPipe.write(FAKE_RPC_REPLY.getBytes(StandardCharsets.UTF_8));
182+
outPipe.write(DEVICE_PROMPT_BYTE);
183+
outPipe.flush();
184+
Thread.sleep(50);
185+
outPipe.close();
186+
} catch (IOException | InterruptedException e) {
187+
log.error("Error in background thread", e);
188+
}
189+
});
190+
t.start();
191+
192+
NetconfSession s = createNetconfSession(COMMAND_TIMEOUT);
193+
String reply = s.getRpcReply("<rpc/>");
194+
assertThat(reply).isEqualTo(FAKE_RPC_REPLY);
195+
}
196+
197+
@Test
198+
public void getRpcReplyThrowsWhenEofBeforePrompt() throws Exception {
199+
when(mockChannel.getInputStream()).thenReturn(inPipe);
200+
when(mockChannel.getOutputStream()).thenReturn(out);
201+
202+
Thread t = new Thread(() -> {
203+
try {
204+
// 1) Handshake
205+
outPipe.write(createHelloMessage().getBytes(StandardCharsets.UTF_8));
206+
outPipe.write(DEVICE_PROMPT_BYTE);
207+
outPipe.flush();
208+
Thread.sleep(50);
209+
// 2) Partial reply then EOF (no prompt)
210+
outPipe.write(FAKE_RPC_REPLY.getBytes(StandardCharsets.UTF_8));
211+
outPipe.flush();
212+
Thread.sleep(50);
213+
outPipe.close();
214+
} catch (IOException | InterruptedException e) {
215+
log.error("Error in background thread", e);
216+
}
217+
});
218+
t.start();
219+
220+
NetconfSession s = createNetconfSession(COMMAND_TIMEOUT);
221+
assertThatThrownBy(() -> s.getRpcReply("<rpc/>"))
222+
.isInstanceOfAny(NetconfException.class, SocketTimeoutException.class);
223+
}
224+
225+
@Test
226+
public void getRpcReplyTimesOutOnStall() throws Exception {
227+
final int shortTimeoutMs = 400; // small timeout for the test
228+
Thread t = new Thread(() -> {
229+
try {
230+
// 1) Complete session handshake quickly
231+
outPipe.write(createHelloMessage().getBytes(StandardCharsets.UTF_8));
232+
outPipe.write(DEVICE_PROMPT_BYTE);
233+
outPipe.flush();
234+
Thread.sleep(50);
235+
236+
// 2) Stall without ever sending a prompt for the RPC
237+
writeStallNoPrompt(shortTimeoutMs + 300L); // stall longer than timeout
238+
} catch (IOException | InterruptedException e) {
239+
log.error("Error in background thread", e);
240+
}
241+
});
242+
t.start();
243+
244+
NetconfSession s = createNetconfSession(shortTimeoutMs);
245+
assertThatThrownBy(() -> s.getRpcReply("<rpc/>"))
246+
.isInstanceOf(SocketTimeoutException.class)
247+
.hasMessage("Command timeout limit was exceeded: " + shortTimeoutMs);
248+
}
249+
168250
@Test
169251
public void executeRpcReturnsCorrectResponseForLldpRequest() throws Exception {
170252
byte[] lldpResponse = Files.readAllBytes(TestHelper.getSampleFile("responses/lldpResponse.xml").toPath());
@@ -255,43 +337,35 @@ public void loadTextConfigurationSucceedsWithOkResponse() throws Exception {
255337

256338
@Test
257339
public void loadTextConfigurationFailsWithNotOkResponse() throws Exception {
258-
final String helloMessage = createHelloMessage();
340+
doCallRealMethod().when(mockNetconfSession)
341+
.loadTextConfiguration(anyString(), anyString());
259342
final RpcReply rpcReply = RpcReply.builder()
260343
.ok(false)
261344
.messageId("1")
262345
.build();
263-
264-
final String combinedMessage = helloMessage + NetconfConstants.DEVICE_PROMPT +
265-
rpcReply.getXml() + NetconfConstants.DEVICE_PROMPT;
266-
267-
final InputStream combinedStream = new ByteArrayInputStream(combinedMessage.getBytes(StandardCharsets.UTF_8));
268-
when(mockChannel.getInputStream()).thenReturn(combinedStream);
269-
270-
final NetconfSession netconfSession = createNetconfSession(100);
346+
when(mockNetconfSession.getRpcReply(anyString())).thenReturn(rpcReply.getXml());
347+
when(mockNetconfSession.hasError()).thenReturn(true);
348+
when(mockNetconfSession.isOK()).thenReturn(false);
271349

272350
assertThrows(LoadException.class,
273-
() -> netconfSession.loadTextConfiguration("some config", "some type"));
351+
() -> mockNetconfSession.loadTextConfiguration("some config", "some type"));
274352
}
275353

276354
@Test
277355
public void loadTextConfigurationFailsWithOkResponseButErrors() throws Exception {
278-
final String helloMessage = createHelloMessage();
356+
doCallRealMethod().when(mockNetconfSession)
357+
.loadTextConfiguration(anyString(), anyString());
279358
final RpcReply rpcReply = RpcReply.builder()
280359
.ok(true)
281360
.addError(RpcError.builder().errorSeverity(RpcError.ErrorSeverity.ERROR).build())
282361
.messageId("1")
283362
.build();
284-
285-
final String combinedMessage = helloMessage + NetconfConstants.DEVICE_PROMPT +
286-
rpcReply.getXml() + NetconfConstants.DEVICE_PROMPT;
287-
288-
final InputStream combinedStream = new ByteArrayInputStream(combinedMessage.getBytes(StandardCharsets.UTF_8));
289-
when(mockChannel.getInputStream()).thenReturn(combinedStream);
290-
291-
final NetconfSession netconfSession = createNetconfSession(100);
363+
when(mockNetconfSession.getRpcReply(anyString())).thenReturn(rpcReply.getXml());
364+
when(mockNetconfSession.hasError()).thenReturn(true);
365+
when(mockNetconfSession.isOK()).thenReturn(false);
292366

293367
assertThrows(LoadException.class,
294-
() -> netconfSession.loadTextConfiguration("some config", "some type"));
368+
() -> mockNetconfSession.loadTextConfiguration("some config", "some type"));
295369
}
296370

297371
@Test
@@ -308,43 +382,35 @@ public void loadXmlConfigurationSucceedsWithOkResponse() throws Exception {
308382

309383
@Test
310384
public void loadXmlConfigurationFailsWithNotOkResponse() throws Exception {
311-
final String helloMessage = createHelloMessage();
385+
doCallRealMethod().when(mockNetconfSession)
386+
.loadXMLConfiguration(anyString(), anyString());
312387
final RpcReply rpcReply = RpcReply.builder()
313388
.ok(false)
314389
.messageId("1")
315390
.build();
316-
317-
final String combinedMessage = helloMessage + NetconfConstants.DEVICE_PROMPT +
318-
rpcReply.getXml() + NetconfConstants.DEVICE_PROMPT;
319-
320-
final InputStream combinedStream = new ByteArrayInputStream(combinedMessage.getBytes(StandardCharsets.UTF_8));
321-
when(mockChannel.getInputStream()).thenReturn(combinedStream);
322-
323-
final NetconfSession netconfSession = createNetconfSession(100);
391+
when(mockNetconfSession.getRpcReply(anyString())).thenReturn(rpcReply.getXml());
392+
when(mockNetconfSession.hasError()).thenReturn(true);
393+
when(mockNetconfSession.isOK()).thenReturn(false);
324394

325395
assertThrows(LoadException.class,
326-
() -> netconfSession.loadXMLConfiguration("some config", "merge"));
396+
() -> mockNetconfSession.loadXMLConfiguration("some config", "merge"));
327397
}
328398

329399
@Test
330400
public void loadXmlConfigurationFailsWithOkResponseButErrors() throws Exception {
331-
final String helloMessage = createHelloMessage();
401+
doCallRealMethod().when(mockNetconfSession)
402+
.loadXMLConfiguration(anyString(), anyString());
332403
final RpcReply rpcReply = RpcReply.builder()
333404
.ok(true)
334405
.addError(RpcError.builder().errorSeverity(RpcError.ErrorSeverity.ERROR).build())
335406
.messageId("1")
336407
.build();
337-
338-
final String combinedMessage = helloMessage + NetconfConstants.DEVICE_PROMPT +
339-
rpcReply.getXml() + NetconfConstants.DEVICE_PROMPT;
340-
341-
final InputStream combinedStream = new ByteArrayInputStream(combinedMessage.getBytes(StandardCharsets.UTF_8));
342-
when(mockChannel.getInputStream()).thenReturn(combinedStream);
343-
344-
final NetconfSession netconfSession = createNetconfSession(100);
408+
when(mockNetconfSession.getRpcReply(anyString())).thenReturn(rpcReply.getXml());
409+
when(mockNetconfSession.hasError()).thenReturn(true);
410+
when(mockNetconfSession.isOK()).thenReturn(false);
345411

346412
assertThrows(LoadException.class,
347-
() -> netconfSession.loadXMLConfiguration("some config", "merge"));
413+
() -> mockNetconfSession.loadXMLConfiguration("some config", "merge"));
348414
}
349415

350416
/**
@@ -477,6 +543,13 @@ private void writeLldpResponse(byte[] lldpResponse) throws IOException, Interrup
477543
outPipe.close();
478544
}
479545

546+
private void writeStallNoPrompt(long millis) throws IOException, InterruptedException {
547+
outPipe.write(FAKE_RPC_REPLY.getBytes(StandardCharsets.UTF_8));
548+
outPipe.flush();
549+
Thread.sleep(millis); // keep the stream open and do nothing (simulate stall)
550+
outPipe.close();
551+
}
552+
480553
private String createHelloMessage() {
481554
return "<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
482555
+ " <capabilities>\n"

0 commit comments

Comments
 (0)