Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c4e12dd
Pin submodule
ladvoc Mar 5, 2026
61f4815
Update proto
ladvoc Mar 5, 2026
159a198
Add initial examples
ladvoc Mar 5, 2026
1c603d5
Initial FFI implementation
ladvoc Mar 5, 2026
00c00ca
Hide proto type
ladvoc Mar 5, 2026
0d4c5d2
Improve error reporting
ladvoc Mar 5, 2026
671777d
More generic type signature for try_push
ladvoc Mar 5, 2026
7bea8cb
Data class for frame
ladvoc Mar 5, 2026
10e6561
Data class for track info
ladvoc Mar 5, 2026
629d405
Frame convenience methods
ladvoc Mar 5, 2026
fb073be
Doc strings
ladvoc Mar 5, 2026
3a0a171
generated protobuf
github-actions[bot] Mar 5, 2026
7707e40
Refine example
ladvoc Mar 11, 2026
869dcbb
Format
ladvoc Mar 11, 2026
50a9a95
Pin RTC
ladvoc Mar 13, 2026
19a592a
End-to-end test for data tracks
ladvoc Mar 13, 2026
96ed624
Patch proto generation script
ladvoc Mar 13, 2026
b98deb3
generated protobuf
github-actions[bot] Mar 13, 2026
dcc4639
Pin submodule
ladvoc Mar 19, 2026
17008fa
Generate proto
ladvoc Mar 19, 2026
f2adba9
Expose buffer size option
ladvoc Mar 19, 2026
9e6784e
Rename event
ladvoc Mar 24, 2026
b54ac64
Expose unpublished event
ladvoc Mar 24, 2026
38a5ebb
Pin submodule
ladvoc Mar 24, 2026
b75cb34
Generate proto
ladvoc Mar 24, 2026
509fe32
Add explicit request to receive next frame for subscription
ladvoc Mar 24, 2026
8d9c838
Format
ladvoc Mar 24, 2026
9a7378d
Pin submodule
ladvoc Mar 27, 2026
d794d98
Generate proto
ladvoc Mar 27, 2026
8fe8de5
Make subscribe method sync
ladvoc Mar 27, 2026
c1e9bfb
Use typed dict for options
ladvoc Mar 27, 2026
cad3c25
Reduce type complexity
ladvoc Mar 27, 2026
8960c9b
Pin submodule
ladvoc Mar 31, 2026
9a360fc
Generate proto
ladvoc Mar 31, 2026
86c6205
Use error message
ladvoc Mar 31, 2026
903268d
Use keyword args for data track options
ladvoc Mar 31, 2026
13e9e90
Pin submodule
ladvoc Apr 1, 2026
2f674bb
Generate proto
ladvoc Apr 1, 2026
56c5f5f
Rename type
ladvoc Apr 1, 2026
170c5f5
Upgrade FFI to v0.12.52
ladvoc Apr 2, 2026
7ef3f20
Generate proto
ladvoc Apr 2, 2026
7365294
Add aclose method to data track stream
ladvoc Apr 2, 2026
789507a
Remove del finalizer
ladvoc Apr 2, 2026
94e88e9
Remove unnecessary utility
ladvoc Apr 2, 2026
e5aa64b
Add standalone read method
ladvoc Apr 2, 2026
618f8e8
Fix elapsed time calculation
ladvoc Apr 2, 2026
acf6c69
Format
ladvoc Apr 2, 2026
a37ce60
Merge remote-tracking branch 'origin/main' into jacobgelman/bot-242-p…
ladvoc Apr 2, 2026
991b0b1
Remove user timestamp helper methods
ladvoc Apr 2, 2026
9e0d018
Make unpublish async for future proofing
ladvoc Apr 2, 2026
aef4689
Format
ladvoc Apr 2, 2026
886495f
Disable data tracks tests in CI for now
ladvoc Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions examples/data_tracks/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os
import logging
import asyncio
import time
from signal import SIGINT, SIGTERM
from livekit import rtc

# Set the following environment variables with your own values
TOKEN = os.environ.get("LIVEKIT_TOKEN")
URL = os.environ.get("LIVEKIT_URL")


async def read_sensor() -> bytes:
# Dynamically read some sensor data...
return bytes([0xFA] * 256)


async def push_frames(track: rtc.LocalDataTrack):
while True:
logging.info("Pushing frame")
data = await read_sensor()
try:
frame = rtc.DataTrackFrame(payload=data, user_timestamp=int(time.time() * 1000))
track.try_push(frame)
except rtc.PushFrameError as e:
logging.error("Failed to push frame: %s", e)
Comment on lines +24 to +26
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From an API perspective, I feel like it's kind of weird that "pushing" can fail? Since it looks like a queue?

Copy link
Copy Markdown
Contributor Author

@ladvoc ladvoc Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V1 only supports lossy delivery, and queuing on the publisher side is minimal; this will error if attempting to push frames too fast (rather than queuing stale data).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be named send?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a discussion about this back during the planning phase; originally it was going to be called "publish," but it was changed to "push" to disambiguate from publishing the track itself. The "try" prefix is to denote this is not guaranteed to succeed and to give us a path to introduce an async variant just called "push" to support reliable delivery in a future version.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Publisher example logs empty string for PushFrameError because exception doesn't pass message to super().init

