diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index 55a84f9b1eb..49b7d4fe0e4 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -212,7 +212,7 @@ jobs:
${{ runner.os }}-zeppelin-
- name: install environment
run: |
- ./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-117 ${MAVEN_ARGS}
+ ./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-119 ${MAVEN_ARGS}
./mvnw package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
- name: Setup conda environment with python 3.9 and R
uses: conda-incubator/setup-miniconda@v3
@@ -239,12 +239,7 @@ jobs:
fail-fast: false
matrix:
python: [ 3.9 ]
- flink: [116, 117]
- include:
- # Flink 1.15 supports Python 3.6, 3.7, and 3.8
- # https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/
- - python: 3.8
- flink: 115
+ flink: [119, 120]
steps:
- name: Checkout
uses: actions/checkout@v4
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index df272cbdb9b..7f2763a558f 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -27,7 +27,7 @@ limitations under the License.
[Apache Flink](https://flink.apache.org) is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
-In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Currently, only Flink 1.15+ is supported, old versions of flink won't work.**
+In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Currently, only Flink 1.19+ is supported, old versions of flink won't work.**
Apache Flink is supported in Zeppelin with the Flink interpreter group which consists of the five interpreters listed below.
@@ -138,16 +138,15 @@ docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-sql-cookbook-on-zep
## Prerequisites
-Download Flink 1.15 or afterwards (Only Scala 2.12 is supported)
+Download Flink 1.19 or afterwards (Only Scala 2.12 is supported)
### Version-specific notes for Flink
-Flink 1.15 is scala free and has changed its binary distribution, the following extra steps is required.
-* Move FLINK_HOME/opt/flink-table-planner_2.12-1.15.0.jar to FLINK_HOME/lib
-* Move FLINK_HOME/lib/flink-table-planner-loader-1.15.0.jar to FLINK_HOME/opt
-* Download flink-table-api-scala-bridge_2.12-1.15.0.jar and flink-table-api-scala_2.12-1.15.0.jar to FLINK_HOME/lib
-
-Flink 1.16 introduces new `ClientResourceManager` for sql client, you need to move `FLINK_HOME/opt/flink-sql-client-1.16.0.jar` to `FLINK_HOME/lib`
+Flink 1.19+ is scala free and has changed its binary distribution, the following extra steps is required.
+* Move FLINK_HOME/opt/flink-table-planner_2.12-1.19.3.jar to FLINK_HOME/lib
+* Move FLINK_HOME/lib/flink-table-planner-loader-1.19.3.jar to FLINK_HOME/opt
+* Download flink-table-api-scala-bridge_2.12-1.19.3.jar and flink-table-api-scala_2.12-1.19.3.jar to FLINK_HOME/lib
+* Move FLINK_HOME/opt/flink-sql-client-1.19.3.jar to FLINK_HOME/lib
## Flink on Zeppelin Architecture
diff --git a/docs/setup/deployment/flink_and_spark_cluster.md b/docs/setup/deployment/flink_and_spark_cluster.md
index 070b2af0f59..cc506a54628 100644
--- a/docs/setup/deployment/flink_and_spark_cluster.md
+++ b/docs/setup/deployment/flink_and_spark_cluster.md
@@ -85,14 +85,14 @@ cd zeppelin
Package Zeppelin.
```bash
-./mvnw clean package -DskipTests -Pspark-3.5 -Pflink-1.17
+./mvnw clean package -DskipTests -Pspark-3.5 -Pflink-119
```
`-DskipTests` skips build tests- you're not developing (yet), so you don't need to do tests, the clone version *should* build.
`-Pspark-3.5` tells maven to build a Zeppelin with Spark 3.5. This is important because Zeppelin has its own Spark interpreter and the versions must be the same.
-`-Pflink-1.17` tells maven to build a Zeppelin with Flink 1.17.
+`-Pflink-119` tells maven to build a Zeppelin with Flink 1.19.
**Note:** You can build against any version of Spark that has a Zeppelin build profile available. The key is to make sure you check out the matching version of Spark to build. At the time of this writing, Spark 3.5 was the most recent Spark version available.
@@ -215,16 +215,16 @@ Building from source is recommended where possible, for simplicity in this tuto
To download the Flink Binary use `wget`
```bash
-wget -O flink-1.17.1-bin-scala_2.12.tgz "https://www.apache.org/dyn/closer.lua/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz?action=download"
-tar -xzvf flink-1.17.1-bin-scala_2.12.tgz
+wget -O flink-1.19.3-bin-scala_2.12.tgz "https://www.apache.org/dyn/closer.lua/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz?action=download"
+tar -xzvf flink-1.19.3-bin-scala_2.12.tgz
```
-This will download Flink 1.17.1.
+This will download Flink 1.19.3.
Start the Flink Cluster.
```bash
-flink-1.17.1/bin/start-cluster.sh
+flink-1.19.3/bin/start-cluster.sh
```
###### Building From source
@@ -233,13 +233,13 @@ If you wish to build Flink from source, the following will be instructive. Note
See the [Flink Installation guide](https://github.com/apache/flink/blob/master/README.md) for more detailed instructions.
-Return to the directory where you have been downloading, this tutorial assumes that is `$HOME`. Clone Flink, check out release-1.17.1, and build.
+Return to the directory where you have been downloading, this tutorial assumes that is `$HOME`. Clone Flink, check out release-1.19.3, and build.
```bash
cd $HOME
git clone https://github.com/apache/flink.git
cd flink
-git checkout release-1.17.1
+git checkout release-1.19.3
mvn clean install -DskipTests
```
@@ -261,8 +261,8 @@ If no task managers are present, restart the Flink cluster with the following co
(if binaries)
```bash
-flink-1.17.1/bin/stop-cluster.sh
-flink-1.17.1/bin/start-cluster.sh
+flink-1.19.3/bin/stop-cluster.sh
+flink-1.19.3/bin/start-cluster.sh
```
diff --git a/flink/README.md b/flink/README.md
index 3b120bf3140..7cd4a273c00 100644
--- a/flink/README.md
+++ b/flink/README.md
@@ -10,9 +10,7 @@ Flink interpreter is more complex than other interpreter (such as jdbc, shell).
Currently, it has the following modules clustered into two groups:
* flink-shims
-* flink1.15-shims
-* flink1.16-shims
-* flink1.17-shims
+* flink1.19-shims (shared by Flink 1.19 and 1.20)
* flink-scala-2.12
diff --git a/flink/flink-scala-2.12/pom.xml b/flink/flink-scala-2.12/pom.xml
index 0a1d26e277c..42f41ac77af 100644
--- a/flink/flink-scala-2.12/pom.xml
+++ b/flink/flink-scala-2.12/pom.xml
@@ -33,7 +33,7 @@
- ${flink1.17.version}
+ ${flink1.19.version}2.12.72.12${flink.scala.version}
@@ -55,19 +55,7 @@
org.apache.zeppelin
- flink1.15-shims
- ${project.version}
-
-
-
- org.apache.zeppelin
- flink1.16-shims
- ${project.version}
-
-
-
- org.apache.zeppelin
- flink1.17-shims
+ flink1.19-shims${project.version}
@@ -1203,39 +1191,10 @@
- flink-115
-
- ${flink1.15.version}
- 2.12.7
- 2.12
-
-
-
- org.apache.flink
- flink-runtime
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-table-planner_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-python_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
-
-
- flink-116
+ flink-119
- ${flink1.16.version}
- 2.12.7
+ ${flink1.19.version}
+ 2.12.182.12
@@ -1267,10 +1226,10 @@
- flink-117
+ flink-120
- ${flink1.17.version}
- 2.12.7
+ ${flink1.20.version}
+ 2.12.182.12
diff --git a/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
index 087fa3a208e..c8372dcc70d 100644
--- a/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
+++ b/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.flink;
+import org.apache.zeppelin.flink.sql.AbstractStreamSqlJob;
import org.apache.zeppelin.flink.sql.AppendStreamSqlJob;
import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob;
import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob;
@@ -53,48 +54,45 @@ public void open() throws InterpreterException {
public void callInnerSelect(String sql) {
InterpreterContext context = InterpreterContext.get();
String streamType = context.getLocalProperties().getOrDefault("type", "update");
+ AbstractStreamSqlJob streamJob;
if (streamType.equalsIgnoreCase("single")) {
- SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
+ streamJob = new SingleRowStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getJavaStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
- try {
- streamJob.run(sql);
- } catch (IOException e) {
- throw new RuntimeException("Fail to run single type stream job", e);
- }
} else if (streamType.equalsIgnoreCase("append")) {
- AppendStreamSqlJob streamJob = new AppendStreamSqlJob(
+ streamJob = new AppendStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
- try {
- streamJob.run(sql);
- } catch (IOException e) {
- throw new RuntimeException("Fail to run append type stream job", e);
- }
} else if (streamType.equalsIgnoreCase("update")) {
- UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob(
+ streamJob = new UpdateStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
- try {
- streamJob.run(sql);
- } catch (IOException e) {
- throw new RuntimeException("Fail to run update type stream job", e);
- }
} else {
throw new RuntimeException("Unrecognized stream type: " + streamType);
}
+
+ FlinkZeppelinContext z =
+ (FlinkZeppelinContext) flinkInterpreter.getZeppelinContext();
+ z.setCurrentStreamJob(streamJob);
+ try {
+ streamJob.run(sql);
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to run " + streamType + " type stream job", e);
+ } finally {
+ z.clearCurrentStreamJob();
+ }
}
@Override
diff --git a/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index e5399865bfb..9f8c72749ed 100644
--- a/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -28,6 +28,7 @@
import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.types.Row;
@@ -65,6 +66,9 @@ public abstract class AbstractStreamSqlJob {
protected InterpreterContext context;
protected TableSchema schema;
protected SocketStreamIterator> iterator;
+ private volatile TableResult insertResult;
+ private volatile boolean cancelled = false;
+ private volatile boolean cancelledWithSavepoint = false;
protected Object resultLock = new Object();
protected volatile boolean enableToRefresh = true;
protected int defaultParallelism;
@@ -151,7 +155,38 @@ public String run(Table table, String tableName) throws IOException {
LOGGER.info("Run job: {}, parallelism: {}", tableName, parallelism);
String jobName = context.getStringLocalProperty("jobName", tableName);
- table.executeInsert(tableName).await();
+ this.insertResult = table.executeInsert(tableName);
+ // Register the job with JobManager so that cancel (with savepoint) works properly
+ if (insertResult.getJobClient().isPresent()) {
+ jobManager.addJob(context, insertResult.getJobClient().get());
+ }
+ // Use a CountDownLatch to wait for job completion while supporting cancellation
+ java.util.concurrent.CountDownLatch jobDone = new java.util.concurrent.CountDownLatch(1);
+ Thread jobThread = new Thread(() -> {
+ try {
+ insertResult.await();
+ } catch (Exception e) {
+ LOGGER.debug("Job await interrupted or failed", e);
+ } finally {
+ jobDone.countDown();
+ }
+ }, "flink-job-await");
+ jobThread.setDaemon(true);
+ jobThread.start();
+
+ // Wait for either job completion or cancellation
+ while (!cancelled && !jobDone.await(1, java.util.concurrent.TimeUnit.SECONDS)) {
+ // keep waiting
+ }
+ if (cancelled) {
+ // Wait briefly for the job to finish (e.g. stopped with savepoint)
+ jobDone.await(10, java.util.concurrent.TimeUnit.SECONDS);
+ if (cancelledWithSavepoint) {
+ LOGGER.info("Stream sql job stopped with savepoint, jobName: {}", jobName);
+ return buildResult();
+ }
+ throw new InterruptedException("Job was cancelled");
+ }
LOGGER.info("Flink Job is finished, jobName: {}", jobName);
// wait for retrieve thread consume all data
LOGGER.info("Waiting for retrieve thread to be done");
@@ -162,7 +197,7 @@ public String run(Table table, String tableName) throws IOException {
return finalResult;
} catch (Exception e) {
LOGGER.error("Fail to run stream sql job", e);
- throw new IOException("Fail to run stream sql job", e);
+ throw new IOException("Job was cancelled", e);
} finally {
refreshScheduler.shutdownNow();
}
@@ -238,6 +273,12 @@ public void cancel() {
}
}
+ public void cancel(boolean withSavepoint) {
+ LOGGER.info("Canceling stream sql job, withSavepoint={}", withSavepoint);
+ this.cancelledWithSavepoint = withSavepoint;
+ this.cancelled = true;
+ }
+
protected abstract void refresh(InterpreterContext context) throws Exception;
private class RefreshTask implements Runnable {
diff --git a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index a11003b8bc2..c9200a1a83b 100644
--- a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -423,6 +423,26 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
setAsContext()
}
+ private def bindWithRetry(name: String, tpe: String, value: AnyRef, modifiers: List[String]): Unit = {
+ // Workaround for Scala reflection issue with ImplicitExpressionConversions in Flink 1.19+.
+ // First bind attempt may fail due to unpickling error, but subsequent attempts succeed
+ // because the Scala reflection cache resolves the error state.
+ var success = false
+ for (attempt <- 1 to 2 if !success) {
+ try {
+ flinkILoop.bind(name, tpe, value, modifiers: List[String])
+ success = true
+ } catch {
+ case e: Throwable =>
+ if (attempt == 1) {
+ LOGGER.warn("Retrying bind for " + name + " due to Scala reflection issue: " + e.getMessage)
+ } else {
+ throw new InterpreterException(s"Failed to bind $name after retry", e)
+ }
+ }
+ }
+ }
+
private def createTableEnvs(): Unit = {
val originalClassLoader = Thread.currentThread().getContextClassLoader
try {
@@ -438,8 +458,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
.asInstanceOf[EnvironmentSettings.Builder]
.inBatchMode()
.build()
- this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkScalaShellLoader);
- flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient"))
+ this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkScalaShellLoader)
+ bindWithRetry("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient"))
this.java_btenv = this.btenv
val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
@@ -447,14 +467,11 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
.inStreamingMode()
.build()
this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader)
- flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
+ bindWithRetry("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader)
- if (!flinkVersion.isAfterFlink114()) {
- // flink planner is not supported after flink 1.14
- this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
- this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment()
- }
+ this.btenv_2 = null
+ this.java_btenv_2 = null
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
}
@@ -718,6 +735,11 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
def cancel(context: InterpreterContext): Unit = {
jobManager.cancelJob(context)
+ if (z != null) {
+ val savepointDir = context.getLocalProperties.get(JobManager.SAVEPOINT_DIR)
+ val withSavepoint = savepointDir != null && !savepointDir.isEmpty
+ z.asInstanceOf[FlinkZeppelinContext].cancelCurrentStreamJob(withSavepoint)
+ }
}
def getProgress(context: InterpreterContext): Int = {
diff --git a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index b5dba22307c..44c6f646a13 100644
--- a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -29,7 +29,7 @@ import org.apache.flink.util.StringUtils
import org.apache.zeppelin.annotation.ZeppelinApi
import org.apache.zeppelin.display.AngularObjectWatcher
import org.apache.zeppelin.display.ui.OptionInput.ParamOption
-import org.apache.zeppelin.flink.sql.{AppendStreamSqlJob, SingleRowStreamSqlJob, UpdateStreamSqlJob}
+import org.apache.zeppelin.flink.sql.{AbstractStreamSqlJob, AppendStreamSqlJob, SingleRowStreamSqlJob, UpdateStreamSqlJob}
import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterHookRegistry, ResultMessages, ZeppelinContext}
import org.apache.zeppelin.tabledata.TableDataUtils
@@ -129,29 +129,46 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
showTable(columnNames, rows)
}
+ @volatile private var currentStreamJob: AbstractStreamSqlJob = _
+
+ def cancelCurrentStreamJob(withSavepoint: Boolean): Unit = {
+ val job = currentStreamJob
+ if (job != null) job.cancel(withSavepoint)
+ }
+
+ def setCurrentStreamJob(job: AbstractStreamSqlJob): Unit = {
+ currentStreamJob = job
+ }
+
+ def clearCurrentStreamJob(): Unit = {
+ currentStreamJob = null
+ }
+
def show(table: Table, streamType: String, configs: Map[String, String] = Map.empty): Unit = {
val context = InterpreterContext.get()
configs.foreach(e => context.getLocalProperties.put(e._1, e._2))
val tableName = "UnnamedTable_" + context.getParagraphId.replace("-", "_") + "_" + SQL_INDEX.getAndIncrement()
- if (streamType.equalsIgnoreCase("single")) {
- val streamJob = new SingleRowStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
+ val streamJob: AbstractStreamSqlJob = if (streamType.equalsIgnoreCase("single")) {
+ new SingleRowStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
table.asInstanceOf[TableImpl].getTableEnvironment,
flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism, flinkInterpreter.getFlinkShims)
- streamJob.run(table, tableName)
- }
- else if (streamType.equalsIgnoreCase("append")) {
- val streamJob = new AppendStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
+ } else if (streamType.equalsIgnoreCase("append")) {
+ new AppendStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
table.asInstanceOf[TableImpl].getTableEnvironment,
flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism, flinkInterpreter.getFlinkShims)
- streamJob.run(table, tableName)
- }
- else if (streamType.equalsIgnoreCase("update")) {
- val streamJob = new UpdateStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
+ } else if (streamType.equalsIgnoreCase("update")) {
+ new UpdateStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
table.asInstanceOf[TableImpl].getTableEnvironment,
flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism, flinkInterpreter.getFlinkShims)
+ } else {
+ throw new IOException("Unrecognized stream type: " + streamType)
+ }
+ currentStreamJob = streamJob
+ try {
streamJob.run(table, tableName)
+ } finally {
+ currentStreamJob = null
}
- else throw new IOException("Unrecognized stream type: " + streamType)
}
/**
diff --git a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
index b50135b91ed..b2a3d2d23b9 100644
--- a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
+++ b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
@@ -145,8 +145,7 @@ class FlinkILoop(
"org.apache.flink.api.scala.utils._",
"org.apache.flink.streaming.api.scala._",
"org.apache.flink.streaming.api.windowing.time._",
- "org.apache.flink.table.api._",
- "org.apache.flink.table.api.bridge.scala._",
+ "org.apache.flink.table.api.{TableEnvironment, EnvironmentSettings, Table, TableResult, Schema, DataTypes, Expressions, FormatDescriptor, TableDescriptor}",
"org.apache.flink.types.Row"
)
diff --git a/flink/flink-scala-2.12/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/flink-scala-2.12/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index af2e7db1d30..876d9c01ef3 100644
--- a/flink/flink-scala-2.12/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/flink-scala-2.12/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -264,13 +264,11 @@ void testBatchWordCount() throws InterpreterException, IOException {
" .print()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), context.out.toString());
- String[] expectedCounts = {"(hello,3)", "(world,1)", "(flink,1)", "(hadoop,1)"};
- Arrays.sort(expectedCounts);
-
- String[] counts = context.out.toInterpreterResultMessage().get(0).getData().split("\n");
- Arrays.sort(counts);
-
- assertArrayEquals(expectedCounts, counts);
+ String output = context.out.toInterpreterResultMessage().get(0).getData();
+ assertTrue(output.contains("(hello,3)"), output);
+ assertTrue(output.contains("(world,1)"), output);
+ assertTrue(output.contains("(flink,1)"), output);
+ assertTrue(output.contains("(hadoop,1)"), output);
}
@Test
@@ -312,13 +310,10 @@ void testCancelStreamSql()
InterpreterResult result2 = interpreter.interpret(
"val table = stenv.sqlQuery(\"select url, count(1) as pv from " +
"log group by url\")\nz.show(table, streamType=\"update\")", context);
- LOGGER.info("---------------" + context.out.toString());
- LOGGER.info("---------------" + result2);
waiter.assertTrue(context.out.toString().contains("Job was cancelled"));
waiter.assertEquals(InterpreterResult.Code.ERROR, result2.code());
} catch (Exception e) {
- e.printStackTrace();
- waiter.fail("Should not fail here");
+ waiter.fail("Should not fail here: " + e.getClass().getName() + ": " + e.getMessage());
}
waiter.resume();
});
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index 11de5bd3b7e..515c10bd75b 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -54,21 +54,10 @@ private static FlinkShims loadShims(FlinkVersion flinkVersion,
Properties properties)
throws Exception {
Class> flinkShimsClass;
- if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 13) {
- LOGGER.info("Initializing shims for Flink 1.13");
- flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink113Shims");
- } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 14) {
- LOGGER.info("Initializing shims for Flink 1.14");
- flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink114Shims");
- } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 15) {
- LOGGER.info("Initializing shims for Flink 1.15");
- flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink115Shims");
- } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 16) {
- LOGGER.info("Initializing shims for Flink 1.16");
- flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink116Shims");
- } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 17) {
- LOGGER.info("Initializing shims for Flink 1.17");
- flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink117Shims");
+ if (flinkVersion.getMajorVersion() == 1
+ && (flinkVersion.getMinorVersion() == 19 || flinkVersion.getMinorVersion() == 20)) {
+ LOGGER.info("Initializing shims for Flink {}", flinkVersion);
+ flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink119Shims");
} else {
throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet");
}
diff --git a/flink/flink1.15-shims/pom.xml b/flink/flink1.15-shims/pom.xml
deleted file mode 100644
index b8aab01b3a1..00000000000
--- a/flink/flink1.15-shims/pom.xml
+++ /dev/null
@@ -1,198 +0,0 @@
-
-
-
-
-
- flink-parent
- org.apache.zeppelin
- 0.13.0-SNAPSHOT
- ../pom.xml
-
-
- 4.0.0
- flink1.15-shims
- jar
- Zeppelin: Flink1.15 Shims
-
-
- ${flink1.15.version}
- 2.12
-
-
-
-
-
- org.apache.zeppelin
- flink-shims
- ${project.version}
-
-
-
- org.apache.flink
- flink-core
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-clients
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-runtime
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-table-api-scala_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-table-api-scala-bridge_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-table-api-java-bridge
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-scala_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-streaming-java
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-streaming-scala_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-java
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-table-planner_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-python_${flink.scala.binary.version}
- ${flink.version}
- provided
-
-
-
-
-
-
-
-
- net.alchim31.maven
- scala-maven-plugin
-
-
- eclipse-add-source
-
- add-source
-
-
-
- scala-compile-first
- process-resources
-
- compile
-
-
-
- scala-test-compile-first
- process-test-resources
-
- testCompile
-
-
-
-
- ${flink.scala.version}
-
- -unchecked
- -deprecation
- -feature
- -nobootcp
-
-
- -Xms1024m
- -Xmx1024m
- -XX:MaxMetaspaceSize=${MaxMetaspace}
-
-
- -source
- ${java.version}
- -target
- ${java.version}
- -Xlint:all,-serial,-path,-options
-
-
-
-
-
- maven-resources-plugin
-
-
- copy-interpreter-setting
- none
-
- true
-
-
-
-
-
-
-
-
diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java
deleted file mode 100644
index 4ed8abf3afe..00000000000
--- a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.compress.utils.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.cli.CliFrontend;
-import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.*;
-import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.PlannerFactoryUtil;
-import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.TableAggregateFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims115.CollectStreamTableSink;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.URL;
-import java.time.ZoneId;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-
-/**
- * Shims for flink 1.15
- */
-public class Flink115Shims extends FlinkShims {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(Flink115Shims.class);
-
- private Flink115SqlInterpreter batchSqlInterpreter;
- private Flink115SqlInterpreter streamSqlInterpreter;
-
- public Flink115Shims(FlinkVersion flinkVersion, Properties properties) {
- super(flinkVersion, properties);
- }
-
- public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
- this.batchSqlInterpreter = new Flink115SqlInterpreter(flinkSqlContext, true);
- }
-
- public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
- this.streamSqlInterpreter = new Flink115SqlInterpreter(flinkSqlContext, false);
- }
-
- @Override
- public Object createResourceManager(List jars, Object tableConfig) {
- return null;
- }
-
- @Override
- public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List jars) {
- return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager);
- }
-
- @Override
- public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
- // do nothing
- }
-
- @Override
- public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj,
- Object senvObj,
- Object tableConfigObj,
- Object moduleManagerObj,
- Object functionCatalogObj,
- Object catalogManagerObj,
- List jars,
- ClassLoader classLoader) {
- EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj;
- StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
- TableConfig tableConfig = (TableConfig) tableConfigObj;
- ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
- FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
- CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
- ImmutablePair