Spark: Support writing shredded variant in Iceberg-Spark#14297
Spark: Support writing shredded variant in Iceberg-Spark#14297huaxingao merged 34 commits intoapache:mainfrom
Conversation
16b7a09 to
dc4f72e
Compare
97851f0 to
b87e999
Compare
|
@amogh-jahagirdar @Fokko @huaxingao Can you help take a look at this PR and if we have better approach for this? |
|
cc @RussellSpitzer, @pvary and @rdblue Seems it's better to have the implementation with new File Format proposal but want to check if this is acceptable approach as an interim solution or you see a better alternative. |
|
@aihuaxu: Don't we want to do the same but instead of wrapping the Would this be prohibitively complex? |
|
In Spark DSv2, planning/validation happens on the driver. For shredded variant, we don’t know the shredded schema at planning time. We have to inspect some records to derive it. Doing a read on the driver during Because of that, the current proposed Spark approach is: put the logical variant in the writer factory, on the executor, buffer the first N rows, infer the shredded schema from data, then initialize the concrete writer and flush the buffer. I believe this PR follow the same approach, which seems like a practical solution to me given DSV2's constraints. |
|
Thanks for the explanation, @huaxingao! I see several possible workarounds for the DataWriterFactory serialization issue, but I have some more fundamental concerns about the overall approach. Even if we accept that the written data should dictate the shredding logic, Spark’s implementation—while dependent on input order—is at least somewhat stable. It drops rarely used fields, handles inconsistent types, and limits the number of columns. |
|
Thanks @huaxingao and @pvary for reviewing, and thanks to Huaxin for explaining how the writer works in Spark. Regarding the concern about unstable schemas, Spark's approach makes sense:
We could implement similar heuristics. Additionally, making the shredded schema configurable would allow users to choose which fields to shred at write time based on their read patterns. For this POC, I'd like any feedback on whether there are any significant high-level design options to consider first and if this approach is acceptable. This seems hacky. I may have missed big picture on how the writers work across Spark + Iceberg + Parquet and we may have better way. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This PR caught my eye, as I've implemented the equivalent in DuckDB: duckdb/duckdb#19336 The PR description doesn't give much away, but I think the approach is similar to the proposed (interim) solution here: buffer the first rowgroup, infer the shredded schema from this, then finalize the file schema and start writing data. We've opted to create a We've also added a copy option to force the shredded schema, for debugging purposes and for power users. As for DECIMAL, it's kind of a special case in the shredding inference. We only shred on a DECIMAL type if all the decimal values we've seen for a column/field have the same width+scale, if any decimal value differs, DECIMAL won't be considered anymore when determining the shredded type of the column/field |
|
This PR is super exciting! Regarding the heuristics - I'd like to propose adding table properties as hints for variant shredding. |
That is correct.
I'm still trying to improve the heuristics to use the most common one as shredding type rather than the first one and probably cap the number of shredded fields, etc. but it doesn't need 100% consistent type to be shredded.
Yeah. I think that makes sense for advanced user to determine the shredded schema since they may know the read pattern.
Why is DECIMAL special here? If we determine DECIMAL4 to be shredded type, then we may shred as DECIMAL4 or not shred if they cannot fit in DECIMAL4, right? |
Yeah. I'm also thinking of that too. Will address that separately. Basically based on read pattern, the user can specify the shredding schema. |
gkpanda4
left a comment
There was a problem hiding this comment.
When processing JSON objects containing null field values (e.g., {"field": null}), the variant shredding creates schema columns for these null fields instead of omitting them entirely. This would cause schema bloat.
Adding a null check in ParquetVariantUtil.java:386 in the object() method should fix it.
2e81d79 to
7e1b608
Compare
I addressed this null value check in VariantShreddingAnalyzer.java instead. If it's NULL, then we will not add the shredded field. |
7c805f6 to
67dbe97
Compare
|
I looked at VariantShreddingAnalyzer and SparkVariantShreddingAnalyzer, implementation looks good, just minor nit. The current strategy is to shred aggressively, including fields with multiple incompatible types by picking the most common one. When a field has mixed types, the shredded typed_value is only populated for rows whose value matches the chosen type; other rows still carry the full binary value. This means bounded column reads are not available for mixed-type fields, and the performance gain relative to the added column overhead is not clear. I am not suggesting the current design is flawed. Shredding parameters like MIN_FIELD_FREQUENCY and MAX_SHREDDED_FIELDS can be tuned or new strategies introduced in follow-ups without breaking existing files. But more performance testing on real query patterns would help inform whether these thresholds need to be user-tunable. I would not block merge on this, assuming the community agrees. |
| private static final String VALUE = "value"; | ||
| private static final String ELEMENT = "element"; | ||
| private static final double MIN_FIELD_FREQUENCY = 0.10; | ||
| private static final int MAX_SHREDDED_FIELDS = 300; |
There was a problem hiding this comment.
Maybe we can make those shredding params configurable in the future, or after more performance testing?
|
|
||
| private static class PathNode { | ||
| private final String fieldName; | ||
| private final Map<String, PathNode> objectChildren = Maps.newTreeMap(); |
There was a problem hiding this comment.
Performance nit: this map is on the hot path, tree map requires string comparison. Maybe change it to hashmap since ordering is not important until after pruning createObjectTypedValue? We do sort there:
// createObjectTypedValue: sort once here
private static Type createObjectTypedValue(PathNode node) {
List<PathNode> sorted = Lists.newArrayList(node.objectChildren.values());
sorted.sort(Comparator.comparing(child -> child.fieldName));
...
}
|
Thanks for the reviews @steveloughran @qlong - all great points. I'd like to land this PR as-is and I can follow up with a PR to address these since the PR is already large. I summarized here:
None of these affect correctness. Happy to open the follow-up immediately after merge if there is agreement. |
qlong
left a comment
There was a problem hiding this comment.
I focused on shredding analyzer and it looks good to me
|
Will address @huaxingao's comments in an upcoming commit. I also realized that this PR was originally only on Spark 4.1. I'll can add the changes to Spark 4.0 too. Or should I do that in a follow up PR after this is merged?
|
|
|
||
| GroupType typedValue = variantGroup.getType("typed_value").asGroupType(); | ||
| assertThat(typedValue.containsField("a")).isTrue(); | ||
| assertThat(typedValue.containsField("b")).isTrue(); |
There was a problem hiding this comment.
The test verifies the shredded schema and the data round-trip. Should we also verify the data is in the typed columns to prove the data is really shredded?
There was a problem hiding this comment.
Updated the test with check for the data in the typed_value
| // Verify data is in typed columns by reading raw Parquet groups | ||
| try (ParquetReader<Group> rawReader = | ||
| ParquetReader.builder( | ||
| new GroupReadSupport(), new org.apache.hadoop.fs.Path(outputFile.location())) |
There was a problem hiding this comment.
nit: import org.apache.hadoop.fs.Path. You can fix this in the followup PR.
There was a problem hiding this comment.
I saw that in another test too here and the TestParquetDataWriter has import java.nio.file.Path so it would conflict. I'm not sure if there is a better way.
|
I'll open a follow-up PR to address the pending items here after @pvary's backport PR goes in for Spark 4.0. |
What it does
This PR adds support for writing shredded variants from Spark into Iceberg tables. Variant shredding extracts commonly-typed fields from semi-structured VARIANT columns into dedicated typed Parquet columns (typed_value), enabling predicate pushdown, column pruning, and better read performance.
Key design: Buffered schema inference
Because the shredded schema isn't known at Spark's planning time (DSv2 creates DataWriterFactory on the driver before seeing data), the PR uses a lazy/buffered approach:
Shredding heuristics
Co-Authored by: @nssalian