Skip to content

Commit d630013

Browse files
committed
test(log_uploader): cover cross-thread emit during active flush
Adds a deterministic regression test that parks the uploader thread inside _flush() via a threading.Event, emits a real log record from the main thread while _FLUSH_GUARD.active is set on the uploader thread, and asserts the record lands in the next batch (not dropped). Documents that the thread-local guard only blocks recursive emits on the uploader thread itself.
1 parent 900f74b commit d630013

1 file changed

Lines changed: 50 additions & 0 deletions

File tree

tests/unit/test_log_uploader.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
import threading
34
import time
45
from unittest.mock import Mock
56

@@ -132,6 +133,55 @@ def test_levels_map_correctly():
132133
assert levels == ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
133134

134135

136+
def test_emit_during_active_flush_on_another_thread_is_not_dropped():
137+
# Race scenario: the uploader thread is mid-flush (request in flight, _FLUSH_GUARD
138+
# active on the uploader thread) while a real log record is emitted from a different
139+
# thread. The thread-local guard must NOT block the other thread's emit; the record
140+
# must land in the new buffer and ship on the next flush.
141+
client = Mock(spec=CliClient)
142+
u = BatchedLogUploader(client, "run-race", flush_interval=10)
143+
h = UploadingLogHandler(u)
144+
145+
flush_in_flight = threading.Event()
146+
release_flush = threading.Event()
147+
request_bodies: list = []
148+
149+
def blocking_request(**kwargs):
150+
request_bodies.append(json.loads(kwargs["payload"]))
151+
flush_in_flight.set()
152+
release_flush.wait(timeout=2.0)
153+
return Mock()
154+
155+
client.request.side_effect = blocking_request
156+
157+
# Seed the first batch and start a flush on a worker thread so the main thread can
158+
# observe + interleave with it.
159+
u.add({"timestamp": "t", "level": "INFO", "message": "first", "context": "c"})
160+
flusher = threading.Thread(target=u._flush, daemon=True)
161+
flusher.start()
162+
assert flush_in_flight.wait(timeout=2.0), "uploader never entered _flush"
163+
164+
# While the uploader is parked inside _flush() with _FLUSH_GUARD.active set on its
165+
# own thread, emit a real log from the main thread. The guard is thread-local, so
166+
# this emit must proceed and land in the freshly-swapped buffer.
167+
rec = logging.LogRecord(
168+
name="socketcli", level=logging.INFO, pathname=__file__,
169+
lineno=1, msg="emitted-during-flush", args=(), exc_info=None,
170+
)
171+
h.emit(rec)
172+
assert len(u._buf) == 1
173+
assert u._buf[0]["message"] == "emitted-during-flush"
174+
175+
# Let the in-flight flush finish, then drain — the record must ship.
176+
release_flush.set()
177+
flusher.join(timeout=2.0)
178+
u._flush()
179+
180+
assert len(request_bodies) == 2
181+
assert request_bodies[0]["logs"][0]["message"] == "first"
182+
assert request_bodies[1]["logs"][0]["message"] == "emitted-during-flush"
183+
184+
135185
def test_run_thread_flushes_periodically_then_exits():
136186
client = Mock(spec=CliClient)
137187
u = BatchedLogUploader(client, "run-t", flush_interval=0.05)

0 commit comments

Comments
 (0)