diff --git a/rlib-network/src/main/java/javasabr/rlib/network/UnsafeConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/UnsafeConnection.java index 2f8108ff..46f37847 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/UnsafeConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/UnsafeConnection.java @@ -1,8 +1,14 @@ package javasabr.rlib.network; +import java.nio.channels.AsynchronousSocketChannel; + public interface UnsafeConnection> extends Connection { Network network(); + BufferAllocator bufferAllocator(); + + AsynchronousSocketChannel channel(); + void onConnected(); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java index b5615cf5..21b5c67e 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java @@ -53,13 +53,13 @@ public WritablePacketWithFeedback(CompletableFuture attachment, Writabl final String remoteAddress; @Getter final Network network; - + @Getter final BufferAllocator bufferAllocator; + @Getter final AsynchronousSocketChannel channel; + final Deque> pendingPackets; final StampedLock lock; - - final AtomicBoolean isWriting; final AtomicBoolean closed; final MutableArray>> subscribers; @@ -80,7 +80,6 @@ public AbstractConnection( this.channel = channel; this.pendingPackets = DequeFactory.arrayBasedBased(WritableNetworkPacket.class); this.network = network; - this.isWriting = new AtomicBoolean(false); this.closed = new AtomicBoolean(false); this.subscribers = ArrayFactory.copyOnModifyArray(BiConsumer.class); this.remoteAddress = String.valueOf(NetworkUtils.getRemoteAddress(channel)); diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataConnection.java index e15bd66d..d59701d9 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataConnection.java @@ -19,7 +19,8 @@ @Getter(AccessLevel.PROTECTED) @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class DefaultDataConnection> extends AbstractConnection { +public abstract class DefaultDataConnection> + extends AbstractConnection { final NetworkPacketReader packetReader; final NetworkPacketWriter packetWriter; @@ -39,10 +40,9 @@ public DefaultDataConnection( } protected NetworkPacketReader createPacketReader() { + //noinspection unchecked return new DefaultNetworkPacketReader<>( (C) this, - channel, - bufferAllocator, this::updateLastActivity, this::handleReceivedPacket, value -> createReadablePacket(), @@ -51,10 +51,9 @@ protected NetworkPacketReader createPacketReader() { } protected NetworkPacketWriter createPacketWriter() { + //noinspection unchecked return new DefaultNetworkPacketWriter<>( (C) this, - channel, - bufferAllocator, this::updateLastActivity, this::nextPacketToWrite, this::serializedPacket, diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataSslConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataSslConnection.java index 30ac62be..facfe8ba 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataSslConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataSslConnection.java @@ -44,8 +44,6 @@ public DefaultDataSslConnection( protected NetworkPacketReader createPacketReader() { return new DefaultSslNetworkPacketReader<>( (C) this, - channel, - bufferAllocator, this::updateLastActivity, this::handleReceivedPacket, value -> createReadablePacket(), @@ -58,8 +56,6 @@ protected NetworkPacketReader createPacketReader() { protected NetworkPacketWriter createPacketWriter() { return new DefaultSslNetworkPacketWriter<>( (C) this, - channel, - bufferAllocator, this::updateLastActivity, this::nextPacketToWrite, this::serializedPacket, diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/IdBasedPacketConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/IdBasedPacketConnection.java index 31ec2de4..7bf9be7e 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/IdBasedPacketConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/IdBasedPacketConnection.java @@ -46,10 +46,9 @@ public IdBasedPacketConnection( } protected NetworkPacketReader createPacketReader() { + //noinspection unchecked return new IdBasedNetworkPacketReader<>( (C) this, - channel, - bufferAllocator, this::updateLastActivity, this::handleReceivedPacket, packetLengthHeaderSize, @@ -59,12 +58,11 @@ protected NetworkPacketReader createPacketReader() { } protected NetworkPacketWriter createPacketWriter() { + //noinspection unchecked return new IdBasedNetworkPacketWriter<>( (C) this, - channel, - bufferAllocator, this::updateLastActivity, - () -> nextPacketToWrite(), + this::nextPacketToWrite, this::serializedPacket, this::handleSentPacket, packetLengthHeaderSize, diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/PackedIdBasedConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/PackedIdBasedConnection.java deleted file mode 100644 index d172d847..00000000 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/PackedIdBasedConnection.java +++ /dev/null @@ -1,25 +0,0 @@ -package javasabr.rlib.network.impl; - -import javasabr.rlib.network.Connection; - -/** - * The interface to implement a packet id based async connection. - * - * @author JavaSaBr - */ -public interface PackedIdBasedConnection> extends Connection { - - /** - * Get length of packet's header with packet's data length. - * - * @return the length of packet's header with packet's data length. - */ - int getPacketLengthHeaderSize(); - - /** - * Get length of packet's header with packet's id. - * - * @return the length of packet's header with packet's id. - */ - int getPacketIdHeaderSize(); -} diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java index 724c733e..cba2669d 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java @@ -4,7 +4,6 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; import java.nio.channels.InterruptedByTimeoutException; @@ -13,7 +12,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import javasabr.rlib.common.util.BufferUtils; -import javasabr.rlib.network.BufferAllocator; import javasabr.rlib.network.Network; import javasabr.rlib.network.NetworkConfig; import javasabr.rlib.network.UnsafeConnection; @@ -59,8 +57,6 @@ public void failed(Throwable exc, ByteBuffer readingBuffer) { final AtomicInteger emptyReadsCounter = new AtomicInteger(0); final C connection; - final AsynchronousSocketChannel socketChannel; - final BufferAllocator bufferAllocator; final ByteBuffer readBuffer; final ByteBuffer pendingBuffer; @@ -77,16 +73,12 @@ public void failed(Throwable exc, ByteBuffer readingBuffer) { protected AbstractNetworkPacketReader( C connection, - AsynchronousSocketChannel socketChannel, - BufferAllocator bufferAllocator, Runnable updateActivityFunction, Consumer packetHandler, int maxPacketsByRead) { this.connection = connection; - this.socketChannel = socketChannel; - this.bufferAllocator = bufferAllocator; - this.readBuffer = bufferAllocator.takeReadBuffer(); - this.pendingBuffer = bufferAllocator.takePendingBuffer(); + this.readBuffer = connection.bufferAllocator().takeReadBuffer(); + this.pendingBuffer = connection.bufferAllocator().takePendingBuffer(); this.updateActivityFunction = updateActivityFunction; this.packetHandler = packetHandler; this.maxPacketsByRead = maxPacketsByRead; @@ -116,7 +108,7 @@ protected void startReadImpl() { log.debug(remoteAddress(), "[%s] Start waiting for new data from channel..."::formatted); ByteBuffer buffer = bufferToReadFromChannel(); try { - socketChannel.read(buffer, buffer, readChannelHandler); + connection.channel().read(buffer, buffer, readChannelHandler); } catch (RuntimeException ex) { log.error(ex); if (reading.compareAndSet(true, false)) { @@ -367,9 +359,13 @@ protected void reAllocTempBigBuffers(ByteBuffer sourceBuffer, int fullPacketLeng log.debug(remoteAddress(), sourceBuffer.capacity(), fullPacketLength, "[%s] Resize temp big buffer from:[%s] to:[%s]"::formatted); - var newTempBuffer = bufferAllocator.takeBuffer(fullPacketLength + readBuffer.capacity()); + ByteBuffer newTempBuffer = connection + .bufferAllocator() + .takeBuffer(fullPacketLength + readBuffer.capacity()); + log.debug(remoteAddress(), sourceBuffer, newTempBuffer, "[%s] Moved data from old temp big buffer:[%s] to new:[%s]"::formatted); + newTempBuffer.put(sourceBuffer); freeTempBigBuffers(); @@ -378,15 +374,19 @@ protected void reAllocTempBigBuffers(ByteBuffer sourceBuffer, int fullPacketLeng protected void allocTempBigBuffers(ByteBuffer sourceBuffer, int fullPacketLength) { int notConsumeBytes = sourceBuffer.remaining(); + log.debug( notConsumeBytes, fullPacketLength, "Request temp big buffer to store part:[%s] of big packet with length:[%s]"::formatted); - var tempBigBuffer = bufferAllocator.takeBuffer(fullPacketLength + readBuffer.capacity()); + ByteBuffer tempBigBuffer = connection + .bufferAllocator() + .takeBuffer(fullPacketLength + readBuffer.capacity()); + log.debug(sourceBuffer, tempBigBuffer, "Put data from old temp big buffer:[%s] to new:[%s]"::formatted); - tempBigBuffer.put(sourceBuffer); + tempBigBuffer.put(sourceBuffer); this.tempBigBuffer = tempBigBuffer; } @@ -394,7 +394,9 @@ protected void freeTempBigBuffers() { ByteBuffer tempBuffer = tempBigBuffer(); if (tempBuffer != null) { tempBigBuffer(null); - bufferAllocator.putBuffer(tempBuffer); + connection + .bufferAllocator() + .putBuffer(tempBuffer); } } @@ -425,7 +427,7 @@ protected void handleEmptyReadFromChannel() { if (connection.closed()) { reading.compareAndSet(true, false); return; - } else if (!socketChannel.isOpen()) { + } else if (!connection.channel().isOpen()) { connection.close(); return; } @@ -502,7 +504,8 @@ protected abstract R createPacketFor( @Override public void close() { - bufferAllocator + connection + .bufferAllocator() .putReadBuffer(readBuffer) .putPendingBuffer(pendingBuffer); freeTempBigBuffers(); diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java index 82b7fb0f..21f1264d 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java @@ -6,14 +6,15 @@ import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Supplier; import javasabr.rlib.functions.ObjBoolConsumer; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; +import javasabr.rlib.network.NetworkConfig; +import javasabr.rlib.network.UnsafeConnection; +import javasabr.rlib.network.exception.MalformedProtocolException; import javasabr.rlib.network.packet.NetworkPacketWriter; import javasabr.rlib.network.packet.WritableNetworkPacket; import lombok.AccessLevel; @@ -33,7 +34,7 @@ @FieldDefaults(level = AccessLevel.PROTECTED) public abstract class AbstractNetworkPacketWriter< W extends WritableNetworkPacket, - C extends Connection> implements NetworkPacketWriter { + C extends UnsafeConnection> implements NetworkPacketWriter { final CompletionHandler> writeHandler = new CompletionHandler<>() { @@ -51,8 +52,6 @@ public void failed(Throwable exc, @Nullable WritableNetworkPacket packet) { final AtomicBoolean writing = new AtomicBoolean(); final C connection; - final AsynchronousSocketChannel socketChannel; - final BufferAllocator bufferAllocator; final ByteBuffer writeBuffer; @Nullable @@ -67,16 +66,12 @@ public void failed(Throwable exc, @Nullable WritableNetworkPacket packet) { public AbstractNetworkPacketWriter( C connection, - AsynchronousSocketChannel socketChannel, - BufferAllocator bufferAllocator, Runnable updateActivityFunction, Supplier<@Nullable WritableNetworkPacket> packetProvider, Consumer> serializedToChannelPacketHandler, ObjBoolConsumer> sentPacketHandler) { this.connection = connection; - this.socketChannel = socketChannel; - this.bufferAllocator = bufferAllocator; - this.writeBuffer = bufferAllocator.takeWriteBuffer(); + this.writeBuffer = connection.bufferAllocator().takeWriteBuffer(); this.updateActivityFunction = updateActivityFunction; this.writablePacketProvider = packetProvider; this.serializedToChannelPacketHandler = serializedToChannelPacketHandler; @@ -129,7 +124,7 @@ protected boolean writeBuffer( } writingBuffer = resultBuffer; log.debug(remoteAddress(), resultBuffer, (address, buff) -> "[%s] Write to channel data:\n" + hexDump(buff)); - socketChannel.write(resultBuffer, nextPacket, writeHandler); + connection.channel().write(resultBuffer, nextPacket, writeHandler); return true; } @@ -148,10 +143,17 @@ protected ByteBuffer serialize(WritableNetworkPacket packet) { W resultPacket = (W) packet; int expectedLength = packet.expectedLength(connection); - int totalSize = expectedLength == -1 ? -1 : totalSize(packet, expectedLength); + int totalSize = expectedLength == WritableNetworkPacket.UNKNOWN_EXPECTED_BYTES + ? WritableNetworkPacket.UNKNOWN_EXPECTED_BYTES + : totalSize(packet, expectedLength); + + if (totalSize > 0) { + validateMaxPacketSize(packet, totalSize); + } // if the packet is too big to use a write buffer if (expectedLength != -1 && totalSize > writeBuffer.capacity()) { + BufferAllocator bufferAllocator = connection.bufferAllocator(); ByteBuffer tempBuffer = bufferAllocator.takeBuffer(totalSize); try { ByteBuffer serialized = serialize(resultPacket, expectedLength, totalSize, tempBuffer); @@ -173,6 +175,17 @@ protected ByteBuffer serialize(WritableNetworkPacket packet) { } } + protected void validateMaxPacketSize(WritableNetworkPacket packet, int totalSize) { + NetworkConfig networkConfig = connection + .network() + .config(); + + if (totalSize > networkConfig.maxPacketSize()) { + throw new MalformedProtocolException( + "Writable packet:[" + packet + "] is too big:[" + totalSize + ">" + networkConfig.maxPacketSize() + "]"); + } + } + /** * Gets total size of the packet if it's possible. * @@ -341,7 +354,7 @@ protected void handleSuccessfulWritingData(Integer wroteBytes, @Nullable Writabl log.debug(remoteAddress(), writingBuffer, "[%s] Buffer was not consumed fully, try to write else [%s] bytes to channel"::formatted); try { - socketChannel.write(writingBuffer, packet, writeHandler); + connection.channel().write(writingBuffer, packet, writeHandler); } catch (RuntimeException ex) { log.error(ex); if (writing.compareAndSet(true, false)) { @@ -385,7 +398,9 @@ protected void handleFailedWritingData(Throwable exception, @Nullable WritableNe @Override public void close() { - bufferAllocator.putWriteBuffer(writeBuffer); + connection + .bufferAllocator() + .putWriteBuffer(writeBuffer); clearTempBuffers(); writingBuffer = EMPTY_BUFFER; writing.compareAndSet(true, false); @@ -396,7 +411,9 @@ protected void clearTempBuffers() { var writeTempBuffer = this.writeTempBuffer; if (writeTempBuffer != null) { this.writeTempBuffer = null; - bufferAllocator.putBuffer(writeTempBuffer); + connection + .bufferAllocator() + .putBuffer(writeTempBuffer); } } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java index 89b3ec68..0c8b88b3 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java @@ -3,7 +3,6 @@ import static javasabr.rlib.network.util.NetworkUtils.hexDump; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; import java.util.function.Consumer; import javasabr.rlib.common.util.BufferUtils; import javasabr.rlib.network.BufferAllocator; @@ -50,14 +49,13 @@ public abstract class AbstractSslNetworkPacketReader< protected AbstractSslNetworkPacketReader( C connection, - AsynchronousSocketChannel channel, - BufferAllocator bufferAllocator, Runnable updateActivityFunction, Consumer readPacketHandler, SSLEngine sslEngine, Consumer> packetWriter, int maxPacketsByRead) { - super(connection, channel, bufferAllocator, updateActivityFunction, readPacketHandler, maxPacketsByRead); + super(connection, updateActivityFunction, readPacketHandler, maxPacketsByRead); + BufferAllocator bufferAllocator = connection.bufferAllocator(); this.sslEngine = sslEngine; this.sslDataBuffer = bufferAllocator.takeBuffer(sslEngine .getSession() @@ -65,7 +63,8 @@ protected AbstractSslNetworkPacketReader( this.sslDataPendingBuffer = bufferAllocator.takeBuffer(sslEngine .getSession() .getApplicationBufferSize() * 2); - this.sslNetworkBuffer = bufferAllocator.takeBuffer(sslEngine + this.sslNetworkBuffer = bufferAllocator + .takeBuffer(sslEngine .getSession() .getPacketBufferSize()) .limit(0); @@ -251,6 +250,7 @@ protected ByteBuffer moveDataToNetworkBuffer(ByteBuffer readingBuffer) { } protected synchronized ByteBuffer increaseNetworkBuffer(int extra) { + BufferAllocator bufferAllocator = connection.bufferAllocator(); ByteBuffer current = sslNetworkBuffer(); int newSize = (int) Math.max(current.capacity() * 1.3, current.capacity() + extra); sslNetworkBuffer = NetworkUtils @@ -260,21 +260,23 @@ protected synchronized ByteBuffer increaseNetworkBuffer(int extra) { } protected synchronized void increaseDataBuffer() { + BufferAllocator allocator = connection.bufferAllocator(); int newSize = sslEngine .getSession() .getApplicationBufferSize(); sslDataBuffer = NetworkUtils - .increaseBuffer(sslDataBuffer, bufferAllocator, newSize) + .increaseBuffer(sslDataBuffer, allocator, newSize) .flip(); sslDataPendingBuffer = NetworkUtils - .increaseBuffer(sslDataPendingBuffer, bufferAllocator, newSize * 2) + .increaseBuffer(sslDataPendingBuffer, allocator, newSize * 2) .flip(); } @Override public void close() { sslEngine.closeOutbound(); - bufferAllocator + connection + .bufferAllocator() .putBuffer(sslDataBuffer) .putBuffer(sslDataPendingBuffer) .putBuffer(sslNetworkBuffer); diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java index b759aea6..f15f9aee 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java @@ -4,12 +4,10 @@ import static javasabr.rlib.network.util.NetworkUtils.hexDump; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; import java.util.function.Consumer; import java.util.function.Supplier; import javasabr.rlib.functions.ObjBoolConsumer; -import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; +import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.WritableNetworkPacket; import javasabr.rlib.network.util.NetworkUtils; import javasabr.rlib.network.util.SslUtils; @@ -29,7 +27,7 @@ @FieldDefaults(level = AccessLevel.PROTECTED) public abstract class AbstractSslNetworkPacketWriter< W extends WritableNetworkPacket, - C extends Connection> extends AbstractNetworkPacketWriter { + C extends UnsafeConnection> extends AbstractNetworkPacketWriter { private static final ByteBuffer[] EMPTY_BUFFERS = { NetworkUtils.EMPTY_BUFFER @@ -47,8 +45,6 @@ public abstract class AbstractSslNetworkPacketWriter< public AbstractSslNetworkPacketWriter( C connection, - AsynchronousSocketChannel channel, - BufferAllocator bufferAllocator, Runnable updateActivityFunction, Supplier> packetProvider, Consumer> serializedToChannelPacketHandler, @@ -57,17 +53,17 @@ public AbstractSslNetworkPacketWriter( Consumer> queueAtFirst) { super( connection, - channel, - bufferAllocator, updateActivityFunction, packetProvider, serializedToChannelPacketHandler, sentPacketHandler); this.sslEngine = sslEngine; this.queueAtFirst = queueAtFirst; - this.sslNetworkBuffer = bufferAllocator.takeBuffer(sslEngine - .getSession() - .getPacketBufferSize()); + this.sslNetworkBuffer = connection + .bufferAllocator() + .takeBuffer(sslEngine + .getSession() + .getPacketBufferSize()); } @Override @@ -112,7 +108,9 @@ protected ByteBuffer serialize(WritableNetworkPacket packet) { "[%s] Has remaining [%s] bytes after encrypting, will create temp big buffer"::formatted); int tempBufferSize = (int) ((bufferToSend.limit() + serialized.remaining()) * 1.2); - ByteBuffer tempBuffer = bufferAllocator.takeBuffer(tempBufferSize); + ByteBuffer tempBuffer = connection + .bufferAllocator() + .takeBuffer(tempBufferSize); tempBuffer.put(bufferToSend.flip()); while (serialized.hasRemaining()) { @@ -215,7 +213,9 @@ protected void clearTempBuffers() { super.clearTempBuffers(); ByteBuffer sslTempBuffer = this.sslTempBuffer; if (sslTempBuffer != null) { - bufferAllocator.putBuffer(sslTempBuffer); + connection + .bufferAllocator() + .putBuffer(sslTempBuffer); this.sslTempBuffer = null; } } @@ -223,7 +223,9 @@ protected void clearTempBuffers() { @Override public void close() { sslEngine.closeOutbound(); - bufferAllocator.putBuffer(sslNetworkBuffer); + connection + .bufferAllocator() + .putBuffer(sslNetworkBuffer); super.close(); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketReader.java index 4fd8b975..f5f0b320 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketReader.java @@ -1,10 +1,8 @@ package javasabr.rlib.network.packet.impl; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; import java.util.function.Consumer; import java.util.function.IntFunction; -import javasabr.rlib.network.BufferAllocator; import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.ReadableNetworkPacket; import lombok.AccessLevel; @@ -24,14 +22,12 @@ public class DefaultNetworkPacketReader< public DefaultNetworkPacketReader( C connection, - AsynchronousSocketChannel channel, - BufferAllocator bufferAllocator, Runnable updateActivityFunction, Consumer packetHandler, IntFunction readablePacketFactory, int packetLengthHeaderSize, int maxPacketsByRead) { - super(connection, channel, bufferAllocator, updateActivityFunction, packetHandler, maxPacketsByRead); + super(connection, updateActivityFunction, packetHandler, maxPacketsByRead); this.readablePacketFactory = readablePacketFactory; this.packetLengthHeaderSize = packetLengthHeaderSize; } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java index 69098115..ef9e9323 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java @@ -1,12 +1,10 @@ package javasabr.rlib.network.packet.impl; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; import java.util.function.Consumer; import java.util.function.Supplier; import javasabr.rlib.functions.ObjBoolConsumer; -import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; +import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.WritableNetworkPacket; import lombok.AccessLevel; import lombok.experimental.FieldDefaults; @@ -18,14 +16,12 @@ @FieldDefaults(level = AccessLevel.PROTECTED) public class DefaultNetworkPacketWriter< W extends WritableNetworkPacket, - C extends Connection> extends AbstractNetworkPacketWriter { + C extends UnsafeConnection> extends AbstractNetworkPacketWriter { final int packetLengthHeaderSize; public DefaultNetworkPacketWriter( C connection, - AsynchronousSocketChannel channel, - BufferAllocator bufferAllocator, Runnable updateActivityFunction, Supplier<@Nullable WritableNetworkPacket> packetProvider, Consumer> serializedToChannelPacketHandler, @@ -33,8 +29,6 @@ public DefaultNetworkPacketWriter( int packetLengthHeaderSize) { super( connection, - channel, - bufferAllocator, updateActivityFunction, packetProvider, serializedToChannelPacketHandler, diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketReader.java index f7c15fd7..461aa78b 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketReader.java @@ -1,10 +1,8 @@ package javasabr.rlib.network.packet.impl; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; import java.util.function.Consumer; import java.util.function.IntFunction; -import javasabr.rlib.network.BufferAllocator; import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.ReadableNetworkPacket; import javasabr.rlib.network.packet.WritableNetworkPacket; @@ -26,8 +24,6 @@ public class DefaultSslNetworkPacketReader< public DefaultSslNetworkPacketReader( C connection, - AsynchronousSocketChannel channel, - BufferAllocator bufferAllocator, Runnable updateActivityFunction, Consumer packetHandler, IntFunction packetResolver, @@ -37,8 +33,6 @@ public DefaultSslNetworkPacketReader( int maxPacketsByRead) { super( connection, - channel, - bufferAllocator, updateActivityFunction, packetHandler, sslEngine, diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java index cec55033..abd961cb 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java @@ -1,12 +1,10 @@ package javasabr.rlib.network.packet.impl; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; import java.util.function.Consumer; import java.util.function.Supplier; import javasabr.rlib.functions.ObjBoolConsumer; -import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; +import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.WritableNetworkPacket; import javax.net.ssl.SSLEngine; import lombok.AccessLevel; @@ -19,14 +17,12 @@ @FieldDefaults(level = AccessLevel.PROTECTED) public class DefaultSslNetworkPacketWriter< W extends WritableNetworkPacket, - C extends Connection> extends AbstractSslNetworkPacketWriter { + C extends UnsafeConnection> extends AbstractSslNetworkPacketWriter { final int packetLengthHeaderSize; public DefaultSslNetworkPacketWriter( C connection, - AsynchronousSocketChannel channel, - BufferAllocator bufferAllocator, Runnable updateActivityFunction, Supplier<@Nullable WritableNetworkPacket> nextWritePacketSupplier, Consumer> serializedToChannelPacketHandler, @@ -36,8 +32,6 @@ public DefaultSslNetworkPacketWriter( int packetLengthHeaderSize) { super( connection, - channel, - bufferAllocator, updateActivityFunction, nextWritePacketSupplier, serializedToChannelPacketHandler, diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketReader.java index e1bee478..55c34ff9 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketReader.java @@ -1,9 +1,7 @@ package javasabr.rlib.network.packet.impl; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; import java.util.function.Consumer; -import javasabr.rlib.network.BufferAllocator; import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.IdBasedReadableNetworkPacket; import javasabr.rlib.network.packet.registry.ReadableNetworkPacketRegistry; @@ -25,15 +23,13 @@ public class IdBasedNetworkPacketReader< public IdBasedNetworkPacketReader( C connection, - AsynchronousSocketChannel channel, - BufferAllocator bufferAllocator, Runnable updateActivityFunction, Consumer packetHandler, int packetLengthHeaderSize, int maxPacketsByRead, int packetIdHeaderSize, ReadableNetworkPacketRegistry packetRegistry) { - super(connection, channel, bufferAllocator, updateActivityFunction, packetHandler, maxPacketsByRead); + super(connection, updateActivityFunction, packetHandler, maxPacketsByRead); this.packetLengthHeaderSize = packetLengthHeaderSize; this.packetIdHeaderSize = packetIdHeaderSize; this.packetRegistry = packetRegistry; diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java index 7d738b77..4b30eb00 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java @@ -1,12 +1,10 @@ package javasabr.rlib.network.packet.impl; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; import java.util.function.Consumer; import java.util.function.Supplier; import javasabr.rlib.functions.ObjBoolConsumer; -import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; +import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.IdBasedWritableNetworkPacket; import javasabr.rlib.network.packet.WritableNetworkPacket; import lombok.AccessLevel; @@ -19,14 +17,12 @@ @FieldDefaults(level = AccessLevel.PROTECTED) public class IdBasedNetworkPacketWriter< W extends IdBasedWritableNetworkPacket, - C extends Connection> extends DefaultNetworkPacketWriter { + C extends UnsafeConnection> extends DefaultNetworkPacketWriter { final int packetIdHeaderSize; public IdBasedNetworkPacketWriter( C connection, - AsynchronousSocketChannel channel, - BufferAllocator bufferAllocator, Runnable updateActivityFunction, Supplier<@Nullable WritableNetworkPacket> packetProvider, Consumer> serializedToChannelPacketHandler, @@ -35,8 +31,6 @@ public IdBasedNetworkPacketWriter( int packetIdHeaderSize) { super( connection, - channel, - bufferAllocator, updateActivityFunction, packetProvider, serializedToChannelPacketHandler, diff --git a/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java b/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java index 311cf7a1..c523443d 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java @@ -14,7 +14,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction;