Conversation
|
Some things from talking to claude (maybe you find it useful):
|
| async def __anext__(self) -> DataTrackFrame: | ||
| if self._closed: | ||
| raise StopAsyncIteration | ||
|
|
||
| event: proto_ffi.FfiEvent = await self._queue.get() |
There was a problem hiding this comment.
question: Should the python implementation of data tracks be updated to have the synchronous track.subscribe() behavior which was implemented in javascript, with any subscription errors cascading down into the first iterator __anext__ call?
There was a problem hiding this comment.
Discussed on slack with @ladvoc, we decided this behavior makes sense to do here as well!
| class PublishDataTrackError(Exception): | ||
| def __init__(self, message: str) -> None: | ||
| self.message = message |
There was a problem hiding this comment.
question: Is it worth exposing a more rich set of errors here instead of just the single error with the message, like rust / js have added?
There was a problem hiding this comment.
I debated this and went with a string for now for three reasons: first, we may need to add new cases while this is still under development; second, using strings matches the existing FFI pattern, even though Rust supports error enums; and third, given how this FFI interface works there would be fair amount of boilerplate, including Protobuf definitions and public enums in each client language so we do not expose Protobuf types. What do you think?
There was a problem hiding this comment.
I think that makes sense, yea. Migrating to a more rich error format probably should happen as part of a larger pass over the whole project. I think what you have here is minimal enough too that it could be extended in the future in a backwards compatible way (ie, keep this as a base class and extend with more specific error variants).
| """Subscribes to the data track to receive frames. | ||
|
|
||
| Args: | ||
| buffer_size: Maximum number of received frames to buffer internally. |
There was a problem hiding this comment.
thought: I realize this could get out of date quickly if the default starts changing a bunch, but it might be nice to actually include the default value here (I think it is 16?).
Or maybe another alternative to sidestep that duplication concern could be to link to the future docs page (probably will be https://docs.livekit.io/transport/data/data-tracks/#buffer-size) where this is mentioned.
There was a problem hiding this comment.
Yeah, I left that out here to avoid stale documentation in the future. Will make sure that is clear on the docs page.
There was a problem hiding this comment.
Is there a significant downside to including the docs page in the docstring?
| track.info.name, | ||
| track.publisher_identity, | ||
| ) | ||
| subscription = await track.subscribe() |
There was a problem hiding this comment.
Should we have the same API as AudioTrack/VideoTrack for subscription?
data_track_subscribed event and set_subscribed
|
|
||
|
|
||
| @dataclass | ||
| class DataTrackOptions: |
|
|
||
|
|
||
| @dataclass | ||
| class DataTrackFrame: |
There was a problem hiding this comment.
For consistency should we use dataclass as well on other frames?
| async def __anext__(self) -> DataTrackFrame: | ||
| if self._closed: | ||
| raise StopAsyncIteration | ||
|
|
||
| self._send_read_request() | ||
| event: proto_ffi.FfiEvent = await self._queue.get() | ||
| sub_event = event.data_track_subscription_event | ||
| detail = sub_event.WhichOneof("detail") | ||
|
|
||
| if detail == "frame_received": | ||
| proto_frame = sub_event.frame_received.frame | ||
| user_ts: Optional[int] = None | ||
| if proto_frame.HasField("user_timestamp"): | ||
| user_ts = proto_frame.user_timestamp | ||
| return DataTrackFrame( | ||
| payload=proto_frame.payload, | ||
| user_timestamp=user_ts, | ||
| ) | ||
| elif detail == "eos": | ||
| self._close() | ||
| raise StopAsyncIteration | ||
| else: | ||
| self._close() | ||
| raise StopAsyncIteration |
There was a problem hiding this comment.
I don't think the user should be responsible for pulling here, can we have a main loop like VideoStream/AudioStream?
The risk is that it becomes easier for users to OOM their program
|
|
||
| async def publish_data_track( | ||
| self, | ||
| options: Union[str, DataTrackOptions], |
There was a problem hiding this comment.
The Union is unnecessary complexity? OK if DataTrackOptions is a TypedDict?
| self._close() | ||
| self._ffi_handle.dispose() | ||
|
|
||
| def __del__(self) -> None: |
There was a problem hiding this comment.
No need for a destructor if we follow the main task pattern:
| self._closed = True | ||
| FfiClient.instance.queue.unsubscribe(self._queue) | ||
|
|
||
| def close(self) -> None: |
| from .data_track import ( | ||
| LocalDataTrack, | ||
| RemoteDataTrack, | ||
| DataTrackSubscription, | ||
| DataTrackFrame, | ||
| DataTrackInfo, | ||
| DataTrackOptions, | ||
| PushFrameError, | ||
| ) |
There was a problem hiding this comment.
🟡 SubscribeDataTrackError not exported from public API despite being a documented raised exception
RemoteDataTrack.subscribe() documents that it raises SubscribeDataTrackError (livekit-rtc/livekit/rtc/data_track.py:208), but this exception class is not exported from __init__.py. Users cannot catch this specific error when importing from livekit.rtc. This is inconsistent with PushFrameError, which IS exported at livekit-rtc/livekit/rtc/__init__.py:118 and listed in __all__ at line 204.
| from .data_track import ( | |
| LocalDataTrack, | |
| RemoteDataTrack, | |
| DataTrackSubscription, | |
| DataTrackFrame, | |
| DataTrackInfo, | |
| DataTrackOptions, | |
| PushFrameError, | |
| ) | |
| from .data_track import ( | |
| LocalDataTrack, | |
| RemoteDataTrack, | |
| DataTrackSubscription, | |
| DataTrackFrame, | |
| DataTrackInfo, | |
| DataTrackOptions, | |
| PushFrameError, | |
| SubscribeDataTrackError, | |
| ) |
Was this helpful? React with 👍 or 👎 to provide feedback.
No description provided.