Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -78,13 +77,14 @@ public class AcpAgentSession implements AcpSession {
private final AtomicLong requestCounter = new AtomicLong(0);

/**
* Active prompt tracking for single-turn enforcement.
* Only ONE prompt can be active at a time per ACP session.
* Active prompt tracking for single-turn enforcement per session.
* Only ONE prompt can be active at a time per ACP logical session,
* but different sessions can run prompts concurrently.
*/
private final AtomicReference<ActivePrompt> activePrompt = new AtomicReference<>(null);
private final ConcurrentHashMap<String, ActivePrompt> activePrompts = new ConcurrentHashMap<>();

/**
* Represents an active prompt session for single-turn enforcement.
* Represents an active prompt for single-turn enforcement.
*/
private record ActivePrompt(String sessionId, Object requestId) {
}
Expand Down Expand Up @@ -229,27 +229,25 @@ private Mono<AcpSchema.JSONRPCResponse> handleIncomingRequest(AcpSchema.JSONRPCR
new AcpSchema.JSONRPCError(-32601, error.message(), error.data())));
}

// Single-turn enforcement for session/prompt requests
// Per-session single-turn enforcement for prompt requests.
// Different sessions can run prompts concurrently; only reject
// when the same session already has an active prompt.
if (AcpSchema.METHOD_SESSION_PROMPT.equals(request.method())) {
// Extract sessionId from params
String sessionId = extractSessionId(request.params());
ActivePrompt newPrompt = new ActivePrompt(sessionId, request.id());

// Try to set as active prompt - fails if another prompt is active
if (!activePrompt.compareAndSet(null, newPrompt)) {
ActivePrompt current = activePrompt.get();
logger.warn("Rejected concurrent prompt request. Active prompt: sessionId={}, requestId={}",
current != null ? current.sessionId() : "unknown",
current != null ? current.requestId() : "unknown");
ActivePrompt existing = activePrompts.putIfAbsent(sessionId, newPrompt);
if (existing != null) {
logger.warn("Rejected concurrent prompt request for same session. Active prompt: sessionId={}, requestId={}",
existing.sessionId(), existing.requestId());
return Mono.just(new AcpSchema.JSONRPCResponse(AcpSchema.JSONRPC_VERSION, request.id(), null,
new AcpSchema.JSONRPCError(-32000, "There is already an active prompt execution", null)));
new AcpSchema.JSONRPCError(-32000, "There is already an active prompt execution for this session", null)));
}