PushFrameError.__init__ at livekit-rtc/livekit/rtc/data_track.py:42-43 stores the message as self.message but never calls super().__init__(message). When the publisher example formats the exception with %s, Python calls str(e) which returns str(self.args) — an empty tuple () renders as empty string. The log output will be "Failed to push frame: " with no actual error information. The subscriber example at examples/data_tracks/subscriber.py:27 correctly uses e.message, demonstrating the intended access pattern.

Suggested change
logging.error("Failed to push frame: %s", e)
logging.error("Failed to push frame: %s", e.message)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

await asyncio.sleep(0.5)


async def main(room: rtc.Room):
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

await room.connect(URL, TOKEN)
logger.info("connected to room %s", room.name)

track = await room.local_participant.publish_data_track(name="my_sensor_data")
await push_frames(track)


if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.FileHandler("publisher.log"),
logging.StreamHandler(),
],
)

loop = asyncio.get_event_loop()
room = rtc.Room(loop=loop)

main_task = asyncio.ensure_future(main(room))

async def cleanup():
main_task.cancel()
try:
await main_task
except asyncio.CancelledError:
pass
await room.disconnect()
loop.stop()

for signal in [SIGINT, SIGTERM]:
loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup()))

try:
loop.run_forever()
finally:
loop.close()
69 changes: 69 additions & 0 deletions examples/data_tracks/subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
import logging
import asyncio
import time
from signal import SIGINT, SIGTERM
from livekit import rtc

# Set the following environment variables with your own values
TOKEN = os.environ.get("LIVEKIT_TOKEN")
URL = os.environ.get("LIVEKIT_URL")


async def subscribe(track: rtc.RemoteDataTrack):
logging.info(
"Subscribing to '%s' published by '%s'",
track.info.name,
track.publisher_identity,
)
try:
async for frame in track.subscribe():
logging.info("Received frame (%d bytes)", len(frame.payload))

if frame.user_timestamp is not None:
latency = (int(time.time() * 1000) - frame.user_timestamp) / 1000.0
logging.info("Latency: %.3f s", latency)
except rtc.SubscribeDataTrackError as e:
logging.error("Failed to subscribe to '%s': %s", track.info.name, e.message)


async def main(room: rtc.Room):
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

active_tasks = []

@room.on("data_track_published")
def on_data_track_published(track: rtc.RemoteDataTrack):
task = asyncio.create_task(subscribe(track))
active_tasks.append(task)
task.add_done_callback(lambda _: active_tasks.remove(task))

await room.connect(URL, TOKEN)
logger.info("connected to room %s", room.name)


if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.FileHandler("subscriber.log"),
logging.StreamHandler(),
],
)

loop = asyncio.get_event_loop()
room = rtc.Room(loop=loop)

async def cleanup():
await room.disconnect()
loop.stop()

asyncio.ensure_future(main(room))
for signal in [SIGINT, SIGTERM]:
loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup()))

try:
loop.run_forever()
finally:
loop.close()
3 changes: 2 additions & 1 deletion livekit-rtc/generate_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ protoc \
$FFI_PROTOCOL/participant.proto \
$FFI_PROTOCOL/room.proto \
$FFI_PROTOCOL/track.proto \
$FFI_PROTOCOL/data_track.proto \
$FFI_PROTOCOL/video_frame.proto \
$FFI_PROTOCOL/e2ee.proto \
$FFI_PROTOCOL/stats.proto \
Expand All @@ -41,5 +42,5 @@ protoc \
touch -a "$FFI_OUT_PYTHON/__init__.py"

for f in "$FFI_OUT_PYTHON"/*.py "$FFI_OUT_PYTHON"/*.pyi; do
perl -i -pe 's|^(import (audio_frame_pb2\|ffi_pb2\|handle_pb2\|participant_pb2\|room_pb2\|track_pb2\|video_frame_pb2\|e2ee_pb2\|stats_pb2\|rpc_pb2\|track_publication_pb2\|data_stream_pb2))|from . $1|g' "$f"
perl -i -pe 's|^(import (audio_frame_pb2\|ffi_pb2\|handle_pb2\|participant_pb2\|room_pb2\|track_pb2\|video_frame_pb2\|e2ee_pb2\|stats_pb2\|rpc_pb2\|track_publication_pb2\|data_stream_pb2\|data_track_pb2))|from . $1|g' "$f"
done
16 changes: 16 additions & 0 deletions livekit-rtc/livekit/rtc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@
ByteStreamWriter,
ByteStreamReader,
)
from .data_track import (
LocalDataTrack,
RemoteDataTrack,
DataTrackStream,
DataTrackFrame,
DataTrackInfo,
PushFrameError,
SubscribeDataTrackError,
)
from .frame_processor import FrameProcessor

__all__ = [
Expand Down Expand Up @@ -186,6 +195,13 @@
"ByteStreamWriter",
"AudioProcessingModule",
"FrameProcessor",
"LocalDataTrack",
"RemoteDataTrack",
"DataTrackStream",
"DataTrackFrame",
"DataTrackInfo",
"PushFrameError",
"SubscribeDataTrackError",
"__version__",
]

Expand Down
10 changes: 10 additions & 0 deletions livekit-rtc/livekit/rtc/_proto/audio_frame_pb2.pyi

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 92 additions & 0 deletions livekit-rtc/livekit/rtc/_proto/data_track_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading