From dfdbe8e3450adf99b270652c84e6ffbb4faf04a4 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 7 Jun 2026 20:08:08 -0700 Subject: [PATCH 1/7] feat(bench): add Arrow Flight E2E benchmark + Benchmarks CI workflow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an end-to-end micro-benchmark of the Arrow Flight data path that spawns a real PythonWorkflowWorker actor (real Pekko + real Python subprocess via texera_run_python_worker.py + real Arrow Flight gRPC), wires an identity Python UDF, and times per-batch send→echo round-trip across a 36-config sweep (batch_size × schema_width × string_len). CI integration is bench-agnostic: a single `Benchmarks` workflow calls `bin/run-benchmarks.sh` as an opaque entry point. Future benches (e.g. JMH for ArrowUtils micros) plug in by appending one line to that script and adding a Publish step block. Trigger gate mirrors amber-integration via PR labels (no file-path filters); failure does not block merges (workflow stays out of required-checks.yml's aggregator). Results upload to gh-pages dashboard via SHA-pinned benchmark-action (v1.22.1, on ASF allow-list); auto-push gated on push-to-main so PRs do not pollute the baseline. --- .github/workflows/benchmarks.yml | 239 ++++++++ .../bench/ArrowFlightActorBench.scala | 554 ++++++++++++++++++ bin/run-benchmarks.sh | 59 ++ 3 files changed, 852 insertions(+) create mode 100644 .github/workflows/benchmarks.yml create mode 100644 amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala create mode 100755 bin/run-benchmarks.sh diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml new file mode 100644 index 00000000000..fe4ba425e2f --- /dev/null +++ b/.github/workflows/benchmarks.yml @@ -0,0 +1,239 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Texera benchmarks — bench-agnostic umbrella workflow. +# +# This file is the single CI entry for ALL Texera performance benchmarks +# (currently Arrow Flight E2E; JMH and others land here as well). The +# workflow knows nothing about specific benches — bin/run-benchmarks.sh +# is the opaque entry point that owns which benches run and where their +# outputs land under bench-results/. Adding a new bench is: +# 1. Append the run command to bin/run-benchmarks.sh. +# 2. Add a `Publish ` step block below pointing at the +# bench's JSON output file with the right `tool:` setting. +# This workflow file otherwise stays unchanged. +# +# Triggering — mirrors amber-integration's label gate (NOT file paths): +# - PR: runs only when one of the labels mapped to the amber-integration +# stack in required-checks.yml's LABEL_STACKS is present on the PR. +# Labels are applied by the .github/labeler.yml workflow on opened / +# synchronize, so we wait for that workflow to complete before +# deciding (same pattern required-checks.yml uses). +# - push to main: always runs and publishes the baseline to gh-pages. +# - workflow_dispatch: manual smoke trigger. +# +# Non-blocking: this workflow is NOT included in required-checks.yml's +# `required-checks` aggregator, so its result doesn't gate merges even +# when it fails. Adding it to branch protection later is a deliberate +# .asf.yaml change. +# +# Permissions: +# contents: write — needed by benchmark-action's auto-push to gh-pages. +# For PR runs (which GitHub auto-downgrades to read-only on forks) +# auto-push is gated off via the `github.ref == 'refs/heads/main'` +# check, so the missing write is never exercised. + +name: Benchmarks + +on: + push: + branches: [main] + pull_request: + types: [opened, reopened, synchronize, labeled, unlabeled] + workflow_dispatch: + +permissions: + contents: write + +concurrency: + group: benchmarks-${{ github.ref }} + # On main: never cancel an in-flight baseline run; on PRs: supersede. + cancel-in-progress: ${{ github.ref != 'refs/heads/main' }} + +jobs: + precheck: + # Decide whether to run based on PR labels (push / dispatch always + # run). Lifted from required-checks.yml's precheck so the trigger + # surface matches amber-integration exactly. + name: Precheck + runs-on: ubuntu-latest + outputs: + run_bench: ${{ steps.decide.outputs.run_bench }} + steps: + - name: Wait for Pull Request Labeler + if: github.event_name == 'pull_request' + uses: actions/github-script@v8 + with: + script: | + const ref = context.payload.pull_request.head.sha; + const maxAttempts = 30; + for (let i = 0; i < maxAttempts; i++) { + const { data } = await github.rest.checks.listForRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref, + check_name: "labeler", + }); + const check = data.check_runs[0]; + if (check && check.status === "completed") { + core.info(`labeler ${check.conclusion}`); + return; + } + core.info(`labeler not ready (attempt ${i + 1}/${maxAttempts})`); + await new Promise((r) => setTimeout(r, 10000)); + } + core.warning("labeler did not complete within 5 minutes; proceeding with current labels."); + + - name: Decide whether to run bench + id: decide + uses: actions/github-script@v8 + with: + script: | + const eventName = context.eventName; + if (eventName !== "pull_request") { + // push to main / workflow_dispatch always run. + core.info(`event=${eventName} — running unconditionally`); + core.setOutput("run_bench", "true"); + return; + } + // Re-fetch labels: the labeler may have just added some. + const { data: pr } = await github.rest.pulls.get({ + owner: context.repo.owner, + repo: context.repo.repo, + pull_number: context.payload.pull_request.number, + }); + const labels = pr.labels.map((l) => l.name); + core.info(`PR labels: ${labels.join(", ") || "(none)"}`); + // Mirrors LABEL_STACKS in required-checks.yml: every label + // whose stack list contains "amber-integration" triggers this + // bench. Keep in sync if LABEL_STACKS there changes. + const TRIGGER_LABELS = new Set([ + "python", + "engine", + "amber-integration", + "common", + "ddl-change", + "ci", + ]); + const matched = labels.filter((l) => TRIGGER_LABELS.has(l)); + const shouldRun = matched.length > 0; + core.info( + shouldRun + ? `Triggering on labels: ${matched.join(", ")}` + : "No trigger label present; skipping bench." + ); + core.setOutput("run_bench", shouldRun ? "true" : "false"); + + bench: + needs: precheck + if: ${{ needs.precheck.outputs.run_bench == 'true' }} + runs-on: ubuntu-22.04 + env: + JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 + JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 + # PR runs are smaller (faster signal); main builds the baseline at + # the full sweep so the gh-pages chart is stable. + BENCH_NUM_BATCHES: ${{ github.event_name == 'push' && '200' || '50' }} + steps: + - name: Checkout + uses: actions/checkout@v5 + with: + fetch-depth: 0 + - name: Setup JDK + uses: actions/setup-java@v5 + with: + distribution: "temurin" + java-version: 17 + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.11" + - name: Install Python dependencies + # Mirrors amber-integration's installer in build.yml so the bench + # subprocess imports resolve identically (pytorch CPU index + + # betterproto plugin via dev-requirements). + run: | + python -m pip install uv + if [ -f amber/requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/requirements.txt; fi + if [ -f amber/operator-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; fi + if [ -f amber/dev-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi + - name: Install protoc + run: | + PROTOC_VERSION=$(cat bin/protoc-version.txt) + curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" + sudo unzip -o /tmp/protoc.zip -d /usr/local + sudo chmod +x /usr/local/bin/protoc + sudo chmod -R a+rX /usr/local/include/google + - name: Generate Python proto bindings + run: bash bin/python-proto-gen.sh + - name: Setup sbt launcher + uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22 + - uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # v8.1.0 + with: + extraSbtFiles: '["*.sbt", "project/**.{scala,sbt}", "project/build.properties" ]' + + - name: Run benchmarks + # Single opaque entry point — this workflow doesn't know which + # benches exist. Adding a JMH suite later = appending one line + # inside bin/run-benchmarks.sh and adding a publish step below. + run: bash bin/run-benchmarks.sh + + - name: Upload bench artifacts + if: ${{ !cancelled() }} + uses: actions/upload-artifact@v4 + with: + name: bench-results-${{ github.run_id }} + path: bench-results/ + retention-days: 14 + + # Publish to the gh-pages dashboard. auto-push + save-data-file are + # both gated on push-to-main: PR runs only emit the job summary and + # the uploaded artifact, never touching the tracked baseline. Adding + # a new benchmark = adding one publish block below matching the + # JSON filename convention in bin/run-benchmarks.sh. + - name: Publish throughput + if: ${{ !cancelled() }} + uses: benchmark-action/github-action-benchmark@52576c92bccf6ac60c8223ec7eb2565637cae9ba # v1.22.1 + with: + name: Arrow Flight E2E Throughput + tool: customBiggerIsBetter + output-file-path: bench-results/arrow-flight-e2e-throughput.json + github-token: ${{ secrets.GITHUB_TOKEN }} + auto-push: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} + save-data-file: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} + gh-pages-branch: gh-pages + benchmark-data-dir-path: dev/bench + alert-threshold: "150%" + # comment-on-alert needs pull-requests:write; skip and let + # results show up via summary-always instead. + comment-on-alert: false + summary-always: true + - name: Publish latency + if: ${{ !cancelled() }} + uses: benchmark-action/github-action-benchmark@52576c92bccf6ac60c8223ec7eb2565637cae9ba # v1.22.1 + with: + name: Arrow Flight E2E Latency + tool: customSmallerIsBetter + output-file-path: bench-results/arrow-flight-e2e-latency.json + github-token: ${{ secrets.GITHUB_TOKEN }} + auto-push: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} + save-data-file: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} + gh-pages-branch: gh-pages + benchmark-data-dir-path: dev/bench + alert-threshold: "150%" + comment-on-alert: false + summary-always: true diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala new file mode 100644 index 00000000000..3a189f299ed --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala @@ -0,0 +1,554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.pythonworker.bench + +import org.apache.pekko.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props} +import org.apache.pekko.testkit.TestProbe +import org.apache.texera.amber.clustering.SingleNodeListener +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity, + WorkflowIdentity +} +import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity} +import org.apache.texera.amber.engine.architecture.common.WorkflowActor.{NetworkAck, NetworkMessage} +import org.apache.texera.amber.engine.architecture.pythonworker.PythonWorkflowWorker +import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.ReturnInvocation +import org.apache.texera.amber.engine.architecture.scheduling.config.WorkerConfig +import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.OneToOnePartitioning +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.ambermessage.WorkflowMessage.getInMemSize +import org.apache.texera.amber.util.VirtualIdentityUtils + +import java.io.PrintWriter +import java.nio.file.{Files, Paths} +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * End-to-end micro-benchmark of the real Arrow Flight data path through a + * live PythonWorkflowWorker actor. + * + * Each measured config spawns a fresh PythonWorkflowWorker (real Pekko actor, + * real Python subprocess via texera_run_python_worker.py, real Arrow Flight + * transport), wires up an identity Python UDF, and times the round-trip of + * `numBatches` DataFrames send→echo through the actor mailbox. + * + * Output: + * - stdout summary per config + * - benchmark-results.csv (one row per config) — overwritten each run + * + * Run with: + * sbt "WorkflowExecutionService/Test/runMain \ + * org.apache.texera.amber.engine.architecture.pythonworker.bench.ArrowFlightActorBench" + * + * Caveat: `Utils.amberHomePath` does a `Files.walk(cwd, 2).findAny` for any + * dir ending in `amber`. If `.claude/amber/` exists locally, the Python + * subprocess may end up trying to launch from that path; move it aside for + * the run, or fix `amberHomePath` upstream. + */ +object ArrowFlightActorBench { + + // --------------------------------------------------------------------------- + // Identity Python UDF — passes input tuples straight through to output. + // --------------------------------------------------------------------------- + private val IdentityPythonCode: String = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + | yield tuple_ + |""".stripMargin + + private val WorkflowId = WorkflowIdentity(1L) + private val InputPortId = PortIdentity(id = 0, internal = false) + private val OutputPortId = PortIdentity(id = 0, internal = false) + + private val WarmupBatches = 20 + + // Full sweep grid (4 × 3 × 3 = 36 configs). Per-config `numBatches` is + // tunable via the BENCH_NUM_BATCHES env var so local smoke can use a + // small value (10-20) without recompile while CI uses 50-200 for stable + // percentiles. + private val DefaultBatchSizes: Seq[Int] = Seq(10, 100, 1000, 10000) + private val DefaultSchemaWidths: Seq[Int] = Seq(1, 10, 50) + private val DefaultStringLens: Seq[Int] = Seq(8, 64, 512) + private val DefaultNumBatches: Int = + sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(100) + + // All artifacts land under bench-results/ so CI can artifact-upload the + // whole directory uniformly without knowing individual filenames beyond + // what its publish matrix declares. + // Conventions for new benches: + // bench-results/-throughput.json → customBiggerIsBetter + // bench-results/-latency.json → customSmallerIsBetter + // bench-results/-jmh.json → tool=jmh + private val OutDir = Paths.get("bench-results") + private val CsvOutPath = OutDir.resolve("arrow-flight-e2e.csv") + // Two JSON files — github-action-benchmark needs distinct + // customBiggerIsBetter / customSmallerIsBetter inputs since each upload + // direction is per-`tool` parameter. + private val ThroughputJsonPath = OutDir.resolve("arrow-flight-e2e-throughput.json") + private val LatencyJsonPath = OutDir.resolve("arrow-flight-e2e-latency.json") + + // --------------------------------------------------------------------------- + // Sink actor: stands in for the downstream worker. Auto-acks every + // NetworkMessage from the worker (otherwise PekkoMessageTransferService + // throttles after the first unacked reply and the bench stalls), and + // forwards every received message to the bench probe for inspection. + // --------------------------------------------------------------------------- + private class AutoAckSink(forwardTo: ActorRef) extends Actor { + override def receive: Receive = { + case msg @ NetworkMessage(id, internal) => + sender() ! NetworkAck(id, getInMemSize(internal), 0L) + forwardTo ! msg + case other => + forwardTo ! other + } + } + + private case class BenchConfig( + configIdx: Int, + batchSize: Int, + schemaWidth: Int, + stringLen: Int, + numBatches: Int + ) + + private case class BenchResult( + cfg: BenchConfig, + totalWallNs: Long, + totalTuples: Long, + totalBytesApprox: Long, + latencyP50Ns: Long, + latencyP95Ns: Long, + latencyP99Ns: Long + ) { + def tuplesPerSec: Double = totalTuples * 1e9 / totalWallNs + def mbPerSec: Double = totalBytesApprox * 1e9 / totalWallNs / (1024.0 * 1024.0) + } + + def main(args: Array[String]): Unit = { + val system = ActorSystem("arrow-flight-bench", AmberRuntime.pekkoConfig) + system.actorOf(Props[SingleNodeListener](), "cluster-info") + + val configs: Seq[BenchConfig] = (for { + sw <- DefaultSchemaWidths + sl <- DefaultStringLens + bs <- DefaultBatchSizes + } yield (sw, sl, bs)).zipWithIndex.map { + case ((sw, sl, bs), idx) => + BenchConfig( + idx, + batchSize = bs, + schemaWidth = sw, + stringLen = sl, + numBatches = DefaultNumBatches + ) + } + + println(s"[bench] sweeping ${configs.size} configurations × ${DefaultNumBatches} batches each") + // Pre-create output dir + rewrite the result files after every completed + // config so a killed / timed-out sweep still leaves a usable artifact. + Files.createDirectories(OutDir) + val resultsBuf = scala.collection.mutable.ArrayBuffer.empty[BenchResult] + configs.foreach { cfg => + try { + val r = runConfig(system, cfg) + resultsBuf += r + writeCsv(resultsBuf.toSeq) + writeJsonForGitHubActionBenchmark(resultsBuf.toSeq) + } catch { + case t: Throwable => + println(s"[bench] FAILED config #${cfg.configIdx} ($cfg): $t") + } + } + printSummary(resultsBuf.toSeq) + Await.result(system.terminate(), 30.seconds) + } + + // --------------------------------------------------------------------------- + // One configuration: spawn fresh worker, run warmup + timed loop, tear down. + // --------------------------------------------------------------------------- + private def runConfig(system: ActorSystem, cfg: BenchConfig): BenchResult = { + val workerId = + VirtualIdentityUtils.createWorkerIdentity(WorkflowId, "bench", "main", cfg.configIdx) + val downstreamId = + VirtualIdentityUtils.createWorkerIdentity(WorkflowId, "benchsink", "main", cfg.configIdx) + + val ctlChannelIn = ChannelIdentity(downstreamId, workerId, isControl = true) + val dataChannelIn = ChannelIdentity(downstreamId, workerId, isControl = false) + val dataChannelOut = ChannelIdentity(workerId, downstreamId, isControl = false) + + val probe = TestProbe()(system) + val sink = system.actorOf( + Props(new AutoAckSink(probe.ref)), + name = s"bench-sink-${cfg.configIdx}" + ) + val worker = system.actorOf( + PythonWorkflowWorker.props(WorkerConfig(workerId)), + name = s"bench-worker-${cfg.configIdx}" + ) + + println(s"\n[bench] config #${cfg.configIdx}: $cfg") + + try { + val schema = makeSchema(cfg.schemaWidth) + val schemaMap = schema.getAttributes.map(a => a.getName -> a.getType.name()).toMap + + var ctlSeq: Long = 0L + var dataSeq: Long = 0L + var msgId: Long = 0L + + def sendCtl(payload: ControlInvocation): Unit = { + val fifo = WorkflowFIFOMessage(ctlChannelIn, ctlSeq, payload) + ctlSeq += 1 + worker.tell(NetworkMessage(msgId, fifo), sink) + msgId += 1 + } + def sendOnDataChannel( + payload: org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload + ): Unit = { + val fifo = WorkflowFIFOMessage(dataChannelIn, dataSeq, payload) + dataSeq += 1 + worker.tell(NetworkMessage(msgId, fifo), sink) + msgId += 1 + } + + // ----------------------------------------------------------------------- + // Setup control sequence + StartChannel ECM (see Pass 1 for details). + // ----------------------------------------------------------------------- + val ctx = AsyncRPCContext(sender = downstreamId, receiver = workerId) + sendCtl( + ControlInvocation( + "InitializeExecutor", + InitializeExecutorRequest( + 1, + OpExecWithCode(IdentityPythonCode, "python"), + isSource = false + ), + ctx, + 0L + ) + ) + sendCtl( + ControlInvocation( + "AssignPort", + AssignPortRequest(InputPortId, input = true, schemaMap, Seq.empty, Seq.empty), + ctx, + 1L + ) + ) + sendCtl( + ControlInvocation( + "AssignPort", + AssignPortRequest(OutputPortId, input = false, schemaMap, Seq.empty, Seq.empty), + ctx, + 2L + ) + ) + sendCtl( + ControlInvocation( + "AddInputChannel", + AddInputChannelRequest(dataChannelIn, InputPortId), + ctx, + 3L + ) + ) + val outLink = PhysicalLink( + fromOpId = VirtualIdentityUtils.getPhysicalOpId(workerId), + fromPortId = OutputPortId, + toOpId = VirtualIdentityUtils.getPhysicalOpId(downstreamId), + toPortId = InputPortId + ) + sendCtl( + ControlInvocation( + "AddPartitioning", + AddPartitioningRequest( + outLink, + // batch_size = cfg.batchSize keeps the Python-side partitioning + // buffer aligned with our send size — one Java DataFrame in maps + // to exactly one Python DataFrame out. + OneToOnePartitioning(batchSize = cfg.batchSize, channels = Seq(dataChannelOut)) + ), + ctx, + 4L + ) + ) + sendCtl(ControlInvocation("OpenExecutor", EmptyRequest(), ctx, 5L)) + sendCtl(ControlInvocation("StartWorker", EmptyRequest(), ctx, 6L)) + + waitForReturns(probe, 7, 60.seconds) + + // StartChannel ECM enables data flow on the input channel. + val startChannelEcm = EmbeddedControlMessage( + id = EmbeddedControlMessageIdentity("StartChannel"), + ecmType = EmbeddedControlMessageType.NO_ALIGNMENT, + scope = Seq.empty, + commandMapping = Map( + workerId.name -> ControlInvocation( + "StartChannel", + EmptyRequest(), + AsyncRPCContext(ActorVirtualIdentity(""), ActorVirtualIdentity("")), + -1L + ) + ) + ) + sendOnDataChannel(startChannelEcm) + // Drain the StartChannel-echo the worker forwards downstream so it + // doesn't show up in the data-loop's measurement window. + drainNonDataFor(probe, 2.seconds) + + // ----------------------------------------------------------------------- + // Build sample tuples once; reuse across all batches in this config. + // ----------------------------------------------------------------------- + val sampleBatch: Array[Tuple] = buildBatch(schema, cfg.batchSize, cfg.stringLen) + val approxBytesPerBatch: Long = + cfg.batchSize.toLong * cfg.schemaWidth.toLong * cfg.stringLen.toLong + + // Warmup — let JIT settle, Python import any lazy modules. + var warmedBatches = 0 + while (warmedBatches < WarmupBatches) { + sendOnDataChannel(DataFrame(sampleBatch)) + if (awaitOneDataFrameEcho(probe, 30.seconds)) warmedBatches += 1 + } + + // ----------------------------------------------------------------------- + // Timed loop — per-batch latency from send to corresponding echo. + // Because the Python pipeline is FIFO, sending batch i then awaiting + // exactly one DataFrame echo gives latency_i = receive_i - send_i. + // ----------------------------------------------------------------------- + val latencies = new Array[Long](cfg.numBatches) + val totalStart = System.nanoTime() + var i = 0 + while (i < cfg.numBatches) { + val t0 = System.nanoTime() + sendOnDataChannel(DataFrame(sampleBatch)) + if (!awaitOneDataFrameEcho(probe, 60.seconds)) { + throw new RuntimeException(s"timed out waiting for echo of batch $i") + } + latencies(i) = System.nanoTime() - t0 + i += 1 + } + val totalNs = System.nanoTime() - totalStart + + val totalTuples = cfg.numBatches.toLong * cfg.batchSize.toLong + val totalBytes = cfg.numBatches.toLong * approxBytesPerBatch + val result = BenchResult( + cfg, + totalWallNs = totalNs, + totalTuples = totalTuples, + totalBytesApprox = totalBytes, + latencyP50Ns = percentile(latencies, 0.50), + latencyP95Ns = percentile(latencies, 0.95), + latencyP99Ns = percentile(latencies, 0.99) + ) + + printOne(result) + result + } finally { + worker ! PoisonPill + sink ! PoisonPill + // Give the worker a moment to tear down its Python subprocess + flight + // server cleanly before we move to the next config. + Thread.sleep(500) + } + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + private def makeSchema(width: Int): Schema = { + val attrs = (0 until width).map(i => new Attribute(s"col$i", AttributeType.STRING)) + Schema(attrs.toList) + } + + private def buildBatch(schema: Schema, batchSize: Int, stringLen: Int): Array[Tuple] = { + val arr = new Array[Tuple](batchSize) + val sampleVal = "x" * stringLen + var i = 0 + val attrs = schema.getAttributes + while (i < batchSize) { + val b = Tuple.builder(schema) + var j = 0 + while (j < attrs.size) { + b.add(attrs(j), sampleVal) + j += 1 + } + arr(i) = b.build() + i += 1 + } + arr + } + + private def waitForReturns(probe: TestProbe, n: Int, timeout: FiniteDuration): Unit = { + val deadline = System.currentTimeMillis() + timeout.toMillis + var seen = 0 + while (seen < n && System.currentTimeMillis() < deadline) { + probe.receiveOne(2.seconds) match { + case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: ReturnInvocation)) => + seen += 1 + case _ => // ignore acks + other + } + } + if (seen < n) { + throw new RuntimeException(s"only $seen/$n control returns within $timeout") + } + } + + private def awaitOneDataFrameEcho(probe: TestProbe, timeout: FiniteDuration): Boolean = { + val deadline = System.currentTimeMillis() + timeout.toMillis + while (System.currentTimeMillis() < deadline) { + probe.receiveOne(timeout) match { + case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: DataFrame)) => return true + case null => return false + case _ => // ignore acks, ECM forwards + } + } + false + } + + private def drainNonDataFor(probe: TestProbe, dur: FiniteDuration): Unit = { + val end = System.currentTimeMillis() + dur.toMillis + while (System.currentTimeMillis() < end) { + probe.receiveOne(200.millis) match { + case null => return + case _ => // discard + } + } + } + + private def percentile(values: Array[Long], q: Double): Long = { + if (values.isEmpty) 0L + else { + val sorted = values.sorted + val idx = math.min(sorted.length - 1, math.max(0, (sorted.length * q).toInt)) + sorted(idx) + } + } + + private def printOne(r: BenchResult): Unit = { + val ms = r.totalWallNs / 1e6 + println( + f" -> total=${ms}%.0fms tuples/s=${r.tuplesPerSec}%,.0f MB/s=${r.mbPerSec}%.2f " + + f"p50=${r.latencyP50Ns / 1000.0}%.1fus p95=${r.latencyP95Ns / 1000.0}%.1fus " + + f"p99=${r.latencyP99Ns / 1000.0}%.1fus" + ) + } + + private def writeCsv(results: Seq[BenchResult]): Unit = { + val pw = new PrintWriter(Files.newBufferedWriter(CsvOutPath)) + try { + pw.println( + "config_idx,batch_size,schema_width,string_len,num_batches," + + "total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec," + + "lat_p50_us,lat_p95_us,lat_p99_us" + ) + results.foreach { r => + val c = r.cfg + pw.println( + List( + c.configIdx, + c.batchSize, + c.schemaWidth, + c.stringLen, + c.numBatches, + f"${r.totalWallNs / 1e6}%.2f", + r.totalTuples, + r.totalBytesApprox, + f"${r.tuplesPerSec}%.0f", + f"${r.mbPerSec}%.3f", + f"${r.latencyP50Ns / 1000.0}%.2f", + f"${r.latencyP95Ns / 1000.0}%.2f", + f"${r.latencyP99Ns / 1000.0}%.2f" + ).mkString(",") + ) + } + } finally pw.close() + println(s"\n[bench] wrote ${results.size} rows to ${CsvOutPath.toAbsolutePath}") + } + + /** + * Emit two JSON arrays per github-action-benchmark's customBiggerIsBetter + * (throughput) and customSmallerIsBetter (latency) input formats. Each + * config produces one throughput entry and three latency entries (p50/p95/ + * p99). Spec: https://github.com/benchmark-action/github-action-benchmark + */ + private def writeJsonForGitHubActionBenchmark(results: Seq[BenchResult]): Unit = { + def cfgLabel(c: BenchConfig): String = + s"bs=${c.batchSize} sw=${c.schemaWidth} sl=${c.stringLen}" + + def jsonEntry(name: String, unit: String, value: Double): String = + s""" { "name": ${quoteJson(name)}, "unit": ${quoteJson(unit)}, "value": $value }""" + + val throughputEntries = results.map { r => + jsonEntry(s"throughput / ${cfgLabel(r.cfg)}", "tuples/sec", r.tuplesPerSec) + } + val latencyEntries = results.flatMap { r => + Seq( + jsonEntry(s"latency p50 / ${cfgLabel(r.cfg)}", "us", r.latencyP50Ns / 1000.0), + jsonEntry(s"latency p95 / ${cfgLabel(r.cfg)}", "us", r.latencyP95Ns / 1000.0), + jsonEntry(s"latency p99 / ${cfgLabel(r.cfg)}", "us", r.latencyP99Ns / 1000.0) + ) + } + + writeJsonArray(ThroughputJsonPath, throughputEntries) + writeJsonArray(LatencyJsonPath, latencyEntries) + println( + s"[bench] wrote ${results.size} throughput entries to ${ThroughputJsonPath.toAbsolutePath}" + ) + println( + s"[bench] wrote ${latencyEntries.size} latency entries to ${LatencyJsonPath.toAbsolutePath}" + ) + } + + private def writeJsonArray(path: java.nio.file.Path, entries: Seq[String]): Unit = { + val pw = new PrintWriter(Files.newBufferedWriter(path)) + try { + pw.println("[") + pw.println(entries.mkString(",\n")) + pw.println("]") + } finally pw.close() + } + + private def quoteJson(s: String): String = + "\"" + s.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" + + private def printSummary(results: Seq[BenchResult]): Unit = { + println("\n[bench] === summary ===") + println( + f"${"#"}%3s ${"bs"}%5s ${"sw"}%3s ${"sl"}%4s ${"tuples/s"}%10s ${"MB/s"}%7s " + + f"${"p50us"}%8s ${"p99us"}%8s" + ) + results.foreach { r => + println( + f"${r.cfg.configIdx}%3d ${r.cfg.batchSize}%5d ${r.cfg.schemaWidth}%3d ${r.cfg.stringLen}%4d " + + f"${r.tuplesPerSec}%,10.0f ${r.mbPerSec}%7.2f " + + f"${r.latencyP50Ns / 1000.0}%8.1f ${r.latencyP99Ns / 1000.0}%8.1f" + ) + } + } +} diff --git a/bin/run-benchmarks.sh b/bin/run-benchmarks.sh new file mode 100755 index 00000000000..e93f8f5a1bf --- /dev/null +++ b/bin/run-benchmarks.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Single entry-point for all Texera benchmarks. CI calls this script +# verbatim — it does NOT reference individual benchmark main classes. +# Adding a new benchmark (e.g., a JMH suite) means appending one block +# to this script; no CI workflow change. +# +# Output convention: every benchmark writes to bench-results/ with a +# self-describing filename suffix that matches the github-action-benchmark +# `tool` parameter expected by the publish step in build.yml: +# bench-results/-throughput.json → tool: customBiggerIsBetter +# bench-results/-latency.json → tool: customSmallerIsBetter +# bench-results/-jmh.json → tool: jmh +# CSV / log / debug files may live alongside; the publish matrix only +# cares about the *.json files declared in build.yml. +# +# Env vars honored: +# BENCH_NUM_BATCHES — passes through to the e2e bench (default 100). +# Lower for fast PR runs; higher for stable nightlies. +# UDF_PYTHON_PATH — Python executable for the spawned worker subprocess. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +cd "$REPO_ROOT" + +mkdir -p bench-results + +echo "=== run-benchmarks: arrow-flight-e2e ===" +sbt --error \ + "WorkflowExecutionService/Test/runMain org.apache.texera.amber.engine.architecture.pythonworker.bench.ArrowFlightActorBench" + +# Future benchmarks: add new blocks below. Each block should self-contain +# the run command and ensure its outputs land in bench-results/. Example +# for a future JMH suite: +# echo "=== run-benchmarks: arrow-utils-jmh ===" +# sbt "WorkflowExecutionService/Jmh/run -rf json -rff $REPO_ROOT/bench-results/arrow-utils-jmh.json" + +echo +echo "=== bench artifacts ===" +ls -la bench-results/ From c923ea9bdbded317d67eeac63a816eb9d5ac3060 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 7 Jun 2026 20:13:12 -0700 Subject: [PATCH 2/7] fix(bench): add Postgres service for JOOQ codegen in CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The bench job's sbt compile transitively reaches `common/auth` which imports JOOQ-generated classes from `org.apache.texera.dao.jooq.generated.*`. JOOQ codegen at sbt compile time requires a live Postgres connection to introspect schema; without it the auth module's `User` / `UserRoleEnum` symbols fail to resolve and the whole compile aborts. The bench itself doesn't touch the DB at runtime — this is purely a build dependency. Mirrors the same `services.postgres` block and `Create Databases` step that amber-integration in build.yml uses (minus the iceberg / lakefs / lakekeeper SQL since the bench never reads from those schemas). Local builds didn't surface this because they had cached JOOQ classes from prior runs against a developer Postgres. --- .github/workflows/benchmarks.yml | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index fe4ba425e2f..4a85efeeb11 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -148,6 +148,25 @@ jobs: # PR runs are smaller (faster signal); main builds the baseline at # the full sweep so the gh-pages chart is stable. BENCH_NUM_BATCHES: ${{ github.event_name == 'push' && '200' || '50' }} + services: + # The bench itself doesn't touch the DB, but sbt's transitive compile + # chain reaches `common/auth` which imports JOOQ-generated classes + # from `org.apache.texera.dao.jooq.generated.*`. JOOQ codegen at + # sbt compile time requires a live Postgres to introspect against; + # without it the auth module's `User` / `UserRoleEnum` symbols fail + # to resolve and the whole bench compile aborts. Mirrors the same + # service block from amber-integration in build.yml. + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=10s + --health-timeout=5s + --health-retries=5 steps: - name: Checkout uses: actions/checkout@v5 @@ -178,6 +197,13 @@ jobs: sudo unzip -o /tmp/protoc.zip -d /usr/local sudo chmod +x /usr/local/bin/protoc sudo chmod -R a+rX /usr/local/include/google + - name: Create Database for JOOQ codegen + # Minimal subset of amber-integration's "Create Databases" step — + # JOOQ only introspects against texera_db, not iceberg/lakefs/ + # lakekeeper schemas which the bench never touches. + run: psql -h localhost -U postgres -f sql/texera_ddl.sql + env: + PGPASSWORD: postgres - name: Generate Python proto bindings run: bash bin/python-proto-gen.sh - name: Setup sbt launcher From eb5e99bee128fdad000e657c5b5243cfcd8e2384 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 7 Jun 2026 21:06:44 -0700 Subject: [PATCH 3/7] =?UTF-8?q?feat(bench):=20two-tier=20sweep=20=E2=80=94?= =?UTF-8?q?=20fast=20PR/post-merge=20+=20weekly=20full=20grid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR and post-merge runs now use a trimmed 3-config grid (`pr` mode: 3 batch sizes × single schema × single string len × 20 batches) targeting ~5 min in CI. The full 36-config grid (`full` mode: 4 × 3 × 3 × 200 batches) runs weekly on a scheduled trigger and on workflow_dispatch. Trigger → mode → publish mapping: pull_request → pr → no gh-pages push push to main → pr → publishes baseline schedule (Sundays 08:00 UTC) → full → publishes baseline workflow_dispatch → full → no publish Bench-side: `BENCH_MODE` env var selects between two GridSpec cases. `BENCH_NUM_BATCHES` still overrides numBatches if set (for local smoke). --- .github/workflows/benchmarks.yml | 40 +++++++++----- .../bench/ArrowFlightActorBench.scala | 52 +++++++++++++++---- 2 files changed, 69 insertions(+), 23 deletions(-) diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 4a85efeeb11..095169ea880 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -33,8 +33,16 @@ # Labels are applied by the .github/labeler.yml workflow on opened / # synchronize, so we wait for that workflow to complete before # deciding (same pattern required-checks.yml uses). -# - push to main: always runs and publishes the baseline to gh-pages. -# - workflow_dispatch: manual smoke trigger. +# - push to main: always runs (same trimmed grid as PR for quick post- +# merge signal) and publishes to gh-pages. +# - schedule (weekly): runs the full 36-config sweep and publishes to +# gh-pages — this is the authoritative long-term baseline. +# - workflow_dispatch: manual full-grid run (no publish; bring-your-own +# trigger for ad-hoc exploration). +# +# Two modes via BENCH_MODE env (read by the bench Scala main): +# pr — 3 configs × 20 batches, ~5 min (PR + push-to-main) +# full — 36 configs × 200 batches, ~50-60 min (schedule + dispatch) # # Non-blocking: this workflow is NOT included in required-checks.yml's # `required-checks` aggregator, so its result doesn't gate merges even @@ -43,9 +51,9 @@ # # Permissions: # contents: write — needed by benchmark-action's auto-push to gh-pages. -# For PR runs (which GitHub auto-downgrades to read-only on forks) -# auto-push is gated off via the `github.ref == 'refs/heads/main'` -# check, so the missing write is never exercised. +# PR runs (which GitHub auto-downgrades to read-only on forks) gate +# auto-push off via the event check, so the missing write is never +# exercised. name: Benchmarks @@ -54,6 +62,12 @@ on: branches: [main] pull_request: types: [opened, reopened, synchronize, labeled, unlabeled] + schedule: + # Weekly full-grid baseline refresh, Sundays 08:00 UTC. PR and post- + # merge runs use a trimmed 3-config grid to stay around 5 min; the + # scheduled run covers the full 36-config sweep that the gh-pages + # dashboard tracks long-term. + - cron: "0 8 * * 0" workflow_dispatch: permissions: @@ -145,9 +159,11 @@ jobs: env: JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 - # PR runs are smaller (faster signal); main builds the baseline at - # the full sweep so the gh-pages chart is stable. - BENCH_NUM_BATCHES: ${{ github.event_name == 'push' && '200' || '50' }} + # `pr` mode = 3-config trimmed sweep (~5 min) for PR + post-merge. + # `full` mode = 36-config sweep (~50-60 min) for schedule + manual. + # Read by the bench Scala main (see GridSpec switch); workflow only + # decides which mode to pass. + BENCH_MODE: ${{ (github.event_name == 'schedule' || github.event_name == 'workflow_dispatch') && 'full' || 'pr' }} services: # The bench itself doesn't touch the DB, but sbt's transitive compile # chain reaches `common/auth` which imports JOOQ-generated classes @@ -239,8 +255,8 @@ jobs: tool: customBiggerIsBetter output-file-path: bench-results/arrow-flight-e2e-throughput.json github-token: ${{ secrets.GITHUB_TOKEN }} - auto-push: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} - save-data-file: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} + auto-push: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} + save-data-file: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} gh-pages-branch: gh-pages benchmark-data-dir-path: dev/bench alert-threshold: "150%" @@ -256,8 +272,8 @@ jobs: tool: customSmallerIsBetter output-file-path: bench-results/arrow-flight-e2e-latency.json github-token: ${{ secrets.GITHUB_TOKEN }} - auto-push: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} - save-data-file: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} + auto-push: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} + save-data-file: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} gh-pages-branch: gh-pages benchmark-data-dir-path: dev/bench alert-threshold: "150%" diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala index 3a189f299ed..068f9ef384b 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala @@ -87,17 +87,47 @@ object ArrowFlightActorBench { private val InputPortId = PortIdentity(id = 0, internal = false) private val OutputPortId = PortIdentity(id = 0, internal = false) - private val WarmupBatches = 20 - - // Full sweep grid (4 × 3 × 3 = 36 configs). Per-config `numBatches` is - // tunable via the BENCH_NUM_BATCHES env var so local smoke can use a - // small value (10-20) without recompile while CI uses 50-200 for stable - // percentiles. - private val DefaultBatchSizes: Seq[Int] = Seq(10, 100, 1000, 10000) - private val DefaultSchemaWidths: Seq[Int] = Seq(1, 10, 50) - private val DefaultStringLens: Seq[Int] = Seq(8, 64, 512) - private val DefaultNumBatches: Int = - sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(100) + // Sweep grid + iteration counts switch on BENCH_MODE so PR / post-merge + // checks stay around 5 min while scheduled / manual runs do the full + // 36-config grid that the gh-pages dashboard tracks long-term. + // pr — 3 configs × 20 batches, warmup 5 (~4-5 min in CI) + // full — 36 configs × 200 batches, warmup 20 (~50-60 min in CI) + // BENCH_NUM_BATCHES, if set, overrides numBatches for the current mode + // (useful for local smoke). + private val BenchMode: String = sys.env.getOrElse("BENCH_MODE", "full").toLowerCase + + private case class GridSpec( + batchSizes: Seq[Int], + schemaWidths: Seq[Int], + stringLens: Seq[Int], + numBatches: Int, + warmupBatches: Int + ) + + private val grid: GridSpec = BenchMode match { + case "pr" => + GridSpec( + batchSizes = Seq(10, 100, 1000), + schemaWidths = Seq(10), + stringLens = Seq(64), + numBatches = sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(20), + warmupBatches = 5 + ) + case _ => + GridSpec( + batchSizes = Seq(10, 100, 1000, 10000), + schemaWidths = Seq(1, 10, 50), + stringLens = Seq(8, 64, 512), + numBatches = sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(200), + warmupBatches = 20 + ) + } + + private val DefaultBatchSizes: Seq[Int] = grid.batchSizes + private val DefaultSchemaWidths: Seq[Int] = grid.schemaWidths + private val DefaultStringLens: Seq[Int] = grid.stringLens + private val DefaultNumBatches: Int = grid.numBatches + private val WarmupBatches: Int = grid.warmupBatches // All artifacts land under bench-results/ so CI can artifact-upload the // whole directory uniformly without knowing individual filenames beyond From 1d2216cf381b9572873e8781e1d90cf43f2e8c4f Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 7 Jun 2026 21:14:53 -0700 Subject: [PATCH 4/7] fix(bench): skip gh-pages fetch + continue-on-error on publish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bench step itself ran fine on the prior CI; the Publish throughput / latency steps failed with `fatal: couldn't find remote ref gh-pages` because apache/texera has no gh-pages branch yet. The action attempts to fetch gh-pages even when auto-push is false (it normally wants the branch to compare against baseline). Add `skip-fetch-gh-pages: true` to bypass the fetch — auto-push on push-to-main / schedule still creates the branch on first write. Once the dashboard is seeded, flip this to false to re-enable baseline comparison + alert-threshold logic. Add `continue-on-error: true` on each publish step as a safety net: the bench data is already preserved in the uploaded artifact, so any gh-pages-side surprise (permissions glitch, transient git failure) shouldn't fail the bench job overall. --- .github/workflows/benchmarks.yml | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 095169ea880..8c0b495d4fe 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -243,12 +243,25 @@ jobs: retention-days: 14 # Publish to the gh-pages dashboard. auto-push + save-data-file are - # both gated on push-to-main: PR runs only emit the job summary and - # the uploaded artifact, never touching the tracked baseline. Adding - # a new benchmark = adding one publish block below matching the - # JSON filename convention in bin/run-benchmarks.sh. + # both gated on push-to-main / schedule: PR runs only emit the job + # summary and the uploaded artifact, never touching the tracked + # baseline. Adding a new benchmark = adding one publish block below + # matching the JSON filename convention in bin/run-benchmarks.sh. + # + # `skip-fetch-gh-pages: true` because gh-pages does not exist on + # apache/texera yet — without this the action fatals with + # `couldn't find remote ref gh-pages` even before the auto-push + # decision. auto-push: true on push/schedule still creates the + # branch on first write. Once the dashboard is seeded, flip this + # to false to re-enable baseline comparison + alert-threshold. + # + # `continue-on-error: true` keeps any other gh-pages-side surprise + # (permission glitches, transient git failures) from failing the + # bench job overall — the bench data itself is already in the + # uploaded artifact above. - name: Publish throughput if: ${{ !cancelled() }} + continue-on-error: true uses: benchmark-action/github-action-benchmark@52576c92bccf6ac60c8223ec7eb2565637cae9ba # v1.22.1 with: name: Arrow Flight E2E Throughput @@ -257,6 +270,7 @@ jobs: github-token: ${{ secrets.GITHUB_TOKEN }} auto-push: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} save-data-file: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} + skip-fetch-gh-pages: true gh-pages-branch: gh-pages benchmark-data-dir-path: dev/bench alert-threshold: "150%" @@ -266,6 +280,7 @@ jobs: summary-always: true - name: Publish latency if: ${{ !cancelled() }} + continue-on-error: true uses: benchmark-action/github-action-benchmark@52576c92bccf6ac60c8223ec7eb2565637cae9ba # v1.22.1 with: name: Arrow Flight E2E Latency @@ -274,6 +289,7 @@ jobs: github-token: ${{ secrets.GITHUB_TOKEN }} auto-push: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} save-data-file: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} + skip-fetch-gh-pages: true gh-pages-branch: gh-pages benchmark-data-dir-path: dev/bench alert-threshold: "150%" From 5976a30dd511086aec0722e9f314892149ba66f3 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 7 Jun 2026 21:33:28 -0700 Subject: [PATCH 5/7] feat(bench): post bench results as PR comment + tidy CI naming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three things: 1. Rename bench job's display name to `Bench` (was lowercase `bench`). 2. Bench job → Python 3.12 (was 3.11). Matches the local dev venv and the runtime Python texera_run_python_worker.py spawns; consistency removes a class of "works locally, drifts in CI" surprises. 3. PR-side comment with bench results. The Benchmarks workflow runs on `pull_request` events from forks, where GitHub forces GITHUB_TOKEN to read-only and refuses to inject any secret (AUTO_MERGE_TOKEN included — that restriction applies to ALL secrets, not just GITHUB_TOKEN). The fix is the ASF-approved `workflow_run` pattern: a separate workflow file that triggers when Benchmarks completes, runs in the base repo's trusted context, and has `pull-requests: write`. Bench-side: write the PR number to bench-results/pr-number.txt (workflow_run.pull_requests is empty for fork PRs, so we ferry the number via artifact); render a markdown summary table to the $GITHUB_STEP_SUMMARY for one-click visibility on the workflow page. Comment-side (benchmarks-pr-comment.yml): download the artifact, read + strict-validate (`^[0-9]+$`) the PR number, sanitize the CSV (cap at 32 KB, neutralize any triple-backtick sequence so a malicious fork can't escape the code fence and inject arbitrary markdown), then upsert a marker-tagged comment so subsequent runs update in place rather than spam. --- .github/workflows/benchmarks-pr-comment.yml | 177 ++++++++++++++++++++ .github/workflows/benchmarks.yml | 32 +++- 2 files changed, 208 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/benchmarks-pr-comment.yml diff --git a/.github/workflows/benchmarks-pr-comment.yml b/.github/workflows/benchmarks-pr-comment.yml new file mode 100644 index 00000000000..c23ccc788a5 --- /dev/null +++ b/.github/workflows/benchmarks-pr-comment.yml @@ -0,0 +1,177 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Posts (or upserts) a PR comment with bench results AFTER the Benchmarks +# workflow completes. +# +# Why a separate workflow_run-triggered file: +# - The Benchmarks workflow runs on `pull_request`, which for fork PRs +# gets a read-only GITHUB_TOKEN and zero secret access — GitHub's +# hard-coded security model. We can't comment from there. +# - `workflow_run` runs in the BASE repo's context (apache/texera) +# with normal token + secret access, so it CAN comment on fork PRs. +# - This is the ASF-approved pattern; `pull_request_target` is policy- +# forbidden for any action that handles tokens. +# +# Why workflow_run is safe here vs pull_request_target: +# - We only READ a small, opaque artifact (pr-number.txt + the bench +# JSON / CSV) produced by the upstream run; we don't execute any +# PR-author code in this workflow. +# - The PR number is validated against ^[0-9]+$ before being used in +# any API call, blocking ref injection. + +name: Benchmarks PR Comment + +on: + workflow_run: + workflows: ["Benchmarks"] + types: [completed] + +permissions: + # Need pull-requests: write to post / update the comment. + # contents: read is the default and enough to checkout for github-script + # which we don't actually do here (we only call REST APIs). + pull-requests: write + actions: read + +jobs: + comment: + # Only act when the upstream Benchmarks run was triggered by a PR. + # push-to-main / schedule / dispatch produce no PR to comment on. + if: ${{ github.event.workflow_run.event == 'pull_request' }} + runs-on: ubuntu-22.04 + steps: + - name: Download bench-results artifact + uses: actions/github-script@v8 + with: + script: | + const fs = require("fs"); + const path = require("path"); + const runId = context.payload.workflow_run.id; + const { data } = await github.rest.actions.listWorkflowRunArtifacts({ + owner: context.repo.owner, + repo: context.repo.repo, + run_id: runId, + }); + const match = data.artifacts.find((a) => a.name.startsWith("bench-results-")); + if (!match) { + core.warning(`no bench-results-* artifact on run ${runId}; nothing to comment.`); + return; + } + const zip = await github.rest.actions.downloadArtifact({ + owner: context.repo.owner, + repo: context.repo.repo, + artifact_id: match.id, + archive_format: "zip", + }); + fs.mkdirSync("bench-results-zip", { recursive: true }); + fs.writeFileSync(path.join("bench-results-zip", "artifact.zip"), Buffer.from(zip.data)); + core.info(`downloaded artifact ${match.name} (${match.size_in_bytes} bytes)`); + + - name: Unzip artifact + run: | + mkdir -p bench-results + unzip -o bench-results-zip/artifact.zip -d bench-results + ls -la bench-results/ + + - name: Read PR number from artifact + id: pr + # Read + strictly validate (digits only) before using in API calls. + # The artifact comes from a fork-triggered workflow whose contents + # are not entirely trusted; numeric-only PR numbers block any + # injection vector through this value. + run: | + if [ ! -f bench-results/pr-number.txt ]; then + echo "no pr-number.txt in artifact; bailing out" + exit 0 + fi + raw=$(tr -d '[:space:]' < bench-results/pr-number.txt) + if ! [[ "$raw" =~ ^[0-9]+$ ]]; then + echo "invalid pr-number.txt contents: '$raw'" + exit 1 + fi + echo "number=$raw" >> "$GITHUB_OUTPUT" + + - name: Upsert PR comment with bench summary + if: steps.pr.outputs.number != '' + uses: actions/github-script@v8 + env: + PR_NUMBER: ${{ steps.pr.outputs.number }} + with: + script: | + const fs = require("fs"); + const pr = Number(process.env.PR_NUMBER); + const marker = ""; + + // CSV comes from a fork-PR-controlled artifact — sanitize before + // embedding in markdown: + // 1. Cap total size so a giant junk file can't bloat a comment. + // 2. Strip any triple-backtick sequence so the content cannot + // escape the surrounding code fence and inject arbitrary + // markdown (phishing links, image-rendering tricks, etc). + // Replacement with a zero-width char preserves byte alignment + // visually while neutralizing fence termination. + const MAX_CSV_BYTES = 32 * 1024; + const csvPath = "bench-results/arrow-flight-e2e.csv"; + let csv = "_(no arrow-flight-e2e.csv in artifact)_"; + if (fs.existsSync(csvPath)) { + let raw = fs.readFileSync(csvPath, "utf8"); + if (raw.length > MAX_CSV_BYTES) { + raw = raw.slice(0, MAX_CSV_BYTES) + "\n[truncated]"; + } + csv = raw.replace(/```+/g, "`​``").trim(); + } + + // workflow_run.html_url is GitHub-emitted (URL to apache/texera + // run page); not attacker-influenceable. + const upstreamUrl = context.payload.workflow_run.html_url; + const body = [ + marker, + "## Arrow Flight E2E bench", + "", + "```csv", + csv, + "```", + "", + `[Full workflow run](${upstreamUrl})`, + ].join("\n"); + + // Find existing marker comment so subsequent runs upsert in place. + const { data: comments } = await github.rest.issues.listComments({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pr, + per_page: 100, + }); + const existing = comments.find((c) => c.body && c.body.includes(marker)); + if (existing) { + await github.rest.issues.updateComment({ + owner: context.repo.owner, + repo: context.repo.repo, + comment_id: existing.id, + body, + }); + core.info(`updated comment ${existing.id} on PR #${pr}`); + } else { + await github.rest.issues.createComment({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pr, + body, + }); + core.info(`created new comment on PR #${pr}`); + } diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 8c0b495d4fe..1277ec76493 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -153,6 +153,7 @@ jobs: core.setOutput("run_bench", shouldRun ? "true" : "false"); bench: + name: Bench needs: precheck if: ${{ needs.precheck.outputs.run_bench == 'true' }} runs-on: ubuntu-22.04 @@ -196,7 +197,7 @@ jobs: - name: Setup Python uses: actions/setup-python@v6 with: - python-version: "3.11" + python-version: "3.12" - name: Install Python dependencies # Mirrors amber-integration's installer in build.yml so the bench # subprocess imports resolve identically (pytorch CPU index + @@ -234,6 +235,35 @@ jobs: # inside bin/run-benchmarks.sh and adding a publish step below. run: bash bin/run-benchmarks.sh + - name: Stash PR number for downstream comment workflow + # PR fork workflows can't comment (GitHub forces read-only token); + # benchmarks-pr-comment.yml runs separately via workflow_run with + # proper write access, and needs the PR number to find the target. + # github.event.workflow_run.pull_requests is empty for fork PRs, + # so we ferry the number via artifact. + if: ${{ github.event_name == 'pull_request' && !cancelled() }} + env: + PR_NUMBER: ${{ github.event.pull_request.number }} + run: echo "$PR_NUMBER" > bench-results/pr-number.txt + + - name: Render bench summary + # Render the bench CSV into a markdown table on the workflow run + # page. Visible without further clicks — and doesn't need any + # extra permissions (writes to $GITHUB_STEP_SUMMARY only). + if: ${{ !cancelled() }} + run: | + { + echo "## Bench results (\`$BENCH_MODE\` mode)" + echo + if [ -f bench-results/arrow-flight-e2e.csv ]; then + echo '```csv' + cat bench-results/arrow-flight-e2e.csv + echo '```' + else + echo "_(no bench-results/arrow-flight-e2e.csv produced)_" + fi + } >> "$GITHUB_STEP_SUMMARY" + - name: Upload bench artifacts if: ${{ !cancelled() }} uses: actions/upload-artifact@v4 From 266e5e8226a37d986440d13fb190f83b782fdbcc Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 7 Jun 2026 21:59:47 -0700 Subject: [PATCH 6/7] refactor(bench): render bench results as markdown table, hide raw CSV The previous comment dumped raw CSV inside a `csv` code block, which forced reviewers to mentally column-align 13 fields per row. Render the actionable subset as a right-aligned markdown table (batch / schema_w / str_len / n_batches / tuples-s / MB-s / p50 ms / p99 ms / total ms), convert lat_*_us to milliseconds, drop redundant fields (config_idx, total_tuples, total_bytes, lat_p95_us), and tuck the raw sanitized CSV into a collapsed `
` for verifiability. Per-cell sanitizer escapes pipes and strips newlines to defeat table injection from the still-untrusted fork-PR-controlled CSV. Falls back to the raw-CSV view if header parsing fails. --- .github/workflows/benchmarks-pr-comment.yml | 100 +++++++++++++++++--- 1 file changed, 89 insertions(+), 11 deletions(-) diff --git a/.github/workflows/benchmarks-pr-comment.yml b/.github/workflows/benchmarks-pr-comment.yml index c23ccc788a5..bf280bbb4ea 100644 --- a/.github/workflows/benchmarks-pr-comment.yml +++ b/.github/workflows/benchmarks-pr-comment.yml @@ -127,7 +127,7 @@ jobs: // visually while neutralizing fence termination. const MAX_CSV_BYTES = 32 * 1024; const csvPath = "bench-results/arrow-flight-e2e.csv"; - let csv = "_(no arrow-flight-e2e.csv in artifact)_"; + let csv = null; if (fs.existsSync(csvPath)) { let raw = fs.readFileSync(csvPath, "utf8"); if (raw.length > MAX_CSV_BYTES) { @@ -136,19 +136,97 @@ jobs: csv = raw.replace(/```+/g, "`​``").trim(); } + // Per-cell sanitizer for the markdown table: strip newlines, escape + // pipes (would break columns), and cap length. + const escapeCell = (s) => + s == null + ? "" + : String(s).replace(/[\r\n]+/g, " ").replace(/\|/g, "\\|").slice(0, 64); + + // Render selected columns as a markdown table. Drops noise columns + // (config_idx, total_tuples, total_bytes, lat_p95_us) and converts + // microseconds to milliseconds for latency readability. Returns + // null on any parsing failure → fallback renders raw CSV instead. + const csvToTable = (text) => { + try { + const rows = text + .trim() + .split(/\r?\n/) + .map((line) => line.split(",")); + if (rows.length < 2) return null; + const header = rows[0].map((h) => h.trim()); + const idx = (col) => header.indexOf(col); + const cols = [ + { col: "batch_size", label: "batch", fmt: (v) => v }, + { col: "schema_width", label: "schema_w", fmt: (v) => v }, + { col: "string_len", label: "str_len", fmt: (v) => v }, + { col: "num_batches", label: "n_batches", fmt: (v) => v }, + { col: "tuples_per_sec", label: "tuples/s", fmt: (v) => v }, + { col: "mb_per_sec", label: "MB/s", fmt: (v) => v }, + { + col: "lat_p50_us", + label: "p50 ms", + fmt: (v) => (parseFloat(v) / 1000).toFixed(2), + }, + { + col: "lat_p99_us", + label: "p99 ms", + fmt: (v) => (parseFloat(v) / 1000).toFixed(2), + }, + { col: "total_ms", label: "total ms", fmt: (v) => v }, + ].filter((c) => idx(c.col) >= 0); + if (cols.length === 0) return null; + const lines = []; + lines.push("| " + cols.map((c) => escapeCell(c.label)).join(" | ") + " |"); + lines.push("|" + cols.map(() => "---:").join("|") + "|"); + for (const row of rows.slice(1)) { + const cells = cols.map((c) => { + const raw = row[idx(c.col)]; + try { + return escapeCell(c.fmt(raw)); + } catch (e) { + return escapeCell(raw); + } + }); + lines.push("| " + cells.join(" | ") + " |"); + } + return lines.join("\n"); + } catch (e) { + core.warning(`csvToTable failed: ${e.message}`); + return null; + } + }; + // workflow_run.html_url is GitHub-emitted (URL to apache/texera // run page); not attacker-influenceable. const upstreamUrl = context.payload.workflow_run.html_url; - const body = [ - marker, - "## Arrow Flight E2E bench", - "", - "```csv", - csv, - "```", - "", - `[Full workflow run](${upstreamUrl})`, - ].join("\n"); + + // Primary view: rendered markdown table for skim-readability. + // Fallback view (collapsed
): raw sanitized CSV for full + // verifiability — readers click to expand if they need every column. + const tableMd = csv ? csvToTable(csv) : null; + const bodyParts = [marker, "## Arrow Flight E2E bench", ""]; + if (tableMd) { + bodyParts.push(tableMd, ""); + } else if (!csv) { + bodyParts.push("_(no arrow-flight-e2e.csv in artifact)_", ""); + } else { + bodyParts.push("_(unable to parse CSV; raw below)_", ""); + } + if (csv) { + bodyParts.push( + "
Raw CSV", + "", + "```csv", + csv, + "```", + "", + "
", + "" + ); + } + bodyParts.push(`[Full workflow run](${upstreamUrl})`); + const body = bodyParts.join("\n"); // Find existing marker comment so subsequent runs upsert in place. const { data: comments } = await github.rest.issues.listComments({ From f6c4da02fae0ddd4f8bd4dd79fc29092de3d3068 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 7 Jun 2026 22:35:49 -0700 Subject: [PATCH 7/7] fix(bench): address Copilot review - benchmarks.yml: TRIGGER_LABELS used `python`; required-checks.yml's LABEL_STACKS keys this stack as `pyamber` per labeler.yml, so PRs labeled `pyamber` were silently skipping the bench. Swap to the correct key. - benchmarks-pr-comment.yml: switch listComments to paginate so the marker comment can still be located on PRs with >100 comments; prevents duplicate-comment-spam on long-running PRs. - ArrowFlightActorBench.awaitOneDataFrameEcho: each receiveOne now uses the remaining time to the absolute deadline rather than the full timeout per loop iteration. A flood of ACK/ECM messages can no longer extend the overall wait beyond the caller's deadline. - ArrowFlightActorBench scaladoc: replace stale `benchmark-results.csv` with the actual `bench-results/arrow-flight-e2e.{csv,*.json}` paths. --- .github/workflows/benchmarks-pr-comment.yml | 20 ++++++++++++------- .github/workflows/benchmarks.yml | 2 +- .../bench/ArrowFlightActorBench.scala | 20 +++++++++++++------ 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/.github/workflows/benchmarks-pr-comment.yml b/.github/workflows/benchmarks-pr-comment.yml index bf280bbb4ea..f8bbae1a65b 100644 --- a/.github/workflows/benchmarks-pr-comment.yml +++ b/.github/workflows/benchmarks-pr-comment.yml @@ -229,13 +229,19 @@ jobs: const body = bodyParts.join("\n"); // Find existing marker comment so subsequent runs upsert in place. - const { data: comments } = await github.rest.issues.listComments({ - owner: context.repo.owner, - repo: context.repo.repo, - issue_number: pr, - per_page: 100, - }); - const existing = comments.find((c) => c.body && c.body.includes(marker)); + // Paginate via `paginate` so a long-running PR with >100 comments + // still locates the marker — otherwise we'd silently create a + // duplicate every push past the 100-comment ceiling. + const allComments = await github.paginate( + github.rest.issues.listComments, + { + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pr, + per_page: 100, + } + ); + const existing = allComments.find((c) => c.body && c.body.includes(marker)); if (existing) { await github.rest.issues.updateComment({ owner: context.repo.owner, diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 1277ec76493..83da7428576 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -136,7 +136,7 @@ jobs: // whose stack list contains "amber-integration" triggers this // bench. Keep in sync if LABEL_STACKS there changes. const TRIGGER_LABELS = new Set([ - "python", + "pyamber", "engine", "amber-integration", "common", diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala index 068f9ef384b..b020dfd34ba 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala @@ -56,9 +56,12 @@ import scala.concurrent.duration._ * transport), wires up an identity Python UDF, and times the round-trip of * `numBatches` DataFrames send→echo through the actor mailbox. * - * Output: + * Output (rewritten incrementally after every config so a killed sweep + * still preserves usable data): * - stdout summary per config - * - benchmark-results.csv (one row per config) — overwritten each run + * - bench-results/arrow-flight-e2e.csv (one row per config) + * - bench-results/arrow-flight-e2e-throughput.json (github-action-benchmark customBiggerIsBetter) + * - bench-results/arrow-flight-e2e-latency.json (github-action-benchmark customSmallerIsBetter) * * Run with: * sbt "WorkflowExecutionService/Test/runMain \ @@ -450,12 +453,17 @@ object ArrowFlightActorBench { } private def awaitOneDataFrameEcho(probe: TestProbe, timeout: FiniteDuration): Boolean = { - val deadline = System.currentTimeMillis() + timeout.toMillis - while (System.currentTimeMillis() < deadline) { - probe.receiveOne(timeout) match { + // Each iteration uses the *remaining* time, not the full timeout — so a + // flood of ACK / ECM messages can't extend the overall wait beyond the + // caller's deadline by `timeout` × N. + val deadline = System.nanoTime() + timeout.toNanos + while (true) { + val remainingNs = deadline - System.nanoTime() + if (remainingNs <= 0) return false + probe.receiveOne(remainingNs.nanos) match { case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: DataFrame)) => return true case null => return false - case _ => // ignore acks, ECM forwards + case _ => // ignore acks, ECM forwards; loop } } false