diff --git a/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpAgentSession.java b/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpAgentSession.java index a0168fd..98e4ad6 100644 --- a/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpAgentSession.java +++ b/acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpAgentSession.java @@ -10,7 +10,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -78,13 +77,14 @@ public class AcpAgentSession implements AcpSession { private final AtomicLong requestCounter = new AtomicLong(0); /** - * Active prompt tracking for single-turn enforcement. - * Only ONE prompt can be active at a time per ACP session. + * Active prompt tracking for single-turn enforcement per session. + * Only ONE prompt can be active at a time per ACP logical session, + * but different sessions can run prompts concurrently. */ - private final AtomicReference activePrompt = new AtomicReference<>(null); + private final ConcurrentHashMap activePrompts = new ConcurrentHashMap<>(); /** - * Represents an active prompt session for single-turn enforcement. + * Represents an active prompt for single-turn enforcement. */ private record ActivePrompt(String sessionId, Object requestId) { } @@ -229,27 +229,25 @@ private Mono handleIncomingRequest(AcpSchema.JSONRPCR new AcpSchema.JSONRPCError(-32601, error.message(), error.data()))); } - // Single-turn enforcement for session/prompt requests + // Per-session single-turn enforcement for prompt requests. + // Different sessions can run prompts concurrently; only reject + // when the same session already has an active prompt. if (AcpSchema.METHOD_SESSION_PROMPT.equals(request.method())) { - // Extract sessionId from params String sessionId = extractSessionId(request.params()); ActivePrompt newPrompt = new ActivePrompt(sessionId, request.id()); - // Try to set as active prompt - fails if another prompt is active - if (!activePrompt.compareAndSet(null, newPrompt)) { - ActivePrompt current = activePrompt.get(); - logger.warn("Rejected concurrent prompt request. Active prompt: sessionId={}, requestId={}", - current != null ? current.sessionId() : "unknown", - current != null ? current.requestId() : "unknown"); + ActivePrompt existing = activePrompts.putIfAbsent(sessionId, newPrompt); + if (existing != null) { + logger.warn("Rejected concurrent prompt request for same session. Active prompt: sessionId={}, requestId={}", + existing.sessionId(), existing.requestId()); return Mono.just(new AcpSchema.JSONRPCResponse(AcpSchema.JSONRPC_VERSION, request.id(), null, - new AcpSchema.JSONRPCError(-32000, "There is already an active prompt execution", null))); + new AcpSchema.JSONRPCError(-32000, "There is already an active prompt execution for this session", null))); } - // Execute handler and clear active prompt when done return handler.handle(request.params()) .map(result -> new AcpSchema.JSONRPCResponse(AcpSchema.JSONRPC_VERSION, request.id(), result, null)) .doFinally(signal -> { - activePrompt.compareAndSet(newPrompt, null); + activePrompts.remove(sessionId, newPrompt); logger.debug("Prompt completed with signal: {}", signal); }); } @@ -289,9 +287,7 @@ private Mono handleIncomingNotification(AcpSchema.JSONRPCNotification noti // Handle cancel notification specially if (AcpSchema.METHOD_SESSION_CANCEL.equals(notification.method())) { String sessionId = extractSessionId(notification.params()); - ActivePrompt current = activePrompt.get(); - if (current != null && sessionId.equals(current.sessionId())) { - activePrompt.compareAndSet(current, null); + if (activePrompts.remove(sessionId) != null) { logger.debug("Cancelled active prompt for session: {}", sessionId); } } @@ -369,18 +365,18 @@ public Mono sendNotification(String method, Object params) { /** * Checks if there is an active prompt being processed. - * @return true if a prompt is currently active + * @return true if any prompt is currently active */ public boolean hasActivePrompt() { - return activePrompt.get() != null; + return !activePrompts.isEmpty(); } /** - * Gets the session ID of the active prompt, if any. - * @return the session ID or null if no prompt is active + * Gets the session ID of an active prompt, if any. + * @return a session ID with an active prompt, or null if none are active */ public String getActivePromptSessionId() { - ActivePrompt current = activePrompt.get(); + ActivePrompt current = activePrompts.values().stream().findFirst().orElse(null); return current != null ? current.sessionId() : null; } @@ -391,7 +387,7 @@ public String getActivePromptSessionId() { @Override public Mono closeGracefully() { return Mono.fromRunnable(() -> { - activePrompt.set(null); + activePrompts.clear(); dismissPendingResponses(); timeoutScheduler.dispose(); }).then(this.transport.closeGracefully()); @@ -402,7 +398,7 @@ public Mono closeGracefully() { */ @Override public void close() { - activePrompt.set(null); + activePrompts.clear(); dismissPendingResponses(); timeoutScheduler.dispose(); transport.close(); diff --git a/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpAgentSessionTest.java b/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpAgentSessionTest.java index 4ce255d..7f830f4 100644 --- a/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpAgentSessionTest.java +++ b/acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpAgentSessionTest.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import com.agentclientprotocol.sdk.test.InMemoryTransportPair; @@ -178,11 +179,11 @@ void singleTurnEnforcementRejectsConcurrentPrompts() throws Exception { Thread.sleep(100); // Manually set active prompt to simulate an in-progress prompt - // We use reflection to access the activePrompt field for testing - java.lang.reflect.Field activePromptField = AcpAgentSession.class.getDeclaredField("activePrompt"); - activePromptField.setAccessible(true); + // We use reflection to access the activePrompts field for testing + java.lang.reflect.Field activePromptsField = AcpAgentSession.class.getDeclaredField("activePrompts"); + activePromptsField.setAccessible(true); @SuppressWarnings("unchecked") - AtomicReference activePromptRef = (AtomicReference) activePromptField.get(session); + ConcurrentHashMap activePromptsRef = (ConcurrentHashMap) activePromptsField.get(session); // Create an ActivePrompt instance using reflection Class activePromptClass = Class.forName( @@ -191,7 +192,7 @@ void singleTurnEnforcementRejectsConcurrentPrompts() throws Exception { Object.class); constructor.setAccessible(true); Object activePrompt = constructor.newInstance("session-1", "existing-request-id"); - activePromptRef.set(activePrompt); + activePromptsRef.put("session-1", activePrompt); // Verify active prompt is set assertThat(session.hasActivePrompt()).isTrue(); @@ -246,10 +247,10 @@ void hasActivePromptReturnsCorrectState() throws Exception { assertThat(session.getActivePromptSessionId()).isNull(); // Manually set active prompt using reflection to test the getter methods - java.lang.reflect.Field activePromptField = AcpAgentSession.class.getDeclaredField("activePrompt"); - activePromptField.setAccessible(true); + java.lang.reflect.Field activePromptsField = AcpAgentSession.class.getDeclaredField("activePrompts"); + activePromptsField.setAccessible(true); @SuppressWarnings("unchecked") - AtomicReference activePromptRef = (AtomicReference) activePromptField.get(session); + ConcurrentHashMap activePromptsRef = (ConcurrentHashMap) activePromptsField.get(session); // Create an ActivePrompt instance using reflection Class activePromptClass = Class.forName( @@ -258,14 +259,14 @@ void hasActivePromptReturnsCorrectState() throws Exception { Object.class); constructor.setAccessible(true); Object activePrompt = constructor.newInstance("session-1", "request-1"); - activePromptRef.set(activePrompt); + activePromptsRef.put("session-1", activePrompt); // Now there should be an active prompt assertThat(session.hasActivePrompt()).isTrue(); assertThat(session.getActivePromptSessionId()).isEqualTo("session-1"); // Clear active prompt - activePromptRef.set(null); + activePromptsRef.clear(); // Active prompt should be cleared assertThat(session.hasActivePrompt()).isFalse();