Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repositories {
}

ext {
rlibVersion = "10.0.alpha3"
rlibVersion = "10.0.alpha4"
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
rootProject.version = "10.0.alpha3"
rootProject.version = "10.0.alpha4"
group = 'javasabr.rlib'

allprojects {
Expand Down
10 changes: 9 additions & 1 deletion buildSrc/settings.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS")

rootProject.name = 'rlib-build-configuration'
rootProject.name = 'build-configuration'

dependencyResolutionManagement {
versionCatalogs {
libs {
from(files("../gradle/libs.versions.toml"))
}
}
}
3 changes: 3 additions & 0 deletions buildSrc/src/main/groovy/configure-java.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ javadoc {
test {
useJUnitPlatform()
failOnNoDiscoveredTests = false
jvmArgs("-XX:+EnableDynamicAgentLoading")
}

tasks.register("loadTest", Test) {
Expand All @@ -52,6 +53,8 @@ dependencies {
compileOnly libs.lombok
annotationProcessor libs.lombok

testImplementation libs.mockito.core
testImplementation libs.mockito.junit.jupiter
testImplementation libs.junit.api
testImplementation libs.junit.jupiter.params

Expand Down
4 changes: 4 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ lombok = "1.18.38"
jspecify = "1.0.0"
# https://mvnrepository.com/artifact/org.junit.platform/junit-platform-launcher
junit-platform-launcher = "1.13.4"
# https://mvnrepository.com/artifact/org.mockito/mockito-core
mockito = "5.20.0"

[libraries]
project-reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "project-reactor" }
Expand All @@ -32,6 +34,8 @@ slf4j-ext = { module = "org.slf4j:slf4j-ext", version.ref = "slf4j" }
jakarta-mail-api = { module = "jakarta.mail:jakarta.mail-api", version.ref = "jakarta-mail" }
angus-mail = { module = "org.eclipse.angus:angus-mail", version.ref = "angus-mail" }
testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" }
mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" }
mockito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version.ref = "mockito" }

[bundles]
mail = ["jakarta-mail-api", "angus-mail"]
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void testServerWithMultiplyClients() {
var serverAllocator = new DefaultBufferAllocator(serverConfig);
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

int clientCount = 200;
int clientCount = 500;
int messagesPerIteration = 3_000;
int expectedMessages = clientCount * messagesPerIteration * MAX_ITERATIONS;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class SimpleNetworkConfig implements NetworkConfig {
private int writeBufferSize = 2048;
@Builder.Default
private int retryDelayInMs = 1000;
@Builder.Default
private int maxPacketSize = 5 * 1024 * 1024;
@Builder.Default
private int maxEmptyReadsBeforeClose = 3;
@Builder.Default
private boolean useDirectByteBuffer = false;
}

NetworkConfig DEFAULT_CLIENT = new NetworkConfig() {
Expand Down Expand Up @@ -76,6 +82,13 @@ default int writeBufferSize() {
return 2048;
}

/**
* Gets the max size of one single network packet.
*/
default int maxPacketSize() {
return 5 * 1024 * 1024;
}

/**
* Gets a timeout for retry read/write operation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ class SimpleServerNetworkConfig implements ServerNetworkConfig {
@Builder.Default
private int retryDelayInMs = 1000;
@Builder.Default
private int maxPacketSize = 5 * 1024 * 1024;
@Builder.Default
private int maxEmptyReadsBeforeClose = 3;
@Builder.Default
private boolean useDirectByteBuffer = false;
@Builder.Default
private int threadGroupMinSize = 1;
@Builder.Default
private int threadGroupMaxSize = 1;
Expand Down Expand Up @@ -77,6 +83,7 @@ default int scheduledThreadGroupSize() {
return 1;
}


/**
* Get a thread constructor which should be used to create network threads.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package javasabr.rlib.network.exception;

public class MalformedProtocolException extends RuntimeException {
public MalformedProtocolException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javasabr.rlib.network.Network;
import javasabr.rlib.network.NetworkConfig;
import javasabr.rlib.network.UnsafeConnection;
import javasabr.rlib.network.exception.MalformedProtocolException;
import javasabr.rlib.network.packet.NetworkPacketReader;
import javasabr.rlib.network.packet.ReadableNetworkPacket;
import lombok.AccessLevel;
Expand Down Expand Up @@ -151,11 +152,15 @@ protected int readPackets(ByteBuffer receivedBuffer) {
*/
protected int readPackets(ByteBuffer receivedBuffer, ByteBuffer pendingBuffer) {
String remoteAddress = remoteAddress();
NetworkConfig networkConfig = connection
.network()
.config();

log.debug(remoteAddress, receivedBuffer,
"[%s] Start reading packets from received buffer:[%s]"::formatted);

int waitedBytes = pendingBuffer.position();

ByteBuffer bufferToRead = receivedBuffer;
ByteBuffer tempBigBuffer = tempBigBuffer();

Expand Down Expand Up @@ -212,6 +217,11 @@ else if (waitedBytes > 0) {

int positionBeforeRead = endPosition;
int packetFullLength = readFullPacketLength(bufferToRead);
if (packetFullLength > networkConfig.maxPacketSize()) {
throw new MalformedProtocolException(
"Received to big packet:[" + packetFullLength + ">" + networkConfig.maxPacketSize() + "]");
}

int alreadyReadBytes = bufferToRead.position() - endPosition;
int packetDataLength = calculatePacketDataLength(packetFullLength, alreadyReadBytes, bufferToRead);

Expand Down Expand Up @@ -401,8 +411,9 @@ protected void handleReceivedData(int receivedBytes, ByteBuffer readingBuffer) {
emptyReadsCounter.set(0);
try {
readPackets(readingBuffer);
} catch (Exception e) {
log.error(e);
} catch (MalformedProtocolException e) {
handleFailedReceiving(e, readingBuffer);
return;
}
startReadImpl();
}
Expand Down Expand Up @@ -440,19 +451,20 @@ protected void handleEmptyReadFromChannel() {
* @param readingBuffer the currently reading buffer.
*/
protected void handleFailedReceiving(Throwable exception, ByteBuffer readingBuffer) {
if (exception instanceof InterruptedByTimeoutException) {
if (reading.compareAndSet(true, false)) {
retryReadLater();
switch (exception) {
case InterruptedByTimeoutException ex -> {
if (reading.compareAndSet(true, false)) {
retryReadLater();
}
}
case AsynchronousCloseException ex ->
log.info(remoteAddress(), "[%s] Connection was closed"::formatted);
case ClosedChannelException ex ->
log.info(remoteAddress(), "[%s] Connection was closed"::formatted);
default -> {
log.error(exception);
connection.close();
}
return;
}
if (exception instanceof AsynchronousCloseException) {
log.info(remoteAddress(), "[%s] Connection was closed"::formatted);
} else if (exception instanceof ClosedChannelException) {
log.info(remoteAddress(), "[%s] Connection was closed"::formatted);
} else {
log.error(exception);
connection.close();
}
}

Expand All @@ -464,16 +476,12 @@ protected int maxPacketsByRead() {
}

protected int readHeader(ByteBuffer buffer, int headerSize) {
switch (headerSize) {
case 1:
return buffer.get() & 0xFF;
case 2:
return buffer.getShort() & 0xFFFF;
case 4:
return buffer.getInt();
default:
throw new IllegalStateException("Wrong packet's header size: " + headerSize);
}
return switch (headerSize) {
case 1 -> buffer.get() & 0xFF;
case 2 -> buffer.getShort() & 0xFFFF;
case 4 -> buffer.getInt();
default -> throw new MalformedProtocolException("Wrong packet's header size:" + headerSize);
};
}

/**
Expand Down
Loading
Loading