Data: Add TCK tests for Schema Evolution in BaseFormatModelTests#15843
Data: Add TCK tests for Schema Evolution in BaseFormatModelTests#15843Guosmilesmile wants to merge 11 commits intoapache:mainfrom
Conversation
| * column). The NameMapping maps file column name "col_b" to old field ID 2. Since the read schema | ||
| * expects field ID 6, the new col_b should be null. |
There was a problem hiding this comment.
The NameMapping is a different feature. It allows the user to read files written outside Iceberg to map the columns to the Iceberg fieldIds, so this last sentence is not needed and it is incorrect.
OTOH we need a test for NameMapping where we read a Parquet/ORC/Avro file without Iceberg metadata (not written by Iceberg writers). And we should be able to map the column names to the Iceberg fields using the NameMapping
There was a problem hiding this comment.
I too feel the same. We should have separate tests which uses name mapping as part of TCK.
There was a problem hiding this comment.
This part is complex, and the validation needs to be implemented by writing externally without the id. I have added a UT, please take a look when you have time.
| */ | ||
| @ParameterizedTest | ||
| @FieldSource("FILE_FORMATS") | ||
| void testSchemaEvolutionTypePromotion(FileFormat fileFormat) throws IOException { |
There was a problem hiding this comment.
We should create different tests for the different allowed type promotions.
Integer to Long: int → long
Float to Double: float → double
Decimal Precision: decimal(P, S) → decimal(P', S) where (only increases the total precision, not the scale)
Date to Timestamp (v3+): date → timestamp or timestamp_ns
There was a problem hiding this comment.
Again, don't duplicate code, just use a private method
There was a problem hiding this comment.
If problematic, maybe use a single column schema and use RandomGenericData.generate?
There was a problem hiding this comment.
Date to Timestamp (v3+): date → timestamp or timestamp_ns
Added testSchemaEvolutionTypePromotionDateToTimestamp and testSchemaEvolutionTypePromotionDateToTimestampNano. Currently, these two test cases are failing, and the behavior of Flink and Spark are completely different. Moreover, this type of conversion is not supported in TypeUtil.isPromotionAllowed, but V3 types do support this conversion. So I'm wondering and suspecting that this is implemented at the engine layer rather than the file format layer.
Still investigating, feel free to add any additional information if you have any.
702a7ad to
426f13b
Compare
f3ae334 to
6a376cf
Compare
| } | ||
|
|
||
| static boolean hasIds(Schema schema) { | ||
| public static boolean hasIds(Schema schema) { |
There was a problem hiding this comment.
I don't like these changes. We are adding public methods for testing purposes only.
Not an elegant, but possible solution is to create an util class in the data/srcr/test/java/org/apache/iceberg/avro directory which is public, but only in test package.
This way we don't expose these methods, but still could use in our tests.
@nastra: Any better ideas?
There was a problem hiding this comment.
I guess the other alternative could be to move writeAvroWithoutFieldIds into a separate class under the avro package, so that you could access hasIds() from there. Same for the ORC case
There was a problem hiding this comment.
I saw that many of the methods in these utility classes are static, which made me think I could just add them directly. I've now revised my approach by adding these methods to the utility classes under the test package of the corresponding module, avoiding any changes to the production code.
| @ParameterizedTest | ||
| @FieldSource("FILE_FORMATS") | ||
| void testSchemaEvolutionTypePromotionDateToTimestamp(FileFormat fileFormat) throws IOException { | ||
| assumeThat(TypeUtil.isPromotionAllowed(Types.DateType.get(), Types.TimestampType.withoutZone())) |
There was a problem hiding this comment.
Why do we have here?
The Spec says that it should be allowed. So we should fail if we change the spec.
And when we change the spec, then we should change the tests to. Using assume could easily hide the issue if the isPromotionAllowed implementation is changed
There was a problem hiding this comment.
I tried it myself, and currently neither Flink nor Spark supports this feature. Below are my test cases and the results I got.
Flink:
@TestTemplate
public void testDateToTimestamp() throws Exception {
String tableName = "test_insert_into_timestamp";
sql(
"CREATE TABLE %s(id INT, data date) WITH ('write.format.default'='%s','format-version'='3')",
tableName, format.name());
try {
Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
sql("INSERT INTO %s SELECT 1, DATE '2024-01-01'", tableName);
sql("INSERT INTO %s SELECT 2, DATE '2024-01-02'", tableName);
sql("INSERT INTO %s SELECT 3, DATE '2024-01-03'", tableName);
table
.updateSchema()
.updateColumn("data", Types.TimestampType.withoutZone())
.commit();
table.refresh();
sql("SELECT * FROM %s ORDER BY id", tableName);
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}Cannot change column type: data: date -> timestamp
java.lang.IllegalArgumentException: Cannot change column type: data: date -> timestamp
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:463)
at org.apache.iceberg.SchemaUpdate.updateColumn(SchemaUpdate.java:285)
at org.apache.iceberg.flink.TestFlinkTableSink.testDateToTimestamp(TestFlinkTableSink.java:341)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at java.base/java.util.Optional.ifPresent(Optional.java:178)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
Spark:
@TestTemplate
public void testDateToTimestamp() {
String tableName = tableName("date_to_timestamp_table");
sql(
"CREATE TABLE %s (id INT NOT NULL, dt DATE) "
+ "USING iceberg TBLPROPERTIES ('format-version'='3')",
tableName);
sql("INSERT INTO %s VALUES (1, DATE '2023-01-01')", tableName);
sql("ALTER TABLE %s ALTER COLUMN dt TYPE timestamp_ntz", tableName);
List<Object[]> sql = sql("select * from %s", tableName);
System.out.println(sql);
}Cannot change column type: dt: date -> timestamptz
java.lang.IllegalArgumentException: Cannot change column type: dt: date -> timestamptz
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:463)
at org.apache.iceberg.SchemaUpdate.updateColumn(SchemaUpdate.java:285)
at org.apache.iceberg.spark.Spark3Util.applySchemaChanges(Spark3Util.java:179)
at org.apache.iceberg.spark.SparkCatalog.commitChanges(SparkCatalog.java:781)
at org.apache.iceberg.spark.SparkCatalog.alterTable(SparkCatalog.java:294)
at org.apache.spark.sql.execution.datasources.v2.AlterTableExec.run(AlterTableExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)| Path tempFile = Files.createTempFile("iceberg-test-no-ids-", ".parquet"); | ||
| Files.deleteIfExists(tempFile); |
There was a problem hiding this comment.
Why do we do this double writing? Could we just write the result to the encryptedFile immediately?
There was a problem hiding this comment.
Good point. I didn't find a suitable API at first, but after searching again, I've refactored this part.
# Conflicts: # data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
| configurations { | ||
| parquetTestArtifactsRuntime { | ||
| canBeConsumed = false | ||
| canBeResolved = true | ||
| transitive = false | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Could you please help me understand why we need this?
There was a problem hiding this comment.
When I add testImplementation (project(path: ':iceberg-parquet', configuration: 'testFixturesRuntimeElements')) in spark I meet a error.
Cannot select module with conflict on capability 'org.apache.iceberg:iceberg-parquet-test-fixtures:1.10.0-SNAPSHOT' also provided by ['project :iceberg-parquet' (testFixturesApiElements), 'project :iceberg-parquet' (testFixturesRuntimeElements)]
Spark tests already compile against the main :iceberg-parquet artifact and its testFixtures, but some test code/resources from parquet are only needed at execution time. Keeping them in a separate runtime-only configuration avoids exposing parquet's internal test classes on the test compile classpath, and transitive = false makes sure we only pull in that test jar itself rather than an extra dependency graph.
parquetTestArtifactsRuntime is an internal, resolvable-only configuration. canBeConsumed = false means it is not published for other projects to consume, canBeResolved = true allows this project to resolve it into files and append them to the test classpath, and transitive = false ensures we only pull the :iceberg-parquet testArtifacts jar itself rather than its full transitive dependency graph.
There was a problem hiding this comment.
Could we just exclude transitivity when adding the dependency?
testRuntimeOnly(project(path: ':iceberg-parquet', configuration: 'testArtifacts')) {
transitive = false
}
There was a problem hiding this comment.
I replace the code with above line. Get the same error .
There was a problem hiding this comment.
Ok. I get it now, and I don't have a better solution.
@huaxingao: Do you think this change could be ok for the Spark code? I don't see any issues, but I would like to have someone from more Spark knowledge to take a look.
Thanks,
Peter
There was a problem hiding this comment.
Could we move ParquetWritingTestUtils from parquet/src/test/java/ to parquet/src/testFixtures/java/? That would let us drop the parquetTestArtifactsRuntime workaround in all 4 Spark builds, since the existing testFixtures(project(':iceberg-parquet')) dependency would already provide it.
There was a problem hiding this comment.
@huaxingao @pvary Thanks for the suggestion, I have made some change. If you have time,help to review it again.
| testImplementation project(path: ':iceberg-orc', configuration: 'testArtifacts') | ||
| testImplementation (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) | ||
| testImplementation libs.awaitility | ||
| testImplementation(testFixtures(project(':iceberg-parquet'))) |
There was a problem hiding this comment.
Only 3.4 need this. Other version has this code.
| import org.apache.parquet.io.OutputFile; | ||
|
|
||
| /** Utilities for tests that need to write Parquet files. */ | ||
| public class TestParquetWritingUtils { |
There was a problem hiding this comment.
nit: maybe ParquetWritingTestUtils? Test* prefix is more for JUnit test classes.
There was a problem hiding this comment.
Since we have the same name ParquetWritingTestUtils in test package, I rename to ParquetFileTestUtils
Added schema evolution tests to BaseFormatModelTests:
Part of #15415