Skip to content

Commit d1b9dae

Browse files
committed
added interface based calling pattern and entity re-entrancy
1 parent 55ae3bb commit d1b9dae

File tree

8 files changed

+1255
-0
lines changed

8 files changed

+1255
-0
lines changed
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask;
4+
5+
import javax.annotation.Nonnull;
6+
import java.lang.reflect.InvocationHandler;
7+
import java.lang.reflect.Method;
8+
import java.lang.reflect.ParameterizedType;
9+
import java.lang.reflect.Proxy;
10+
import java.lang.reflect.Type;
11+
12+
/**
13+
* Creates type-safe proxies for interacting with durable entities from orchestrations.
14+
* <p>
15+
* A typed entity proxy is a JDK dynamic proxy that implements a user-defined interface.
16+
* Method calls on the proxy are translated into entity operations:
17+
* <ul>
18+
* <li>{@code void} methods become fire-and-forget signals via
19+
* {@link TaskOrchestrationContext#signalEntity}</li>
20+
* <li>Methods returning {@link Task}{@code <V>} become two-way calls via
21+
* {@link TaskOrchestrationContext#callEntity}</li>
22+
* </ul>
23+
* <p>
24+
* The method name is used as the entity operation name (case-insensitive matching on the
25+
* entity side). Methods must accept 0 or 1 parameters; the single parameter is passed as
26+
* the operation input.
27+
*
28+
* <p>Example:
29+
* <pre>{@code
30+
* // Define entity operations as an interface
31+
* public interface ICounter {
32+
* void add(int amount); // fire-and-forget signal
33+
* void reset(); // fire-and-forget signal
34+
* Task<Integer> get(); // two-way call returning a result
35+
* }
36+
*
37+
* // Use in an orchestration
38+
* ICounter counter = ctx.createEntityProxy(entityId, ICounter.class);
39+
* counter.add(5);
40+
* counter.reset();
41+
* int value = counter.get().await();
42+
* }</pre>
43+
*
44+
* @see TaskOrchestrationContext#createEntityProxy(EntityInstanceId, Class)
45+
* @see TaskOrchestrationEntityFeature#createProxy(EntityInstanceId, Class)
46+
*/
47+
public final class EntityProxy {
48+
49+
private EntityProxy() {
50+
// Utility class — not instantiable
51+
}
52+
53+
/**
54+
* Creates a typed entity proxy for the given entity instance.
55+
*
56+
* @param ctx the orchestration context (used to send signals and calls)
57+
* @param entityId the target entity's instance ID
58+
* @param proxyInterface the interface whose methods map to entity operations
59+
* @param <T> the proxy interface type
60+
* @return a proxy instance that implements {@code proxyInterface}
61+
* @throws IllegalArgumentException if {@code proxyInterface} is not an interface
62+
*/
63+
@SuppressWarnings("unchecked")
64+
public static <T> T create(
65+
@Nonnull TaskOrchestrationContext ctx,
66+
@Nonnull EntityInstanceId entityId,
67+
@Nonnull Class<T> proxyInterface) {
68+
if (ctx == null) {
69+
throw new IllegalArgumentException("ctx must not be null");
70+
}
71+
if (entityId == null) {
72+
throw new IllegalArgumentException("entityId must not be null");
73+
}
74+
if (proxyInterface == null) {
75+
throw new IllegalArgumentException("proxyInterface must not be null");
76+
}
77+
if (!proxyInterface.isInterface()) {
78+
throw new IllegalArgumentException(
79+
"proxyInterface must be an interface, got: " + proxyInterface.getName());
80+
}
81+
82+
return (T) Proxy.newProxyInstance(
83+
proxyInterface.getClassLoader(),
84+
new Class<?>[]{ proxyInterface },
85+
new EntityInvocationHandler(ctx, entityId));
86+
}
87+
88+
/**
89+
* Invocation handler that translates interface method calls into entity operations.
90+
*/
91+
private static final class EntityInvocationHandler implements InvocationHandler {
92+
private final TaskOrchestrationContext ctx;
93+
private final EntityInstanceId entityId;
94+
95+
EntityInvocationHandler(TaskOrchestrationContext ctx, EntityInstanceId entityId) {
96+
this.ctx = ctx;
97+
this.entityId = entityId;
98+
}
99+
100+
@Override
101+
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
102+
// Handle java.lang.Object methods
103+
if (method.getDeclaringClass() == Object.class) {
104+
switch (method.getName()) {
105+
case "toString":
106+
return "EntityProxy[" + entityId + "]";
107+
case "hashCode":
108+
return entityId.hashCode();
109+
case "equals":
110+
if (args[0] == proxy) {
111+
return true;
112+
}
113+
if (args[0] == null || !Proxy.isProxyClass(args[0].getClass())) {
114+
return false;
115+
}
116+
InvocationHandler otherHandler = Proxy.getInvocationHandler(args[0]);
117+
if (otherHandler instanceof EntityInvocationHandler) {
118+
return entityId.equals(((EntityInvocationHandler) otherHandler).entityId);
119+
}
120+
return false;
121+
default:
122+
return method.invoke(this, args);
123+
}
124+
}
125+
126+
String operationName = method.getName();
127+
Object input = (args != null && args.length == 1) ? args[0] : null;
128+
129+
if (args != null && args.length > 1) {
130+
throw new UnsupportedOperationException(
131+
"Entity proxy methods must have 0 or 1 parameters. " +
132+
"Method '" + operationName + "' has " + args.length + " parameters. " +
133+
"Use a single wrapper object to pass multiple values.");
134+
}
135+
136+
Class<?> returnType = method.getReturnType();
137+
138+
if (returnType == void.class) {
139+
// Fire-and-forget signal
140+
ctx.signalEntity(entityId, operationName, input);
141+
return null;
142+
} else if (Task.class.isAssignableFrom(returnType)) {
143+
// Two-way entity call — extract the Task<V> type parameter
144+
Class<?> resultType = extractTaskTypeParameter(method);
145+
return ctx.callEntity(entityId, operationName, input, resultType);
146+
} else {
147+
throw new UnsupportedOperationException(
148+
"Entity proxy methods must return void (for signals) or Task<V> (for calls). " +
149+
"Method '" + operationName + "' returns " + returnType.getName() + ".");
150+
}
151+
}
152+
153+
/**
154+
* Extracts the generic type parameter from a method returning {@code Task<V>}.
155+
* Falls back to {@code Void.class} if the type cannot be determined.
156+
*/
157+
private static Class<?> extractTaskTypeParameter(Method method) {
158+
Type genericReturnType = method.getGenericReturnType();
159+
if (genericReturnType instanceof ParameterizedType) {
160+
ParameterizedType pt = (ParameterizedType) genericReturnType;
161+
Type[] typeArgs = pt.getActualTypeArguments();
162+
if (typeArgs.length > 0) {
163+
return getRawClass(typeArgs[0]);
164+
}
165+
}
166+
return Void.class;
167+
}
168+
169+
/**
170+
* Resolves a {@link Type} to its raw {@link Class}, handling parameterized types
171+
* and wildcard types.
172+
*/
173+
private static Class<?> getRawClass(Type type) {
174+
if (type instanceof Class) {
175+
return (Class<?>) type;
176+
} else if (type instanceof ParameterizedType) {
177+
return getRawClass(((ParameterizedType) type).getRawType());
178+
}
179+
return Object.class;
180+
}
181+
}
182+
}

