diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java index a46a33b907..f3ca9a8415 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java @@ -64,6 +64,7 @@ public final class IOReactorConfig { private final SocketAddress socksProxyAddress; private final String socksProxyUsername; private final String socksProxyPassword; + private final int maxCommandsPerSession; IOReactorConfig( final TimeValue selectInterval, @@ -82,7 +83,8 @@ public final class IOReactorConfig { final int tcpKeepCount, final SocketAddress socksProxyAddress, final String socksProxyUsername, - final String socksProxyPassword) { + final String socksProxyPassword, + final int maxCommandsPerSession) { super(); this.selectInterval = selectInterval; this.ioThreadCount = ioThreadCount; @@ -101,6 +103,7 @@ public final class IOReactorConfig { this.socksProxyAddress = socksProxyAddress; this.socksProxyUsername = socksProxyUsername; this.socksProxyPassword = socksProxyPassword; + this.maxCommandsPerSession = maxCommandsPerSession; } /** @@ -240,6 +243,16 @@ public String getSocksProxyPassword() { return this.socksProxyPassword; } + /** + * Maximum number of commands that can be enqueued per I/O session. + * A value of {@code 0} means unlimited. + * + * @since 5.5 + */ + public int getMaxCommandsPerSession() { + return this.maxCommandsPerSession; + } + public static Builder custom() { return new Builder(); } @@ -262,7 +275,9 @@ public static Builder copy(final IOReactorConfig config) { .setTcpKeepCount(config.getTcpKeepCount()) .setSocksProxyAddress(config.getSocksProxyAddress()) .setSocksProxyUsername(config.getSocksProxyUsername()) - .setSocksProxyPassword(config.getSocksProxyPassword()); + .setSocksProxyPassword(config.getSocksProxyPassword()) + .setMaxCommandsPerSession(config.getMaxCommandsPerSession()); + } public static class Builder { @@ -311,6 +326,7 @@ public static void setDefaultMaxIOThreadCount(final int defaultMaxIOThreadCount) private SocketAddress socksProxyAddress; private String socksProxyUsername; private String socksProxyPassword; + private int maxCommandsPerSession; Builder() { this.selectInterval = TimeValue.ofSeconds(1); @@ -330,6 +346,7 @@ public static void setDefaultMaxIOThreadCount(final int defaultMaxIOThreadCount) this.socksProxyAddress = null; this.socksProxyUsername = null; this.socksProxyPassword = null; + this.maxCommandsPerSession = 0; } /** @@ -596,6 +613,17 @@ public Builder setSocksProxyPassword(final String socksProxyPassword) { return this; } + /** + * Sets maximum number of commands enqueued per I/O session. + * A value of {@code 0} means unlimited. + * + * @since 5.5 + */ + public Builder setMaxCommandsPerSession(final int maxCommandsPerSession) { + this.maxCommandsPerSession = Args.notNegative(maxCommandsPerSession, "Max commands per session"); + return this; + } + public IOReactorConfig build() { return new IOReactorConfig( selectInterval != null ? selectInterval : TimeValue.ofSeconds(1), @@ -608,7 +636,7 @@ public IOReactorConfig build() { trafficClass, sndBufSize, rcvBufSize, backlogSize, tcpKeepIdle, tcpKeepInterval, tcpKeepCount, - socksProxyAddress, socksProxyUsername, socksProxyPassword); + socksProxyAddress, socksProxyUsername, socksProxyPassword, maxCommandsPerSession); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java index cf2b6023d3..a213e4eabe 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java @@ -36,6 +36,8 @@ import java.nio.channels.SocketChannel; import java.util.Deque; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -65,9 +67,11 @@ class IOSessionImpl implements IOSession { private volatile long lastReadTime; private volatile long lastWriteTime; private volatile long lastEventTime; + private final int maxCommandsPerSession; + private final AtomicInteger queuedCommands; public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel, - final Callback sessionClosedCallback) { + final Callback sessionClosedCallback, final int maxCommandsPerSession) { super(); this.key = Args.notNull(key, "Selection key"); this.channel = Args.notNull(socketChannel, "Socket channel"); @@ -82,6 +86,8 @@ public IOSessionImpl(final String type, final SelectionKey key, final SocketChan this.lastReadTime = currentTimeMillis; this.lastWriteTime = currentTimeMillis; this.lastEventTime = currentTimeMillis; + this.maxCommandsPerSession = maxCommandsPerSession; + this.queuedCommands = maxCommandsPerSession > 0 ? new AtomicInteger(0) : null; } @Override @@ -104,17 +110,47 @@ public Lock getLock() { return lock; } + private boolean tryIncrementQueuedCommands(final Command command) { + for (;;) { + final int q = queuedCommands.get(); + if (q >= maxCommandsPerSession) { + command.cancel(); + return false; + } + if (queuedCommands.compareAndSet(q, q + 1)) { + return true; + } + } + } + @Override public void enqueue(final Command command, final Command.Priority priority) { - if (priority == Command.Priority.IMMEDIATE) { - commandQueue.addFirst(command); - } else { - commandQueue.add(command); + if (command == null) { + return; + } + if (maxCommandsPerSession > 0 && !tryIncrementQueuedCommands(command)) { + throw new RejectedExecutionException("I/O session command queue limit reached (max=" + maxCommandsPerSession + ")"); } - if (isOpen()) { + if (!isOpen()) { + command.cancel(); + if (maxCommandsPerSession > 0) { + queuedCommands.decrementAndGet(); + } + return; + } + try { + if (priority == Command.Priority.IMMEDIATE) { + commandQueue.addFirst(command); + } else { + commandQueue.add(command); + } setEvent(SelectionKey.OP_WRITE); - } else { + } catch (final RuntimeException ex) { command.cancel(); + if (maxCommandsPerSession > 0) { + queuedCommands.decrementAndGet(); + } + throw ex; } } @@ -125,7 +161,11 @@ public boolean hasCommands() { @Override public Command poll() { - return commandQueue.poll(); + final Command command = commandQueue.poll(); + if (command != null && maxCommandsPerSession > 0) { + queuedCommands.decrementAndGet(); + } + return command; } @Override @@ -211,7 +251,7 @@ public void setSocketTimeout(final Timeout timeout) { @Override public int read(final ByteBuffer dst) throws IOException { - return this.channel.read(dst); + return this.channel.read(dst); } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java index 2d7004b77c..4ca5956e76 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java @@ -222,7 +222,7 @@ private void processPendingChannels() throws IOException { } catch (final ClosedChannelException ex) { return; } - final IOSessionImpl ioSession = new IOSessionImpl("a", key, socketChannel, closedSessions::add); + final IOSessionImpl ioSession = new IOSessionImpl("a", key, socketChannel, closedSessions::add, reactorConfig.getMaxCommandsPerSession()); final InternalDataChannel dataChannel = new InternalDataChannel( ioSession, null, @@ -391,7 +391,7 @@ private void processConnectionRequest(final SocketChannel socketChannel, final I validateAddress(remoteAddress); final boolean connected = socketChannel.connect(remoteAddress); final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); - final IOSessionImpl ioSession = new IOSessionImpl("c", key, socketChannel, closedSessions::add); + final IOSessionImpl ioSession = new IOSessionImpl("c", key, socketChannel, closedSessions::add, reactorConfig.getMaxCommandsPerSession()); final InternalDataChannel dataChannel = new InternalDataChannel( ioSession, sessionRequest.remoteEndpoint, diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionWithPerSessionCapExample.java b/httpcore5/src/test/java/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionWithPerSessionCapExample.java new file mode 100644 index 0000000000..bfc6d07775 --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionWithPerSessionCapExample.java @@ -0,0 +1,123 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http.examples; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.nio.AsyncClientEndpoint; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +public class AsyncPipelinedRequestExecutionWithPerSessionCapExample { + + public static void main(final String[] args) throws Exception { + + final int maxCommandsPerSession = 2; + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(5, TimeUnit.SECONDS) + .setMaxCommandsPerSession(maxCommandsPerSession) + .build(); + + final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP requester shutting down"); + requester.close(CloseMode.GRACEFUL); + })); + + requester.start(); + + final HttpHost target = new HttpHost("httpbin.org"); + final String[] requestUris = new String[] {"/delay/3?i=0", "/delay/3?i=1", "/delay/3?i=2", "/delay/3?i=3"}; + + final Future future = requester.connect(target, Timeout.ofSeconds(5)); + final AsyncClientEndpoint clientEndpoint = future.get(); + + final CountDownLatch latch = new CountDownLatch(requestUris.length); + + for (final String requestUri: requestUris) { + try { + clientEndpoint.execute( + AsyncRequestBuilder.get() + .setHttpHost(target) + .setPath(requestUri) + .build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + new FutureCallback>() { + + @Override + public void completed(final Message message) { + latch.countDown(); + final HttpResponse response = message.getHead(); + System.out.println(requestUri + "->" + response.getCode()); + } + + @Override + public void failed(final Exception ex) { + latch.countDown(); + System.out.println(requestUri + "->" + ex); + } + + @Override + public void cancelled() { + latch.countDown(); + System.out.println(requestUri + " cancelled"); + } + + }); + } catch (final RejectedExecutionException ex) { + latch.countDown(); + System.out.println(requestUri + "-> rejected: " + ex.getMessage()); + } + } + + latch.await(); + + clientEndpoint.releaseAndDiscard(); + + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + } + +} diff --git a/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestMaxCommandsPerSession.java b/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestMaxCommandsPerSession.java new file mode 100644 index 0000000000..37c8411a0d --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestMaxCommandsPerSession.java @@ -0,0 +1,100 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.reactor; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +class TestMaxCommandsPerSession { + + @Test + void testCommandQueueCap() throws Exception { + final ServerSocketChannel server = ServerSocketChannel.open(); + server.bind(new InetSocketAddress("127.0.0.1", 0)); + final InetSocketAddress address = (InetSocketAddress) server.getLocalAddress(); + + final SocketChannel client = SocketChannel.open(address); + client.configureBlocking(false); + + final SocketChannel accepted = server.accept(); + assertNotNull(accepted); + + final Selector selector = Selector.open(); + final SelectionKey key = client.register(selector, 0); + + final IOSessionImpl session = new IOSessionImpl("t", key, client, null, 2); + + final AtomicInteger cancelled = new AtomicInteger(0); + + final Command c1 = () -> { + cancelled.incrementAndGet(); + return true; + }; + final Command c2 = () -> { + cancelled.incrementAndGet(); + return true; + }; + final Command rejected = () -> { + cancelled.incrementAndGet(); + return true; + }; + + assertDoesNotThrow(() -> session.enqueue(c1, Command.Priority.NORMAL)); + assertDoesNotThrow(() -> session.enqueue(c2, Command.Priority.NORMAL)); + + assertThrows(RejectedExecutionException.class, () -> session.enqueue(rejected, Command.Priority.NORMAL)); + assertEquals(1, cancelled.get()); + + assertNotNull(session.poll()); + + final Command retry = () -> { + cancelled.incrementAndGet(); + return true; + }; + + assertDoesNotThrow(() -> session.enqueue(retry, Command.Priority.NORMAL)); + + accepted.close(); + client.close(); + selector.close(); + server.close(); + } + +}