diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md index 703976857a15..1cc6c0bc691f 100644 --- a/docs/docs/spark/sql-write.md +++ b/docs/docs/spark/sql-write.md @@ -431,7 +431,7 @@ Parquet columns are matched **by column name** (not by position). Extra columns ```sql COPY INTO 'target_path' -FROM table_name +FROM { table_name | (SELECT ...) } FILE_FORMAT = (TYPE = CSV [, option = value, ...]) [OVERWRITE = TRUE|FALSE] ``` @@ -445,11 +445,19 @@ FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER = ',') OVERWRITE = TRUE; ``` +**Write from query:** + +```sql +COPY INTO '/export/active_users/' +FROM (SELECT id, name FROM my_db.users WHERE active = TRUE) +FILE_FORMAT = (TYPE = CSV, HEADER = TRUE); +``` + #### Write JSON Files ```sql COPY INTO 'target_path' -FROM table_name +FROM { table_name | (SELECT ...) } FILE_FORMAT = (TYPE = JSON [, option = value, ...]) [OVERWRITE = TRUE|FALSE] ``` @@ -463,11 +471,19 @@ FILE_FORMAT = (TYPE = JSON) OVERWRITE = TRUE; ``` +**JSON export from query:** + +```sql +COPY INTO '/export/recent_events/' +FROM (SELECT * FROM my_db.events WHERE event_date > '2024-01-01') +FILE_FORMAT = (TYPE = JSON); +``` + #### Write Parquet Files ```sql COPY INTO 'target_path' -FROM table_name +FROM { table_name | (SELECT ...) } FILE_FORMAT = (TYPE = PARQUET [, option = value, ...]) [OVERWRITE = TRUE|FALSE] ``` @@ -490,6 +506,14 @@ FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP) OVERWRITE = TRUE; ``` +**Parquet export from aggregation query:** + +```sql +COPY INTO '/export/summary/' +FROM (SELECT dept, COUNT(*) AS cnt FROM my_db.employees GROUP BY dept) +FILE_FORMAT = (TYPE = PARQUET); +``` + #### FILE_FORMAT Options `FILE_FORMAT` is required and must include `TYPE = CSV`, `TYPE = JSON`, or `TYPE = PARQUET`. @@ -614,10 +638,11 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files have been successfull - **CSV column-count mismatch**: Rows with fewer or more columns than the target schema are treated as malformed records. With `ON_ERROR = CONTINUE`, these rows are skipped and counted as errors. - Only **CSV**, **JSON**, and **Parquet** formats are supported. -- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not supported. - `SINGLE = TRUE` (single-file output) is not supported. - File format options must be specified inline in `FILE_FORMAT = (...)`. - File listing is **non-recursive**: only direct files under the source path are processed. Subdirectories are ignored. - `PATTERN` matches the **base file name** only (not the full path). - Concurrent COPY INTO commands targeting the same table may produce duplicate data. - `SKIP_HEADER` only supports values `0` or `1`. +- `FROM (...)` accepts any read-only query (e.g. `SELECT`, `WITH ... SELECT`, `VALUES`); statements with side effects (e.g. `INSERT`, `INSERT OVERWRITE DIRECTORY`, DDL) are rejected. +- For a `FROM (...)` export, `rows_written` is an execution-time statistic counted by a separate pass before the files are written. Because the DataFrame is lazy and not cached, writing re-executes the query a second time; if the query is non-deterministic (e.g. uses `rand()`, `current_timestamp()`, or reads a volatile source), the two runs can produce different rows, so `rows_written` may not match the actual file contents. The result is intentionally not staged, so the export does not consume extra executor disk. diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index 620a2bb95abc..8c2e45b34ec2 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -86,6 +86,18 @@ statement overwriteClause? #copyIntoLocation | CREATE TABLE (IF NOT EXISTS)? target=multipartIdentifier LIKE source=multipartIdentifier ( . )*? #createTableLike + | COPY INTO targetPath=STRING + FROM query=parenBlock + fileFormatClause + overwriteClause? #copyIntoLocationFromQuery + ; + +// A parenthesized block with balanced parentheses, used to capture an inline subquery verbatim, +// e.g. the (SELECT ...) in `COPY INTO FROM (SELECT ...)`. A recursive rule is required +// (rather than '(' .*? ')') so that nested parentheses such as `WHERE x IN (1, 2)` are matched +// correctly. The raw subquery text is later extracted from the token stream by the AST builder. +parenBlock + : '(' ( parenBlock | ~('(' | ')') )* ')' ; callArgument diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala index 8e19c5ddbef2..d60e9ab8f3a4 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala @@ -23,7 +23,7 @@ import org.apache.paimon.spark.SparkTypeUtils import org.apache.paimon.spark.catalog.SupportView import org.apache.paimon.view.View -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{PaimonUtils, SparkSession} import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedRelation, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, UpCast} import org.apache.spark.sql.catalyst.parser.ParseException @@ -91,13 +91,7 @@ case class PaimonViewResolver(spark: SparkSession) ) try { CurrentOrigin.withOrigin(origin) { - try { - spark.sessionState.sqlParser.parseQuery(viewText) - } catch { - // For compatibility with Spark 3.2 and below - case _: NoSuchMethodError => - spark.sessionState.sqlParser.parsePlan(viewText) - } + PaimonUtils.parseQueryCompat(spark.sessionState.sqlParser, viewText) } } catch { case _: ParseException => diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala index 3e5d0a066651..dd04754c81ff 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala @@ -23,9 +23,26 @@ import org.apache.paimon.spark.leafnode.PaimonLeafCommand import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.types.{IntegerType, LongType, StringType} +/** + * The (still unresolved) source to export: either a named table or an inline read-only query. Using + * an ADT keeps the two mutually exclusive, so impossible states such as "table name and query both + * present" cannot be constructed. The table name is resolved to a catalog/identifier later, during + * planning. + */ +sealed trait CopyIntoLocationSource + +object CopyIntoLocationSource { + + /** Export the named table; `nameParts` is the multipart identifier, unresolved at this point. */ + case class TableName(nameParts: Seq[String]) extends CopyIntoLocationSource + + /** Export the result of the inline `FROM ()` read-only query. */ + case class Query(query: String) extends CopyIntoLocationSource +} + case class CopyIntoLocationCommand( targetPath: String, - table: Seq[String], + source: CopyIntoLocationSource, fileFormat: CopyFileFormat, overwrite: Boolean) extends PaimonLeafCommand { @@ -37,6 +54,12 @@ case class CopyIntoLocationCommand( ) override def simpleString(maxFields: Int): String = { - s"CopyIntoLocation: target=$targetPath, source=$table" + val sourceDesc = source match { + case CopyIntoLocationSource.Query(q) => + val truncated = if (q.length > 100) q.take(100) + "..." else q + s"query=$truncated" + case CopyIntoLocationSource.TableName(nameParts) => s"table=$nameParts" + } + s"CopyIntoLocation: target=$targetPath, source=$sourceDesc" } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala index 45471bab8458..76d147162a52 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala @@ -28,10 +28,21 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.unsafe.types.UTF8String +/** The source to export from: either a Paimon table or an inline read-only query. */ +sealed trait CopyIntoSource + +object CopyIntoSource { + + /** Export an existing Paimon table identified by `catalog`/`ident`. */ + case class TableSource(catalog: TableCatalog, ident: Identifier) extends CopyIntoSource + + /** Export the result of an inline `FROM ()` read-only query. */ + case class QuerySource(query: String) extends CopyIntoSource +} + case class CopyIntoLocationExec( spark: SparkSession, - catalog: TableCatalog, - ident: Identifier, + source: CopyIntoSource, targetPath: String, fileFormat: CopyFileFormat, overwrite: Boolean, @@ -43,14 +54,21 @@ case class CopyIntoLocationExec( override protected def run(): Seq[InternalRow] = { fileFormat.validateForExport() - val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident) - val df = spark.table(tableName) - - val rowCount = df.count() + val df = source match { + case CopyIntoSource.QuerySource(query) => CopyIntoUtils.queryToDataFrame(spark, query) + case CopyIntoSource.TableSource(catalog, ident) => + spark.table(CopyIntoUtils.quoteIdentifier(catalog.name(), ident)) + } val writerOptions = fileFormat.toSparkWriterOptions val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists + // `rows_written` is counted by a separate `count()` action before the write. The DataFrame is + // lazy and not cached, so the write re-executes the query a second time; for a non-deterministic + // query (e.g. `rand()`, `current_timestamp()`, or a volatile source) the two runs can yield + // different rows, so this count may not match the files (see the docs). We accept this rather + // than stage the whole result to disk just to make the count exact. + val rowCount = df.count() fileFormat.formatType match { case FileFormatType.JSON => df.write.options(writerOptions).mode(saveMode).json(targetPath) @@ -63,8 +81,9 @@ case class CopyIntoLocationExec( val hadoopConf = spark.sessionState.newHadoopConf() val fsPath = new Path(targetPath) val fs = fsPath.getFileSystem(hadoopConf) + // Count only data files (part-*); committer side files such as _SUCCESS are not data output. val fileCount = if (fs.exists(fsPath)) { - fs.listStatus(fsPath).count(_.isFile) + fs.listStatus(fsPath).count(s => s.isFile && s.getPath.getName.startsWith("part-")) } else { 0 } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala index fdf76b8dc6c5..32aa73078053 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala @@ -18,12 +18,62 @@ package org.apache.paimon.spark.execution -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.{Command, InsertIntoDir, LogicalPlan, ParsedStatement} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.functions.col +import org.apache.spark.sql.paimon.shims.SparkShimLoader object CopyIntoUtils { + /** + * Build a [[DataFrame]] from the inline query of `COPY INTO FROM ()`. + * + * The query is parsed with the session parser (the Paimon parser) via `parsePlan`, so it goes + * through the exact same path as `spark.sql()`, including Paimon's parser rules (e.g. the + * v1 function rewrite). Any read-only query is accepted (`SELECT`, `WITH ... SELECT`, `VALUES`, + * etc.); the only restriction is that it must have no side effects, enforced by the + * [[hasSideEffect]] guard on the resulting plan: the Paimon parser does not reject statements at + * parse time (its `parseQuery` just delegates to `parsePlan`), so DDL/DML reaches us as a plan + * and must be rejected by inspecting the tree. + */ + def queryToDataFrame(spark: SparkSession, query: String): DataFrame = { + val plan = + try { + spark.sessionState.sqlParser.parsePlan(query) + } catch { + case e: ParseException => + throw new IllegalArgumentException( + s"COPY INTO FROM () only supports read-only queries: $query", + e) + } + if (hasSideEffect(plan)) { + throw new IllegalArgumentException( + "COPY INTO FROM () only supports read-only queries, " + + s"but got a statement with side effects: $query") + } + SparkShimLoader.shim.classicApi.createDataset(spark, plan) + } + + /** + * Whether `plan` contains a node with side effects anywhere in its tree: + * - `Command` covers resolved commands such as `DROP TABLE`. + * - `ParsedStatement` covers parsed-but-unresolved DDL/DML such as `INSERT`, CTAS, + * `CREATE VIEW` (it is the parent of `InsertIntoStatement`); a pure SELECT never contains + * such a node. + * - `InsertIntoDir` (the `INSERT OVERWRITE DIRECTORY` plan) is neither of the above, so it is + * matched explicitly. + * + * `find` is used rather than `exists` because `TreeNode.exists` is absent on Spark 3.2. + */ + private def hasSideEffect(plan: LogicalPlan): Boolean = { + plan.find { + case _: Command | _: ParsedStatement | _: InsertIntoDir => true + case _ => false + }.isDefined + } + def quoteIdentifier(catalogName: String, ident: Identifier): String = { val parts = Seq(catalogName) ++ ident.namespace().toSeq ++ diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index 6870ce8832a4..321e61f2cb59 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -22,7 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkTable, SparkUtils} import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView} import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView -import org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand, CopyIntoTableCommand, CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand, TruncatePaimonTableWithFilter} +import org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand, CopyIntoLocationSource, CopyIntoTableCommand, CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand, TruncatePaimonTableWithFilter} import org.apache.paimon.table.Table import org.apache.spark.sql.SparkSession @@ -162,11 +162,23 @@ case class PaimonStrategy(spark: SparkSession) c.onError, c.output) :: Nil - case c @ CopyIntoLocationCommand(_, PaimonCatalogAndIdentifier(catalog, ident), _, _) => + case c @ CopyIntoLocationCommand(_, CopyIntoLocationSource.Query(query), _, _) => CopyIntoLocationExec( spark, - catalog, - ident, + CopyIntoSource.QuerySource(query), + c.targetPath, + c.fileFormat, + c.overwrite, + c.output) :: Nil + + case c @ CopyIntoLocationCommand( + _, + CopyIntoLocationSource.TableName(PaimonCatalogAndIdentifier(catalog, ident)), + _, + _) => + CopyIntoLocationExec( + spark, + CopyIntoSource.TableSource(catalog, ident), c.targetPath, c.fileFormat, c.overwrite, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala index 16d34ecf4c4a..18dbdfd498b0 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala @@ -23,6 +23,7 @@ import org.apache.spark.rdd.InputFileBlockHolder import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.expressions.FieldReference import org.apache.spark.sql.connector.expressions.filter.Predicate @@ -63,6 +64,21 @@ object PaimonUtils { SparkShimLoader.shim.classicApi.createDataset(sparkSession, logicalPlan) } + /** + * Parse a read-only query, preferring [[ParserInterface.parseQuery]] which rejects non-query + * statements at parse time. `parseQuery` was added in Spark 3.3, so on Spark 3.2 (where it is + * absent) we fall back to [[ParserInterface.parsePlan]]. Callers are responsible for handling any + * [[org.apache.spark.sql.catalyst.parser.ParseException]] and for any further validation of the + * returned plan. + */ + def parseQueryCompat(parser: ParserInterface, sqlText: String): LogicalPlan = { + try { + parser.parseQuery(sqlText) + } catch { + case _: NoSuchMethodError => parser.parsePlan(sqlText) + } + } + def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression] = { DataSourceStrategy.normalizeExprs(exprs, attributes) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index 0d54c698e6a7..f5986492b65e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -190,7 +190,46 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) val table = typedVisit[Seq[String]](ctx.multipartIdentifier) val fileFormat = buildFileFormat(ctx.fileFormatClause()) val overwrite = Option(ctx.overwriteClause()).exists(_.booleanValue().TRUE() != null) - logical.CopyIntoLocationCommand(targetPath, table, fileFormat, overwrite) + logical.CopyIntoLocationCommand( + targetPath, + logical.CopyIntoLocationSource.TableName(table), + fileFormat, + overwrite) + } + + /** Create a COPY INTO LOCATION FROM (query) (export) logical command. */ + override def visitCopyIntoLocationFromQuery( + ctx: CopyIntoLocationFromQueryContext): logical.CopyIntoLocationCommand = withOrigin(ctx) { + val targetPath = unquoteString(ctx.targetPath.getText) + val query = extractParenBlockInner(ctx.query) + val fileFormat = buildFileFormat(ctx.fileFormatClause()) + val overwrite = Option(ctx.overwriteClause()).exists(_.booleanValue().TRUE() != null) + logical.CopyIntoLocationCommand( + targetPath, + logical.CopyIntoLocationSource.Query(query), + fileFormat, + overwrite) + } + + /** + * Extract the raw subquery text inside a [[ParenBlockContext]], i.e. the `SELECT ...` between the + * outer parentheses of `FROM (SELECT ...)`. The text is taken verbatim from the original input + * stream (not unquoted) so that the inline query is later re-parsed exactly as the user wrote it. + */ + private def extractParenBlockInner(ctx: ParenBlockContext): String = { + val open = ctx.getStart.getStartIndex // '(' + val close = ctx.getStop.getStopIndex // ')' + val inner = + if (close - 1 < open + 1) { + "" + } else { + ctx.getStart.getInputStream.getText(Interval.of(open + 1, close - 1)).trim + } + if (inner.isEmpty) { + throw new IllegalArgumentException( + "COPY INTO FROM () requires a non-empty query") + } + inner } private def buildFileFormat(ctx: FileFormatClauseContext): CopyFileFormat = { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala index 23365e35e68d..bb127854d76d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala @@ -1422,4 +1422,239 @@ class CopyIntoTestBase extends PaimonSparkTestBase { spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_count") } + + // ==================== FROM (SELECT ...) export tests ==================== + + test("COPY INTO location: FROM (SELECT ...) with CSV") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query") + spark.sql(s"CREATE TABLE $dbName0.copy_export_query (id INT, name STRING, age INT)") + spark.sql( + s"INSERT INTO $dbName0.copy_export_query VALUES (1, 'Alice', 30), (2, 'Bob', 25), (3, 'Carol', 35)") + + withCsvDir { + dir => + val exportPath = new File(dir, "query_export").getAbsolutePath + val result = + spark.sql(s"""COPY INTO '$exportPath' + |FROM (SELECT id, name FROM $dbName0.copy_export_query WHERE age > 28) + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + val rows = result.collect() + assert(rows.length == 1) + assert(rows(0).getInt(1) >= 1, "file_count should count at least one data file") + assert(rows(0).getLong(2) == 2, "Should export 2 rows (Alice age 30 and Carol age 35)") + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query") + } + + test("COPY INTO location: FROM (SELECT ...) with JSON") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_json") + spark.sql(s"CREATE TABLE $dbName0.copy_export_query_json (id INT, name STRING)") + spark.sql(s"INSERT INTO $dbName0.copy_export_query_json VALUES (1, 'Alice'), (2, 'Bob')") + + withJsonDir { + dir => + val exportPath = new File(dir, "json_export").getAbsolutePath + val result = + spark.sql(s"""COPY INTO '$exportPath' + |FROM (SELECT * FROM $dbName0.copy_export_query_json WHERE id = 1) + |FILE_FORMAT = (TYPE = JSON) + |""".stripMargin) + + val rows = result.collect() + assert(rows.length == 1) + assert(rows(0).getLong(2) == 1, "Should export 1 row") + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_json") + } + + test("COPY INTO location: FROM (SELECT ...) with aggregation") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_agg") + spark.sql(s"CREATE TABLE $dbName0.copy_export_query_agg (dept STRING, salary INT)") + spark.sql( + s"INSERT INTO $dbName0.copy_export_query_agg VALUES ('A', 100), ('A', 200), ('B', 150)") + + withCsvDir { + dir => + val exportPath = new File(dir, "agg_export").getAbsolutePath + val result = spark.sql( + s"""COPY INTO '$exportPath' + |FROM (SELECT dept, SUM(salary) AS total FROM $dbName0.copy_export_query_agg GROUP BY dept) + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + val rows = result.collect() + assert(rows.length == 1) + assert(rows(0).getLong(2) == 2, "Should export 2 aggregated rows") + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_agg") + } + + test("COPY INTO location: FROM (SELECT ...) with nested parentheses") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_nested") + spark.sql(s"CREATE TABLE $dbName0.copy_export_nested (id INT, name STRING)") + spark.sql( + s"INSERT INTO $dbName0.copy_export_nested VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol')") + + withCsvDir { + dir => + val exportPath = new File(dir, "nested_export").getAbsolutePath + // The subquery contains nested parentheses (IN (...)), which only parse correctly with the + // balanced-paren grammar rule. + val result = + spark.sql(s"""COPY INTO '$exportPath' + |FROM (SELECT id, name FROM $dbName0.copy_export_nested WHERE id IN (1, 3)) + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + val rows = result.collect() + assert(rows.length == 1) + assert(rows(0).getLong(2) == 2, "Should export 2 rows (id 1 and 3)") + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_nested") + } + + test("COPY INTO location: FROM (SELECT ...) with Parquet") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_parquet") + spark.sql(s"CREATE TABLE $dbName0.copy_export_parquet (dept STRING, salary INT)") + spark.sql(s"INSERT INTO $dbName0.copy_export_parquet VALUES ('A', 100), ('A', 200), ('B', 150)") + + withParquetDir { + dir => + val exportPath = new File(dir, "parquet_export").getAbsolutePath + val result = spark.sql( + s"""COPY INTO '$exportPath' + |FROM (SELECT dept, COUNT(*) AS cnt FROM $dbName0.copy_export_parquet GROUP BY dept) + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + val rows = result.collect() + assert(rows.length == 1) + assert(rows(0).getLong(2) == 2, "Should export 2 aggregated rows") + + val exported = spark.read.parquet(exportPath).collect() + assert(exported.length == 2, "Parquet output should contain the 2 aggregated rows") + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_parquet") + } + + test("COPY INTO location: FROM (SELECT ...) with OVERWRITE = TRUE") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_ow") + spark.sql(s"CREATE TABLE $dbName0.copy_export_query_ow (id INT)") + spark.sql(s"INSERT INTO $dbName0.copy_export_query_ow VALUES (1), (2), (3)") + + withCsvDir { + dir => + val exportPath = new File(dir, "ow_export").getAbsolutePath + + // First export writes 3 rows. + spark.sql(s"""COPY INTO '$exportPath' + |FROM (SELECT id FROM $dbName0.copy_export_query_ow) + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + // OVERWRITE = TRUE replaces the previous output with a 1-row result. + val result = spark.sql(s"""COPY INTO '$exportPath' + |FROM (SELECT id FROM $dbName0.copy_export_query_ow WHERE id = 1) + |FILE_FORMAT = (TYPE = CSV) + |OVERWRITE = TRUE + |""".stripMargin) + + val rows = result.collect() + assert(rows.length == 1) + assert(rows(0).getLong(2) == 1, "Overwrite export should report 1 row") + + val exported = spark.read.csv(exportPath).collect() + assert(exported.length == 1, "Overwrite must replace the previous 3-row output") + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_ow") + } + + test("COPY INTO location: FROM () empty query is rejected") { + withCsvDir { + dir => + val exportPath = new File(dir, "empty_export").getAbsolutePath + val error = intercept[IllegalArgumentException] { + spark.sql(s"""COPY INTO '$exportPath' + |FROM () + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + } + assert(error.getMessage.contains("requires a non-empty query")) + } + } + + test("COPY INTO location: FROM (query) accepts read-only non-SELECT queries (VALUES)") { + withCsvDir { + dir => + val exportPath = new File(dir, "values_export").getAbsolutePath + // A read-only query that is not a plain SELECT (here a VALUES list) is a valid export + // source: the only restriction is "no side effects", not "must be a SELECT". + val result = + spark.sql(s"""COPY INTO '$exportPath' + |FROM (VALUES (1, 'a'), (2, 'b')) + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + val rows = result.collect() + assert(rows.length == 1) + assert(rows(0).getLong(2) == 2, "Should export 2 rows from the VALUES list") + } + } + + test("COPY INTO location: FROM (query) rejects statements with side effects") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_reject") + spark.sql(s"CREATE TABLE $dbName0.copy_export_reject (id INT)") + spark.sql(s"INSERT INTO $dbName0.copy_export_reject VALUES (1), (2)") + + withCsvDir { + dir => + val exportPath = new File(dir, "reject_export").getAbsolutePath + + // A DDL statement must be rejected and must NOT actually drop the table. + val dropError = intercept[IllegalArgumentException] { + spark.sql(s"""COPY INTO '$exportPath' + |FROM (DROP TABLE $dbName0.copy_export_reject) + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + } + assert(dropError.getMessage.contains("only supports read-only queries")) + + // An INSERT statement must be rejected as well. + val insertError = intercept[IllegalArgumentException] { + spark.sql(s"""COPY INTO '$exportPath' + |FROM (INSERT INTO $dbName0.copy_export_reject VALUES (3)) + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + } + assert(insertError.getMessage.contains("only supports read-only queries")) + + // INSERT OVERWRITE DIRECTORY must be rejected (on Spark 3.2 it parses to InsertIntoDir, + // which is neither a Command nor a ParsedStatement, so it is rejected explicitly) and must + // NOT write any files. + val outDir = new File(dir, "evil_dir").getAbsolutePath + val dirError = intercept[IllegalArgumentException] { + spark.sql( + s"""COPY INTO '$exportPath' + |FROM (INSERT OVERWRITE DIRECTORY '$outDir' USING csv SELECT * FROM $dbName0.copy_export_reject) + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + } + assert(dirError.getMessage.contains("only supports read-only queries")) + assert(!new File(outDir).exists(), "INSERT OVERWRITE DIRECTORY must not have written files") + + // The table and its original contents are untouched. + val rows = spark.sql(s"SELECT * FROM $dbName0.copy_export_reject").collect() + assert(rows.length == 2, "Source table must be unchanged by the rejected statements") + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_reject") + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala index 09aa98d93afa..e96a32e65679 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala @@ -24,6 +24,8 @@ import org.apache.paimon.spark.function.FunctionResources._ import org.apache.spark.SparkConf import org.apache.spark.sql.Row +import java.io.File + abstract class PaimonV1FunctionTestBase extends PaimonSparkTestWithRestCatalogBase { test("Paimon V1 Function: create or replace function") { @@ -266,6 +268,35 @@ abstract class PaimonV1FunctionTestBase extends PaimonSparkTestWithRestCatalogBa }.getMessage.contains("udf_add2 is a built-in/temporary function")) } } + + test("Paimon V1 Function: COPY INTO location FROM (SELECT udf(...))") { + withUserDefinedFunction("udf_add2" -> false) { + sql(s""" + |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class' + |USING JAR '$testUDFJarPath' + |""".stripMargin) + withTable("t") { + sql("CREATE TABLE t (a INT, b INT)") + sql("INSERT INTO t VALUES (1, 2), (3, 4)") + + withTempDir { + dir => + val exportPath = new File(dir, "udf_export").getAbsolutePath + // The inline query references a Paimon v1 function. This only resolves if the query is + // parsed through the session (Paimon) parser, which applies the v1 function rewrite. + val result = sql(s""" + |COPY INTO '$exportPath' + |FROM (SELECT udf_add2(a, b) AS c FROM t) + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + checkAnswer(result.selectExpr("rows_written"), Row(2L)) + checkAnswer( + spark.read.csv(exportPath).selectExpr("CAST(_c0 AS INT)"), + Seq(Row(3), Row(7))) + } + } + } + } } class DisablePaimonV1FunctionTest extends PaimonSparkTestWithRestCatalogBase {