Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/cdc-ingestion/kafka-cdc.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ To use this feature through `flink run`, run the following shell command.
This action will build a single combined sink for all tables. For each Kafka topic's table to be synchronized, if the
corresponding Paimon table does not exist, this action will automatically create the table, and its schema will be derived
from all specified Kafka topic's tables. If the Paimon table already exists and its schema is different from that parsed
from Kafka record, this action will try to preform schema evolution.
from Kafka record, this action will try to perform schema evolution.

Example

Expand Down
4 changes: 2 additions & 2 deletions docs/docs/cdc-ingestion/pulsar-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ Only tables with primary keys will be synchronized.
This action will build a single combined sink for all tables. For each Pulsar topic's table to be synchronized, if the
corresponding Paimon table does not exist, this action will automatically create the table, and its schema will be derived
from all specified Pulsar topic's tables. If the Paimon table already exists and its schema is different from that parsed
from Pulsar record, this action will try to preform schema evolution.
from Pulsar record, this action will try to perform schema evolution.

Example

Expand Down Expand Up @@ -312,7 +312,7 @@ There are some useful options to build Flink Pulsar Source, but they are not pro
<tr>
<td>pulsar.startCursor.fromMessageId</td>
<td>EARLIEST</td>
<td>Sting</td>
<td>String</td>
<td>Using a unique identifier of a single message to seek the start position. The common format is a triple
'&ltlong&gtledgerId,&ltlong&gtentryId,&ltint&gtpartitionIndex'. Specially, you can set it to
EARLIEST (-1, -1, -1) or LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/iceberg/rest-catalog.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ the query results:
*/
```

**Schema compatabilty and Partition evolution:**
**Schema compatibility and Partition evolution:**

There is a fundamental difference between Paimon and Iceberg regarding the starting fieldId. Paimon uses fieldId 0, while Iceberg uses fieldId 1. If we create an Iceberg table using a Paimon schema directly, it will shift all fieldIds by +1, causing field disorder. However, it is possible to update the schema after table creation and start the schema from fieldId 0.

Expand Down
2 changes: 1 addition & 1 deletion docs/docs/primary-key-table/pk-clustering-override.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,4 @@ values, so queries like `WHERE id = 12345` remain efficient.

- Merge engine: `partial-update` or `aggregation`.
- Changelog producer: `lookup` or `full-compaction`.
- Configue: `sequence.fields` or `record-level.expire-time`.
- Configure: `sequence.fields` or `record-level.expire-time`.
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ public InlineElement getDescription() {
.booleanType()
.defaultValue(true)
.withDescription(
"The legacy partition name is using `toString` fpr all types. If false, using "
"The legacy partition name is using `toString` for all types. If false, using "
+ "cast to string for all types.");

public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MIN =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public final class DataTypeCasts {

/**
* allowExplicit false : Returns whether the source type can be safely cast to the target type
* without loosing information. Implicit casts are used for type widening and type
* without losing information. Implicit casts are used for type widening and type
* generalization (finding a common supertype for a set of types). Implicit casts are similar to
* the Java semantics (e.g. this is not possible: {@code int x = (String) z}).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void cleanUnusedManifests(Changelog changelog, Set<String> skippingSet) {
}
}

// the index and statics manifest list should handle by snapshot deletion.
// the index and statistics manifest list should handle by snapshot deletion.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ protected void updateLastColumn(
&& CastExecutors.resolve(sourceRootType, targetRootType)
!= null,
String.format(
"Column type %s[%s] cannot be converted to %s without loosing information.",
"Column type %s[%s] cannot be converted to %s without losing information.",
field.name(), sourceRootType, targetRootType));
return new DataField(
field.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public boolean earliestFileNotExists() {
}

/**
* Returns a {@link Snapshot} whoes commit time is later than or equal to given timestamp mills.
* Returns a {@link Snapshot} whose commit time is later than or equal to given timestamp mills.
* If there is no such a snapshot, returns null.
*/
public @Nullable Snapshot laterOrEqualTimeMills(long timestampMills) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2136,7 +2136,7 @@ private void baseAlterTable(Map<String, String> initOptions) throws Exception {
.satisfies(
anyCauseMatches(
IllegalStateException.class,
"Column type col1[DOUBLE] cannot be converted to DATE without loosing information."));
"Column type col1[DOUBLE] cannot be converted to DATE without losing information."));

// Alter table update a column type throws ColumnNotExistException when column does not
// exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ public void testUpdateFieldType() throws Exception {
"f0", DataTypes.STRING()))))
.isInstanceOf(IllegalStateException.class)
.hasMessage(
"Column type f0[BIGINT] cannot be converted to STRING without loosing information.");
"Column type f0[BIGINT] cannot be converted to STRING without losing information.");
schemaManager.commitChanges(
Collections.singletonList(
SchemaChange.setOption("disable-explicit-type-casting", "false")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected void runSingleTableSchemaEvolutionWithSchemaIncludeRecord(
"+I[105, hammer, 14oz carpenter's hammer, 0.875, 24]");
waitForResult(expected, table, rowType, primaryKeys);

// column type covert exception (int64 -> string)
// column type convert exception (int64 -> string)
writeRecordsToKafka(
topic, "kafka/%s/table/schema/%s/%s-data-5.txt", format, sourceDir, format);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void testModifyColumnTypeBooleanAndNumeric() {
assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY (f BOOLEAN)"))
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage(
"Column type f[DOUBLE] cannot be converted to BOOLEAN without loosing information.");
"Column type f[DOUBLE] cannot be converted to BOOLEAN without losing information.");
}

@Test
Expand Down Expand Up @@ -1605,13 +1605,13 @@ public void testDisableExplicitTypeCasting(String formatType) {
assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 10), Row.of(2, 20));
assertThatCode(() -> sql("ALTER TABLE T MODIFY v SMALLINT"))
.hasStackTraceContaining(
"Column type v[INT] cannot be converted to SMALLINT without loosing information");
"Column type v[INT] cannot be converted to SMALLINT without losing information");
sql("ALTER TABLE T MODIFY v BIGINT");
assertThat(sql("SELECT * FROM T"))
.containsExactlyInAnyOrder(Row.of(1, 10L), Row.of(2, 20L));
assertThatCode(() -> sql("ALTER TABLE T MODIFY v INT"))
.hasStackTraceContaining(
"Column type v[BIGINT] cannot be converted to INT without loosing information");
"Column type v[BIGINT] cannot be converted to INT without losing information");
// disable explicit type casting
sql("ALTER TABLE T SET ('disable-explicit-type-casting' = 'false')");
sql("ALTER TABLE T MODIFY v INT");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testPrimaryKeyTable() throws Exception {
.toString()
.contains("iceberg/test_db"));

// specify a dedicated hive database and table for paimon iceberg commiter
// specify a dedicated hive database and table for paimon iceberg committer
tEnv.executeSql(
"CREATE TABLE my_paimon.test_db.t1 ( pt INT, id INT, data STRING, PRIMARY KEY (pt, id) NOT ENFORCED ) "
+ "PARTITIONED BY (pt) WITH "
Expand Down Expand Up @@ -216,7 +216,7 @@ public void testAppendOnlyTable() throws Exception {
tEnv.executeSql(
"SELECT data, id, pt FROM my_iceberg.test_db.t WHERE id > 1 ORDER BY pt, id")));

// specify a dedicated hive database and table for paimon iceberg commiter
// specify a dedicated hive database and table for paimon iceberg committer
tEnv.executeSql(
"CREATE TABLE my_paimon.test_db.t1 ( pt INT, id INT, data STRING ) PARTITIONED BY (pt) WITH "
+ "( 'metadata.iceberg.storage' = 'hive-catalog', 'metadata.iceberg.uri' = '', 'file.format' = 'avro', "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ case class PaimonSparkWriter(
postCommit(commitMessages)
}

/** Boostrap and repartition for cross partition mode. */
/** Bootstrap and repartition for cross partition mode. */
private def bootstrapAndRepartitionByKeyHash(
data: DataFrame,
parallelism: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession)
ifExists,
replace) =>
if (isPaimonBuildInFunction(funcIdent)) {
throw new UnsupportedOperationException(s"Can't create build-in function: $funcIdent")
throw new UnsupportedOperationException(s"Can't create built-in function: $funcIdent")
}
val v1Function = CatalogFunction(funcIdent, className, resources)
CreatePaimonV1FunctionCommand(v1FunctionCatalog, v1Function, ifExists, replace)
Expand All @@ -77,7 +77,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession)
CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, funcIdent, false),
ifExists) =>
if (isPaimonBuildInFunction(funcIdent)) {
throw new UnsupportedOperationException(s"Can't drop build-in function: $funcIdent")
throw new UnsupportedOperationException(s"Can't drop built-in function: $funcIdent")
}
// The function may be v1 function or not, anyway it can be safely deleted here.
DropPaimonV1FunctionCommand(v1FunctionCatalog, funcIdent, ifExists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ abstract class DataFrameWriteTestBase extends PaimonSparkTestBase {
.option("write.merge-schema", "true")
.save(location)

// Case 2: colum b and d are absent in the coming data
// Case 2: column b and d are absent in the coming data
val df3 = Seq((4, 45.6d), (5, 56.7d))
.toDF("a", "c")
df3.write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ abstract class PaimonV1FunctionTestBase extends PaimonSparkTestWithRestCatalogBa
}
}

test("Paimon V1 Function: select with build-in function") {
test("Paimon V1 Function: select with built-in function") {
withUserDefinedFunction("udf_add2" -> false) {
sql(s"""
|CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
Expand Down Expand Up @@ -144,18 +144,18 @@ abstract class PaimonV1FunctionTestBase extends PaimonSparkTestWithRestCatalogBa
}

test("Paimon V1 Function: unsupported operation") {
// create a build-in function
// create a built-in function
assert(intercept[Exception] {
sql(s"""
|CREATE FUNCTION sys.max_pt AS '$UDFExampleAdd2Class'
|USING JAR '$testUDFJarPath'
|""".stripMargin)
}.getMessage.contains("Can't create build-in function"))
}.getMessage.contains("Can't create built-in function"))

// drop a build-in function
// drop a built-in function
assert(intercept[Exception] {
sql("DROP FUNCTION sys.max_pt")
}.getMessage.contains("Can't drop build-in function"))
}.getMessage.contains("Can't drop built-in function"))
}

test("Paimon V1 Function: user defined aggregate function") {
Expand Down