Claude/fix utc tzdata timestamp [ENG-225]#75
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes two bugs: (1) a ZoneInfoNotFoundError when converting UTC-aware PyArrow timestamps on minimal systems lacking a timezone database, and (2) internal meta columns (e.g. __computed, __pod_ts) leaking into downstream streams, which caused DuplicateError in multi-input Join operations.
Changes:
- Added
tzdata>=2024.1as a runtime dependency to ensure timezone data is always available for PyArrow UTC timestamp conversion. - Added
ColumnConfig.metafiltering toFunctionNode.as_table(), so meta columns are properly excluded (or selectively included) when building output tables. - Changed
iter_packets()andasync_execute()to drop all meta-prefixed columns (not just__input_packet_hash) from cached-results tables before constructing replay streams.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
pyproject.toml |
Added tzdata>=2024.1 runtime dependency for portable timezone support |
src/orcapod/core/nodes/function_node.py |
Added meta column filtering in as_table(), broadened meta column stripping in iter_packets() and async_execute() |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| # Collection[str]: keep only meta columns matching the specified prefixes | ||
| drop_columns.extend( | ||
| c | ||
| for c in self._cached_output_table.column_names | ||
| if c.startswith(constants.META_PREFIX) | ||
| and not any(c.startswith(p) for p in column_config.meta) |
There was a problem hiding this comment.
The Collection[str] branch does not normalize user-supplied prefixes by prepending constants.META_PREFIX when needed. In contrast, datagram.py (line 483–488) normalizes prefixes so that e.g. meta=["pipeline"] correctly matches "__pipeline".
As written, if a user passes meta=["pipeline"], the c.startswith(p) check compares column names (which start with "__") against the bare prefix "pipeline", so no columns will match and all meta columns will be dropped.
The prefixes in column_config.meta should be normalized the same way datagram.py does it: prepend constants.META_PREFIX to any prefix that doesn't already start with it.
| # Collection[str]: keep only meta columns matching the specified prefixes | |
| drop_columns.extend( | |
| c | |
| for c in self._cached_output_table.column_names | |
| if c.startswith(constants.META_PREFIX) | |
| and not any(c.startswith(p) for p in column_config.meta) | |
| # Collection[str]: keep only meta columns matching the specified prefixes. | |
| # Normalize user-supplied prefixes to ensure they include META_PREFIX, | |
| # so that e.g. "pipeline" matches "__pipeline". | |
| meta_prefixes = [ | |
| p | |
| if p.startswith(constants.META_PREFIX) | |
| else f"{constants.META_PREFIX}{p}" | |
| for p in column_config.meta | |
| ] | |
| drop_columns.extend( | |
| c | |
| for c in self._cached_output_table.column_names | |
| if c.startswith(constants.META_PREFIX) | |
| and not any(c.startswith(p) for p in meta_prefixes) |
| if not column_config.meta: | ||
| drop_columns.extend( | ||
| c | ||
| for c in self._cached_output_table.column_names | ||
| if c.startswith(constants.META_PREFIX) | ||
| ) |
There was a problem hiding this comment.
There are no tests covering the new meta filtering logic added to as_table. The existing test file (tests/test_core/function_pod/test_function_pod_node.py) has thorough tests for get_all_records meta column behavior (e.g. TestGetAllRecordsMetaColumns) but none for as_table with meta=False, meta=True, or meta=Collection[str]. Similarly, the change in iter_packets and async_execute to drop all meta-prefixed columns (instead of just the hash column) is not covered by any test verifying that cached replay streams exclude meta columns. Adding tests for these paths would help prevent regressions.
| drop_columns.extend(f"{constants.SOURCE_PREFIX}{c}" for c in self.keys()[1]) | ||
| if not column_config.context: | ||
| drop_columns.append(constants.CONTEXT_KEY) | ||
| if not column_config.meta: |
There was a problem hiding this comment.
We shouldn't be blindly dropping metadata -- this needs to be maintained. The comment suggests that this was done to handle the column name collision when joining 3 or more streams. What's happening is that when join encounters that two streams has colliding metadata column, it renames the second column by appending _right to it. However, when the next join with the 3rd stream occurs and the same name collides (e.g. imagine all 3 streams had __meta_x), then first join gave __meta_x and __meta_x_right, but when the third __meta_x collides with the __meta_x from the first stream, it tries to rename to __meta_x_right but this again collides! To get over this issue, the proper solution would be to use stream index_based suffix (instead of _right currently used). So that it would be __meta_x, __meta_x_1, __meta_x_2, for example
|
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
eywalker
left a comment
There was a problem hiding this comment.
Turns out the very fact meta was trying to be dropped is already a bug...
src/orcapod/core/operators/join.py
Outdated
There was a problem hiding this comment.
| table = stream.as_table(all_info=True) |
Bug Log
Missing tzdata dependency causes UTC timestamp conversion failure
timestamp("us", tz="UTC")columns to Python objects viato_pylist()oras_py(). The error wasZoneInfoNotFoundError: 'No time zone found with key UTC'. This affected any pipeline that read back cached results from the result database, since thePOD_TIMESTAMPcolumn is stored as a UTC-aware timestamp.tzdata>=2024.1as a runtime dependency inorcapod-python/pyproject.toml. Thetzdatapackage ships the full IANA timezone database as a Python package, making timezone lookups self-contained and independent of the host system's timezone data.Internal meta columns leak into downstream streams, breaking multi-input Join
Joinoperator with 3+ inputs was run, apolars.exceptions.DuplicateError: column with name '__computed_right' already existserror was raised. The root cause was thatFunctionNode.as_table()did not respect theColumnConfig.metaflag — it built its output table usingpacket.as_dict(all_info=True)(which includes internal meta columns such as__computed,__pod_ts,__pod_id_*) but never filtered them out whenmeta=False(the default). As a result, these internal columns leaked into the tables passed toJoin.static_process(). Polars' join renamed duplicates with a_rightsuffix, but on the third join iteration the renamed column already existed, causing a collision. A secondary instance of the same issue existed inFunctionNode.iter_packets()andasync_execute(), where only the input hash column (__input_packet_hash) was stripped from the cached-results table before constructing a replayArrowTableStream, leaving other meta columns in the stream.src/orcapod/core/nodes/function_node.py(as_table): AddedColumnConfig.metafiltering to the drop-columns logic, consistent with howsystem_tags,source, andcontextare already filtered. Meta columns (those starting withconstants.META_PREFIX = "__") are now excluded whenmeta=Falseand selectively filtered whenmetais aCollection[str].src/orcapod/core/nodes/function_node.py(iter_packets,async_execute): Changedexisting.drop([hash_col])to drop all meta-prefixed columns, completing what the existing comment ("strip the meta column") intended.