diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/http/impl/async/InternalTestHttpAsyncExecRuntime.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/http/impl/async/InternalTestHttpAsyncExecRuntime.java new file mode 100644 index 0000000000..b31251d417 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/http/impl/async/InternalTestHttpAsyncExecRuntime.java @@ -0,0 +1,152 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.async.AsyncExecRuntime; +import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.nio.AsyncClientConnectionManager; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.BasicFuture; +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.FutureContribution; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.reactor.ConnectionInitiator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class InternalTestHttpAsyncExecRuntime extends InternalHttpAsyncExecRuntime { + + private static final Logger LOG = LoggerFactory.getLogger(InternalTestHttpAsyncExecRuntime.class); + + private final AtomicBoolean cancelled; + + public InternalTestHttpAsyncExecRuntime(final AsyncClientConnectionManager manager, + final ConnectionInitiator connectionInitiator, + final TlsConfig tlsConfig) { + super(LOG, manager, connectionInitiator, null, tlsConfig, -1, new AtomicInteger()); + this.cancelled = new AtomicBoolean(); + } + + public Future leaseAndConnect(final HttpHost target, final HttpClientContext context) { + final BasicFuture resultFuture = new BasicFuture<>(null); + acquireEndpoint("test", new HttpRoute(target), null, context, new FutureContribution(resultFuture) { + + @Override + public void completed(final AsyncExecRuntime runtime) { + if (!runtime.isEndpointConnected()) { + runtime.connectEndpoint(context, new FutureContribution(resultFuture) { + + @Override + public void completed(final AsyncExecRuntime runtime) { + resultFuture.completed(true); + } + + }); + } else { + resultFuture.completed(true); + } + } + + }); + return resultFuture; + } + + @Override + public Cancellable execute(final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) { + return super.execute(id, new AsyncClientExchangeHandler() { + + public void cancel() { + if (cancelled.compareAndSet(false, true)) { + exchangeHandler.cancel(); + } + } + + public void failed(final Exception cause) { + exchangeHandler.failed(cause); + } + + public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException { + exchangeHandler.produceRequest(channel, context); + } + + public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) throws HttpException, IOException { + exchangeHandler.consumeResponse(response, entityDetails, context); + } + + public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException { + exchangeHandler.consumeInformation(response, context); + } + + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + exchangeHandler.updateCapacity(capacityChannel); + } + + public void consume(final ByteBuffer src) throws IOException { + exchangeHandler.consume(src); + } + + public void streamEnd(final List trailers) throws HttpException, IOException { + exchangeHandler.streamEnd(trailers); + } + + public void releaseResources() { + exchangeHandler.releaseResources(); + } + + public int available() { + return exchangeHandler.available(); + } + + public void produce(final DataStreamChannel channel) throws IOException { + exchangeHandler.produce(channel); + } + + }, context); + } + + public boolean isAborted() { + return cancelled.get(); + } + +} diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java new file mode 100644 index 0000000000..c29017b41d --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java @@ -0,0 +1,209 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import java.net.InetSocketAddress; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.impl.async.InternalTestHttpAsyncExecRuntime; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel; +import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel; +import org.apache.hc.client5.testing.extension.async.TestAsyncClient; +import org.apache.hc.client5.testing.extension.async.TestAsyncResources; +import org.apache.hc.client5.testing.extension.async.TestAsyncServer; +import org.apache.hc.client5.testing.extension.async.TestAsyncServerBootstrap; +import org.apache.hc.core5.concurrent.BasicFuture; +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.FutureContribution; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.nio.support.AsyncClientPipeline; +import org.apache.hc.core5.http.support.BasicRequestBuilder; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.pool.PoolStats; +import org.apache.hc.core5.reactor.ConnectionInitiator; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class TestInternalHttpAsyncExecRuntime { + + public static final Timeout TIMEOUT = Timeout.ofMinutes(1); + + @RegisterExtension + private final TestAsyncResources testResources; + + public TestInternalHttpAsyncExecRuntime() { + this.testResources = new TestAsyncResources(URIScheme.HTTP, ClientProtocolLevel.STANDARD, ServerProtocolLevel.STANDARD, TIMEOUT); + } + + public void configureServer(final Consumer serverCustomizer) { + testResources.configureServer(serverCustomizer); + } + + public HttpHost startServer() throws Exception { + final TestAsyncServer server = testResources.server(); + final InetSocketAddress inetSocketAddress = server.start(); + return new HttpHost(testResources.scheme().id, "localhost", inetSocketAddress.getPort()); + } + + public TestAsyncClient startClient() throws Exception { + final TestAsyncClient client = testResources.client(); + client.start(); + return client; + } + + static final int REQ_NUM = 5; + + HttpRequest createRequest(final HttpHost target) { + return BasicRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/20000") + .addHeader(HttpHeaders.HOST, target.toHostString()) + .build(); + } + + @Test + void testExecutionCancellation_http11HardCancellation_connectionMarkedNonReusable() throws Exception { + configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new)); + final HttpHost target = startServer(); + + final TestAsyncClient client = startClient(); + final ConnectionInitiator connectionInitiator = client.getImplementation(); + final PoolingAsyncClientConnectionManager connectionManager = client.getConnectionManager(); + for (int i = 0; i < REQ_NUM; i++) { + final HttpClientContext context = HttpClientContext.create(); + + final InternalTestHttpAsyncExecRuntime testRuntime = new InternalTestHttpAsyncExecRuntime( + connectionManager, + connectionInitiator, + TlsConfig.custom() + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1) + .build()); + final Future connectFuture = testRuntime.leaseAndConnect(target, context); + Assertions.assertTrue(connectFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit())); + + final BasicFuture> resultFuture = new BasicFuture<>(null); + final Cancellable cancellable = testRuntime.execute( + "test-" + i, + AsyncClientPipeline.assemble() + .request(createRequest(target)).noContent() + .response().asByteArray() + .result(new FutureContribution>(resultFuture) { + + @Override + public void completed(final Message result) { + resultFuture.completed(result); + } + + }) + .create(), + context); + // sleep a bit + Thread.sleep(i % 10); + cancellable.cancel(); + + // The message exchange is expected to get aborted + try { + resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + } catch (final CancellationException expected) { + } + Assertions.assertTrue(testRuntime.isAborted()); + testRuntime.discardEndpoint(); + } + } + + @Test + void testExecutionCancellation_http11NoHardCancellation_connectionAlive() throws Exception { + configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new)); + final HttpHost target = startServer(); + + final TestAsyncClient client = startClient(); + final ConnectionInitiator connectionInitiator = client.getImplementation(); + final PoolingAsyncClientConnectionManager connectionManager = client.getConnectionManager(); + for (int i = 0; i < REQ_NUM; i++) { + final HttpClientContext context = HttpClientContext.create(); + context.setRequestConfig(RequestConfig.custom() + .setHardCancellationEnabled(false) + .build()); + + final InternalTestHttpAsyncExecRuntime testRuntime = new InternalTestHttpAsyncExecRuntime( + connectionManager, + connectionInitiator, + TlsConfig.custom() + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1) + .build()); + final Future connectFuture = testRuntime.leaseAndConnect(target, context); + Assertions.assertTrue(connectFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit())); + + final BasicFuture> resultFuture = new BasicFuture<>(null); + final Cancellable cancellable = testRuntime.execute( + "test-" + i, + AsyncClientPipeline.assemble() + .request(createRequest(target)).noContent() + .response().asByteArray() + .result(new FutureContribution>(resultFuture) { + + @Override + public void completed(final Message result) { + resultFuture.completed(result); + } + + }) + .create(), + context); + // sleep a bit + Thread.sleep(i % 10); + cancellable.cancel(); + + // The message exchange should not get aborted and is expected to successfully complete + final Message message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + Assertions.assertNotNull(message); + Assertions.assertFalse(testRuntime.isAborted()); + // The underlying connection is expected to stay valid + Assertions.assertTrue(testRuntime.isEndpointConnected()); + testRuntime.markConnectionReusable(null, TimeValue.ofMinutes(1)); + testRuntime.releaseEndpoint(); + + final PoolStats totalStats = connectionManager.getTotalStats(); + Assertions.assertTrue(totalStats.getAvailable() > 0); + } + } + +} diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncServerBootstrap.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncServerBootstrap.java index 3bb742a6dc..897b702f0e 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncServerBootstrap.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncServerBootstrap.java @@ -66,7 +66,7 @@ public HandlerEntry(final String uriPattern, final T handler) { } private final URIScheme scheme; - private final ServerProtocolLevel serverProtocolLevel; + private ServerProtocolLevel serverProtocolLevel; private final List>> handlerList; private Timeout timeout; @@ -79,6 +79,10 @@ public TestAsyncServerBootstrap(final URIScheme scheme, final ServerProtocolLeve this.handlerList = new ArrayList<>(); } + public void setServerProtocolLevel(final ServerProtocolLevel serverProtocolLevel) { + this.serverProtocolLevel = serverProtocolLevel; + } + public ServerProtocolLevel getProtocolLevel() { return serverProtocolLevel; } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java index 9a7820cb50..da3adf532a 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java @@ -29,6 +29,7 @@ import java.io.InterruptedIOException; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -44,7 +45,10 @@ import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.concurrent.CallbackContribution; import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.ComplexCancellable; import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.HandlerFactory; @@ -327,6 +331,43 @@ private AsyncClientExchangeHandler guard(final AsyncClientExchangeHandler handle return new ReleasingAsyncClientExchangeHandler(handler, this::releaseSlot); } + private Cancellable doExecute( + final String id, + final AsyncConnectionEndpoint endpoint, + final AsyncClientExchangeHandler exchangeHandler, + final HttpClientContext context) { + if (log.isDebugEnabled()) { + log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); + } + final RequestConfig requestConfig = context.getRequestConfigOrDefault(); + final Timeout responseTimeout = requestConfig.getResponseTimeout(); + final EndpointInfo endpointInfo = endpoint.getInfo(); + final ProtocolVersion version = endpointInfo != null ? endpointInfo.getProtocol() : null; + final boolean isH2 = version != null && version.greaterEquals(HttpVersion.HTTP_2); + if (!isH2 && responseTimeout != null) { + endpoint.setSocketTimeout(responseTimeout); + } + endpoint.execute(id, exchangeHandler, pushHandlerFactory, context); + if (isH2 || requestConfig.isHardCancellationEnabled()) { + return new Cancellable() { + + private final AtomicBoolean cancelled = new AtomicBoolean(); + + @Override + public boolean cancel() { + if (cancelled.compareAndSet(false, true)) { + exchangeHandler.cancel(); + return true; + } + return false; + } + + }; + } else { + return Operations.nonCancellable(); + } + } + @Override public Cancellable execute( final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) { @@ -336,33 +377,18 @@ public Cancellable execute( "Execution pipeline queue limit reached (max=" + maxQueued + ")")); return Operations.nonCancellable(); } - final AsyncClientExchangeHandler actual = guard(exchangeHandler); + final AsyncClientExchangeHandler handler = guard(exchangeHandler); if (endpoint.isConnected()) { - if (log.isDebugEnabled()) { - log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); - } - final RequestConfig requestConfig = context.getRequestConfigOrDefault(); - final Timeout responseTimeout = requestConfig.getResponseTimeout(); - if (responseTimeout != null) { - endpoint.setSocketTimeout(responseTimeout); - } - endpoint.execute(id, actual, pushHandlerFactory, context); - if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) { - return () -> { - actual.cancel(); - return true; - }; - } + return doExecute(id, endpoint, handler, context); } else { - connectEndpoint(context, new FutureCallback() { + final ComplexCancellable complexCancellable = new ComplexCancellable(); + final Cancellable connectCancellable = connectEndpoint(context, new FutureCallback() { @Override public void completed(final AsyncExecRuntime runtime) { - if (log.isDebugEnabled()) { - log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); - } try { - endpoint.execute(id, actual, pushHandlerFactory, context); + final Cancellable executeCancellable = doExecute(id, endpoint, handler, context); + complexCancellable.setDependency(executeCancellable); } catch (final RuntimeException ex) { failed(ex); } @@ -370,17 +396,18 @@ public void completed(final AsyncExecRuntime runtime) { @Override public void failed(final Exception ex) { - actual.failed(ex); + handler.failed(ex); } @Override public void cancelled() { - actual.failed(new InterruptedIOException()); + handler.failed(new InterruptedIOException()); } }); + complexCancellable.setDependency(connectCancellable); + return complexCancellable; } - return Operations.nonCancellable(); } @Override