// Execute handler and clear active prompt when done
return handler.handle(request.params())
.map(result -> new AcpSchema.JSONRPCResponse(AcpSchema.JSONRPC_VERSION, request.id(), result, null))
.doFinally(signal -> {
activePrompt.compareAndSet(newPrompt, null);
activePrompts.remove(sessionId, newPrompt);
logger.debug("Prompt completed with signal: {}", signal);
});
}
Expand Down Expand Up @@ -289,9 +287,7 @@ private Mono<Void> handleIncomingNotification(AcpSchema.JSONRPCNotification noti
// Handle cancel notification specially
if (AcpSchema.METHOD_SESSION_CANCEL.equals(notification.method())) {
String sessionId = extractSessionId(notification.params());
ActivePrompt current = activePrompt.get();
if (current != null && sessionId.equals(current.sessionId())) {
activePrompt.compareAndSet(current, null);
if (activePrompts.remove(sessionId) != null) {
logger.debug("Cancelled active prompt for session: {}", sessionId);
}
}
Expand Down Expand Up @@ -369,18 +365,18 @@ public Mono<Void> sendNotification(String method, Object params) {

/**
* Checks if there is an active prompt being processed.
* @return true if a prompt is currently active
* @return true if any prompt is currently active
*/
public boolean hasActivePrompt() {
return activePrompt.get() != null;
return !activePrompts.isEmpty();
}

/**
* Gets the session ID of the active prompt, if any.
* @return the session ID or null if no prompt is active
* Gets the session ID of an active prompt, if any.
* @return a session ID with an active prompt, or null if none are active
*/
public String getActivePromptSessionId() {
ActivePrompt current = activePrompt.get();
ActivePrompt current = activePrompts.values().stream().findFirst().orElse(null);
return current != null ? current.sessionId() : null;
}

Expand All @@ -391,7 +387,7 @@ public String getActivePromptSessionId() {
@Override
public Mono<Void> closeGracefully() {
return Mono.fromRunnable(() -> {
activePrompt.set(null);
activePrompts.clear();
dismissPendingResponses();
timeoutScheduler.dispose();
}).then(this.transport.closeGracefully());
Expand All @@ -402,7 +398,7 @@ public Mono<Void> closeGracefully() {
*/
@Override
public void close() {
activePrompt.set(null);
activePrompts.clear();
dismissPendingResponses();
timeoutScheduler.dispose();
transport.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

import com.agentclientprotocol.sdk.test.InMemoryTransportPair;
Expand Down Expand Up @@ -178,11 +179,11 @@ void singleTurnEnforcementRejectsConcurrentPrompts() throws Exception {
Thread.sleep(100);

// Manually set active prompt to simulate an in-progress prompt
// We use reflection to access the activePrompt field for testing
java.lang.reflect.Field activePromptField = AcpAgentSession.class.getDeclaredField("activePrompt");
activePromptField.setAccessible(true);
// We use reflection to access the activePrompts field for testing
java.lang.reflect.Field activePromptsField = AcpAgentSession.class.getDeclaredField("activePrompts");
activePromptsField.setAccessible(true);
@SuppressWarnings("unchecked")
AtomicReference<Object> activePromptRef = (AtomicReference<Object>) activePromptField.get(session);
ConcurrentHashMap<String, Object> activePromptsRef = (ConcurrentHashMap<String, Object>) activePromptsField.get(session);

// Create an ActivePrompt instance using reflection
Class<?> activePromptClass = Class.forName(
Expand All @@ -191,7 +192,7 @@ void singleTurnEnforcementRejectsConcurrentPrompts() throws Exception {
Object.class);
constructor.setAccessible(true);
Object activePrompt = constructor.newInstance("session-1", "existing-request-id");
activePromptRef.set(activePrompt);
activePromptsRef.put("session-1", activePrompt);

// Verify active prompt is set
assertThat(session.hasActivePrompt()).isTrue();
Expand Down Expand Up @@ -246,10 +247,10 @@ void hasActivePromptReturnsCorrectState() throws Exception {
assertThat(session.getActivePromptSessionId()).isNull();

// Manually set active prompt using reflection to test the getter methods
java.lang.reflect.Field activePromptField = AcpAgentSession.class.getDeclaredField("activePrompt");
activePromptField.setAccessible(true);
java.lang.reflect.Field activePromptsField = AcpAgentSession.class.getDeclaredField("activePrompts");
activePromptsField.setAccessible(true);
@SuppressWarnings("unchecked")
AtomicReference<Object> activePromptRef = (AtomicReference<Object>) activePromptField.get(session);
ConcurrentHashMap<String, Object> activePromptsRef = (ConcurrentHashMap<String, Object>) activePromptsField.get(session);

// Create an ActivePrompt instance using reflection
Class<?> activePromptClass = Class.forName(
Expand All @@ -258,14 +259,14 @@ void hasActivePromptReturnsCorrectState() throws Exception {
Object.class);
constructor.setAccessible(true);
Object activePrompt = constructor.newInstance("session-1", "request-1");
activePromptRef.set(activePrompt);
activePromptsRef.put("session-1", activePrompt);

// Now there should be an active prompt
assertThat(session.hasActivePrompt()).isTrue();
assertThat(session.getActivePromptSessionId()).isEqualTo("session-1");

// Clear active prompt
activePromptRef.set(null);
activePromptsRef.clear();

// Active prompt should be cleared
assertThat(session.hasActivePrompt()).isFalse();
Expand Down