Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies = [
"gitpython>=3.1.45",
"starfix>=0.1.3",
"pygraphviz>=1.14",
"tzdata>=2024.1",
"uuid-utils>=0.11.1",
"s3fs>=2025.12.0",
"pymongo>=4.15.5",
Expand Down
15 changes: 14 additions & 1 deletion src/orcapod/core/nodes/function_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,20 @@ def as_table(
drop_columns.extend(f"{constants.SOURCE_PREFIX}{c}" for c in self.keys()[1])
if not column_config.context:
drop_columns.append(constants.CONTEXT_KEY)

if not column_config.meta:
drop_columns.extend(
c
for c in self._cached_output_table.column_names
if c.startswith(constants.META_PREFIX)
)
elif not isinstance(column_config.meta, bool):
# Collection[str]: keep only meta columns matching the specified prefixes
drop_columns.extend(
c
for c in self._cached_output_table.column_names
if c.startswith(constants.META_PREFIX)
and not any(c.startswith(p) for p in column_config.meta)
)
output_table = self._cached_output_table.drop(
[c for c in drop_columns if c in self._cached_output_table.column_names]
)
Expand Down
31 changes: 27 additions & 4 deletions src/orcapod/core/operators/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def static_process(self, *streams: StreamProtocol) -> StreamProtocol:
stream = ordered_streams[0]

tag_keys, _ = [set(k) for k in stream.keys()]
table = stream.as_table(columns={"source": True, "system_tags": True})
table = stream.as_table(columns={"source": True, "system_tags": True, "meta": True})
# trick to get cartesian product
table = table.add_column(0, COMMON_JOIN_KEY, pa.array([0] * len(table)))
table = arrow_data_utils.append_to_system_tags(
Expand All @@ -139,9 +139,7 @@ def static_process(self, *streams: StreamProtocol) -> StreamProtocol:

for idx, next_stream in enumerate(ordered_streams[1:], start=1):
next_tag_keys, _ = next_stream.keys()
next_table = next_stream.as_table(
columns={"source": True, "system_tags": True}
)
next_table = next_stream.as_table(columns={"source": True, "system_tags": True, "meta": True})
next_table = arrow_data_utils.append_to_system_tags(
next_table,
f"{next_stream.pipeline_hash().to_hex(n_char)}:{idx}",
Expand All @@ -151,6 +149,31 @@ def static_process(self, *streams: StreamProtocol) -> StreamProtocol:
next_table = next_table.add_column(
0, COMMON_JOIN_KEY, pa.array([0] * len(next_table))
)

# Rename any non-key columns in next_table that would collide with
# the accumulated table, using stream-index-based suffixes instead of
# Polars' default ``_right`` suffix which causes cascading collisions
# on 3+ stream joins. The only legitimately shared column names are
# the tag join keys; everything else (meta columns, their derived
# source-info columns, etc.) must be unique.
join_key_set = tag_keys.intersection(next_tag_keys) | {COMMON_JOIN_KEY}
existing_names = set(table.column_names)
rename_map = {}
for col in next_table.column_names:
if col not in join_key_set and col in existing_names:
new_name = f"{col}_{idx}"
counter = idx
while new_name in existing_names or new_name in rename_map.values():
counter += 1
new_name = f"{col}_{counter}"
rename_map[col] = new_name
if rename_map:
next_table = (
pl.DataFrame(next_table)
.rename(rename_map)
.to_arrow()
)

common_tag_keys = tag_keys.intersection(next_tag_keys)
common_tag_keys.add(COMMON_JOIN_KEY)

Expand Down
44 changes: 44 additions & 0 deletions tests/test_core/operators/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,50 @@ def test_join_is_commutative(self, simple_stream, disjoint_stream):
assert isinstance(sym, frozenset)


class TestJoinMetaColumnCollision:
"""Verify that a 3-way join with identical meta columns on all inputs does not
raise a DuplicateError. Instead, colliding meta columns should be renamed
with stream-index-based suffixes (e.g. ``__computed_1``, ``__computed_2``)."""

def _make_stream(self, id_vals, pkt_col, pkt_vals, meta_val):
"""Helper: stream with shared tag 'id', one packet column, and ``__computed``."""
table = pa.table(
{
"id": pa.array(id_vals, type=pa.int64()),
pkt_col: pa.array(pkt_vals, type=pa.int64()),
"__computed": pa.array([meta_val] * len(id_vals), type=pa.bool_()),
}
)
return ArrowTableStream(table, tag_columns=["id"])

def test_three_way_join_with_shared_meta_column_succeeds(self):
"""Three streams each carrying ``__computed`` should join without DuplicateError."""
s1 = self._make_stream([1, 2], "alpha", [10, 20], True)
s2 = self._make_stream([1, 2], "beta", [100, 200], True)
s3 = self._make_stream([1, 2], "gamma", [1000, 2000], True)

result = Join().static_process(s1, s2, s3)
table = result.as_table()

assert len(table) == 2
assert {"id", "alpha", "beta", "gamma"}.issubset(set(table.column_names))

def test_three_way_join_meta_columns_renamed_with_index_suffix(self):
"""Colliding meta columns from streams 2+ get an index-based suffix."""
s1 = self._make_stream([1, 2], "alpha", [10, 20], True)
s2 = self._make_stream([1, 2], "beta", [100, 200], False)
s3 = self._make_stream([1, 2], "gamma", [1000, 2000], True)

result = Join().static_process(s1, s2, s3)
table = result.as_table()
col_names = set(table.column_names)

# Original meta column preserved; colliding ones renamed with suffix
assert "__computed" in col_names
assert "__computed_1" in col_names
assert "__computed_2" in col_names


class TestJoinOutputSchemaSystemTags:
"""Verify that Join.output_schema correctly predicts system tag columns."""

Expand Down
4 changes: 3 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.