Skip to content

Data tracks support#586

Open
ladvoc wants to merge 27 commits intomainfrom
jacobgelman/bot-242-python-client-implementation
Open

Data tracks support#586
ladvoc wants to merge 27 commits intomainfrom
jacobgelman/bot-242-python-client-implementation

Conversation

@ladvoc
Copy link
Contributor

@ladvoc ladvoc commented Mar 5, 2026

No description provided.

@pblazej
Copy link

pblazej commented Mar 16, 2026

Some things from talking to claude (maybe you find it useful):

  • SubscribeDataTrackError / PublishDataTrackError not exported
  • existing streams use aclose() (async), while DataTrackSubscription uses close() (sync); all of them can probably leverage https://www.geeksforgeeks.org/python/aenter-in-python/
  • no remote_data_track_unpublished event - just mirroring rust comment here
  • try_push should probably be push in python
  • remote_data_track_published should be data_track_published as other tracks
  • DataTrackFrame(payload=...) may validate the input type (what happens if you pass a string etc.)

Comment on lines +270 to +274
async def __anext__(self) -> DataTrackFrame:
if self._closed:
raise StopAsyncIteration

event: proto_ffi.FfiEvent = await self._queue.get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Should the python implementation of data tracks be updated to have the synchronous track.subscribe() behavior which was implemented in javascript, with any subscription errors cascading down into the first iterator __anext__ call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed on slack with @ladvoc, we decided this behavior makes sense to do here as well!

Comment on lines +85 to +87
class PublishDataTrackError(Exception):
def __init__(self, message: str) -> None:
self.message = message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is it worth exposing a more rich set of errors here instead of just the single error with the message, like rust / js have added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated this and went with a string for now for three reasons: first, we may need to add new cases while this is still under development; second, using strings matches the existing FFI pattern, even though Rust supports error enums; and third, given how this FFI interface works there would be fair amount of boilerplate, including Protobuf definitions and public enums in each client language so we do not expose Protobuf types. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense, yea. Migrating to a more rich error format probably should happen as part of a larger pass over the whole project. I think what you have here is minimal enough too that it could be extended in the future in a backwards compatible way (ie, keep this as a base class and extend with more specific error variants).

"""Subscribes to the data track to receive frames.

Args:
buffer_size: Maximum number of received frames to buffer internally.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: I realize this could get out of date quickly if the default starts changing a bunch, but it might be nice to actually include the default value here (I think it is 16?).

Or maybe another alternative to sidestep that duplication concern could be to link to the future docs page (probably will be https://docs.livekit.io/transport/data/data-tracks/#buffer-size) where this is mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I left that out here to avoid stale documentation in the future. Will make sure that is clear on the docs page.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a significant downside to including the docs page in the docstring?

track.info.name,
track.publisher_identity,
)
subscription = await track.subscribe()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have the same API as AudioTrack/VideoTrack for subscription?

data_track_subscribed event and set_subscribed



@dataclass
class DataTrackOptions:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use TypedDict



@dataclass
class DataTrackFrame:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency should we use dataclass as well on other frames?

Comment on lines +274 to +297
async def __anext__(self) -> DataTrackFrame:
if self._closed:
raise StopAsyncIteration

self._send_read_request()
event: proto_ffi.FfiEvent = await self._queue.get()
sub_event = event.data_track_subscription_event
detail = sub_event.WhichOneof("detail")

if detail == "frame_received":
proto_frame = sub_event.frame_received.frame
user_ts: Optional[int] = None
if proto_frame.HasField("user_timestamp"):
user_ts = proto_frame.user_timestamp
return DataTrackFrame(
payload=proto_frame.payload,
user_timestamp=user_ts,
)
elif detail == "eos":
self._close()
raise StopAsyncIteration
else:
self._close()
raise StopAsyncIteration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the user should be responsible for pulling here, can we have a main loop like VideoStream/AudioStream?

The risk is that it becomes easier for users to OOM their program


async def publish_data_track(
self,
options: Union[str, DataTrackOptions],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Union is unnecessary complexity? OK if DataTrackOptions is a TypedDict?

self._close()
self._ffi_handle.dispose()

def __del__(self) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a destructor if we follow the main task pattern:

async def _run(self) -> None:

self._closed = True
FfiClient.instance.queue.unsubscribe(self._queue)

def close(self) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close becomes async

@ladvoc ladvoc marked this pull request as ready for review March 26, 2026 18:42
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment on lines +111 to +119
from .data_track import (
LocalDataTrack,
RemoteDataTrack,
DataTrackSubscription,
DataTrackFrame,
DataTrackInfo,
DataTrackOptions,
PushFrameError,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 SubscribeDataTrackError not exported from public API despite being a documented raised exception

RemoteDataTrack.subscribe() documents that it raises SubscribeDataTrackError (livekit-rtc/livekit/rtc/data_track.py:208), but this exception class is not exported from __init__.py. Users cannot catch this specific error when importing from livekit.rtc. This is inconsistent with PushFrameError, which IS exported at livekit-rtc/livekit/rtc/__init__.py:118 and listed in __all__ at line 204.

Suggested change
from .data_track import (
LocalDataTrack,
RemoteDataTrack,
DataTrackSubscription,
DataTrackFrame,
DataTrackInfo,
DataTrackOptions,
PushFrameError,
)
from .data_track import (
LocalDataTrack,
RemoteDataTrack,
DataTrackSubscription,
DataTrackFrame,
DataTrackInfo,
DataTrackOptions,
PushFrameError,
SubscribeDataTrackError,
)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants