Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package javasabr.rlib.network;

import java.nio.channels.AsynchronousSocketChannel;

public interface UnsafeConnection<C extends UnsafeConnection<C>> extends Connection<C> {

Network<?> network();

BufferAllocator bufferAllocator();

AsynchronousSocketChannel channel();

void onConnected();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ public WritablePacketWithFeedback(CompletableFuture<Boolean> attachment, Writabl
final String remoteAddress;
@Getter
final Network<C> network;

@Getter
final BufferAllocator bufferAllocator;
@Getter
final AsynchronousSocketChannel channel;

final Deque<WritableNetworkPacket<C>> pendingPackets;
final StampedLock lock;

final AtomicBoolean isWriting;
final AtomicBoolean closed;

final MutableArray<BiConsumer<C, ? super ReadableNetworkPacket<C>>> subscribers;
Expand All @@ -80,7 +80,6 @@ public AbstractConnection(
this.channel = channel;
this.pendingPackets = DequeFactory.arrayBasedBased(WritableNetworkPacket.class);
this.network = network;
this.isWriting = new AtomicBoolean(false);
this.closed = new AtomicBoolean(false);
this.subscribers = ArrayFactory.copyOnModifyArray(BiConsumer.class);
this.remoteAddress = String.valueOf(NetworkUtils.getRemoteAddress(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
@Getter(AccessLevel.PROTECTED)
@Accessors(fluent = true, chain = false)
@FieldDefaults(level = AccessLevel.PROTECTED)
public abstract class DefaultDataConnection<C extends DefaultDataConnection<C>> extends AbstractConnection<C> {
public abstract class DefaultDataConnection<C extends DefaultDataConnection<C>>
extends AbstractConnection<C> {

final NetworkPacketReader packetReader;
final NetworkPacketWriter packetWriter;
Expand All @@ -39,10 +40,9 @@ public DefaultDataConnection(
}

protected NetworkPacketReader createPacketReader() {
//noinspection unchecked
return new DefaultNetworkPacketReader<>(
(C) this,
channel,
bufferAllocator,
this::updateLastActivity,
this::handleReceivedPacket,
value -> createReadablePacket(),
Expand All @@ -51,10 +51,9 @@ protected NetworkPacketReader createPacketReader() {
}

protected NetworkPacketWriter createPacketWriter() {
//noinspection unchecked
return new DefaultNetworkPacketWriter<>(
(C) this,
channel,
bufferAllocator,
this::updateLastActivity,
this::nextPacketToWrite,
this::serializedPacket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public DefaultDataSslConnection(
protected NetworkPacketReader createPacketReader() {
return new DefaultSslNetworkPacketReader<>(
(C) this,
channel,
bufferAllocator,
this::updateLastActivity,
this::handleReceivedPacket,
value -> createReadablePacket(),
Expand All @@ -58,8 +56,6 @@ protected NetworkPacketReader createPacketReader() {
protected NetworkPacketWriter createPacketWriter() {
return new DefaultSslNetworkPacketWriter<>(
(C) this,
channel,
bufferAllocator,
this::updateLastActivity,
this::nextPacketToWrite,
this::serializedPacket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ public IdBasedPacketConnection(
}

protected NetworkPacketReader createPacketReader() {
//noinspection unchecked
return new IdBasedNetworkPacketReader<>(
(C) this,
channel,
bufferAllocator,
this::updateLastActivity,
this::handleReceivedPacket,
packetLengthHeaderSize,
Expand All @@ -59,12 +58,11 @@ protected NetworkPacketReader createPacketReader() {
}

protected NetworkPacketWriter createPacketWriter() {
//noinspection unchecked
return new IdBasedNetworkPacketWriter<>(
(C) this,
channel,
bufferAllocator,
this::updateLastActivity,
() -> nextPacketToWrite(),
this::nextPacketToWrite,
this::serializedPacket,
this::handleSentPacket,
packetLengthHeaderSize,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.InterruptedByTimeoutException;
Expand All @@ -13,7 +12,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javasabr.rlib.common.util.BufferUtils;
import javasabr.rlib.network.BufferAllocator;
import javasabr.rlib.network.Network;
import javasabr.rlib.network.NetworkConfig;
import javasabr.rlib.network.UnsafeConnection;
Expand Down Expand Up @@ -59,8 +57,6 @@ public void failed(Throwable exc, ByteBuffer readingBuffer) {
final AtomicInteger emptyReadsCounter = new AtomicInteger(0);

final C connection;
final AsynchronousSocketChannel socketChannel;
final BufferAllocator bufferAllocator;

final ByteBuffer readBuffer;
final ByteBuffer pendingBuffer;
Expand All @@ -77,16 +73,12 @@ public void failed(Throwable exc, ByteBuffer readingBuffer) {

protected AbstractNetworkPacketReader(
C connection,
AsynchronousSocketChannel socketChannel,
BufferAllocator bufferAllocator,
Runnable updateActivityFunction,
Consumer<? super R> packetHandler,
int maxPacketsByRead) {
this.connection = connection;
this.socketChannel = socketChannel;
this.bufferAllocator = bufferAllocator;
this.readBuffer = bufferAllocator.takeReadBuffer();
this.pendingBuffer = bufferAllocator.takePendingBuffer();
this.readBuffer = connection.bufferAllocator().takeReadBuffer();
this.pendingBuffer = connection.bufferAllocator().takePendingBuffer();
this.updateActivityFunction = updateActivityFunction;
this.packetHandler = packetHandler;
this.maxPacketsByRead = maxPacketsByRead;
Expand Down Expand Up @@ -116,7 +108,7 @@ protected void startReadImpl() {
log.debug(remoteAddress(), "[%s] Start waiting for new data from channel..."::formatted);
ByteBuffer buffer = bufferToReadFromChannel();
try {
socketChannel.read(buffer, buffer, readChannelHandler);
connection.channel().read(buffer, buffer, readChannelHandler);
} catch (RuntimeException ex) {
log.error(ex);
if (reading.compareAndSet(true, false)) {
Expand Down Expand Up @@ -367,9 +359,13 @@ protected void reAllocTempBigBuffers(ByteBuffer sourceBuffer, int fullPacketLeng
log.debug(remoteAddress(), sourceBuffer.capacity(), fullPacketLength,
"[%s] Resize temp big buffer from:[%s] to:[%s]"::formatted);

var newTempBuffer = bufferAllocator.takeBuffer(fullPacketLength + readBuffer.capacity());
ByteBuffer newTempBuffer = connection
.bufferAllocator()
.takeBuffer(fullPacketLength + readBuffer.capacity());

log.debug(remoteAddress(), sourceBuffer, newTempBuffer,
"[%s] Moved data from old temp big buffer:[%s] to new:[%s]"::formatted);

newTempBuffer.put(sourceBuffer);

freeTempBigBuffers();
Expand All @@ -378,23 +374,29 @@ protected void reAllocTempBigBuffers(ByteBuffer sourceBuffer, int fullPacketLeng

protected void allocTempBigBuffers(ByteBuffer sourceBuffer, int fullPacketLength) {
int notConsumeBytes = sourceBuffer.remaining();

log.debug(
notConsumeBytes,
fullPacketLength,
"Request temp big buffer to store part:[%s] of big packet with length:[%s]"::formatted);

var tempBigBuffer = bufferAllocator.takeBuffer(fullPacketLength + readBuffer.capacity());
ByteBuffer tempBigBuffer = connection
.bufferAllocator()
.takeBuffer(fullPacketLength + readBuffer.capacity());

log.debug(sourceBuffer, tempBigBuffer, "Put data from old temp big buffer:[%s] to new:[%s]"::formatted);
tempBigBuffer.put(sourceBuffer);

tempBigBuffer.put(sourceBuffer);
this.tempBigBuffer = tempBigBuffer;
}

protected void freeTempBigBuffers() {
ByteBuffer tempBuffer = tempBigBuffer();
if (tempBuffer != null) {
tempBigBuffer(null);
bufferAllocator.putBuffer(tempBuffer);
connection
.bufferAllocator()
.putBuffer(tempBuffer);
}
}

Expand Down Expand Up @@ -425,7 +427,7 @@ protected void handleEmptyReadFromChannel() {
if (connection.closed()) {
reading.compareAndSet(true, false);
return;
} else if (!socketChannel.isOpen()) {
} else if (!connection.channel().isOpen()) {
connection.close();
return;
}
Expand Down Expand Up @@ -502,7 +504,8 @@ protected abstract R createPacketFor(

@Override
public void close() {
bufferAllocator
connection
.bufferAllocator()
.putReadBuffer(readBuffer)
.putPendingBuffer(pendingBuffer);
freeTempBigBuffers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javasabr.rlib.functions.ObjBoolConsumer;
import javasabr.rlib.network.BufferAllocator;
import javasabr.rlib.network.Connection;
import javasabr.rlib.network.NetworkConfig;
import javasabr.rlib.network.UnsafeConnection;
import javasabr.rlib.network.exception.MalformedProtocolException;
import javasabr.rlib.network.packet.NetworkPacketWriter;
import javasabr.rlib.network.packet.WritableNetworkPacket;
import lombok.AccessLevel;
Expand All @@ -33,7 +34,7 @@
@FieldDefaults(level = AccessLevel.PROTECTED)
public abstract class AbstractNetworkPacketWriter<
W extends WritableNetworkPacket<C>,
C extends Connection<C>> implements NetworkPacketWriter {
C extends UnsafeConnection<C>> implements NetworkPacketWriter {

final CompletionHandler<Integer, @Nullable WritableNetworkPacket<C>> writeHandler = new CompletionHandler<>() {

Expand All @@ -51,8 +52,6 @@ public void failed(Throwable exc, @Nullable WritableNetworkPacket<C> packet) {
final AtomicBoolean writing = new AtomicBoolean();

final C connection;
final AsynchronousSocketChannel socketChannel;
final BufferAllocator bufferAllocator;
final ByteBuffer writeBuffer;

@Nullable
Expand All @@ -67,16 +66,12 @@ public void failed(Throwable exc, @Nullable WritableNetworkPacket<C> packet) {

public AbstractNetworkPacketWriter(
C connection,
AsynchronousSocketChannel socketChannel,
BufferAllocator bufferAllocator,
Runnable updateActivityFunction,
Supplier<@Nullable WritableNetworkPacket<C>> packetProvider,
Consumer<WritableNetworkPacket<C>> serializedToChannelPacketHandler,
ObjBoolConsumer<WritableNetworkPacket<C>> sentPacketHandler) {
this.connection = connection;
this.socketChannel = socketChannel;
this.bufferAllocator = bufferAllocator;
this.writeBuffer = bufferAllocator.takeWriteBuffer();
this.writeBuffer = connection.bufferAllocator().takeWriteBuffer();
this.updateActivityFunction = updateActivityFunction;
this.writablePacketProvider = packetProvider;
this.serializedToChannelPacketHandler = serializedToChannelPacketHandler;
Expand Down Expand Up @@ -129,7 +124,7 @@ protected boolean writeBuffer(
}
writingBuffer = resultBuffer;
log.debug(remoteAddress(), resultBuffer, (address, buff) -> "[%s] Write to channel data:\n" + hexDump(buff));
socketChannel.write(resultBuffer, nextPacket, writeHandler);
connection.channel().write(resultBuffer, nextPacket, writeHandler);
return true;
}

Expand All @@ -148,10 +143,17 @@ protected ByteBuffer serialize(WritableNetworkPacket<C> packet) {
W resultPacket = (W) packet;

int expectedLength = packet.expectedLength(connection);
int totalSize = expectedLength == -1 ? -1 : totalSize(packet, expectedLength);
int totalSize = expectedLength == WritableNetworkPacket.UNKNOWN_EXPECTED_BYTES
? WritableNetworkPacket.UNKNOWN_EXPECTED_BYTES
: totalSize(packet, expectedLength);

if (totalSize > 0) {
validateMaxPacketSize(packet, totalSize);
}

// if the packet is too big to use a write buffer
if (expectedLength != -1 && totalSize > writeBuffer.capacity()) {
BufferAllocator bufferAllocator = connection.bufferAllocator();
ByteBuffer tempBuffer = bufferAllocator.takeBuffer(totalSize);
try {
ByteBuffer serialized = serialize(resultPacket, expectedLength, totalSize, tempBuffer);
Expand All @@ -173,6 +175,17 @@ protected ByteBuffer serialize(WritableNetworkPacket<C> packet) {
}
}

protected void validateMaxPacketSize(WritableNetworkPacket<C> packet, int totalSize) {
NetworkConfig networkConfig = connection
.network()
.config();

if (totalSize > networkConfig.maxPacketSize()) {
throw new MalformedProtocolException(
"Writable packet:[" + packet + "] is too big:[" + totalSize + ">" + networkConfig.maxPacketSize() + "]");
}
}

/**
* Gets total size of the packet if it's possible.
*
Expand Down Expand Up @@ -341,7 +354,7 @@ protected void handleSuccessfulWritingData(Integer wroteBytes, @Nullable Writabl
log.debug(remoteAddress(), writingBuffer,
"[%s] Buffer was not consumed fully, try to write else [%s] bytes to channel"::formatted);
try {
socketChannel.write(writingBuffer, packet, writeHandler);
connection.channel().write(writingBuffer, packet, writeHandler);
} catch (RuntimeException ex) {
log.error(ex);
if (writing.compareAndSet(true, false)) {
Expand Down Expand Up @@ -385,7 +398,9 @@ protected void handleFailedWritingData(Throwable exception, @Nullable WritableNe

@Override
public void close() {
bufferAllocator.putWriteBuffer(writeBuffer);
connection
.bufferAllocator()
.putWriteBuffer(writeBuffer);
clearTempBuffers();
writingBuffer = EMPTY_BUFFER;
writing.compareAndSet(true, false);
Expand All @@ -396,7 +411,9 @@ protected void clearTempBuffers() {
var writeTempBuffer = this.writeTempBuffer;
if (writeTempBuffer != null) {
this.writeTempBuffer = null;
bufferAllocator.putBuffer(writeTempBuffer);
connection
.bufferAllocator()
.putBuffer(writeTempBuffer);
}
}
}
Loading
Loading