diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/http/impl/async/InternalTestHttpAsyncExecRuntime.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/http/impl/async/InternalTestHttpAsyncExecRuntime.java
new file mode 100644
index 0000000000..b31251d417
--- /dev/null
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/http/impl/async/InternalTestHttpAsyncExecRuntime.java
@@ -0,0 +1,152 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
+import org.apache.hc.client5.http.config.TlsConfig;
+import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.FutureContribution;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.reactor.ConnectionInitiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class InternalTestHttpAsyncExecRuntime extends InternalHttpAsyncExecRuntime {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InternalTestHttpAsyncExecRuntime.class);
+
+ private final AtomicBoolean cancelled;
+
+ public InternalTestHttpAsyncExecRuntime(final AsyncClientConnectionManager manager,
+ final ConnectionInitiator connectionInitiator,
+ final TlsConfig tlsConfig) {
+ super(LOG, manager, connectionInitiator, null, tlsConfig, -1, new AtomicInteger());
+ this.cancelled = new AtomicBoolean();
+ }
+
+ public Future leaseAndConnect(final HttpHost target, final HttpClientContext context) {
+ final BasicFuture resultFuture = new BasicFuture<>(null);
+ acquireEndpoint("test", new HttpRoute(target), null, context, new FutureContribution(resultFuture) {
+
+ @Override
+ public void completed(final AsyncExecRuntime runtime) {
+ if (!runtime.isEndpointConnected()) {
+ runtime.connectEndpoint(context, new FutureContribution(resultFuture) {
+
+ @Override
+ public void completed(final AsyncExecRuntime runtime) {
+ resultFuture.completed(true);
+ }
+
+ });
+ } else {
+ resultFuture.completed(true);
+ }
+ }
+
+ });
+ return resultFuture;
+ }
+
+ @Override
+ public Cancellable execute(final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
+ return super.execute(id, new AsyncClientExchangeHandler() {
+
+ public void cancel() {
+ if (cancelled.compareAndSet(false, true)) {
+ exchangeHandler.cancel();
+ }
+ }
+
+ public void failed(final Exception cause) {
+ exchangeHandler.failed(cause);
+ }
+
+ public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
+ exchangeHandler.produceRequest(channel, context);
+ }
+
+ public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) throws HttpException, IOException {
+ exchangeHandler.consumeResponse(response, entityDetails, context);
+ }
+
+ public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
+ exchangeHandler.consumeInformation(response, context);
+ }
+
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ exchangeHandler.updateCapacity(capacityChannel);
+ }
+
+ public void consume(final ByteBuffer src) throws IOException {
+ exchangeHandler.consume(src);
+ }
+
+ public void streamEnd(final List extends Header> trailers) throws HttpException, IOException {
+ exchangeHandler.streamEnd(trailers);
+ }
+
+ public void releaseResources() {
+ exchangeHandler.releaseResources();
+ }
+
+ public int available() {
+ return exchangeHandler.available();
+ }
+
+ public void produce(final DataStreamChannel channel) throws IOException {
+ exchangeHandler.produce(channel);
+ }
+
+ }, context);
+ }
+
+ public boolean isAborted() {
+ return cancelled.get();
+ }
+
+}
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java
new file mode 100644
index 0000000000..c29017b41d
--- /dev/null
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java
@@ -0,0 +1,209 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.testing.async;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.config.TlsConfig;
+import org.apache.hc.client5.http.impl.async.InternalTestHttpAsyncExecRuntime;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
+import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
+import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
+import org.apache.hc.client5.testing.extension.async.TestAsyncResources;
+import org.apache.hc.client5.testing.extension.async.TestAsyncServer;
+import org.apache.hc.client5.testing.extension.async.TestAsyncServerBootstrap;
+import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.FutureContribution;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.nio.support.AsyncClientPipeline;
+import org.apache.hc.core5.http.support.BasicRequestBuilder;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.pool.PoolStats;
+import org.apache.hc.core5.reactor.ConnectionInitiator;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class TestInternalHttpAsyncExecRuntime {
+
+ public static final Timeout TIMEOUT = Timeout.ofMinutes(1);
+
+ @RegisterExtension
+ private final TestAsyncResources testResources;
+
+ public TestInternalHttpAsyncExecRuntime() {
+ this.testResources = new TestAsyncResources(URIScheme.HTTP, ClientProtocolLevel.STANDARD, ServerProtocolLevel.STANDARD, TIMEOUT);
+ }
+
+ public void configureServer(final Consumer serverCustomizer) {
+ testResources.configureServer(serverCustomizer);
+ }
+
+ public HttpHost startServer() throws Exception {
+ final TestAsyncServer server = testResources.server();
+ final InetSocketAddress inetSocketAddress = server.start();
+ return new HttpHost(testResources.scheme().id, "localhost", inetSocketAddress.getPort());
+ }
+
+ public TestAsyncClient startClient() throws Exception {
+ final TestAsyncClient client = testResources.client();
+ client.start();
+ return client;
+ }
+
+ static final int REQ_NUM = 5;
+
+ HttpRequest createRequest(final HttpHost target) {
+ return BasicRequestBuilder.get()
+ .setHttpHost(target)
+ .setPath("/random/20000")
+ .addHeader(HttpHeaders.HOST, target.toHostString())
+ .build();
+ }
+
+ @Test
+ void testExecutionCancellation_http11HardCancellation_connectionMarkedNonReusable() throws Exception {
+ configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
+ final HttpHost target = startServer();
+
+ final TestAsyncClient client = startClient();
+ final ConnectionInitiator connectionInitiator = client.getImplementation();
+ final PoolingAsyncClientConnectionManager connectionManager = client.getConnectionManager();
+ for (int i = 0; i < REQ_NUM; i++) {
+ final HttpClientContext context = HttpClientContext.create();
+
+ final InternalTestHttpAsyncExecRuntime testRuntime = new InternalTestHttpAsyncExecRuntime(
+ connectionManager,
+ connectionInitiator,
+ TlsConfig.custom()
+ .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
+ .build());
+ final Future connectFuture = testRuntime.leaseAndConnect(target, context);
+ Assertions.assertTrue(connectFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
+
+ final BasicFuture> resultFuture = new BasicFuture<>(null);
+ final Cancellable cancellable = testRuntime.execute(
+ "test-" + i,
+ AsyncClientPipeline.assemble()
+ .request(createRequest(target)).noContent()
+ .response().asByteArray()
+ .result(new FutureContribution>(resultFuture) {
+
+ @Override
+ public void completed(final Message result) {
+ resultFuture.completed(result);
+ }
+
+ })
+ .create(),
+ context);
+ // sleep a bit
+ Thread.sleep(i % 10);
+ cancellable.cancel();
+
+ // The message exchange is expected to get aborted
+ try {
+ resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ } catch (final CancellationException expected) {
+ }
+ Assertions.assertTrue(testRuntime.isAborted());
+ testRuntime.discardEndpoint();
+ }
+ }
+
+ @Test
+ void testExecutionCancellation_http11NoHardCancellation_connectionAlive() throws Exception {
+ configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
+ final HttpHost target = startServer();
+
+ final TestAsyncClient client = startClient();
+ final ConnectionInitiator connectionInitiator = client.getImplementation();
+ final PoolingAsyncClientConnectionManager connectionManager = client.getConnectionManager();
+ for (int i = 0; i < REQ_NUM; i++) {
+ final HttpClientContext context = HttpClientContext.create();
+ context.setRequestConfig(RequestConfig.custom()
+ .setHardCancellationEnabled(false)
+ .build());
+
+ final InternalTestHttpAsyncExecRuntime testRuntime = new InternalTestHttpAsyncExecRuntime(
+ connectionManager,
+ connectionInitiator,
+ TlsConfig.custom()
+ .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
+ .build());
+ final Future connectFuture = testRuntime.leaseAndConnect(target, context);
+ Assertions.assertTrue(connectFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
+
+ final BasicFuture> resultFuture = new BasicFuture<>(null);
+ final Cancellable cancellable = testRuntime.execute(
+ "test-" + i,
+ AsyncClientPipeline.assemble()
+ .request(createRequest(target)).noContent()
+ .response().asByteArray()
+ .result(new FutureContribution>(resultFuture) {
+
+ @Override
+ public void completed(final Message result) {
+ resultFuture.completed(result);
+ }
+
+ })
+ .create(),
+ context);
+ // sleep a bit
+ Thread.sleep(i % 10);
+ cancellable.cancel();
+
+ // The message exchange should not get aborted and is expected to successfully complete
+ final Message message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assertions.assertNotNull(message);
+ Assertions.assertFalse(testRuntime.isAborted());
+ // The underlying connection is expected to stay valid
+ Assertions.assertTrue(testRuntime.isEndpointConnected());
+ testRuntime.markConnectionReusable(null, TimeValue.ofMinutes(1));
+ testRuntime.releaseEndpoint();
+
+ final PoolStats totalStats = connectionManager.getTotalStats();
+ Assertions.assertTrue(totalStats.getAvailable() > 0);
+ }
+ }
+
+}
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncServerBootstrap.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncServerBootstrap.java
index 3bb742a6dc..897b702f0e 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncServerBootstrap.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncServerBootstrap.java
@@ -66,7 +66,7 @@ public HandlerEntry(final String uriPattern, final T handler) {
}
private final URIScheme scheme;
- private final ServerProtocolLevel serverProtocolLevel;
+ private ServerProtocolLevel serverProtocolLevel;
private final List>> handlerList;
private Timeout timeout;
@@ -79,6 +79,10 @@ public TestAsyncServerBootstrap(final URIScheme scheme, final ServerProtocolLeve
this.handlerList = new ArrayList<>();
}
+ public void setServerProtocolLevel(final ServerProtocolLevel serverProtocolLevel) {
+ this.serverProtocolLevel = serverProtocolLevel;
+ }
+
public ServerProtocolLevel getProtocolLevel() {
return serverProtocolLevel;
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
index 9a7820cb50..da3adf532a 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java
@@ -29,6 +29,7 @@
import java.io.InterruptedIOException;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -44,7 +45,10 @@
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.ComplexCancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
@@ -327,6 +331,43 @@ private AsyncClientExchangeHandler guard(final AsyncClientExchangeHandler handle
return new ReleasingAsyncClientExchangeHandler(handler, this::releaseSlot);
}
+ private Cancellable doExecute(
+ final String id,
+ final AsyncConnectionEndpoint endpoint,
+ final AsyncClientExchangeHandler exchangeHandler,
+ final HttpClientContext context) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
+ }
+ final RequestConfig requestConfig = context.getRequestConfigOrDefault();
+ final Timeout responseTimeout = requestConfig.getResponseTimeout();
+ final EndpointInfo endpointInfo = endpoint.getInfo();
+ final ProtocolVersion version = endpointInfo != null ? endpointInfo.getProtocol() : null;
+ final boolean isH2 = version != null && version.greaterEquals(HttpVersion.HTTP_2);
+ if (!isH2 && responseTimeout != null) {
+ endpoint.setSocketTimeout(responseTimeout);
+ }
+ endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
+ if (isH2 || requestConfig.isHardCancellationEnabled()) {
+ return new Cancellable() {
+
+ private final AtomicBoolean cancelled = new AtomicBoolean();
+
+ @Override
+ public boolean cancel() {
+ if (cancelled.compareAndSet(false, true)) {
+ exchangeHandler.cancel();
+ return true;
+ }
+ return false;
+ }
+
+ };
+ } else {
+ return Operations.nonCancellable();
+ }
+ }
+
@Override
public Cancellable execute(
final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
@@ -336,33 +377,18 @@ public Cancellable execute(
"Execution pipeline queue limit reached (max=" + maxQueued + ")"));
return Operations.nonCancellable();
}
- final AsyncClientExchangeHandler actual = guard(exchangeHandler);
+ final AsyncClientExchangeHandler handler = guard(exchangeHandler);
if (endpoint.isConnected()) {
- if (log.isDebugEnabled()) {
- log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
- }
- final RequestConfig requestConfig = context.getRequestConfigOrDefault();
- final Timeout responseTimeout = requestConfig.getResponseTimeout();
- if (responseTimeout != null) {
- endpoint.setSocketTimeout(responseTimeout);
- }
- endpoint.execute(id, actual, pushHandlerFactory, context);
- if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) {
- return () -> {
- actual.cancel();
- return true;
- };
- }
+ return doExecute(id, endpoint, handler, context);
} else {
- connectEndpoint(context, new FutureCallback() {
+ final ComplexCancellable complexCancellable = new ComplexCancellable();
+ final Cancellable connectCancellable = connectEndpoint(context, new FutureCallback() {
@Override
public void completed(final AsyncExecRuntime runtime) {
- if (log.isDebugEnabled()) {
- log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
- }
try {
- endpoint.execute(id, actual, pushHandlerFactory, context);
+ final Cancellable executeCancellable = doExecute(id, endpoint, handler, context);
+ complexCancellable.setDependency(executeCancellable);
} catch (final RuntimeException ex) {
failed(ex);
}
@@ -370,17 +396,18 @@ public void completed(final AsyncExecRuntime runtime) {
@Override
public void failed(final Exception ex) {
- actual.failed(ex);
+ handler.failed(ex);
}
@Override
public void cancelled() {
- actual.failed(new InterruptedIOException());
+ handler.failed(new InterruptedIOException());
}
});
+ complexCancellable.setDependency(connectCancellable);
+ return complexCancellable;
}
- return Operations.nonCancellable();
}
@Override