Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS CRT Async HTTP Client",
"contributor": "",
"description": "Add HTTP/2 support in the AWS CRT Async HTTP Client."
}
25 changes: 25 additions & 0 deletions http-clients/aws-crt-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,31 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
Expand Down Expand Up @@ -91,15 +92,15 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
* we have a pool and no one can destroy it underneath us until we've finished submitting the
* request)
*/
try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(poolKey(asyncRequest.request()))) {
CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
.crtConnPool(crtConnPool)
.readBufferSize(this.readBufferSize)
.request(asyncRequest)
.build();

return new CrtAsyncRequestExecutor().execute(context);
}
HttpStreamManager streamManager = getOrCreateConnectionPool(poolKey(asyncRequest.request()));
CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
.streamManager(streamManager)
.readBufferSize(this.readBufferSize)
.request(asyncRequest)
.protocol(this.protocol)
.build();

return new CrtAsyncRequestExecutor().execute(context);
}

/**
Expand Down Expand Up @@ -224,6 +225,14 @@ AwsCrtAsyncHttpClient.Builder connectionHealthConfiguration(Consumer<ConnectionH
AwsCrtAsyncHttpClient.Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveConfiguration.Builder>
tcpKeepAliveConfigurationBuilder);

/**
* Configure the HTTP protocol version to use for connections.
*
* @param protocol the HTTP protocol version
* @return The builder for method chaining.
*/
AwsCrtAsyncHttpClient.Builder protocol(Protocol protocol);

/**
* Configure whether to enable a hybrid post-quantum key exchange option for the Transport Layer Security (TLS) network
* encryption protocol when communicating with services that support Post Quantum TLS. If Post Quantum cipher suites are
Expand All @@ -248,6 +257,13 @@ AwsCrtAsyncHttpClient.Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveCon
private static final class DefaultAsyncBuilder
extends AwsCrtClientBuilderBase<AwsCrtAsyncHttpClient.Builder> implements Builder {


@Override
public Builder protocol(Protocol protocol) {
getAttributeMap().put(SdkHttpConfigurationOption.PROTOCOL, protocol);
return this;
}

@Override
public SdkAsyncHttpClient build() {
return new AwsCrtAsyncHttpClient(this, getAttributeMap().build()
Expand All @@ -260,5 +276,6 @@ public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
.merge(serviceDefaults)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpFullResponse;
Expand Down Expand Up @@ -56,6 +57,11 @@ public final class AwsCrtHttpClient extends AwsCrtHttpClientBase implements SdkH

private AwsCrtHttpClient(DefaultBuilder builder, AttributeMap config) {
super(builder, config);
if (this.protocol == Protocol.HTTP2) {
throw new UnsupportedOperationException(
"HTTP/2 is not supported for sync HTTP clients. Either use HTTP/1.1 (the default) or use an async "
+ "HTTP client (e.g., AwsCrtAsyncHttpClient).");
}
}

public static AwsCrtHttpClient.Builder builder() {
Expand Down Expand Up @@ -91,14 +97,13 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
* we have a pool and no one can destroy it underneath us until we've finished submitting the
* request)
*/
try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(poolKey(request.httpRequest()))) {
CrtRequestContext context = CrtRequestContext.builder()
.crtConnPool(crtConnPool)
.readBufferSize(this.readBufferSize)
.request(request)
.build();
return new CrtHttpRequest(context);
}
HttpStreamManager streamManager = getOrCreateConnectionPool(poolKey(request.httpRequest()));
CrtRequestContext context = CrtRequestContext.builder()
.streamManager(streamManager)
.readBufferSize(this.readBufferSize)
.request(request)
.build();
return new CrtHttpRequest(context);
}

