2828package org .apache .hc .client5 .http .impl .async ;
2929
3030import java .io .InterruptedIOException ;
31+ import java .lang .reflect .Proxy ;
32+ import java .util .concurrent .ConcurrentHashMap ;
33+ import java .util .concurrent .ConcurrentMap ;
34+ import java .util .concurrent .RejectedExecutionException ;
35+ import java .util .concurrent .atomic .AtomicBoolean ;
36+ import java .util .concurrent .atomic .AtomicInteger ;
3137import java .util .concurrent .atomic .AtomicReference ;
3238
3339import org .apache .hc .client5 .http .EndpointInfo ;
@@ -66,6 +72,8 @@ static class ReUseData {
6672
6773 }
6874
75+ private static final ConcurrentMap <AsyncClientConnectionManager , AtomicInteger > QUEUE_COUNTERS = new ConcurrentHashMap <>();
76+
6977 private final Logger log ;
7078 private final AsyncClientConnectionManager manager ;
7179 private final ConnectionInitiator connectionInitiator ;
@@ -77,13 +85,25 @@ static class ReUseData {
7785 private final TlsConfig tlsConfig ;
7886 private final AtomicReference <AsyncConnectionEndpoint > endpointRef ;
7987 private final AtomicReference <ReUseData > reuseDataRef ;
88+ private final int maxQueued ;
89+ private final AtomicInteger sharedQueued ;
8090
8191 InternalHttpAsyncExecRuntime (
8292 final Logger log ,
8393 final AsyncClientConnectionManager manager ,
8494 final ConnectionInitiator connectionInitiator ,
8595 final HandlerFactory <AsyncPushConsumer > pushHandlerFactory ,
8696 final TlsConfig tlsConfig ) {
97+ this (log , manager , connectionInitiator , pushHandlerFactory , tlsConfig , -1 );
98+ }
99+
100+ InternalHttpAsyncExecRuntime (
101+ final Logger log ,
102+ final AsyncClientConnectionManager manager ,
103+ final ConnectionInitiator connectionInitiator ,
104+ final HandlerFactory <AsyncPushConsumer > pushHandlerFactory ,
105+ final TlsConfig tlsConfig ,
106+ final int maxQueued ) {
87107 super ();
88108 this .log = log ;
89109 this .manager = manager ;
@@ -92,6 +112,8 @@ static class ReUseData {
92112 this .tlsConfig = tlsConfig ;
93113 this .endpointRef = new AtomicReference <>();
94114 this .reuseDataRef = new AtomicReference <>();
115+ this .maxQueued = maxQueued ;
116+ this .sharedQueued = maxQueued > 0 ? QUEUE_COUNTERS .computeIfAbsent (manager , m -> new AtomicInteger (0 )) : null ;
95117 }
96118
97119 @ Override
@@ -282,10 +304,60 @@ public EndpointInfo getEndpointInfo() {
282304 return endpoint != null ? endpoint .getInfo () : null ;
283305 }
284306
307+ private boolean tryAcquireSlot () {
308+ if (sharedQueued == null ) {
309+ return true ;
310+ }
311+ for (; ; ) {
312+ final int q = sharedQueued .get ();
313+ if (q >= maxQueued ) {
314+ return false ;
315+ }
316+ if (sharedQueued .compareAndSet (q , q + 1 )) {
317+ return true ;
318+ }
319+ }
320+ }
321+
322+ private void releaseSlot () {
323+ if (sharedQueued != null ) {
324+ sharedQueued .decrementAndGet ();
325+ }
326+ }
327+
328+ private AsyncClientExchangeHandler guard (final AsyncClientExchangeHandler handler ) {
329+ if (sharedQueued == null ) {
330+ return handler ;
331+ }
332+ final AtomicBoolean released = new AtomicBoolean (false );
333+ return (AsyncClientExchangeHandler ) Proxy .newProxyInstance (
334+ AsyncClientExchangeHandler .class .getClassLoader (),
335+ new Class <?>[]{AsyncClientExchangeHandler .class },
336+ (proxy , method , args ) -> {
337+ if ("releaseResources" .equals (method .getName ())
338+ && method .getParameterCount () == 0 ) {
339+ try {
340+ return method .invoke (handler , args );
341+ } finally {
342+ if (released .compareAndSet (false , true )) {
343+ releaseSlot ();
344+ }
345+ }
346+ }
347+ return method .invoke (handler , args );
348+ });
349+ }
350+
285351 @ Override
286352 public Cancellable execute (
287353 final String id , final AsyncClientExchangeHandler exchangeHandler , final HttpClientContext context ) {
288354 final AsyncConnectionEndpoint endpoint = ensureValid ();
355+ if (sharedQueued != null && !tryAcquireSlot ()) {
356+ exchangeHandler .failed (new RejectedExecutionException (
357+ "Execution pipeline queue limit reached (max=" + maxQueued + ")" ));
358+ return Operations .nonCancellable ();
359+ }
360+ final AsyncClientExchangeHandler actual = sharedQueued != null ? guard (exchangeHandler ) : exchangeHandler ;
289361 if (endpoint .isConnected ()) {
290362 if (log .isDebugEnabled ()) {
291363 log .debug ("{} start execution {}" , ConnPoolSupport .getId (endpoint ), id );
@@ -295,10 +367,10 @@ public Cancellable execute(
295367 if (responseTimeout != null ) {
296368 endpoint .setSocketTimeout (responseTimeout );
297369 }
298- endpoint .execute (id , exchangeHandler , pushHandlerFactory , context );
370+ endpoint .execute (id , actual , pushHandlerFactory , context );
299371 if (context .getRequestConfigOrDefault ().isHardCancellationEnabled ()) {
300372 return () -> {
301- exchangeHandler .cancel ();
373+ actual .cancel ();
302374 return true ;
303375 };
304376 }
@@ -311,20 +383,20 @@ public void completed(final AsyncExecRuntime runtime) {
311383 log .debug ("{} start execution {}" , ConnPoolSupport .getId (endpoint ), id );
312384 }
313385 try {
314- endpoint .execute (id , exchangeHandler , pushHandlerFactory , context );
386+ endpoint .execute (id , actual , pushHandlerFactory , context );
315387 } catch (final RuntimeException ex ) {
316388 failed (ex );
317389 }
318390 }
319391
320392 @ Override
321393 public void failed (final Exception ex ) {
322- exchangeHandler .failed (ex );
394+ actual .failed (ex );
323395 }
324396
325397 @ Override
326398 public void cancelled () {
327- exchangeHandler .failed (new InterruptedIOException ());
399+ actual .failed (new InterruptedIOException ());
328400 }
329401
330402 });
@@ -344,7 +416,7 @@ public void markConnectionNonReusable() {
344416
345417 @ Override
346418 public AsyncExecRuntime fork () {
347- return new InternalHttpAsyncExecRuntime (log , manager , connectionInitiator , pushHandlerFactory , tlsConfig );
419+ return new InternalHttpAsyncExecRuntime (log , manager , connectionInitiator , pushHandlerFactory , tlsConfig , maxQueued );
348420 }
349421
350422}
0 commit comments