Skip to content

Commit 952b302

Browse files
chore: linting and static checks for pynumaflow-lite (#346)
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
1 parent 5c7d07f commit 952b302

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+763
-447
lines changed

packages/pynumaflow-lite/Makefile

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,17 @@ test-rust:
4343
clean:
4444
cargo clean
4545

46+
py-fmt:
47+
uv run black pynumaflow_lite/ tests/ manifests/
48+
49+
py-lint: py-fmt
50+
uv run ruff check --fix .
51+
4652
fmt:
4753
cargo fmt --all
4854

4955
.PHONY: lint
50-
lint: test-fmt clippy
56+
lint: test-fmt clippy py-lint
5157

5258
.PHONY: test-fmt
5359
test-fmt:

packages/pynumaflow-lite/manifests/accumulator/accumulator_stream_sorter.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,18 @@
44
This accumulator buffers incoming data and sorts it by event time,
55
flushing sorted data when the watermark advances.
66
"""
7+
8+
import signal
79
import asyncio
810
from datetime import datetime
911
from typing import AsyncIterator
1012

11-
from pynumaflow_lite.accumulator import Datum, Message, AccumulatorAsyncServer, Accumulator
13+
from pynumaflow_lite.accumulator import (
14+
Datum,
15+
Message,
16+
AccumulatorAsyncServer,
17+
Accumulator,
18+
)
1219

1320

1421
class StreamSorter(Accumulator):
@@ -19,6 +26,7 @@ class StreamSorter(Accumulator):
1926

2027
def __init__(self):
2128
from datetime import timezone
29+
2230
# Initialize with a very old timestamp (timezone-aware)
2331
self.latest_wm = datetime.fromtimestamp(-1, tz=timezone.utc)
2432
self.sorted_buffer: list[Datum] = []
@@ -33,8 +41,10 @@ async def handler(self, datums: AsyncIterator[Datum]) -> AsyncIterator[Message]:
3341

3442
async for datum in datums:
3543
datum_count += 1
36-
print(f"Received datum #{datum_count}: event_time={datum.event_time}, "
37-
f"watermark={datum.watermark}, value={datum.value}")
44+
print(
45+
f"Received datum #{datum_count}: event_time={datum.event_time}, "
46+
f"watermark={datum.watermark}, value={datum.value}"
47+
)
3848

3949
# If watermark has moved forward
4050
if datum.watermark and datum.watermark > self.latest_wm:
@@ -122,7 +132,7 @@ async def main():
122132

123133

124134
# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
125-
import signal
135+
126136
signal.signal(signal.SIGINT, signal.default_int_handler)
127137
try:
128138
signal.signal(signal.SIGTERM, signal.SIG_DFL)

packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99

1010
class SimpleBatchCat(batchmapper.BatchMapper):
11-
async def handler(self, batch: AsyncIterable[batchmapper.Datum]) -> batchmapper.BatchResponses:
11+
async def handler(
12+
self, batch: AsyncIterable[batchmapper.Datum]
13+
) -> batchmapper.BatchResponses:
1214
responses = batchmapper.BatchResponses()
1315
async for d in batch:
1416
resp = batchmapper.BatchResponse(d.id)
@@ -29,7 +31,11 @@ async def handler(self, batch: AsyncIterable[batchmapper.Datum]) -> batchmapper.
2931
pass
3032

3133

32-
async def start(f: Callable[[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]]):
34+
async def start(
35+
f: Callable[
36+
[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]
37+
],
38+
):
3339
server = batchmapper.BatchMapAsyncServer()
3440

3541
# Register loop-level signal handlers so we control shutdown and avoid asyncio.run

packages/pynumaflow-lite/manifests/map/map_cat.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66

77

88
class SimpleCat(mapper.Mapper):
9-
async def handler(
10-
self, keys: list[str], payload: mapper.Datum
11-
) -> mapper.Messages:
9+
async def handler(self, keys: list[str], payload: mapper.Datum) -> mapper.Messages:
1210

1311
messages = mapper.Messages()
1412

packages/pynumaflow-lite/manifests/mapstream/mapstream_cat.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99

1010
class SimpleStreamCat(mapstreamer.MapStreamer):
11-
async def handler(self, keys: list[str], datum: mapstreamer.Datum) -> AsyncIterator[Message]:
11+
async def handler(
12+
self, keys: list[str], datum: mapstreamer.Datum
13+
) -> AsyncIterator[Message]:
1214
parts = datum.value.decode("utf-8").split(",")
1315
if not parts:
1416
yield Message.to_drop()
@@ -51,4 +53,3 @@ async def start(f: Callable[[list[str], mapstreamer.Datum], AsyncIterator[Messag
5153
if __name__ == "__main__":
5254
async_handler = SimpleStreamCat()
5355
asyncio.run(start(async_handler))
54-

packages/pynumaflow-lite/manifests/reduce/reduce_counter_class.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ def __init__(self, initial: int = 0) -> None:
1010
self.counter = initial
1111

1212
async def handler(
13-
self, keys: list[str], datums: AsyncIterable[reducer.Datum], md: reducer.Metadata
13+
self,
14+
keys: list[str],
15+
datums: AsyncIterable[reducer.Datum],
16+
md: reducer.Metadata,
1417
) -> reducer.Messages:
1518
iw = md.interval_window
1619
self.counter = 0
@@ -57,4 +60,3 @@ async def start(creator: type[reducer.Reducer], init_args: tuple):
5760

5861
if __name__ == "__main__":
5962
asyncio.run(start(ReduceCounter, (0,)))
60-

packages/pynumaflow-lite/manifests/reducestream/reducestream_counter.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
The counter increments for each datum and emits a message every 10 items,
88
plus a final message at the end.
99
"""
10+
1011
import asyncio
1112
import signal
1213
from collections.abc import AsyncIterable, AsyncIterator
@@ -17,12 +18,12 @@
1718
class ReduceCounter(reducestreamer.ReduceStreamer):
1819
"""
1920
A reduce streaming counter that emits intermediate results.
20-
21+
2122
This demonstrates the key difference from regular Reducer:
2223
- Regular Reducer: waits for all data, then returns Messages
2324
- ReduceStreamer: yields Message objects incrementally as an async iterator
2425
"""
25-
26+
2627
def __init__(self, initial: int = 0) -> None:
2728
self.counter = initial
2829

@@ -34,21 +35,21 @@ async def handler(
3435
) -> AsyncIterator[reducestreamer.Message]:
3536
"""
3637
Process datums and yield messages incrementally.
37-
38+
3839
Args:
3940
keys: List of keys for this window
4041
datums: Async iterable of incoming data
4142
md: Metadata containing window information
42-
43+
4344
Yields:
4445
Message objects to send to the next vertex
4546
"""
4647
iw = md.interval_window
4748
print(f"Handler started for keys={keys}, window=[{iw.start}, {iw.end}]")
48-
49+
4950
async for _ in datums:
5051
self.counter += 1
51-
52+
5253
# Emit intermediate result every 10 items
5354
if self.counter % 10 == 0:
5455
msg = (
@@ -59,7 +60,7 @@ async def handler(
5960
print(f"Yielding intermediate result: counter={self.counter}")
6061
# Early release of data - this is the key feature of reduce streaming!
6162
yield reducestreamer.Message(msg, keys=keys)
62-
63+
6364
# Emit final result
6465
msg = (
6566
f"counter:{self.counter} (FINAL) "
@@ -105,4 +106,3 @@ async def start(creator: type, init_args: tuple):
105106

106107
if __name__ == "__main__":
107108
asyncio.run(start(ReduceCounter, (0,)))
108-

packages/pynumaflow-lite/manifests/session_reduce/session_reduce_counter_class.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def __init__(self, initial: int = 0) -> None:
2525
self.counter = initial
2626

2727
async def session_reduce(
28-
self, keys: list[str], datums: AsyncIterable[session_reducer.Datum]
28+
self, keys: list[str], datums: AsyncIterable[session_reducer.Datum]
2929
) -> AsyncIterator[session_reducer.Message]:
3030
"""
3131
Count all incoming messages in this session and yield the count.

packages/pynumaflow-lite/manifests/sideinput/sideinput_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- If MAPPER is set to "true", runs as a Mapper that reads side input files
77
- Otherwise, runs as a SideInput retriever that broadcasts values
88
"""
9+
910
import asyncio
1011
import os
1112
import signal
@@ -142,4 +143,3 @@ async def start_mapper():
142143
else:
143144
print("Starting as SideInput retriever...")
144145
asyncio.run(start_sideinput())
145-

packages/pynumaflow-lite/manifests/sink/sink_log.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import collections
32
import logging
43
import signal
54
from collections.abc import AsyncIterable, AsyncIterator
@@ -36,7 +35,9 @@ async def handler(self, datums: AsyncIterable[sinker.Datum]) -> sinker.Responses
3635
pass
3736

3837

39-
async def start(f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Responses]]):
38+
async def start(
39+
f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Responses]],
40+
):
4041
server = sinker.SinkAsyncServer()
4142

4243
# Register loop-level signal handlers so we control shutdown and avoid asyncio.run
@@ -61,4 +62,3 @@ async def start(f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Resp
6162
if __name__ == "__main__":
6263
async_handler = SimpleLogSink()
6364
asyncio.run(start(async_handler))
64-

0 commit comments

Comments
 (0)