Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions docs/docs/spark/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ Parquet columns are matched **by column name** (not by position). Extra columns

```sql
COPY INTO 'target_path'
FROM table_name
FROM { table_name | (SELECT ...) }
FILE_FORMAT = (TYPE = CSV [, option = value, ...])
[OVERWRITE = TRUE|FALSE]
```
Expand All @@ -445,11 +445,19 @@ FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER = ',')
OVERWRITE = TRUE;
```

**Write from query:**

```sql
COPY INTO '/export/active_users/'
FROM (SELECT id, name FROM my_db.users WHERE active = TRUE)
FILE_FORMAT = (TYPE = CSV, HEADER = TRUE);
```

#### Write JSON Files

```sql
COPY INTO 'target_path'
FROM table_name
FROM { table_name | (SELECT ...) }
FILE_FORMAT = (TYPE = JSON [, option = value, ...])
[OVERWRITE = TRUE|FALSE]
```
Expand All @@ -463,11 +471,19 @@ FILE_FORMAT = (TYPE = JSON)
OVERWRITE = TRUE;
```

**JSON export from query:**

```sql
COPY INTO '/export/recent_events/'
FROM (SELECT * FROM my_db.events WHERE event_date > '2024-01-01')
FILE_FORMAT = (TYPE = JSON);
```

#### Write Parquet Files

