From c416ce2e3843f32f60a6bd763388929249bf5a8b Mon Sep 17 00:00:00 2001 From: javasabr Date: Sun, 19 Oct 2025 18:40:06 +0200 Subject: [PATCH 1/4] drop using double write buffer --- README.md | 2 +- build.gradle | 2 +- .../rlib/network/StringNetworkLoadTest.java | 2 +- .../rlib/network/ServerNetworkConfig.java | 1 + .../impl/AbstractNetworkPacketReader.java | 2 + .../impl/AbstractNetworkPacketWriter.java | 111 +++++++----------- .../impl/DefaultNetworkPacketWriter.java | 10 +- .../impl/DefaultSslNetworkPacketWriter.java | 11 +- .../impl/IdBasedNetworkPacketWriter.java | 7 +- .../server/impl/DefaultServerNetwork.java | 5 +- 10 files changed, 62 insertions(+), 91 deletions(-) diff --git a/README.md b/README.md index b338bf1c..ad8a3a9c 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ repositories { } ext { - rlibVersion = "10.0.alpha3" + rlibVersion = "10.0.alpha4" } dependencies { diff --git a/build.gradle b/build.gradle index 33804a0e..def2e707 100644 --- a/build.gradle +++ b/build.gradle @@ -1,4 +1,4 @@ -rootProject.version = "10.0.alpha3" +rootProject.version = "10.0.alpha4" group = 'javasabr.rlib' allprojects { diff --git a/rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java b/rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java index 44a14e15..5a1b262b 100644 --- a/rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java +++ b/rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java @@ -130,7 +130,7 @@ void testServerWithMultiplyClients() { var serverAllocator = new DefaultBufferAllocator(serverConfig); ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); - int clientCount = 200; + int clientCount = 500; int messagesPerIteration = 3_000; int expectedMessages = clientCount * messagesPerIteration * MAX_ITERATIONS; diff --git a/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java b/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java index 36855877..e84af743 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java @@ -77,6 +77,7 @@ default int scheduledThreadGroupSize() { return 1; } + /** * Get a thread constructor which should be used to create network threads. */ diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java index aae4fd2d..883eed7a 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java @@ -121,6 +121,8 @@ protected void startReadImpl() { if (reading.compareAndSet(true, false)) { retryReadLater(); } + } catch (Error error) { + throw error; } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java index 214568ae..82b7fb0f 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java @@ -53,14 +53,10 @@ public void failed(Throwable exc, @Nullable WritableNetworkPacket packet) { final C connection; final AsynchronousSocketChannel socketChannel; final BufferAllocator bufferAllocator; - final ByteBuffer firstWriteBuffer; - final ByteBuffer secondWriteBuffer; + final ByteBuffer writeBuffer; @Nullable - volatile ByteBuffer firstWriteTempBuffer; - @Nullable - volatile ByteBuffer secondWriteTempBuffer; - + volatile ByteBuffer writeTempBuffer; @Getter(AccessLevel.PROTECTED) volatile ByteBuffer writingBuffer = EMPTY_BUFFER; @@ -80,8 +76,7 @@ public AbstractNetworkPacketWriter( this.connection = connection; this.socketChannel = socketChannel; this.bufferAllocator = bufferAllocator; - this.firstWriteBuffer = bufferAllocator.takeWriteBuffer(); - this.secondWriteBuffer = bufferAllocator.takeWriteBuffer(); + this.writeBuffer = bufferAllocator.takeWriteBuffer(); this.updateActivityFunction = updateActivityFunction; this.writablePacketProvider = packetProvider; this.serializedToChannelPacketHandler = serializedToChannelPacketHandler; @@ -156,28 +151,23 @@ protected ByteBuffer serialize(WritableNetworkPacket packet) { int totalSize = expectedLength == -1 ? -1 : totalSize(packet, expectedLength); // if the packet is too big to use a write buffer - if (expectedLength != -1 && totalSize > firstWriteBuffer.capacity()) { - ByteBuffer first = bufferAllocator.takeBuffer(totalSize); - ByteBuffer second = bufferAllocator.takeBuffer(totalSize); - firstWriteTempBuffer = first; - secondWriteTempBuffer = second; + if (expectedLength != -1 && totalSize > writeBuffer.capacity()) { + ByteBuffer tempBuffer = bufferAllocator.takeBuffer(totalSize); try { - return serialize(resultPacket, expectedLength, totalSize, first, second); + ByteBuffer serialized = serialize(resultPacket, expectedLength, totalSize, tempBuffer); + writeTempBuffer = tempBuffer; + return serialized; } catch (BufferOverflowException ex) { log.error(ex); - bufferAllocator.putBuffer(first); - bufferAllocator.putBuffer(second); - firstWriteTempBuffer = null; - secondWriteTempBuffer = null; + bufferAllocator.putBuffer(tempBuffer); + writeTempBuffer = null; throw new RuntimeException(ex); } } else { try { - return serialize(resultPacket, expectedLength, totalSize, firstWriteBuffer, secondWriteBuffer); + return serialize(resultPacket, expectedLength, totalSize, writeBuffer); } catch (BufferOverflowException ex) { log.error(ex); - firstWriteBuffer.clear(); - secondWriteBuffer.clear(); throw new RuntimeException(ex); } } @@ -196,84 +186,76 @@ protected ByteBuffer serialize(WritableNetworkPacket packet) { * @param packet the network packet to serialize. * @param expectedLength the packet's expected size. * @param totalSize the packet's total size. - * @param firstBuffer the first buffer. - * @param secondBuffer the second buffer. + * @param writeBuffer the write buffer. * @return the final buffer to write to channel. */ protected ByteBuffer serialize( W packet, int expectedLength, int totalSize, - ByteBuffer firstBuffer, - ByteBuffer secondBuffer) { - - if (!onBeforeSerialize(packet, expectedLength, totalSize, firstBuffer, secondBuffer)) { - return firstBuffer.clear().limit(0); - } else if (!doSerialize(packet, expectedLength, totalSize, firstBuffer, secondBuffer)) { - return firstBuffer.clear().limit(0); - } else if (!onAfterSerialize(packet, expectedLength, totalSize, firstBuffer, secondBuffer)) { - return firstBuffer.clear().limit(0); + ByteBuffer writeBuffer) { + + if (!onBeforeSerialize(packet, expectedLength, totalSize, writeBuffer)) { + return writeBuffer.clear().limit(0); + } else if (!doSerialize(packet, expectedLength, totalSize, writeBuffer)) { + return writeBuffer.clear().limit(0); + } else if (!onAfterSerialize(packet, expectedLength, totalSize, writeBuffer)) { + return writeBuffer.clear().limit(0); } - return onSerializeResult(packet, expectedLength, totalSize, firstBuffer, secondBuffer); + return onSerializeResult(packet, expectedLength, totalSize, writeBuffer); } /** - * Handles the buffers before serializing packet's data. + * Handles the buffer before serializing packet's data. * * @param packet the network packet. * @param expectedLength the packet's expected size. * @param totalSize the packet's total size. - * @param firstBuffer the first buffer. - * @param secondBuffer the second buffer. + * @param writeBuffer the write buffer. * @return true if handling was successful. */ protected boolean onBeforeSerialize( W packet, int expectedLength, int totalSize, - ByteBuffer firstBuffer, - ByteBuffer secondBuffer) { - firstBuffer.clear(); + ByteBuffer writeBuffer) { + writeBuffer.clear(); return true; } /** - * Serializes the network packet data to the buffers. + * Serializes the network packet data to the buffer. * * @param packet the network packet. * @param expectedLength the packet's expected size. * @param totalSize the packet's total size. - * @param firstBuffer the first buffer. - * @param secondBuffer the second buffer. + * @param writeBuffer the first buffer. * @return true if writing was successful. */ protected boolean doSerialize( W packet, int expectedLength, int totalSize, - ByteBuffer firstBuffer, - ByteBuffer secondBuffer) { - return packet.write(connection, firstBuffer); + ByteBuffer writeBuffer) { + return packet.write(connection, writeBuffer); } /** - * Handles the buffers after serializing packet's data. + * Handles the buffer after serializing packet's data. * * @param packet the network packet. * @param expectedLength the packet's expected size. * @param totalSize the packet's total size. - * @param firstBuffer the first buffer. - * @param secondBuffer the second buffer. + * @param writeBuffer the write buffer. * @return true if handling was successful. */ protected boolean onAfterSerialize( W packet, int expectedLength, int totalSize, - ByteBuffer firstBuffer, - ByteBuffer secondBuffer) { - firstBuffer.flip(); + ByteBuffer writeBuffer) { + writeBuffer.flip(); return true; } @@ -283,17 +265,15 @@ protected boolean onAfterSerialize( * @param packet the network packet. * @param expectedLength the packet's expected size. * @param totalSize the packet's total size. - * @param firstBuffer the first buffer. - * @param secondBuffer the second buffer. + * @param writeBuffer the write buffer. * @return the result buffer. */ protected ByteBuffer onSerializeResult( W packet, int expectedLength, int totalSize, - ByteBuffer firstBuffer, - ByteBuffer secondBuffer) { - return firstBuffer.position(0); + ByteBuffer writeBuffer) { + return writeBuffer; } protected ByteBuffer writeHeader(ByteBuffer buffer, int position, int value, int headerSize) { @@ -405,9 +385,7 @@ protected void handleFailedWritingData(Throwable exception, @Nullable WritableNe @Override public void close() { - bufferAllocator - .putWriteBuffer(firstWriteBuffer) - .putWriteBuffer(secondWriteBuffer); + bufferAllocator.putWriteBuffer(writeBuffer); clearTempBuffers(); writingBuffer = EMPTY_BUFFER; writing.compareAndSet(true, false); @@ -415,17 +393,10 @@ public void close() { protected void clearTempBuffers() { this.writingBuffer = EMPTY_BUFFER; - - var firstWriteTempBuffer = this.firstWriteTempBuffer; - if (firstWriteTempBuffer != null) { - this.firstWriteTempBuffer = null; - bufferAllocator.putBuffer(firstWriteTempBuffer); - } - - var secondWriteTempBuffer = this.secondWriteTempBuffer; - if (secondWriteTempBuffer != null) { - this.secondWriteTempBuffer = null; - bufferAllocator.putBuffer(secondWriteTempBuffer); + var writeTempBuffer = this.writeTempBuffer; + if (writeTempBuffer != null) { + this.writeTempBuffer = null; + bufferAllocator.putBuffer(writeTempBuffer); } } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java index 9234df50..69098115 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java @@ -52,9 +52,8 @@ protected boolean onBeforeSerialize( W packet, int expectedLength, int totalSize, - ByteBuffer firstBuffer, - ByteBuffer secondBuffer) { - firstBuffer + ByteBuffer writeBuffer) { + writeBuffer .clear() .position(packetLengthHeaderSize); return true; @@ -65,9 +64,8 @@ protected ByteBuffer onSerializeResult( W packet, int expectedLength, int totalSize, - ByteBuffer firstBuffer, - ByteBuffer secondBuffer) { - return writePacketLength(firstBuffer, firstBuffer.limit()) + ByteBuffer writeBuffer) { + return writePacketLength(writeBuffer, writeBuffer.limit()) .position(0); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java index 44f93818..cec55033 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java @@ -57,9 +57,8 @@ protected boolean onBeforeSerialize( W packet, int expectedLength, int totalSize, - ByteBuffer firstBuffer, - ByteBuffer secondBuffer) { - firstBuffer + ByteBuffer writeBuffer) { + writeBuffer .clear() .position(packetLengthHeaderSize); return true; @@ -70,9 +69,9 @@ protected ByteBuffer onSerializeResult( W packet, int expectedLength, int totalSize, - ByteBuffer firstBuffer, - ByteBuffer secondBuffer) { - return writePacketLength(firstBuffer, firstBuffer.limit()).position(0); + ByteBuffer writeBuffer) { + return writePacketLength(writeBuffer, writeBuffer.limit()) + .position(0); } protected ByteBuffer writePacketLength(ByteBuffer buffer, int packetLength) { diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java index a8e9d767..7d738b77 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java @@ -50,9 +50,8 @@ protected boolean doSerialize( W packet, int expectedLength, int totalSize, - ByteBuffer firstBuffer, - ByteBuffer secondBuffer) { - writeHeader(firstBuffer, packet.packetId(), packetIdHeaderSize); - return super.doSerialize(packet, expectedLength, totalSize, firstBuffer, secondBuffer); + ByteBuffer writeBuffer) { + writeHeader(writeBuffer, packet.packetId(), packetIdHeaderSize); + return super.doSerialize(packet, expectedLength, totalSize, writeBuffer); } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java b/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java index 3a2ef320..311cf7a1 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java @@ -11,6 +11,7 @@ import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; @@ -180,9 +181,9 @@ protected ExecutorService buildExecutor(ServerNetworkConfig config) { config.threadGroupMaxSize(), 120, TimeUnit.SECONDS, - new SynchronousQueue<>(), + new LinkedBlockingQueue<>(), threadFactory, - new ThreadPoolExecutor.CallerRunsPolicy()); + new ThreadPoolExecutor.AbortPolicy()); } else { executorService = Executors.newFixedThreadPool(config.threadGroupMinSize(), threadFactory); } From 0d3e61fd054a17d305af0c87f18a8e9000ce1f21 Mon Sep 17 00:00:00 2001 From: javasabr Date: Sun, 19 Oct 2025 20:18:13 +0200 Subject: [PATCH 2/4] fix libs catalog for root plugin --- buildSrc/settings.gradle | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/buildSrc/settings.gradle b/buildSrc/settings.gradle index 717dd794..64b18218 100644 --- a/buildSrc/settings.gradle +++ b/buildSrc/settings.gradle @@ -1,3 +1,11 @@ enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS") -rootProject.name = 'rlib-build-configuration' \ No newline at end of file +rootProject.name = 'build-configuration' + +dependencyResolutionManagement { + versionCatalogs { + libs { + from(files("../gradle/libs.versions.toml")) + } + } +} \ No newline at end of file From a8bfba5eafc048447f8b627c74d6b247ded7d023 Mon Sep 17 00:00:00 2001 From: javasabr Date: Mon, 20 Oct 2025 06:05:31 +0200 Subject: [PATCH 3/4] add configuration of max packet size for network --- .../java/javasabr/rlib/network/NetworkConfig.java | 13 +++++++++++++ .../javasabr/rlib/network/ServerNetworkConfig.java | 6 ++++++ .../packet/impl/AbstractNetworkPacketReader.java | 9 +++++++++ 3 files changed, 28 insertions(+) diff --git a/rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java b/rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java index ace95432..05db4ee1 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java @@ -30,6 +30,12 @@ class SimpleNetworkConfig implements NetworkConfig { private int writeBufferSize = 2048; @Builder.Default private int retryDelayInMs = 1000; + @Builder.Default + private int maxPacketSize = 5 * 1024 * 1024; + @Builder.Default + private int maxEmptyReadsBeforeClose = 3; + @Builder.Default + private boolean useDirectByteBuffer = false; } NetworkConfig DEFAULT_CLIENT = new NetworkConfig() { @@ -76,6 +82,13 @@ default int writeBufferSize() { return 2048; } + /** + * Gets the max size of one single network packet. + */ + default int maxPacketSize() { + return 5 * 1024 * 1024; + } + /** * Gets a timeout for retry read/write operation. */ diff --git a/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java b/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java index e84af743..56576270 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java @@ -34,6 +34,12 @@ class SimpleServerNetworkConfig implements ServerNetworkConfig { @Builder.Default private int retryDelayInMs = 1000; @Builder.Default + private int maxPacketSize = 5 * 1024 * 1024; + @Builder.Default + private int maxEmptyReadsBeforeClose = 3; + @Builder.Default + private boolean useDirectByteBuffer = false; + @Builder.Default private int threadGroupMinSize = 1; @Builder.Default private int threadGroupMaxSize = 1; diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java index 883eed7a..5b832b3c 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java @@ -153,11 +153,15 @@ protected int readPackets(ByteBuffer receivedBuffer) { */ protected int readPackets(ByteBuffer receivedBuffer, ByteBuffer pendingBuffer) { String remoteAddress = remoteAddress(); + NetworkConfig networkConfig = connection + .network() + .config(); log.debug(remoteAddress, receivedBuffer, "[%s] Start reading packets from received buffer:[%s]"::formatted); int waitedBytes = pendingBuffer.position(); + ByteBuffer bufferToRead = receivedBuffer; ByteBuffer tempBigBuffer = tempBigBuffer(); @@ -214,6 +218,11 @@ else if (waitedBytes > 0) { int positionBeforeRead = endPosition; int packetFullLength = readFullPacketLength(bufferToRead); + if (packetFullLength > networkConfig.maxPacketSize()) { + throw new IllegalStateException( + "Received to big packet:[" + packetFullLength + ">" + networkConfig.maxPacketSize() + "]"); + } + int alreadyReadBytes = bufferToRead.position() - endPosition; int packetDataLength = calculatePacketDataLength(packetFullLength, alreadyReadBytes, bufferToRead); From ea0611d8b58e3f80bdca17091c66d353faaac7f1 Mon Sep 17 00:00:00 2001 From: javasabr Date: Mon, 20 Oct 2025 18:37:23 +0200 Subject: [PATCH 4/4] add test --- .../src/main/groovy/configure-java.gradle | 3 + gradle/libs.versions.toml | 4 + .../exception/MalformedProtocolException.java | 7 ++ .../impl/AbstractNetworkPacketReader.java | 51 +++++------ .../impl/DefaultDataConnectionTest.java | 88 +++++++++++++++++++ 5 files changed, 126 insertions(+), 27 deletions(-) create mode 100644 rlib-network/src/main/java/javasabr/rlib/network/exception/MalformedProtocolException.java create mode 100644 rlib-network/src/test/java/javasabr/rlib/network/impl/DefaultDataConnectionTest.java diff --git a/buildSrc/src/main/groovy/configure-java.gradle b/buildSrc/src/main/groovy/configure-java.gradle index f59e6e5e..eb80fb22 100644 --- a/buildSrc/src/main/groovy/configure-java.gradle +++ b/buildSrc/src/main/groovy/configure-java.gradle @@ -35,6 +35,7 @@ javadoc { test { useJUnitPlatform() failOnNoDiscoveredTests = false + jvmArgs("-XX:+EnableDynamicAgentLoading") } tasks.register("loadTest", Test) { @@ -52,6 +53,8 @@ dependencies { compileOnly libs.lombok annotationProcessor libs.lombok + testImplementation libs.mockito.core + testImplementation libs.mockito.junit.jupiter testImplementation libs.junit.api testImplementation libs.junit.jupiter.params diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1b63cd29..befd7903 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -17,6 +17,8 @@ lombok = "1.18.38" jspecify = "1.0.0" # https://mvnrepository.com/artifact/org.junit.platform/junit-platform-launcher junit-platform-launcher = "1.13.4" +# https://mvnrepository.com/artifact/org.mockito/mockito-core +mockito = "5.20.0" [libraries] project-reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "project-reactor" } @@ -32,6 +34,8 @@ slf4j-ext = { module = "org.slf4j:slf4j-ext", version.ref = "slf4j" } jakarta-mail-api = { module = "jakarta.mail:jakarta.mail-api", version.ref = "jakarta-mail" } angus-mail = { module = "org.eclipse.angus:angus-mail", version.ref = "angus-mail" } testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" } +mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" } +mockito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version.ref = "mockito" } [bundles] mail = ["jakarta-mail-api", "angus-mail"] \ No newline at end of file diff --git a/rlib-network/src/main/java/javasabr/rlib/network/exception/MalformedProtocolException.java b/rlib-network/src/main/java/javasabr/rlib/network/exception/MalformedProtocolException.java new file mode 100644 index 00000000..fdd1f74b --- /dev/null +++ b/rlib-network/src/main/java/javasabr/rlib/network/exception/MalformedProtocolException.java @@ -0,0 +1,7 @@ +package javasabr.rlib.network.exception; + +public class MalformedProtocolException extends RuntimeException { + public MalformedProtocolException(String message) { + super(message); + } +} diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java index 5b832b3c..724c733e 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java @@ -17,6 +17,7 @@ import javasabr.rlib.network.Network; import javasabr.rlib.network.NetworkConfig; import javasabr.rlib.network.UnsafeConnection; +import javasabr.rlib.network.exception.MalformedProtocolException; import javasabr.rlib.network.packet.NetworkPacketReader; import javasabr.rlib.network.packet.ReadableNetworkPacket; import lombok.AccessLevel; @@ -121,8 +122,6 @@ protected void startReadImpl() { if (reading.compareAndSet(true, false)) { retryReadLater(); } - } catch (Error error) { - throw error; } } @@ -219,7 +218,7 @@ else if (waitedBytes > 0) { int positionBeforeRead = endPosition; int packetFullLength = readFullPacketLength(bufferToRead); if (packetFullLength > networkConfig.maxPacketSize()) { - throw new IllegalStateException( + throw new MalformedProtocolException( "Received to big packet:[" + packetFullLength + ">" + networkConfig.maxPacketSize() + "]"); } @@ -412,8 +411,9 @@ protected void handleReceivedData(int receivedBytes, ByteBuffer readingBuffer) { emptyReadsCounter.set(0); try { readPackets(readingBuffer); - } catch (Exception e) { - log.error(e); + } catch (MalformedProtocolException e) { + handleFailedReceiving(e, readingBuffer); + return; } startReadImpl(); } @@ -451,19 +451,20 @@ protected void handleEmptyReadFromChannel() { * @param readingBuffer the currently reading buffer. */ protected void handleFailedReceiving(Throwable exception, ByteBuffer readingBuffer) { - if (exception instanceof InterruptedByTimeoutException) { - if (reading.compareAndSet(true, false)) { - retryReadLater(); + switch (exception) { + case InterruptedByTimeoutException ex -> { + if (reading.compareAndSet(true, false)) { + retryReadLater(); + } + } + case AsynchronousCloseException ex -> + log.info(remoteAddress(), "[%s] Connection was closed"::formatted); + case ClosedChannelException ex -> + log.info(remoteAddress(), "[%s] Connection was closed"::formatted); + default -> { + log.error(exception); + connection.close(); } - return; - } - if (exception instanceof AsynchronousCloseException) { - log.info(remoteAddress(), "[%s] Connection was closed"::formatted); - } else if (exception instanceof ClosedChannelException) { - log.info(remoteAddress(), "[%s] Connection was closed"::formatted); - } else { - log.error(exception); - connection.close(); } } @@ -475,16 +476,12 @@ protected int maxPacketsByRead() { } protected int readHeader(ByteBuffer buffer, int headerSize) { - switch (headerSize) { - case 1: - return buffer.get() & 0xFF; - case 2: - return buffer.getShort() & 0xFFFF; - case 4: - return buffer.getInt(); - default: - throw new IllegalStateException("Wrong packet's header size: " + headerSize); - } + return switch (headerSize) { + case 1 -> buffer.get() & 0xFF; + case 2 -> buffer.getShort() & 0xFFFF; + case 4 -> buffer.getInt(); + default -> throw new MalformedProtocolException("Wrong packet's header size:" + headerSize); + }; } /** diff --git a/rlib-network/src/test/java/javasabr/rlib/network/impl/DefaultDataConnectionTest.java b/rlib-network/src/test/java/javasabr/rlib/network/impl/DefaultDataConnectionTest.java new file mode 100644 index 00000000..209840b0 --- /dev/null +++ b/rlib-network/src/test/java/javasabr/rlib/network/impl/DefaultDataConnectionTest.java @@ -0,0 +1,88 @@ +package javasabr.rlib.network.impl; + +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.nio.charset.StandardCharsets; +import javasabr.rlib.network.BufferAllocator; +import javasabr.rlib.network.Network; +import javasabr.rlib.network.NetworkConfig; +import javasabr.rlib.network.ServerNetworkConfig.SimpleServerNetworkConfig; +import javasabr.rlib.network.packet.ReadableNetworkPacket; +import javasabr.rlib.network.packet.impl.DefaultNetworkPacketReader; +import org.jspecify.annotations.NonNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DefaultDataConnectionTest { + + private static class TestDataConnection extends DefaultDataConnection { + + public TestDataConnection( + Network network, + AsynchronousSocketChannel channel, + BufferAllocator bufferAllocator, + int maxPacketsByRead, + int packetLengthHeaderSize) { + super(network, channel, bufferAllocator, maxPacketsByRead, packetLengthHeaderSize); + } + + @Override + protected ReadableNetworkPacket createReadablePacket() { + throw new UnsupportedOperationException(); + } + } + + @Mock + Network<@NonNull TestDataConnection> network; + + @Mock + AsynchronousSocketChannel channel; + + @Test + @DisplayName("should not allow to read too big packet") + void shouldNotAllowToReadTooBigPacket() { + // given: + NetworkConfig networkConfig = SimpleServerNetworkConfig + .builder() + .build(); + + var bufferAllocator = new DefaultBufferAllocator(networkConfig); + var connection = new TestDataConnection(network, channel, bufferAllocator, 100, 4); + var packetReader = (DefaultNetworkPacketReader) connection.packetReader; + + int nextPacketSize = 250_000_000; + + var incomingBuffer = ByteBuffer.allocate(100); + incomingBuffer.putInt(nextPacketSize); + incomingBuffer.put("Test data".getBytes(StandardCharsets.UTF_8)); + + Mockito + .when(network.config()) + .thenReturn(networkConfig); + + Mockito + .doAnswer(invocationOnMock -> { + CompletionHandler readChannelHandler = invocationOnMock.getArgument(2); + readChannelHandler.completed(incomingBuffer.remaining(), incomingBuffer); + return null; + }) + .when(channel) + .read(Mockito.any(), Mockito.any(), Mockito.any()); + + // then: + Assertions.assertFalse(connection.closed()); + + // when: + packetReader.startRead(); + + // then: + Assertions.assertTrue(connection.closed()); + } +}