Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions src/main/java/dev/axme/sdk/AxmeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

public final class AxmeClient {
Expand Down Expand Up @@ -733,6 +736,100 @@ public Map<String, Object> mcpCallTool(String name, Map<String, Object> argument
return result instanceof Map ? (Map<String, Object>) result : response;
}

private static final Set<String> TERMINAL_STATUSES =
Set.of("COMPLETED", "FAILED", "CANCELED", "TIMED_OUT");

private static final Set<String> TERMINAL_EVENT_TYPES =
Set.of("intent.completed", "intent.failed", "intent.canceled", "intent.timed_out");

/**
* Polls {@code listIntentEvents} in a loop and returns all events up to and
* including the first terminal event.
*
* <p>A terminal event is one whose {@code status} field is in
* {@code COMPLETED, FAILED, CANCELED, TIMED_OUT} or whose {@code event_type}
* field is {@code intent.completed, intent.failed, intent.canceled, intent.timed_out}.
*
* @param intentId the intent to observe
* @param options polling options (since, interval, timeout); may be {@code null}
* @return list of event maps in arrival order, ending with the terminal event
* @throws AxmeTimeoutException if {@code timeoutSeconds} is set and elapsed
* @throws IOException on HTTP transport errors
* @throws InterruptedException if the polling thread is interrupted
*/
@SuppressWarnings("unchecked")
public List<Map<String, Object>> observe(String intentId, ObserveOptions options)
throws IOException, InterruptedException {
ObserveOptions opts = options != null ? options : ObserveOptions.defaults();
int nextSince = opts.getSince();
long startNanos = System.nanoTime();
List<Map<String, Object>> collected = new ArrayList<>();

while (true) {
if (opts.getTimeoutSeconds() != null) {
double elapsed = (System.nanoTime() - startNanos) / 1_000_000_000.0;
if (elapsed >= opts.getTimeoutSeconds()) {
throw new AxmeTimeoutException(intentId, opts.getTimeoutSeconds());
}
}

Map<String, Object> response = listIntentEvents(intentId, nextSince, RequestOptions.none());
Object eventsObj = response.get("events");
List<Map<String, Object>> events =
eventsObj instanceof List ? (List<Map<String, Object>>) eventsObj : List.of();

for (Map<String, Object> event : events) {
nextSince = maxSeenSeq(nextSince, event);
collected.add(event);
if (isTerminalIntentEvent(event)) {
return collected;
}
}

if (events.isEmpty()) {
long sleepMillis = (long) (opts.getPollIntervalSeconds() * 1000);
Thread.sleep(sleepMillis);
}
}
}

/**
* Polls until a terminal event is seen and returns it.
*
* <p>This is a convenience wrapper around {@link #observe} that discards
* intermediate events and returns only the terminal one.
*
* @param intentId the intent to wait for
* @param options polling options; may be {@code null}
* @return the terminal event map
* @throws AxmeTimeoutException if {@code timeoutSeconds} is set and elapsed
* @throws IOException on HTTP transport errors
* @throws InterruptedException if the polling thread is interrupted
*/
public Map<String, Object> waitFor(String intentId, ObserveOptions options)
throws IOException, InterruptedException {
List<Map<String, Object>> events = observe(intentId, options);
return events.get(events.size() - 1);
}

private static int maxSeenSeq(int currentMax, Map<String, Object> event) {
Object seqObj = event.get("seq");
if (seqObj instanceof Number) {
int seq = ((Number) seqObj).intValue();
return Math.max(currentMax, seq);
}
return currentMax;
}

private static boolean isTerminalIntentEvent(Map<String, Object> event) {
Object status = event.get("status");
if (status instanceof String && TERMINAL_STATUSES.contains(status)) {
return true;
}
Object eventType = event.get("event_type");
return eventType instanceof String && TERMINAL_EVENT_TYPES.contains(eventType);
}

private Map<String, Object> requestJson(
String method,
String path,
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/dev/axme/sdk/AxmeTimeoutException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package dev.axme.sdk;

/**
* Thrown when an {@link AxmeClient#observe} or {@link AxmeClient#waitFor} call
* exceeds its configured timeout.
*/
public final class AxmeTimeoutException extends RuntimeException {
private final String intentId;
private final double timeoutSeconds;

public AxmeTimeoutException(String intentId, double timeoutSeconds) {
super("observe timed out after " + timeoutSeconds + "s for intent " + intentId);
this.intentId = intentId;
this.timeoutSeconds = timeoutSeconds;
}

public String getIntentId() {
return intentId;
}

public double getTimeoutSeconds() {
return timeoutSeconds;
}
}
42 changes: 42 additions & 0 deletions src/main/java/dev/axme/sdk/ObserveOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package dev.axme.sdk;

/**
* Options for {@link AxmeClient#observe} and {@link AxmeClient#waitFor} polling methods.
*/
public final class ObserveOptions {
private final int since;
private final double pollIntervalSeconds;
private final Double timeoutSeconds;

public ObserveOptions() {
this(0, 1.0, null);
}

public ObserveOptions(int since, double pollIntervalSeconds, Double timeoutSeconds) {
if (pollIntervalSeconds <= 0) {
throw new IllegalArgumentException("pollIntervalSeconds must be positive");
}
if (timeoutSeconds != null && timeoutSeconds <= 0) {
throw new IllegalArgumentException("timeoutSeconds must be positive");
}
this.since = since;
this.pollIntervalSeconds = pollIntervalSeconds;
this.timeoutSeconds = timeoutSeconds;
}

public int getSince() {
return since;
}

public double getPollIntervalSeconds() {
return pollIntervalSeconds;
}

public Double getTimeoutSeconds() {
return timeoutSeconds;
}

public static ObserveOptions defaults() {
return new ObserveOptions();
}
}
131 changes: 131 additions & 0 deletions src/test/java/dev/axme/sdk/AxmeClientTest.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package dev.axme.sdk;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Map;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
Expand Down Expand Up @@ -429,4 +431,133 @@ void organizationRoutingDeliveryAndBillingEndpointsAreReachable() throws Excepti
assertEquals("/v1/billing/invoices?org_id=org_1&workspace_id=ws_1&status=open", server.takeRequest().getPath());
assertEquals("/v1/billing/invoices/inv_1", server.takeRequest().getPath());
}

@Test
void observeCollectsEventsAndStopsAtTerminal() throws Exception {
// First poll: two non-terminal events
server.enqueue(new MockResponse().setResponseCode(200).setBody(
"{\"ok\":true,\"events\":["
+ "{\"seq\":1,\"event_type\":\"intent.created\",\"status\":\"PENDING\"},"
+ "{\"seq\":2,\"event_type\":\"intent.dispatched\",\"status\":\"IN_PROGRESS\"}"
+ "]}"));
// Second poll: empty (triggers sleep + re-poll)
server.enqueue(new MockResponse().setResponseCode(200).setBody(
"{\"ok\":true,\"events\":[]}"));
// Third poll: terminal event
server.enqueue(new MockResponse().setResponseCode(200).setBody(
"{\"ok\":true,\"events\":["
+ "{\"seq\":3,\"event_type\":\"intent.completed\",\"status\":\"COMPLETED\"}"
+ "]}"));

List<Map<String, Object>> events = client.observe("it_obs_1",
new ObserveOptions(0, 0.05, null));

assertEquals(3, events.size());
assertEquals("intent.created", events.get(0).get("event_type"));
assertEquals("intent.dispatched", events.get(1).get("event_type"));
assertEquals("intent.completed", events.get(2).get("event_type"));
assertEquals("COMPLETED", events.get(2).get("status"));

// Verify since parameter advances: first call since=0, third since=2
RecordedRequest req1 = server.takeRequest();
assertEquals("/v1/intents/it_obs_1/events?since=0", req1.getPath());
RecordedRequest req2 = server.takeRequest();
assertEquals("/v1/intents/it_obs_1/events?since=2", req2.getPath());
RecordedRequest req3 = server.takeRequest();
assertEquals("/v1/intents/it_obs_1/events?since=2", req3.getPath());
}

@Test
void observeStopsOnEventTypeAlone() throws Exception {
// Terminal via event_type only (no status field)
server.enqueue(new MockResponse().setResponseCode(200).setBody(
"{\"ok\":true,\"events\":["
+ "{\"seq\":1,\"event_type\":\"intent.failed\"}"
+ "]}"));

List<Map<String, Object>> events = client.observe("it_obs_2",
new ObserveOptions(0, 0.05, null));

assertEquals(1, events.size());
assertEquals("intent.failed", events.get(0).get("event_type"));
}

@Test
void observeStopsOnStatusAlone() throws Exception {
// Terminal via status only (no event_type field)
server.enqueue(new MockResponse().setResponseCode(200).setBody(
"{\"ok\":true,\"events\":["
+ "{\"seq\":1,\"status\":\"CANCELED\"}"
+ "]}"));

List<Map<String, Object>> events = client.observe("it_obs_3",
new ObserveOptions(0, 0.05, null));

assertEquals(1, events.size());
assertEquals("CANCELED", events.get(0).get("status"));
}

@Test
void observeTimesOut() {
// Enqueue enough empty responses to keep polling
for (int i = 0; i < 20; i++) {
server.enqueue(new MockResponse().setResponseCode(200).setBody(
"{\"ok\":true,\"events\":[]}"));
}

AxmeTimeoutException ex = assertThrows(AxmeTimeoutException.class, () ->
client.observe("it_timeout", new ObserveOptions(0, 0.05, 0.1)));

assertEquals("it_timeout", ex.getIntentId());
assertTrue(ex.getTimeoutSeconds() > 0);
}

@Test
void waitForReturnsTerminalEvent() throws Exception {
server.enqueue(new MockResponse().setResponseCode(200).setBody(
"{\"ok\":true,\"events\":["
+ "{\"seq\":1,\"event_type\":\"intent.created\",\"status\":\"PENDING\"}"
+ "]}"));
server.enqueue(new MockResponse().setResponseCode(200).setBody(
"{\"ok\":true,\"events\":["
+ "{\"seq\":2,\"event_type\":\"intent.timed_out\",\"status\":\"TIMED_OUT\"}"
+ "]}"));

Map<String, Object> terminal = client.waitFor("it_wait_1",
new ObserveOptions(0, 0.05, null));

assertNotNull(terminal);
assertEquals("TIMED_OUT", terminal.get("status"));
assertEquals("intent.timed_out", terminal.get("event_type"));
}

@Test
void observeWithSinceParameter() throws Exception {
// Start from since=5, verify first request uses since=5
server.enqueue(new MockResponse().setResponseCode(200).setBody(
"{\"ok\":true,\"events\":["
+ "{\"seq\":6,\"event_type\":\"intent.completed\",\"status\":\"COMPLETED\"}"
+ "]}"));

List<Map<String, Object>> events = client.observe("it_since",
new ObserveOptions(5, 0.05, null));

assertEquals(1, events.size());
RecordedRequest req = server.takeRequest();
assertEquals("/v1/intents/it_since/events?since=5", req.getPath());
}

@Test
void observeWithNullOptionsUsesDefaults() throws Exception {
server.enqueue(new MockResponse().setResponseCode(200).setBody(
"{\"ok\":true,\"events\":["
+ "{\"seq\":1,\"event_type\":\"intent.completed\",\"status\":\"COMPLETED\"}"
+ "]}"));

List<Map<String, Object>> events = client.observe("it_null_opts", null);

assertEquals(1, events.size());
RecordedRequest req = server.takeRequest();
assertEquals("/v1/intents/it_null_opts/events?since=0", req.getPath());
}
}
Loading