Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ private static String convertToTypeString(Type type) {
case FIXED:
case BINARY:
return "binary";
case VARIANT:
return "unknown";
case DECIMAL:
final Types.DecimalType decimalType = (Types.DecimalType) type;
return String.format("decimal(%s,%s)", decimalType.precision(), decimalType.scale());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ public void testConversionWithoutLastComment() {
assertThat(schema.asStruct()).isEqualTo(expected.asStruct());
}

@Test
public void testVariantTypeConvertToHiveSchema() {
Schema schema = new Schema(optional(0, "variant_field", Types.VariantType.get()));
List<FieldSchema> hiveSchema = HiveSchemaUtil.convert(schema);
assertThat(hiveSchema).containsExactly(new FieldSchema("variant_field", "unknown", null));
}

protected List<FieldSchema> getSupportedFieldSchemas() {
List<FieldSchema> fields = Lists.newArrayListWithCapacity(10);
fields.add(new FieldSchema("c_float", serdeConstants.FLOAT_TYPE_NAME, "float comment"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,4 +375,54 @@ public void testSnapshotPartitionedV1() throws IOException {
}
}
}

@TestTemplate
public void testSnapshotWithVariant() throws IOException {
assumeThat(catalogName)
.as("Variant type requires Hive 4 which is not yet supported")
.isNotEqualTo("testhive")
.isNotEqualTo("spark_catalog");
String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data variant) USING parquet LOCATION '%s'",
SOURCE_NAME, location);
sql("INSERT INTO TABLE %s VALUES (1, parse_json('{\"key\": 123}'))", SOURCE_NAME);

Object result =
scalarSql(
"CALL %s.system.snapshot('%s', '%s', properties => map('format-version','3'))",
catalogName, SOURCE_NAME, tableName);
assertThat(result).as("Should have added one file").isEqualTo(1L);

assertEquals(
"Should have expected rows",
ImmutableList.of(row(1L, 123)),
sql("SELECT id, variant_get(data, '$.key', 'int') FROM %s", tableName));
}

@TestTemplate
public void testSnapshotPartitionedWithVariant() throws IOException {
assumeThat(catalogName)
.as("Variant type requires Hive 4 which is not yet supported")
.isNotEqualTo("testhive")
.isNotEqualTo("spark_catalog");
String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data variant) USING parquet PARTITIONED BY (id) LOCATION '%s'",
SOURCE_NAME, location);
sql(
"INSERT INTO TABLE %s (id, data) VALUES (1, parse_json('{\"key\": 123}')), (2, parse_json('{\"key\": 456}'))",
SOURCE_NAME);

Object result =
scalarSql(
"CALL %s.system.snapshot('%s', '%s', properties => map('format-version','3'))",
catalogName, SOURCE_NAME, tableName);
assertThat(result).as("Should have added two files").isEqualTo(2L);

