Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .github/workflows/core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ jobs:
${{ runner.os }}-zeppelin-
- name: install environment
run: |
./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-117 ${MAVEN_ARGS}
./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-119 ${MAVEN_ARGS}
./mvnw package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
- name: Setup conda environment with python 3.9 and R
uses: conda-incubator/setup-miniconda@v3
Expand All @@ -239,12 +239,7 @@ jobs:
fail-fast: false
matrix:
python: [ 3.9 ]
flink: [116, 117]
include:
# Flink 1.15 supports Python 3.6, 3.7, and 3.8
# https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/
- python: 3.8
flink: 115
flink: [119, 120]
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
15 changes: 7 additions & 8 deletions docs/interpreter/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ limitations under the License.
[Apache Flink](https://flink.apache.org) is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Currently, only Flink 1.15+ is supported, old versions of flink won't work.**
In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Currently, only Flink 1.19+ is supported, old versions of flink won't work.**
Apache Flink is supported in Zeppelin with the Flink interpreter group which consists of the five interpreters listed below.

<table class="table-configuration">
Expand Down Expand Up @@ -138,16 +138,15 @@ docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-sql-cookbook-on-zep

## Prerequisites

Download Flink 1.15 or afterwards (Only Scala 2.12 is supported)
Download Flink 1.19 or afterwards (Only Scala 2.12 is supported)

### Version-specific notes for Flink

Flink 1.15 is scala free and has changed its binary distribution, the following extra steps is required.
* Move FLINK_HOME/opt/flink-table-planner_2.12-1.15.0.jar to FLINK_HOME/lib
* Move FLINK_HOME/lib/flink-table-planner-loader-1.15.0.jar to FLINK_HOME/opt
* Download flink-table-api-scala-bridge_2.12-1.15.0.jar and flink-table-api-scala_2.12-1.15.0.jar to FLINK_HOME/lib

Flink 1.16 introduces new `ClientResourceManager` for sql client, you need to move `FLINK_HOME/opt/flink-sql-client-1.16.0.jar` to `FLINK_HOME/lib`
Flink 1.19+ is scala free and has changed its binary distribution, the following extra steps is required.
* Move FLINK_HOME/opt/flink-table-planner_2.12-1.19.3.jar to FLINK_HOME/lib
* Move FLINK_HOME/lib/flink-table-planner-loader-1.19.3.jar to FLINK_HOME/opt
* Download flink-table-api-scala-bridge_2.12-1.19.3.jar and flink-table-api-scala_2.12-1.19.3.jar to FLINK_HOME/lib
* Move FLINK_HOME/opt/flink-sql-client-1.19.3.jar to FLINK_HOME/lib
Comment on lines +145 to +149
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These "version-specific notes" hard-code patch-level jar names (1.19.3) even though the page states Flink 1.19+ is supported. This will be incorrect for 1.20.x (and even other 1.19.x patch levels). Consider using a ${FLINK_VERSION} placeholder and/or wording that instructs users to substitute their installed Flink version.

Suggested change
Flink 1.19+ is scala free and has changed its binary distribution, the following extra steps is required.
* Move FLINK_HOME/opt/flink-table-planner_2.12-1.19.3.jar to FLINK_HOME/lib
* Move FLINK_HOME/lib/flink-table-planner-loader-1.19.3.jar to FLINK_HOME/opt
* Download flink-table-api-scala-bridge_2.12-1.19.3.jar and flink-table-api-scala_2.12-1.19.3.jar to FLINK_HOME/lib
* Move FLINK_HOME/opt/flink-sql-client-1.19.3.jar to FLINK_HOME/lib
Flink 1.19+ is scala free and has changed its binary distribution, the following extra steps is required. Replace `${FLINK_VERSION}` below with the version of Flink you installed.
* Move FLINK_HOME/opt/flink-table-planner_2.12-${FLINK_VERSION}.jar to FLINK_HOME/lib
* Move FLINK_HOME/lib/flink-table-planner-loader-${FLINK_VERSION}.jar to FLINK_HOME/opt
* Download flink-table-api-scala-bridge_2.12-${FLINK_VERSION}.jar and flink-table-api-scala_2.12-${FLINK_VERSION}.jar to FLINK_HOME/lib
* Move FLINK_HOME/opt/flink-sql-client-${FLINK_VERSION}.jar to FLINK_HOME/lib

Copilot uses AI. Check for mistakes.

## Flink on Zeppelin Architecture

Expand Down
20 changes: 10 additions & 10 deletions docs/setup/deployment/flink_and_spark_cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ cd zeppelin
Package Zeppelin.

```bash
./mvnw clean package -DskipTests -Pspark-3.5 -Pflink-1.17
./mvnw clean package -DskipTests -Pspark-3.5 -Pflink-119
```

`-DskipTests` skips build tests- you're not developing (yet), so you don't need to do tests, the clone version *should* build.

`-Pspark-3.5` tells maven to build a Zeppelin with Spark 3.5. This is important because Zeppelin has its own Spark interpreter and the versions must be the same.

`-Pflink-1.17` tells maven to build a Zeppelin with Flink 1.17.
`-Pflink-119` tells maven to build a Zeppelin with Flink 1.19.

**Note:** You can build against any version of Spark that has a Zeppelin build profile available. The key is to make sure you check out the matching version of Spark to build. At the time of this writing, Spark 3.5 was the most recent Spark version available.

Expand Down Expand Up @@ -215,16 +215,16 @@ Building from source is recommended where possible, for simplicity in this tuto
To download the Flink Binary use `wget`

```bash
wget -O flink-1.17.1-bin-scala_2.12.tgz "https://www.apache.org/dyn/closer.lua/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz?action=download"
tar -xzvf flink-1.17.1-bin-scala_2.12.tgz
wget -O flink-1.19.3-bin-scala_2.12.tgz "https://www.apache.org/dyn/closer.lua/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz?action=download"
tar -xzvf flink-1.19.3-bin-scala_2.12.tgz
```

This will download Flink 1.17.1.
This will download Flink 1.19.3.

Start the Flink Cluster.

```bash
flink-1.17.1/bin/start-cluster.sh
flink-1.19.3/bin/start-cluster.sh
```

###### Building From source
Expand All @@ -233,13 +233,13 @@ If you wish to build Flink from source, the following will be instructive. Note

See the [Flink Installation guide](https://github.com/apache/flink/blob/master/README.md) for more detailed instructions.

Return to the directory where you have been downloading, this tutorial assumes that is `$HOME`. Clone Flink, check out release-1.17.1, and build.
Return to the directory where you have been downloading, this tutorial assumes that is `$HOME`. Clone Flink, check out release-1.19.3, and build.

```bash
cd $HOME
git clone https://github.com/apache/flink.git
cd flink
git checkout release-1.17.1
git checkout release-1.19.3
mvn clean install -DskipTests
```

Expand All @@ -261,8 +261,8 @@ If no task managers are present, restart the Flink cluster with the following co
(if binaries)

```bash
flink-1.17.1/bin/stop-cluster.sh
flink-1.17.1/bin/start-cluster.sh
flink-1.19.3/bin/stop-cluster.sh
flink-1.19.3/bin/start-cluster.sh
```


Expand Down
4 changes: 1 addition & 3 deletions flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ Flink interpreter is more complex than other interpreter (such as jdbc, shell).
Currently, it has the following modules clustered into two groups:

* flink-shims
* flink1.15-shims
* flink1.16-shims
* flink1.17-shims
* flink1.19-shims (shared by Flink 1.19 and 1.20)

* flink-scala-2.12

Expand Down
57 changes: 8 additions & 49 deletions flink/flink-scala-2.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

<properties>
<!--library versions-->
<flink.version>${flink1.17.version}</flink.version>
<flink.version>${flink1.19.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
<flink.scala.compile.version>${flink.scala.version}</flink.scala.compile.version>
Expand All @@ -55,19 +55,7 @@

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink1.15-shims</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink1.16-shims</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink1.17-shims</artifactId>
<artifactId>flink1.19-shims</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down Expand Up @@ -1203,39 +1191,10 @@

<profiles>
<profile>
<id>flink-115</id>
<properties>
<flink.version>${flink1.15.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>

<profile>
<id>flink-116</id>
<id>flink-119</id>
<properties>
<flink.version>${flink1.16.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.version>${flink1.19.version}</flink.version>
<flink.scala.version>2.12.18</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
</properties>
<dependencies>
Expand Down Expand Up @@ -1267,10 +1226,10 @@
</profile>

<profile>
<id>flink-117</id>
<id>flink-120</id>
<properties>
<flink.version>${flink1.17.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.version>${flink1.20.version}</flink.version>
<flink.scala.version>2.12.18</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
</properties>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.zeppelin.flink;

import org.apache.zeppelin.flink.sql.AbstractStreamSqlJob;
import org.apache.zeppelin.flink.sql.AppendStreamSqlJob;
import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob;
import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob;
Expand Down Expand Up @@ -53,48 +54,45 @@ public void open() throws InterpreterException {
public void callInnerSelect(String sql) {
InterpreterContext context = InterpreterContext.get();
String streamType = context.getLocalProperties().getOrDefault("type", "update");
AbstractStreamSqlJob streamJob;
if (streamType.equalsIgnoreCase("single")) {
SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
streamJob = new SingleRowStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getJavaStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run single type stream job", e);
}
} else if (streamType.equalsIgnoreCase("append")) {
AppendStreamSqlJob streamJob = new AppendStreamSqlJob(
streamJob = new AppendStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run append type stream job", e);
}
} else if (streamType.equalsIgnoreCase("update")) {
UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob(
streamJob = new UpdateStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run update type stream job", e);
}
} else {
throw new RuntimeException("Unrecognized stream type: " + streamType);
}

FlinkZeppelinContext z =
(FlinkZeppelinContext) flinkInterpreter.getZeppelinContext();
z.setCurrentStreamJob(streamJob);
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run " + streamType + " type stream job", e);
} finally {
z.clearCurrentStreamJob();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -65,6 +66,9 @@ public abstract class AbstractStreamSqlJob {
protected InterpreterContext context;
protected TableSchema schema;
protected SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
private volatile TableResult insertResult;
private volatile boolean cancelled = false;
private volatile boolean cancelledWithSavepoint = false;
protected Object resultLock = new Object();
protected volatile boolean enableToRefresh = true;
protected int defaultParallelism;
Expand Down Expand Up @@ -151,7 +155,38 @@ public String run(Table table, String tableName) throws IOException {

LOGGER.info("Run job: {}, parallelism: {}", tableName, parallelism);
String jobName = context.getStringLocalProperty("jobName", tableName);
table.executeInsert(tableName).await();
this.insertResult = table.executeInsert(tableName);
// Register the job with JobManager so that cancel (with savepoint) works properly
if (insertResult.getJobClient().isPresent()) {
jobManager.addJob(context, insertResult.getJobClient().get());
}
Comment on lines +158 to +162
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jobManager.addJob(...) is called for the insert job, but this method stores a JobClient/poller keyed by paragraphId and expects removeJob(...) to be called to avoid stale entries. Since run(...) never removes the job on completion/cancellation, subsequent stream runs in the same paragraph can leave orphaned pollers and can skip starting a new poller due to the existing mapping. Add a jobManager.removeJob(context.getParagraphId()) in a finally block (guarded by JobClient presence) once the job is done.

Copilot uses AI. Check for mistakes.
// Use a CountDownLatch to wait for job completion while supporting cancellation
java.util.concurrent.CountDownLatch jobDone = new java.util.concurrent.CountDownLatch(1);
Thread jobThread = new Thread(() -> {
try {
insertResult.await();
} catch (Exception e) {
LOGGER.debug("Job await interrupted or failed", e);
} finally {
jobDone.countDown();
}
}, "flink-job-await");
jobThread.setDaemon(true);
jobThread.start();

// Wait for either job completion or cancellation
while (!cancelled && !jobDone.await(1, java.util.concurrent.TimeUnit.SECONDS)) {
// keep waiting
}
if (cancelled) {
// Wait briefly for the job to finish (e.g. stopped with savepoint)
jobDone.await(10, java.util.concurrent.TimeUnit.SECONDS);
if (cancelledWithSavepoint) {
LOGGER.info("Stream sql job stopped with savepoint, jobName: {}", jobName);
return buildResult();
}
throw new InterruptedException("Job was cancelled");
}
Comment on lines +181 to +189
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the cancellation path (if (cancelled) { ... }), run(...) exits (either returning or throwing) without stopping/joining retrievalThread. Since the thread loops on iterator.hasNext() it may keep running after the paragraph is cancelled/stopped, leading to leaked threads/sockets and continued background work. Consider cancelling the retrieval thread (and closing the SocketStreamIterator if possible) and joining it in the cancellation branch and/or in finally.

Copilot uses AI. Check for mistakes.
LOGGER.info("Flink Job is finished, jobName: {}", jobName);
// wait for retrieve thread consume all data
LOGGER.info("Waiting for retrieve thread to be done");
Expand All @@ -162,7 +197,7 @@ public String run(Table table, String tableName) throws IOException {
return finalResult;
} catch (Exception e) {
LOGGER.error("Fail to run stream sql job", e);
throw new IOException("Fail to run stream sql job", e);
throw new IOException("Job was cancelled", e);
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catch-all exception handler always rethrows new IOException("Job was cancelled", e) even when the failure is unrelated to cancellation (e.g., job submission failure, runtime exception in result retrieval). This will mask real errors and make debugging harder. Consider only using the "Job was cancelled" message when cancelled is true, and otherwise propagate a more accurate message (or rethrow the original IOException).

Suggested change
throw new IOException("Job was cancelled", e);
if (cancelled) {
throw new IOException("Job was cancelled", e);
} else if (e instanceof IOException) {
throw (IOException) e;
} else {
throw new IOException("Fail to run stream sql job", e);
}

Copilot uses AI. Check for mistakes.
} finally {
refreshScheduler.shutdownNow();
}
Expand Down Expand Up @@ -238,6 +273,12 @@ public void cancel() {
}
}

public void cancel(boolean withSavepoint) {
LOGGER.info("Canceling stream sql job, withSavepoint={}", withSavepoint);
this.cancelledWithSavepoint = withSavepoint;
this.cancelled = true;
}

protected abstract void refresh(InterpreterContext context) throws Exception;

private class RefreshTask implements Runnable {
Expand Down
Loading
Loading