diff --git a/src/main/java/dev/axme/sdk/AxmeClient.java b/src/main/java/dev/axme/sdk/AxmeClient.java index da794a0..1585201 100644 --- a/src/main/java/dev/axme/sdk/AxmeClient.java +++ b/src/main/java/dev/axme/sdk/AxmeClient.java @@ -9,8 +9,11 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; public final class AxmeClient { @@ -733,6 +736,100 @@ public Map mcpCallTool(String name, Map argument return result instanceof Map ? (Map) result : response; } + private static final Set TERMINAL_STATUSES = + Set.of("COMPLETED", "FAILED", "CANCELED", "TIMED_OUT"); + + private static final Set TERMINAL_EVENT_TYPES = + Set.of("intent.completed", "intent.failed", "intent.canceled", "intent.timed_out"); + + /** + * Polls {@code listIntentEvents} in a loop and returns all events up to and + * including the first terminal event. + * + *

A terminal event is one whose {@code status} field is in + * {@code COMPLETED, FAILED, CANCELED, TIMED_OUT} or whose {@code event_type} + * field is {@code intent.completed, intent.failed, intent.canceled, intent.timed_out}. + * + * @param intentId the intent to observe + * @param options polling options (since, interval, timeout); may be {@code null} + * @return list of event maps in arrival order, ending with the terminal event + * @throws AxmeTimeoutException if {@code timeoutSeconds} is set and elapsed + * @throws IOException on HTTP transport errors + * @throws InterruptedException if the polling thread is interrupted + */ + @SuppressWarnings("unchecked") + public List> observe(String intentId, ObserveOptions options) + throws IOException, InterruptedException { + ObserveOptions opts = options != null ? options : ObserveOptions.defaults(); + int nextSince = opts.getSince(); + long startNanos = System.nanoTime(); + List> collected = new ArrayList<>(); + + while (true) { + if (opts.getTimeoutSeconds() != null) { + double elapsed = (System.nanoTime() - startNanos) / 1_000_000_000.0; + if (elapsed >= opts.getTimeoutSeconds()) { + throw new AxmeTimeoutException(intentId, opts.getTimeoutSeconds()); + } + } + + Map response = listIntentEvents(intentId, nextSince, RequestOptions.none()); + Object eventsObj = response.get("events"); + List> events = + eventsObj instanceof List ? (List>) eventsObj : List.of(); + + for (Map event : events) { + nextSince = maxSeenSeq(nextSince, event); + collected.add(event); + if (isTerminalIntentEvent(event)) { + return collected; + } + } + + if (events.isEmpty()) { + long sleepMillis = (long) (opts.getPollIntervalSeconds() * 1000); + Thread.sleep(sleepMillis); + } + } + } + + /** + * Polls until a terminal event is seen and returns it. + * + *

This is a convenience wrapper around {@link #observe} that discards + * intermediate events and returns only the terminal one. + * + * @param intentId the intent to wait for + * @param options polling options; may be {@code null} + * @return the terminal event map + * @throws AxmeTimeoutException if {@code timeoutSeconds} is set and elapsed + * @throws IOException on HTTP transport errors + * @throws InterruptedException if the polling thread is interrupted + */ + public Map waitFor(String intentId, ObserveOptions options) + throws IOException, InterruptedException { + List> events = observe(intentId, options); + return events.get(events.size() - 1); + } + + private static int maxSeenSeq(int currentMax, Map event) { + Object seqObj = event.get("seq"); + if (seqObj instanceof Number) { + int seq = ((Number) seqObj).intValue(); + return Math.max(currentMax, seq); + } + return currentMax; + } + + private static boolean isTerminalIntentEvent(Map event) { + Object status = event.get("status"); + if (status instanceof String && TERMINAL_STATUSES.contains(status)) { + return true; + } + Object eventType = event.get("event_type"); + return eventType instanceof String && TERMINAL_EVENT_TYPES.contains(eventType); + } + private Map requestJson( String method, String path, diff --git a/src/main/java/dev/axme/sdk/AxmeTimeoutException.java b/src/main/java/dev/axme/sdk/AxmeTimeoutException.java new file mode 100644 index 0000000..d8d95fe --- /dev/null +++ b/src/main/java/dev/axme/sdk/AxmeTimeoutException.java @@ -0,0 +1,24 @@ +package dev.axme.sdk; + +/** + * Thrown when an {@link AxmeClient#observe} or {@link AxmeClient#waitFor} call + * exceeds its configured timeout. + */ +public final class AxmeTimeoutException extends RuntimeException { + private final String intentId; + private final double timeoutSeconds; + + public AxmeTimeoutException(String intentId, double timeoutSeconds) { + super("observe timed out after " + timeoutSeconds + "s for intent " + intentId); + this.intentId = intentId; + this.timeoutSeconds = timeoutSeconds; + } + + public String getIntentId() { + return intentId; + } + + public double getTimeoutSeconds() { + return timeoutSeconds; + } +} diff --git a/src/main/java/dev/axme/sdk/ObserveOptions.java b/src/main/java/dev/axme/sdk/ObserveOptions.java new file mode 100644 index 0000000..9d9ff4e --- /dev/null +++ b/src/main/java/dev/axme/sdk/ObserveOptions.java @@ -0,0 +1,42 @@ +package dev.axme.sdk; + +/** + * Options for {@link AxmeClient#observe} and {@link AxmeClient#waitFor} polling methods. + */ +public final class ObserveOptions { + private final int since; + private final double pollIntervalSeconds; + private final Double timeoutSeconds; + + public ObserveOptions() { + this(0, 1.0, null); + } + + public ObserveOptions(int since, double pollIntervalSeconds, Double timeoutSeconds) { + if (pollIntervalSeconds <= 0) { + throw new IllegalArgumentException("pollIntervalSeconds must be positive"); + } + if (timeoutSeconds != null && timeoutSeconds <= 0) { + throw new IllegalArgumentException("timeoutSeconds must be positive"); + } + this.since = since; + this.pollIntervalSeconds = pollIntervalSeconds; + this.timeoutSeconds = timeoutSeconds; + } + + public int getSince() { + return since; + } + + public double getPollIntervalSeconds() { + return pollIntervalSeconds; + } + + public Double getTimeoutSeconds() { + return timeoutSeconds; + } + + public static ObserveOptions defaults() { + return new ObserveOptions(); + } +} diff --git a/src/test/java/dev/axme/sdk/AxmeClientTest.java b/src/test/java/dev/axme/sdk/AxmeClientTest.java index ead1665..ee92524 100644 --- a/src/test/java/dev/axme/sdk/AxmeClientTest.java +++ b/src/test/java/dev/axme/sdk/AxmeClientTest.java @@ -1,11 +1,13 @@ package dev.axme.sdk; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; import java.util.Map; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; @@ -429,4 +431,133 @@ void organizationRoutingDeliveryAndBillingEndpointsAreReachable() throws Excepti assertEquals("/v1/billing/invoices?org_id=org_1&workspace_id=ws_1&status=open", server.takeRequest().getPath()); assertEquals("/v1/billing/invoices/inv_1", server.takeRequest().getPath()); } + + @Test + void observeCollectsEventsAndStopsAtTerminal() throws Exception { + // First poll: two non-terminal events + server.enqueue(new MockResponse().setResponseCode(200).setBody( + "{\"ok\":true,\"events\":[" + + "{\"seq\":1,\"event_type\":\"intent.created\",\"status\":\"PENDING\"}," + + "{\"seq\":2,\"event_type\":\"intent.dispatched\",\"status\":\"IN_PROGRESS\"}" + + "]}")); + // Second poll: empty (triggers sleep + re-poll) + server.enqueue(new MockResponse().setResponseCode(200).setBody( + "{\"ok\":true,\"events\":[]}")); + // Third poll: terminal event + server.enqueue(new MockResponse().setResponseCode(200).setBody( + "{\"ok\":true,\"events\":[" + + "{\"seq\":3,\"event_type\":\"intent.completed\",\"status\":\"COMPLETED\"}" + + "]}")); + + List> events = client.observe("it_obs_1", + new ObserveOptions(0, 0.05, null)); + + assertEquals(3, events.size()); + assertEquals("intent.created", events.get(0).get("event_type")); + assertEquals("intent.dispatched", events.get(1).get("event_type")); + assertEquals("intent.completed", events.get(2).get("event_type")); + assertEquals("COMPLETED", events.get(2).get("status")); + + // Verify since parameter advances: first call since=0, third since=2 + RecordedRequest req1 = server.takeRequest(); + assertEquals("/v1/intents/it_obs_1/events?since=0", req1.getPath()); + RecordedRequest req2 = server.takeRequest(); + assertEquals("/v1/intents/it_obs_1/events?since=2", req2.getPath()); + RecordedRequest req3 = server.takeRequest(); + assertEquals("/v1/intents/it_obs_1/events?since=2", req3.getPath()); + } + + @Test + void observeStopsOnEventTypeAlone() throws Exception { + // Terminal via event_type only (no status field) + server.enqueue(new MockResponse().setResponseCode(200).setBody( + "{\"ok\":true,\"events\":[" + + "{\"seq\":1,\"event_type\":\"intent.failed\"}" + + "]}")); + + List> events = client.observe("it_obs_2", + new ObserveOptions(0, 0.05, null)); + + assertEquals(1, events.size()); + assertEquals("intent.failed", events.get(0).get("event_type")); + } + + @Test + void observeStopsOnStatusAlone() throws Exception { + // Terminal via status only (no event_type field) + server.enqueue(new MockResponse().setResponseCode(200).setBody( + "{\"ok\":true,\"events\":[" + + "{\"seq\":1,\"status\":\"CANCELED\"}" + + "]}")); + + List> events = client.observe("it_obs_3", + new ObserveOptions(0, 0.05, null)); + + assertEquals(1, events.size()); + assertEquals("CANCELED", events.get(0).get("status")); + } + + @Test + void observeTimesOut() { + // Enqueue enough empty responses to keep polling + for (int i = 0; i < 20; i++) { + server.enqueue(new MockResponse().setResponseCode(200).setBody( + "{\"ok\":true,\"events\":[]}")); + } + + AxmeTimeoutException ex = assertThrows(AxmeTimeoutException.class, () -> + client.observe("it_timeout", new ObserveOptions(0, 0.05, 0.1))); + + assertEquals("it_timeout", ex.getIntentId()); + assertTrue(ex.getTimeoutSeconds() > 0); + } + + @Test + void waitForReturnsTerminalEvent() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200).setBody( + "{\"ok\":true,\"events\":[" + + "{\"seq\":1,\"event_type\":\"intent.created\",\"status\":\"PENDING\"}" + + "]}")); + server.enqueue(new MockResponse().setResponseCode(200).setBody( + "{\"ok\":true,\"events\":[" + + "{\"seq\":2,\"event_type\":\"intent.timed_out\",\"status\":\"TIMED_OUT\"}" + + "]}")); + + Map terminal = client.waitFor("it_wait_1", + new ObserveOptions(0, 0.05, null)); + + assertNotNull(terminal); + assertEquals("TIMED_OUT", terminal.get("status")); + assertEquals("intent.timed_out", terminal.get("event_type")); + } + + @Test + void observeWithSinceParameter() throws Exception { + // Start from since=5, verify first request uses since=5 + server.enqueue(new MockResponse().setResponseCode(200).setBody( + "{\"ok\":true,\"events\":[" + + "{\"seq\":6,\"event_type\":\"intent.completed\",\"status\":\"COMPLETED\"}" + + "]}")); + + List> events = client.observe("it_since", + new ObserveOptions(5, 0.05, null)); + + assertEquals(1, events.size()); + RecordedRequest req = server.takeRequest(); + assertEquals("/v1/intents/it_since/events?since=5", req.getPath()); + } + + @Test + void observeWithNullOptionsUsesDefaults() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200).setBody( + "{\"ok\":true,\"events\":[" + + "{\"seq\":1,\"event_type\":\"intent.completed\",\"status\":\"COMPLETED\"}" + + "]}")); + + List> events = client.observe("it_null_opts", null); + + assertEquals(1, events.size()); + RecordedRequest req = server.takeRequest(); + assertEquals("/v1/intents/it_null_opts/events?since=0", req.getPath()); + } }