Skip to content

Commit a0a3807

Browse files
committed
feat: deterministic time steps
1 parent 64f0f43 commit a0a3807

4 files changed

Lines changed: 26 additions & 19 deletions

File tree

src/duron/_core/session.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,10 +333,15 @@ def __init__(
333333

334334
async def _resume(self) -> bool:
335335
recvd_msgs: set[str] = set()
336+
ws: WaitSet | None = None
336337
async for o, entry in self._log.stream():
337338
ts = entry["ts"]
339+
while ws and ws.timer and ws.timer < ts:
340+
self._now_us = max(self._now_us, ws.timer)
341+
ws = await self._step()
342+
338343
self._now_us = max(self._now_us, ts)
339-
_ = await self._step()
344+
ws = await self._step()
340345
if is_entry(entry):
341346
if entry["source"] == "task":
342347
if not self._handle_message(o, entry):
@@ -345,7 +350,7 @@ async def _resume(self) -> bool:
345350
recvd_msgs.add(entry["id"])
346351
else:
347352
_ = self._handle_message(o, entry)
348-
_ = await self._step()
353+
ws = await self._step()
349354
while self._pending_msg:
350355
id_ = self._pending_msg[-1]["id"]
351356
if id_ not in recvd_msgs:
@@ -402,8 +407,12 @@ async def _run(self) -> _T_co:
402407
await self._send_traces()
403408
else:
404409
await waitset.block(self._now_us)
405-
_ = await self._step()
406-
self._now_us = max(self._now_us + 1, time.time_ns() // 1_000)
410+
waitset = await self._step()
411+
now = time.time_ns() // 1_000
412+
if waitset and waitset.timer and waitset.timer < now:
413+
self._now_us = max(self._now_us, waitset.timer)
414+
else:
415+
self._now_us = max(self._now_us, now)
407416

408417
# cleanup
409418
self._loop.close()

src/duron/_core/signal.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ async def __aenter__(self) -> None:
6868
if task is None:
6969
return
7070
assert task.get_loop() == self._loop
71+
7172
if task not in self._tasks:
7273
val = _SignalState(depth=0, triggered=False)
7374
self._tasks[task] = val

src/duron/loop.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,8 @@ def call_at(
147147
*args: Unpack[_Ts],
148148
context: Context | None = None,
149149
) -> asyncio.TimerHandle:
150-
th = asyncio.TimerHandle(
151-
int(when * 1e6), callback, args, loop=self, context=context
152-
)
150+
t = int(when * 1e6 - self._now_us) + self._now_us
151+
th = asyncio.TimerHandle(t, callback, args, loop=self, context=context)
153152
heappush(self._timers, th)
154153
if asyncio.get_running_loop() is self._host:
155154
self._event.set()
@@ -164,11 +163,7 @@ def call_later(
164163
context: Context | None = None,
165164
) -> asyncio.TimerHandle:
166165
th = asyncio.TimerHandle(
167-
self.time_us() + int(delay * 1e6),
168-
callback,
169-
args,
170-
loop=self,
171-
context=context,
166+
self._now_us + int(delay * 1e6), callback, args, loop=self, context=context
172167
)
173168
heappush(self._timers, th)
174169
if asyncio.get_running_loop() is self._host:
@@ -244,15 +239,14 @@ def _poll(self, now: int) -> int | None:
244239

245240
def poll_completion(self, task: Future[_T]) -> WaitSet | None:
246241
assert asyncio.get_running_loop() is self._host
247-
now = self.time_us()
248242
self._event.clear()
249243

250244
# hot path - inline task context switch
251245
if prev_task := tasks.current_task():
252246
tasks._leave_task(self._host, prev_task) # noqa: SLF001
253247
events._set_running_loop(self) # noqa: SLF001
254248
try:
255-
next_deadline = self._poll(now)
249+
next_deadline = self._poll(self._now_us)
256250
if task.done():
257251
return None
258252
added, self._added = self._added, []
@@ -316,12 +310,12 @@ def close(self) -> None:
316310
tasks._leave_task(self._host, prev_task) # noqa: SLF001
317311
events._set_running_loop(self) # noqa: SLF001
318312
try:
319-
_ = self._poll(self.time_us())
313+
_ = self._poll(self._now_us)
320314
to_cancel = (*tasks.all_tasks(), *self._ops.values())
321315
for t in to_cancel:
322316
t.cancel()
323317
_ = asyncio.gather(*to_cancel, return_exceptions=True)
324-
_ = self._poll(self.time_us())
318+
_ = self._poll(self._now_us)
325319
for task in to_cancel:
326320
if task.cancelled() or not task.done():
327321
continue

tests/test_signal.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,16 @@ async def activity(ctx: Context, s: Signal[int] = Provided) -> list[list[int]]:
6060
async def tracker(signal: Signal[int], t: float) -> list[int]:
6161
values: list[int] = []
6262
while True:
63+
cnt = 0
6364
await asyncio.sleep(t)
6465
try:
6566
async with signal:
66-
await asyncio.sleep(9999)
67+
while True:
68+
cnt += 1
69+
await asyncio.sleep(0.0001)
70+
await ctx.run(asyncio.sleep, 0.001)
6771
except SignalInterrupt as e:
68-
values.append(cast("int", e.value))
72+
values.append(cast("int", e.value) + cnt)
6973
if len(values) > 10:
7074
return values
7175

@@ -110,7 +114,6 @@ async def tracker() -> int:
110114
if sys.version_info >= (3, 11):
111115
async with asyncio.timeout(0.01):
112116
await ctx.run(work)
113-
await ctx.time_ns()
114117
values += 1
115118
return values
116119

0 commit comments

Comments
 (0)