-
Notifications
You must be signed in to change notification settings - Fork 113
Data tracks support #586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data tracks support #586
Changes from all commits
c4e12dd
61f4815
159a198
1c603d5
00c00ca
0d4c5d2
671777d
7bea8cb
10e6561
629d405
fb073be
3a0a171
7707e40
869dcbb
50a9a95
19a592a
96ed624
b98deb3
dcc4639
17008fa
f2adba9
9e6784e
b54ac64
38a5ebb
b75cb34
509fe32
8d9c838
9a7378d
d794d98
8fe8de5
c1e9bfb
cad3c25
8960c9b
9a360fc
86c6205
903268d
13e9e90
2f674bb
56c5f5f
170c5f5
7ef3f20
7365294
789507a
94e88e9
e5aa64b
618f8e8
acf6c69
a37ce60
991b0b1
9e0d018
aef4689
886495f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,70 @@ | ||||||
| import os | ||||||
| import logging | ||||||
| import asyncio | ||||||
| import time | ||||||
| from signal import SIGINT, SIGTERM | ||||||
| from livekit import rtc | ||||||
|
|
||||||
| # Set the following environment variables with your own values | ||||||
| TOKEN = os.environ.get("LIVEKIT_TOKEN") | ||||||
| URL = os.environ.get("LIVEKIT_URL") | ||||||
|
|
||||||
|
|
||||||
| async def read_sensor() -> bytes: | ||||||
| # Dynamically read some sensor data... | ||||||
| return bytes([0xFA] * 256) | ||||||
|
|
||||||
|
|
||||||
| async def push_frames(track: rtc.LocalDataTrack): | ||||||
| while True: | ||||||
| logging.info("Pushing frame") | ||||||
| data = await read_sensor() | ||||||
| try: | ||||||
| frame = rtc.DataTrackFrame(payload=data, user_timestamp=int(time.time() * 1000)) | ||||||
| track.try_push(frame) | ||||||
| except rtc.PushFrameError as e: | ||||||
| logging.error("Failed to push frame: %s", e) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Publisher example logs empty string for PushFrameError because exception doesn't pass message to super().init
Suggested change
Was this helpful? React with 👍 or 👎 to provide feedback. |
||||||
| await asyncio.sleep(0.5) | ||||||
|
|
||||||
|
|
||||||
| async def main(room: rtc.Room): | ||||||
| logging.basicConfig(level=logging.INFO) | ||||||
| logger = logging.getLogger(__name__) | ||||||
|
|
||||||
| await room.connect(URL, TOKEN) | ||||||
| logger.info("connected to room %s", room.name) | ||||||
|
|
||||||
| track = await room.local_participant.publish_data_track(name="my_sensor_data") | ||||||
| await push_frames(track) | ||||||
|
|
||||||
|
|
||||||
| if __name__ == "__main__": | ||||||
| logging.basicConfig( | ||||||
| level=logging.INFO, | ||||||
| handlers=[ | ||||||
| logging.FileHandler("publisher.log"), | ||||||
| logging.StreamHandler(), | ||||||
| ], | ||||||
| ) | ||||||
|
|
||||||
| loop = asyncio.get_event_loop() | ||||||
| room = rtc.Room(loop=loop) | ||||||
|
|
||||||
| main_task = asyncio.ensure_future(main(room)) | ||||||
|
|
||||||
| async def cleanup(): | ||||||
| main_task.cancel() | ||||||
| try: | ||||||
| await main_task | ||||||
| except asyncio.CancelledError: | ||||||
| pass | ||||||
| await room.disconnect() | ||||||
| loop.stop() | ||||||
|
|
||||||
| for signal in [SIGINT, SIGTERM]: | ||||||
| loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup())) | ||||||
|
|
||||||
| try: | ||||||
| loop.run_forever() | ||||||
| finally: | ||||||
| loop.close() | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| import os | ||
| import logging | ||
| import asyncio | ||
| import time | ||
| from signal import SIGINT, SIGTERM | ||
| from livekit import rtc | ||
|
|
||
| # Set the following environment variables with your own values | ||
| TOKEN = os.environ.get("LIVEKIT_TOKEN") | ||
| URL = os.environ.get("LIVEKIT_URL") | ||
|
|
||
|
|
||
| async def subscribe(track: rtc.RemoteDataTrack): | ||
| logging.info( | ||
| "Subscribing to '%s' published by '%s'", | ||
| track.info.name, | ||
| track.publisher_identity, | ||
| ) | ||
| try: | ||
| async for frame in track.subscribe(): | ||
| logging.info("Received frame (%d bytes)", len(frame.payload)) | ||
|
|
||
| if frame.user_timestamp is not None: | ||
| latency = (int(time.time() * 1000) - frame.user_timestamp) / 1000.0 | ||
| logging.info("Latency: %.3f s", latency) | ||
| except rtc.SubscribeDataTrackError as e: | ||
| logging.error("Failed to subscribe to '%s': %s", track.info.name, e.message) | ||
|
|
||
|
|
||
| async def main(room: rtc.Room): | ||
| logging.basicConfig(level=logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
| active_tasks = [] | ||
|
|
||
| @room.on("data_track_published") | ||
| def on_data_track_published(track: rtc.RemoteDataTrack): | ||
| task = asyncio.create_task(subscribe(track)) | ||
| active_tasks.append(task) | ||
| task.add_done_callback(lambda _: active_tasks.remove(task)) | ||
|
|
||
| await room.connect(URL, TOKEN) | ||
| logger.info("connected to room %s", room.name) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| logging.basicConfig( | ||
| level=logging.INFO, | ||
| handlers=[ | ||
| logging.FileHandler("subscriber.log"), | ||
| logging.StreamHandler(), | ||
| ], | ||
| ) | ||
|
|
||
| loop = asyncio.get_event_loop() | ||
| room = rtc.Room(loop=loop) | ||
|
|
||
| async def cleanup(): | ||
| await room.disconnect() | ||
| loop.stop() | ||
|
|
||
| asyncio.ensure_future(main(room)) | ||
| for signal in [SIGINT, SIGTERM]: | ||
| loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup())) | ||
|
|
||
| try: | ||
| loop.run_forever() | ||
| finally: | ||
| loop.close() |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From an API perspective, I feel like it's kind of weird that "pushing" can fail? Since it looks like a queue?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V1 only supports lossy delivery, and queuing on the publisher side is minimal; this will error if attempting to push frames too fast (rather than queuing stale data).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be named send?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a discussion about this back during the planning phase; originally it was going to be called "publish," but it was changed to "push" to disambiguate from publishing the track itself. The "try" prefix is to denote this is not guaranteed to succeed and to give us a path to introduce an async variant just called "push" to support reliable delivery in a future version.