assertEquals(
"Should have expected rows",
ImmutableList.of(row(123, 1L), row(456, 2L)),
sql("SELECT variant_get(data, '$.key', 'int'), id FROM %s ORDER BY id", tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private static SparkPartition toSparkPartition(
serde.nonEmpty() || table.provider().nonEmpty(), "Partition format should be defined");

String uri = Util.uriToString(locationUri.get());
String format = serde.nonEmpty() ? serde.get() : table.provider().get();
String format = resolveFileFormat(table);

Map<String, String> partitionSpec =
JavaConverters.mapAsJavaMapConverter(partition.spec()).asJava();
Expand Down Expand Up @@ -683,11 +683,7 @@ private static void importUnpartitionedSparkTable(
ExecutorService service) {
try {
CatalogTable sourceTable = spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
Option<String> format =
sourceTable.storage().serde().nonEmpty()
? sourceTable.storage().serde()
: sourceTable.provider();
Preconditions.checkArgument(format.nonEmpty(), "Could not determine table format");
String format = resolveFileFormat(sourceTable);

Map<String, String> partition = Collections.emptyMap();
PartitionSpec spec = PartitionSpec.unpartitioned();
Expand All @@ -701,7 +697,7 @@ private static void importUnpartitionedSparkTable(
TableMigrationUtil.listPartition(
partition,
Util.uriToString(sourceTable.location()),
format.get(),
format,
spec,
conf,
metricsConfig,
Expand Down Expand Up @@ -1051,6 +1047,22 @@ public static boolean wapEnabled(Table table) {
Boolean.parseBoolean(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
}

private static String resolveFileFormat(CatalogTable table) {
Option<String> serde = table.storage().serde();
if (serde.nonEmpty()) {
String serdeStr = serde.get().toLowerCase(Locale.ROOT);
if (serdeStr.contains("parquet") || serdeStr.contains("avro") || serdeStr.contains("orc")) {
return serde.get();
}
}

Preconditions.checkArgument(
table.provider().nonEmpty(),
"Could not determine table format from serde %s and no provider set",
serde.getOrElse(() -> "unknown"));
return table.provider().get();
}

/** Class representing a table partition. */
public static class SparkPartition implements Serializable {
private final Map<String, String> values;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,4 +375,54 @@ public void testSnapshotPartitionedV1() throws IOException {
}
}
}

@TestTemplate
public void testSnapshotWithVariant() throws IOException {
assumeThat(catalogName)
.as("Variant type requires Hive 4 which is not yet supported")
.isNotEqualTo("testhive")
.isNotEqualTo("spark_catalog");
String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data variant) USING parquet LOCATION '%s'",
SOURCE_NAME, location);
sql("INSERT INTO TABLE %s VALUES (1, parse_json('{\"key\": 123}'))", SOURCE_NAME);

Object result =
scalarSql(
"CALL %s.system.snapshot('%s', '%s', properties => map('format-version','3'))",
catalogName, SOURCE_NAME, tableName);
assertThat(result).as("Should have added one file").isEqualTo(1L);

assertEquals(
"Should have expected rows",
ImmutableList.of(row(1L, 123)),
sql("SELECT id, variant_get(data, '$.key', 'int') FROM %s", tableName));
}

@TestTemplate
public void testSnapshotPartitionedWithVariant() throws IOException {
assumeThat(catalogName)
.as("Variant type requires Hive 4 which is not yet supported")
.isNotEqualTo("testhive")
.isNotEqualTo("spark_catalog");
String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data variant) USING parquet PARTITIONED BY (id) LOCATION '%s'",
SOURCE_NAME, location);
sql(
"INSERT INTO TABLE %s (id, data) VALUES (1, parse_json('{\"key\": 123}')), (2, parse_json('{\"key\": 456}'))",
SOURCE_NAME);

Object result =
scalarSql(
"CALL %s.system.snapshot('%s', '%s', properties => map('format-version','3'))",
catalogName, SOURCE_NAME, tableName);
assertThat(result).as("Should have added two files").isEqualTo(2L);

assertEquals(
"Should have expected rows",
ImmutableList.of(row(123, 1L), row(456, 2L)),
sql("SELECT variant_get(data, '$.key', 'int'), id FROM %s ORDER BY id", tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private static SparkPartition toSparkPartition(
serde.nonEmpty() || table.provider().nonEmpty(), "Partition format should be defined");

String uri = Util.uriToString(locationUri.get());
String format = serde.nonEmpty() ? serde.get() : table.provider().get();
String format = resolveFileFormat(table);

Map<String, String> partitionSpec =
JavaConverters.mapAsJavaMapConverter(partition.spec()).asJava();
Expand Down Expand Up @@ -682,11 +682,7 @@ private static void importUnpartitionedSparkTable(
ExecutorService service) {
try {
CatalogTable sourceTable = spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
Option<String> format =
sourceTable.storage().serde().nonEmpty()
? sourceTable.storage().serde()
: sourceTable.provider();
Preconditions.checkArgument(format.nonEmpty(), "Could not determine table format");
String format = resolveFileFormat(sourceTable);

Map<String, String> partition = Collections.emptyMap();
PartitionSpec spec = PartitionSpec.unpartitioned();
Expand All @@ -700,7 +696,7 @@ private static void importUnpartitionedSparkTable(
TableMigrationUtil.listPartition(
partition,
Util.uriToString(sourceTable.location()),
format.get(),
format,
spec,
conf,
metricsConfig,
Expand Down Expand Up @@ -1143,6 +1139,22 @@ private static boolean wapEnabled(Table table) {
Boolean.parseBoolean(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
}

private static String resolveFileFormat(CatalogTable table) {
Option<String> serde = table.storage().serde();
if (serde.nonEmpty()) {
String serdeStr = serde.get().toLowerCase(Locale.ROOT);
if (serdeStr.contains("parquet") || serdeStr.contains("avro") || serdeStr.contains("orc")) {
return serde.get();
}
}

Preconditions.checkArgument(
table.provider().nonEmpty(),
"Could not determine table format from serde %s and no provider set",
serde.getOrElse(() -> "unknown"));
return table.provider().get();
}

/** Class representing a table partition. */
public static class SparkPartition implements Serializable {
private final Map<String, String> values;
Expand Down
Loading