private static final class CrtHttpRequest implements ExecutableHttpRequest {
Expand Down Expand Up @@ -140,7 +145,7 @@ public HttpExecuteResponse call() throws IOException {
@Override
public void abort() {
if (responseFuture != null) {
responseFuture.completeExceptionally(new IOException("Request ws cancelled"));
responseFuture.completeExceptionally(new IOException("Request was cancelled"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import java.util.concurrent.ConcurrentHashMap;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.Http2StreamManagerOptions;
import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions;
import software.amazon.awssdk.crt.http.HttpMonitoringOptions;
import software.amazon.awssdk.crt.http.HttpProxyOptions;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.crt.http.HttpStreamManagerOptions;
import software.amazon.awssdk.crt.http.HttpVersion;
import software.amazon.awssdk.crt.io.ClientBootstrap;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
Expand All @@ -58,46 +61,48 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
private static final long DEFAULT_STREAM_WINDOW_SIZE = 16L * 1024L * 1024L; // 16 MB

protected final long readBufferSize;
private final Map<URI, HttpClientConnectionManager> connectionPools = new ConcurrentHashMap<>();
protected final Protocol protocol;
private final Map<URI, HttpStreamManager> connectionPools = new ConcurrentHashMap<>();
private final LinkedList<CrtResource> ownedSubResources = new LinkedList<>();
private final ClientBootstrap bootstrap;
private final SocketOptions socketOptions;
private final TlsContext tlsContext;
private final HttpProxyOptions proxyOptions;
private final HttpMonitoringOptions monitoringOptions;
private final long maxConnectionIdleInMilliseconds;
private final int maxConnectionsPerEndpoint;
private final int maxStreamsPerEndpoint;
private final long connectionAcquisitionTimeout;
private final TlsContextOptions tlsContextOptions;
private boolean isClosed = false;

AwsCrtHttpClientBase(AwsCrtClientBuilderBase builder, AttributeMap config) {
if (config.get(PROTOCOL) == Protocol.HTTP2) {
throw new UnsupportedOperationException("HTTP/2 is not supported in AwsCrtHttpClient yet. Use "
+ "NettyNioAsyncHttpClient instead.");
ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
SocketOptions clientSocketOptions = buildSocketOptions(builder.getTcpKeepAliveConfiguration(),
config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT));
TlsContextOptions clientTlsContextOptions =
TlsContextOptions.createDefaultClient()
.withCipherPreference(resolveCipherPreference(builder.getPostQuantumTlsEnabled()))
.withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
this.protocol = config.get(PROTOCOL);
if (protocol == Protocol.HTTP2) {
clientTlsContextOptions = clientTlsContextOptions.withAlpnList("h2");
}

try (ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
SocketOptions clientSocketOptions = buildSocketOptions(builder.getTcpKeepAliveConfiguration(),
config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT));
TlsContextOptions clientTlsContextOptions =
TlsContextOptions.createDefaultClient()
.withCipherPreference(resolveCipherPreference(builder.getPostQuantumTlsEnabled()))
.withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
TlsContext clientTlsContext = new TlsContext(clientTlsContextOptions)) {

this.bootstrap = registerOwnedResource(clientBootstrap);
this.socketOptions = registerOwnedResource(clientSocketOptions);
this.tlsContext = registerOwnedResource(clientTlsContext);
this.readBufferSize = builder.getReadBufferSizeInBytes() == null ?
DEFAULT_STREAM_WINDOW_SIZE : builder.getReadBufferSizeInBytes();
this.maxConnectionsPerEndpoint = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
this.monitoringOptions =
resolveHttpMonitoringOptions(builder.getConnectionHealthConfiguration())
.orElseGet(() -> defaultConnectionHealthConfiguration(config));
this.maxConnectionIdleInMilliseconds = config.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis();
this.connectionAcquisitionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT).toMillis();
this.proxyOptions = resolveProxy(builder.getProxyConfiguration(), tlsContext).orElse(null);
}
this.tlsContextOptions = registerOwnedResource(clientTlsContextOptions);
TlsContext clientTlsContext = new TlsContext(clientTlsContextOptions);

this.bootstrap = registerOwnedResource(clientBootstrap);
this.socketOptions = registerOwnedResource(clientSocketOptions);
this.tlsContext = registerOwnedResource(clientTlsContext);
this.readBufferSize = builder.getReadBufferSizeInBytes() == null ?
DEFAULT_STREAM_WINDOW_SIZE : builder.getReadBufferSizeInBytes();
this.maxStreamsPerEndpoint = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
this.monitoringOptions =
resolveHttpMonitoringOptions(builder.getConnectionHealthConfiguration())
.orElseGet(() -> defaultConnectionHealthConfiguration(config));
this.maxConnectionIdleInMilliseconds = config.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis();
this.connectionAcquisitionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT).toMillis();
this.proxyOptions = resolveProxy(builder.getProxyConfiguration(), tlsContext).orElse(null);
}

/**
Expand All @@ -109,7 +114,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
*/
private <T extends CrtResource> T registerOwnedResource(T subresource) {
if (subresource != null) {
subresource.addRef();
ownedSubResources.push(subresource);
}
return subresource;
Expand All @@ -119,23 +123,46 @@ String clientName() {
return AWS_COMMON_RUNTIME;
}

private HttpClientConnectionManager createConnectionPool(URI uri) {
log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint);
private HttpStreamManager createConnectionPool(URI uri) {
log.debug(() ->
String.format("Creating ConnectionPool for: URI:%s, MaxConns: %d, MaxStreams: %d",
uri, maxStreamsPerEndpoint, maxStreamsPerEndpoint));

boolean isHttps = "https".equalsIgnoreCase(uri.getScheme());
TlsContext poolTlsContext = isHttps ? tlsContext : null;

HttpClientConnectionManagerOptions options = new HttpClientConnectionManagerOptions()
HttpClientConnectionManagerOptions h1Options = new HttpClientConnectionManagerOptions()
.withClientBootstrap(bootstrap)
.withSocketOptions(socketOptions)
.withTlsContext(tlsContext)
.withTlsContext(poolTlsContext)
.withUri(uri)
.withWindowSize(readBufferSize)
.withMaxConnections(maxConnectionsPerEndpoint)
.withMaxConnections(maxStreamsPerEndpoint)
.withManualWindowManagement(true)
.withProxyOptions(proxyOptions)
.withMonitoringOptions(monitoringOptions)
.withMaxConnectionIdleInMilliseconds(maxConnectionIdleInMilliseconds)
.withConnectionAcquisitionTimeoutInMilliseconds(connectionAcquisitionTimeout);

return HttpClientConnectionManager.create(options);
HttpStreamManagerOptions options = new HttpStreamManagerOptions()
.withHTTP1ConnectionManagerOptions(h1Options);

if (protocol == Protocol.HTTP2) {
Http2StreamManagerOptions h2Options = new Http2StreamManagerOptions()
.withMaxConcurrentStreams(maxStreamsPerEndpoint)
.withConnectionManagerOptions(h1Options);

if (!isHttps) {
h2Options.withPriorKnowledge(true);
}

options.withHTTP2StreamManagerOptions(h2Options);
options.withExpectedProtocol(HttpVersion.HTTP_2);
} else {
options.withExpectedProtocol(HttpVersion.HTTP_1_1);
}

return HttpStreamManager.create(options);
}

/*
Expand All @@ -153,14 +180,13 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
* existing pool. If we add all of execute() to the scope, we include, at minimum a JNI call to the native
* pool implementation.
*/
HttpClientConnectionManager getOrCreateConnectionPool(URI uri) {
HttpStreamManager getOrCreateConnectionPool(URI uri) {
synchronized (this) {
if (isClosed) {
throw new IllegalStateException("Client is closed. No more requests can be made with this client.");
}

HttpClientConnectionManager connPool = connectionPools.computeIfAbsent(uri, this::createConnectionPool);
connPool.addRef();
HttpStreamManager connPool = connectionPools.computeIfAbsent(uri, this::createConnectionPool);
return connPool;
}
}
Expand Down
Loading
Loading