```sql
COPY INTO 'target_path'
FROM table_name
FROM { table_name | (SELECT ...) }
FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
[OVERWRITE = TRUE|FALSE]
```
Expand All @@ -490,6 +506,14 @@ FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP)
OVERWRITE = TRUE;
```

**Parquet export from aggregation query:**

```sql
COPY INTO '/export/summary/'
FROM (SELECT dept, COUNT(*) AS cnt FROM my_db.employees GROUP BY dept)
FILE_FORMAT = (TYPE = PARQUET);
```

#### FILE_FORMAT Options

`FILE_FORMAT` is required and must include `TYPE = CSV`, `TYPE = JSON`, or `TYPE = PARQUET`.
Expand Down Expand Up @@ -614,10 +638,11 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files have been successfull

- **CSV column-count mismatch**: Rows with fewer or more columns than the target schema are treated as malformed records. With `ON_ERROR = CONTINUE`, these rows are skipped and counted as errors.
- Only **CSV**, **JSON**, and **Parquet** formats are supported.
- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not supported.
- `SINGLE = TRUE` (single-file output) is not supported.
- File format options must be specified inline in `FILE_FORMAT = (...)`.
- File listing is **non-recursive**: only direct files under the source path are processed. Subdirectories are ignored.
- `PATTERN` matches the **base file name** only (not the full path).
- Concurrent COPY INTO commands targeting the same table may produce duplicate data.
- `SKIP_HEADER` only supports values `0` or `1`.
- `FROM (...)` accepts any read-only query (e.g. `SELECT`, `WITH ... SELECT`, `VALUES`); statements with side effects (e.g. `INSERT`, `INSERT OVERWRITE DIRECTORY`, DDL) are rejected.
- For a `FROM (...)` export, `rows_written` is an execution-time statistic counted by a separate pass before the files are written. Because the DataFrame is lazy and not cached, writing re-executes the query a second time; if the query is non-deterministic (e.g. uses `rand()`, `current_timestamp()`, or reads a volatile source), the two runs can produce different rows, so `rows_written` may not match the actual file contents. The result is intentionally not staged, so the export does not consume extra executor disk.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ statement
overwriteClause? #copyIntoLocation
| CREATE TABLE (IF NOT EXISTS)? target=multipartIdentifier
LIKE source=multipartIdentifier ( . )*? #createTableLike
| COPY INTO targetPath=STRING
FROM query=parenBlock
fileFormatClause
overwriteClause? #copyIntoLocationFromQuery
;

// A parenthesized block with balanced parentheses, used to capture an inline subquery verbatim,
// e.g. the (SELECT ...) in `COPY INTO <location> FROM (SELECT ...)`. A recursive rule is required
// (rather than '(' .*? ')') so that nested parentheses such as `WHERE x IN (1, 2)` are matched
// correctly. The raw subquery text is later extracted from the token stream by the AST builder.
parenBlock
: '(' ( parenBlock | ~('(' | ')') )* ')'
;

callArgument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.paimon.spark.SparkTypeUtils
import org.apache.paimon.spark.catalog.SupportView
import org.apache.paimon.view.View

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{PaimonUtils, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedRelation, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, UpCast}
import org.apache.spark.sql.catalyst.parser.ParseException
Expand Down Expand Up @@ -91,13 +91,7 @@ case class PaimonViewResolver(spark: SparkSession)
)
try {
CurrentOrigin.withOrigin(origin) {
try {
spark.sessionState.sqlParser.parseQuery(viewText)
} catch {
// For compatibility with Spark 3.2 and below
case _: NoSuchMethodError =>
spark.sessionState.sqlParser.parsePlan(viewText)
}
PaimonUtils.parseQueryCompat(spark.sessionState.sqlParser, viewText)
}
} catch {
case _: ParseException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,26 @@ import org.apache.paimon.spark.leafnode.PaimonLeafCommand
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}

/**
* The (still unresolved) source to export: either a named table or an inline read-only query. Using
* an ADT keeps the two mutually exclusive, so impossible states such as "table name and query both
* present" cannot be constructed. The table name is resolved to a catalog/identifier later, during
* planning.
*/
sealed trait CopyIntoLocationSource

object CopyIntoLocationSource {

/** Export the named table; `nameParts` is the multipart identifier, unresolved at this point. */
case class TableName(nameParts: Seq[String]) extends CopyIntoLocationSource

/** Export the result of the inline `FROM (<query>)` read-only query. */
case class Query(query: String) extends CopyIntoLocationSource
}

case class CopyIntoLocationCommand(
targetPath: String,
table: Seq[String],
source: CopyIntoLocationSource,
fileFormat: CopyFileFormat,
overwrite: Boolean)
extends PaimonLeafCommand {
Expand All @@ -37,6 +54,12 @@ case class CopyIntoLocationCommand(
)

override def simpleString(maxFields: Int): String = {
s"CopyIntoLocation: target=$targetPath, source=$table"
val sourceDesc = source match {
case CopyIntoLocationSource.Query(q) =>
val truncated = if (q.length > 100) q.take(100) + "..." else q
s"query=$truncated"
case CopyIntoLocationSource.TableName(nameParts) => s"table=$nameParts"
}
s"CopyIntoLocation: target=$targetPath, source=$sourceDesc"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,21 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.unsafe.types.UTF8String

/** The source to export from: either a Paimon table or an inline read-only query. */
sealed trait CopyIntoSource

object CopyIntoSource {

/** Export an existing Paimon table identified by `catalog`/`ident`. */
case class TableSource(catalog: TableCatalog, ident: Identifier) extends CopyIntoSource

/** Export the result of an inline `FROM (<query>)` read-only query. */
case class QuerySource(query: String) extends CopyIntoSource
}

case class CopyIntoLocationExec(
spark: SparkSession,
catalog: TableCatalog,
ident: Identifier,
source: CopyIntoSource,
targetPath: String,
fileFormat: CopyFileFormat,
overwrite: Boolean,
Expand All @@ -43,14 +54,21 @@ case class CopyIntoLocationExec(
override protected def run(): Seq[InternalRow] = {
fileFormat.validateForExport()

val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
val df = spark.table(tableName)

val rowCount = df.count()
val df = source match {
case CopyIntoSource.QuerySource(query) => CopyIntoUtils.queryToDataFrame(spark, query)
case CopyIntoSource.TableSource(catalog, ident) =>
spark.table(CopyIntoUtils.quoteIdentifier(catalog.name(), ident))
}

val writerOptions = fileFormat.toSparkWriterOptions
val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists

// `rows_written` is counted by a separate `count()` action before the write. The DataFrame is
// lazy and not cached, so the write re-executes the query a second time; for a non-deterministic
// query (e.g. `rand()`, `current_timestamp()`, or a volatile source) the two runs can yield
// different rows, so this count may not match the files (see the docs). We accept this rather
// than stage the whole result to disk just to make the count exact.
val rowCount = df.count()
fileFormat.formatType match {
case FileFormatType.JSON =>
df.write.options(writerOptions).mode(saveMode).json(targetPath)
Expand All @@ -63,8 +81,9 @@ case class CopyIntoLocationExec(
val hadoopConf = spark.sessionState.newHadoopConf()
val fsPath = new Path(targetPath)
val fs = fsPath.getFileSystem(hadoopConf)
// Count only data files (part-*); committer side files such as _SUCCESS are not data output.
val fileCount = if (fs.exists(fsPath)) {
fs.listStatus(fsPath).count(_.isFile)
fs.listStatus(fsPath).count(s => s.isFile && s.getPath.getName.startsWith("part-"))
} else {
0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,62 @@

package org.apache.paimon.spark.execution

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{Command, InsertIntoDir, LogicalPlan, ParsedStatement}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.paimon.shims.SparkShimLoader

object CopyIntoUtils {

/**
* Build a [[DataFrame]] from the inline query of `COPY INTO <location> FROM (<query>)`.
*
* The query is parsed with the session parser (the Paimon parser) via `parsePlan`, so it goes
* through the exact same path as `spark.sql(<query>)`, including Paimon's parser rules (e.g. the
* v1 function rewrite). Any read-only query is accepted (`SELECT`, `WITH ... SELECT`, `VALUES`,
* etc.); the only restriction is that it must have no side effects, enforced by the
* [[hasSideEffect]] guard on the resulting plan: the Paimon parser does not reject statements at
* parse time (its `parseQuery` just delegates to `parsePlan`), so DDL/DML reaches us as a plan
* and must be rejected by inspecting the tree.
*/
def queryToDataFrame(spark: SparkSession, query: String): DataFrame = {
val plan =
try {
spark.sessionState.sqlParser.parsePlan(query)
} catch {
case e: ParseException =>
throw new IllegalArgumentException(
s"COPY INTO <location> FROM (<query>) only supports read-only queries: $query",
e)
}
if (hasSideEffect(plan)) {
throw new IllegalArgumentException(
"COPY INTO <location> FROM (<query>) only supports read-only queries, " +
s"but got a statement with side effects: $query")
}
SparkShimLoader.shim.classicApi.createDataset(spark, plan)
}

/**
* Whether `plan` contains a node with side effects anywhere in its tree:
* - `Command` covers resolved commands such as `DROP TABLE`.
* - `ParsedStatement` covers parsed-but-unresolved DDL/DML such as `INSERT`, CTAS,
* `CREATE VIEW` (it is the parent of `InsertIntoStatement`); a pure SELECT never contains
* such a node.
* - `InsertIntoDir` (the `INSERT OVERWRITE DIRECTORY` plan) is neither of the above, so it is
* matched explicitly.
*
* `find` is used rather than `exists` because `TreeNode.exists` is absent on Spark 3.2.
*/
private def hasSideEffect(plan: LogicalPlan): Boolean = {
plan.find {
case _: Command | _: ParsedStatement | _: InsertIntoDir => true
case _ => false
}.isDefined
}

def quoteIdentifier(catalogName: String, ident: Identifier): String = {
val parts = Seq(catalogName) ++
ident.namespace().toSeq ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkTable, SparkUtils}
import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView}
import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
import org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand, CopyIntoTableCommand, CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand, TruncatePaimonTableWithFilter}
import org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand, CopyIntoLocationSource, CopyIntoTableCommand, CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand, TruncatePaimonTableWithFilter}
import org.apache.paimon.table.Table

import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -162,11 +162,23 @@ case class PaimonStrategy(spark: SparkSession)
c.onError,
c.output) :: Nil

case c @ CopyIntoLocationCommand(_, PaimonCatalogAndIdentifier(catalog, ident), _, _) =>
case c @ CopyIntoLocationCommand(_, CopyIntoLocationSource.Query(query), _, _) =>
CopyIntoLocationExec(
spark,
catalog,
ident,
CopyIntoSource.QuerySource(query),
c.targetPath,
c.fileFormat,
c.overwrite,
c.output) :: Nil

case c @ CopyIntoLocationCommand(
_,
CopyIntoLocationSource.TableName(PaimonCatalogAndIdentifier(catalog, ident)),
_,
_) =>
CopyIntoLocationExec(
spark,
CopyIntoSource.TableSource(catalog, ident),
c.targetPath,
c.fileFormat,
c.overwrite,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.rdd.InputFileBlockHolder
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.connector.expressions.filter.Predicate
Expand Down Expand Up @@ -63,6 +64,21 @@ object PaimonUtils {
SparkShimLoader.shim.classicApi.createDataset(sparkSession, logicalPlan)
}

/**
* Parse a read-only query, preferring [[ParserInterface.parseQuery]] which rejects non-query
* statements at parse time. `parseQuery` was added in Spark 3.3, so on Spark 3.2 (where it is
* absent) we fall back to [[ParserInterface.parsePlan]]. Callers are responsible for handling any
* [[org.apache.spark.sql.catalyst.parser.ParseException]] and for any further validation of the
* returned plan.
*/
def parseQueryCompat(parser: ParserInterface, sqlText: String): LogicalPlan = {
try {
parser.parseQuery(sqlText)
} catch {
case _: NoSuchMethodError => parser.parsePlan(sqlText)
}
}

def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression] = {
DataSourceStrategy.normalizeExprs(exprs, attributes)
}
Expand Down
Loading
Loading