client/src/main/java/com/microsoft/durabletask/TaskEntity.java

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,136 @@ private Object handleImplicitOperations(TaskEntityOperation operation) {
191191
operation.getName() + "'.");
192192
}
193193

194+
// region Re-entrant self-dispatch
195+
196+
/**
197+
* Dispatches an operation to this entity instance synchronously (re-entrant self-call).
198+
* <p>
199+
* This allows one entity operation to invoke another operation on the same entity,
200+
* reusing the same state and context. The dispatched operation executes inline — state
201+
* mutations are visible immediately to the caller.
202+
* <p>
203+
* The operation is resolved using the same case-insensitive reflection dispatch as external
204+
* operations: entity class methods are tried first, then state object methods (if
205+
* {@linkplain #setAllowStateDispatch state dispatch} is enabled), then implicit operations.
206+
*
207+
* <p>Example:
208+
* <pre>{@code
209+
* public class BankAccount extends TaskEntity<BankAccountState> {
210+
* public void deposit(int amount) {
211+
* this.state.balance += amount;
212+
* }
213+
*
214+
* public void depositWithBonus(int amount) {
215+
* dispatch("deposit", amount); // re-entrant call
216+
* dispatch("deposit", amount / 10); // 10% bonus
217+
* }
218+
* }
219+
* }</pre>
220+
*
221+
* @param operationName the name of the operation to dispatch (case-insensitive)
222+
* @return the operation result, or {@code null} for void operations
223+
* @throws IllegalStateException if called outside of entity execution
224+
* @throws UnsupportedOperationException if no matching operation is found
225+
*/
226+
protected Object dispatch(String operationName) {
227+
return dispatch(operationName, null);
228+
}
229+
230+
/**
231+
* Dispatches an operation with input to this entity instance synchronously (re-entrant self-call).
232+
*
233+
* @param operationName the name of the operation to dispatch (case-insensitive)
234+
* @param input the input value to pass to the operation, or {@code null}
235+
* @return the operation result, or {@code null} for void operations
236+
* @throws IllegalStateException if called outside of entity execution
237+
* @throws UnsupportedOperationException if no matching operation is found
238+
* @see #dispatch(String)
239+
*/
240+
protected Object dispatch(String operationName, Object input) {
241+
if (this.context == null) {
242+
throw new IllegalStateException(
243+
"dispatch() can only be called during entity operation execution.");
244+
}
245+
246+
Method method = findMethod(this.getClass(), operationName);
247+
Object target = this;
248+
249+
if (method == null && this.allowStateDispatch && this.state != null) {
250+
method = findMethod(this.state.getClass(), operationName);
251+
target = this.state;
252+
}
253+
254+
if (method == null) {
255+
if ("delete".equalsIgnoreCase(operationName)) {
256+
this.state = null;
257+
return null;
258+
}
259+
throw new UnsupportedOperationException(
260+
"Entity '" + this.getClass().getSimpleName() +
261+
"' does not support operation '" + operationName + "'.");
262+
}
263+
264+
return invokeMethodDirect(method, target, input, this.context);
265+
}
266+
267+
/**
268+
* Dispatches an operation with input and a typed return value (re-entrant self-call).
269+
*
270+
* @param operationName the name of the operation to dispatch (case-insensitive)
271+
* @param input the input value to pass to the operation, or {@code null}
272+
* @param returnType the expected return type
273+
* @param <V> the return type
274+
* @return the operation result cast to {@code V}
275+
* @throws IllegalStateException if called outside of entity execution
276+
* @throws UnsupportedOperationException if no matching operation is found
277+
* @throws ClassCastException if the result cannot be cast to {@code returnType}
278+
* @see #dispatch(String, Object)
279+
*/
280+
protected <V> V dispatch(String operationName, Object input, Class<V> returnType) {
281+
return returnType.cast(dispatch(operationName, input));
282+
}
283+
284+
/**
285+
* Invokes a method with a direct (already-deserialized) input value, without requiring
286+
* a {@link TaskEntityOperation}. Used by {@link #dispatch} for re-entrant self-calls.
287+
*/
288+
private static Object invokeMethodDirect(
289+
Method method, Object target, Object input, TaskEntityContext context) {
290+
Class<?>[] paramTypes = method.getParameterTypes();
291+
Object[] args = new Object[paramTypes.length];
292+
293+
for (int i = 0; i < paramTypes.length; i++) {
294+
if (TaskEntityContext.class.isAssignableFrom(paramTypes[i])) {
295+
args[i] = context;
296+
} else if (TaskEntityOperation.class.isAssignableFrom(paramTypes[i])) {
297+
throw new UnsupportedOperationException(
298+
"Cannot dispatch to method '" + method.getName() +
299+
"' that accepts TaskEntityOperation. Use a simpler signature for " +
300+
"operations that support re-entrant dispatch.");
301+
} else {
302+
args[i] = input;
303+
}
304+
}
305+
306+
try {
307+
return method.invoke(target, args);
308+
} catch (InvocationTargetException e) {
309+
Throwable cause = e.getTargetException();
310+
if (cause instanceof RuntimeException) {
311+
throw (RuntimeException) cause;
312+
}
313+
if (cause instanceof Error) {
314+
throw (Error) cause;
315+
}
316+
throw new RuntimeException(cause);
317+
} catch (IllegalAccessException e) {
318+
throw new RuntimeException(e);
319+
}
320+
}
321+
322+
// endregion
323+
194324
/**
195325
* Finds a public method on the target class matching the operation name (case-insensitive).
196326
* Methods inherited from {@code Object} and from {@code TaskEntity} itself are excluded.

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,24 @@ default <V> Task<V> waitForExternalEvent(String name, Class<V> dataType) {
559559
}
560560
}
561561

562+
/**
563+
* Creates a typed entity proxy that maps interface method calls to entity operations.
564+
* <p>
565+
* This is a convenience method equivalent to calling
566+
* {@code EntityProxy.create(this, entityId, proxyInterface)}.
567+
*
568+
* @param entityId the target entity's instance ID
569+
* @param proxyInterface the interface whose methods map to entity operations;
570+
* {@code void} methods become signals, {@code Task<V>} methods become calls
571+
* @param <T> the proxy interface type
572+
* @return a proxy instance that implements {@code proxyInterface}
573+
* @throws IllegalArgumentException if {@code proxyInterface} is not an interface
574+
* @see EntityProxy#create(TaskOrchestrationContext, EntityInstanceId, Class)
575+
*/
576+
default <T> T createEntityProxy(@Nonnull EntityInstanceId entityId, @Nonnull Class<T> proxyInterface) {
577+
return EntityProxy.create(this, entityId, proxyInterface);
578+
}
579+
562580
/**
563581
* Gets the durable entity feature for this orchestration context.
564582
* <p>

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationEntityFeature.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,22 @@ public void signalEntity(
130130
* @return the list of currently locked entities, or an empty list
131131
*/
132132
public abstract List<EntityInstanceId> getLockedEntities();
133+
134+
/**
135+
* Creates a typed entity proxy that maps interface method calls to entity operations.
136+
* <p>
137+
* This is the feature-based equivalent of
138+
* {@link TaskOrchestrationContext#createEntityProxy(EntityInstanceId, Class)}.
139+
*
140+
* @param entityId the target entity's instance ID
141+
* @param proxyInterface the interface whose methods map to entity operations;
142+
* {@code void} methods become signals, {@code Task<V>} methods become calls
143+
* @param <T> the proxy interface type
144+
* @return a proxy instance that implements {@code proxyInterface}
145+
* @throws IllegalArgumentException if {@code proxyInterface} is not an interface
146+
* @see EntityProxy#create(TaskOrchestrationContext, EntityInstanceId, Class)
147+
*/
148+
public abstract <T> T createProxy(@Nonnull EntityInstanceId entityId, @Nonnull Class<T> proxyInterface);
133149
}
134150

135151
final class ContextBackedTaskOrchestrationEntityFeature extends TaskOrchestrationEntityFeature {
@@ -186,4 +202,9 @@ public boolean isInCriticalSection() {
186202
public List<EntityInstanceId> getLockedEntities() {
187203
return this.context.getLockedEntities();
188204
}
205+
206+
@Override
207+
public <T> T createProxy(@Nonnull EntityInstanceId entityId, @Nonnull Class<T> proxyInterface) {
208+
return EntityProxy.create(this.context, entityId, proxyInterface);
209+
}
189210
}

0 commit comments

Comments
 (0)