diff --git a/.github/workflows/benchmarks-pr-comment.yml b/.github/workflows/benchmarks-pr-comment.yml new file mode 100644 index 00000000000..f8bbae1a65b --- /dev/null +++ b/.github/workflows/benchmarks-pr-comment.yml @@ -0,0 +1,261 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Posts (or upserts) a PR comment with bench results AFTER the Benchmarks +# workflow completes. +# +# Why a separate workflow_run-triggered file: +# - The Benchmarks workflow runs on `pull_request`, which for fork PRs +# gets a read-only GITHUB_TOKEN and zero secret access — GitHub's +# hard-coded security model. We can't comment from there. +# - `workflow_run` runs in the BASE repo's context (apache/texera) +# with normal token + secret access, so it CAN comment on fork PRs. +# - This is the ASF-approved pattern; `pull_request_target` is policy- +# forbidden for any action that handles tokens. +# +# Why workflow_run is safe here vs pull_request_target: +# - We only READ a small, opaque artifact (pr-number.txt + the bench +# JSON / CSV) produced by the upstream run; we don't execute any +# PR-author code in this workflow. +# - The PR number is validated against ^[0-9]+$ before being used in +# any API call, blocking ref injection. + +name: Benchmarks PR Comment + +on: + workflow_run: + workflows: ["Benchmarks"] + types: [completed] + +permissions: + # Need pull-requests: write to post / update the comment. + # contents: read is the default and enough to checkout for github-script + # which we don't actually do here (we only call REST APIs). + pull-requests: write + actions: read + +jobs: + comment: + # Only act when the upstream Benchmarks run was triggered by a PR. + # push-to-main / schedule / dispatch produce no PR to comment on. + if: ${{ github.event.workflow_run.event == 'pull_request' }} + runs-on: ubuntu-22.04 + steps: + - name: Download bench-results artifact + uses: actions/github-script@v8 + with: + script: | + const fs = require("fs"); + const path = require("path"); + const runId = context.payload.workflow_run.id; + const { data } = await github.rest.actions.listWorkflowRunArtifacts({ + owner: context.repo.owner, + repo: context.repo.repo, + run_id: runId, + }); + const match = data.artifacts.find((a) => a.name.startsWith("bench-results-")); + if (!match) { + core.warning(`no bench-results-* artifact on run ${runId}; nothing to comment.`); + return; + } + const zip = await github.rest.actions.downloadArtifact({ + owner: context.repo.owner, + repo: context.repo.repo, + artifact_id: match.id, + archive_format: "zip", + }); + fs.mkdirSync("bench-results-zip", { recursive: true }); + fs.writeFileSync(path.join("bench-results-zip", "artifact.zip"), Buffer.from(zip.data)); + core.info(`downloaded artifact ${match.name} (${match.size_in_bytes} bytes)`); + + - name: Unzip artifact + run: | + mkdir -p bench-results + unzip -o bench-results-zip/artifact.zip -d bench-results + ls -la bench-results/ + + - name: Read PR number from artifact + id: pr + # Read + strictly validate (digits only) before using in API calls. + # The artifact comes from a fork-triggered workflow whose contents + # are not entirely trusted; numeric-only PR numbers block any + # injection vector through this value. + run: | + if [ ! -f bench-results/pr-number.txt ]; then + echo "no pr-number.txt in artifact; bailing out" + exit 0 + fi + raw=$(tr -d '[:space:]' < bench-results/pr-number.txt) + if ! [[ "$raw" =~ ^[0-9]+$ ]]; then + echo "invalid pr-number.txt contents: '$raw'" + exit 1 + fi + echo "number=$raw" >> "$GITHUB_OUTPUT" + + - name: Upsert PR comment with bench summary + if: steps.pr.outputs.number != '' + uses: actions/github-script@v8 + env: + PR_NUMBER: ${{ steps.pr.outputs.number }} + with: + script: | + const fs = require("fs"); + const pr = Number(process.env.PR_NUMBER); + const marker = ""; + + // CSV comes from a fork-PR-controlled artifact — sanitize before + // embedding in markdown: + // 1. Cap total size so a giant junk file can't bloat a comment. + // 2. Strip any triple-backtick sequence so the content cannot + // escape the surrounding code fence and inject arbitrary + // markdown (phishing links, image-rendering tricks, etc). + // Replacement with a zero-width char preserves byte alignment + // visually while neutralizing fence termination. + const MAX_CSV_BYTES = 32 * 1024; + const csvPath = "bench-results/arrow-flight-e2e.csv"; + let csv = null; + if (fs.existsSync(csvPath)) { + let raw = fs.readFileSync(csvPath, "utf8"); + if (raw.length > MAX_CSV_BYTES) { + raw = raw.slice(0, MAX_CSV_BYTES) + "\n[truncated]"; + } + csv = raw.replace(/```+/g, "`​``").trim(); + } + + // Per-cell sanitizer for the markdown table: strip newlines, escape + // pipes (would break columns), and cap length. + const escapeCell = (s) => + s == null + ? "" + : String(s).replace(/[\r\n]+/g, " ").replace(/\|/g, "\\|").slice(0, 64); + + // Render selected columns as a markdown table. Drops noise columns + // (config_idx, total_tuples, total_bytes, lat_p95_us) and converts + // microseconds to milliseconds for latency readability. Returns + // null on any parsing failure → fallback renders raw CSV instead. + const csvToTable = (text) => { + try { + const rows = text + .trim() + .split(/\r?\n/) + .map((line) => line.split(",")); + if (rows.length < 2) return null; + const header = rows[0].map((h) => h.trim()); + const idx = (col) => header.indexOf(col); + const cols = [ + { col: "batch_size", label: "batch", fmt: (v) => v }, + { col: "schema_width", label: "schema_w", fmt: (v) => v }, + { col: "string_len", label: "str_len", fmt: (v) => v }, + { col: "num_batches", label: "n_batches", fmt: (v) => v }, + { col: "tuples_per_sec", label: "tuples/s", fmt: (v) => v }, + { col: "mb_per_sec", label: "MB/s", fmt: (v) => v }, + { + col: "lat_p50_us", + label: "p50 ms", + fmt: (v) => (parseFloat(v) / 1000).toFixed(2), + }, + { + col: "lat_p99_us", + label: "p99 ms", + fmt: (v) => (parseFloat(v) / 1000).toFixed(2), + }, + { col: "total_ms", label: "total ms", fmt: (v) => v }, + ].filter((c) => idx(c.col) >= 0); + if (cols.length === 0) return null; + const lines = []; + lines.push("| " + cols.map((c) => escapeCell(c.label)).join(" | ") + " |"); + lines.push("|" + cols.map(() => "---:").join("|") + "|"); + for (const row of rows.slice(1)) { + const cells = cols.map((c) => { + const raw = row[idx(c.col)]; + try { + return escapeCell(c.fmt(raw)); + } catch (e) { + return escapeCell(raw); + } + }); + lines.push("| " + cells.join(" | ") + " |"); + } + return lines.join("\n"); + } catch (e) { + core.warning(`csvToTable failed: ${e.message}`); + return null; + } + }; + + // workflow_run.html_url is GitHub-emitted (URL to apache/texera + // run page); not attacker-influenceable. + const upstreamUrl = context.payload.workflow_run.html_url; + + // Primary view: rendered markdown table for skim-readability. + // Fallback view (collapsed
): raw sanitized CSV for full + // verifiability — readers click to expand if they need every column. + const tableMd = csv ? csvToTable(csv) : null; + const bodyParts = [marker, "## Arrow Flight E2E bench", ""]; + if (tableMd) { + bodyParts.push(tableMd, ""); + } else if (!csv) { + bodyParts.push("_(no arrow-flight-e2e.csv in artifact)_", ""); + } else { + bodyParts.push("_(unable to parse CSV; raw below)_", ""); + } + if (csv) { + bodyParts.push( + "
Raw CSV", + "", + "```csv", + csv, + "```", + "", + "
", + "" + ); + } + bodyParts.push(`[Full workflow run](${upstreamUrl})`); + const body = bodyParts.join("\n"); + + // Find existing marker comment so subsequent runs upsert in place. + // Paginate via `paginate` so a long-running PR with >100 comments + // still locates the marker — otherwise we'd silently create a + // duplicate every push past the 100-comment ceiling. + const allComments = await github.paginate( + github.rest.issues.listComments, + { + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pr, + per_page: 100, + } + ); + const existing = allComments.find((c) => c.body && c.body.includes(marker)); + if (existing) { + await github.rest.issues.updateComment({ + owner: context.repo.owner, + repo: context.repo.repo, + comment_id: existing.id, + body, + }); + core.info(`updated comment ${existing.id} on PR #${pr}`); + } else { + await github.rest.issues.createComment({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pr, + body, + }); + core.info(`created new comment on PR #${pr}`); + } diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml new file mode 100644 index 00000000000..83da7428576 --- /dev/null +++ b/.github/workflows/benchmarks.yml @@ -0,0 +1,327 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Texera benchmarks — bench-agnostic umbrella workflow. +# +# This file is the single CI entry for ALL Texera performance benchmarks +# (currently Arrow Flight E2E; JMH and others land here as well). The +# workflow knows nothing about specific benches — bin/run-benchmarks.sh +# is the opaque entry point that owns which benches run and where their +# outputs land under bench-results/. Adding a new bench is: +# 1. Append the run command to bin/run-benchmarks.sh. +# 2. Add a `Publish ` step block below pointing at the +# bench's JSON output file with the right `tool:` setting. +# This workflow file otherwise stays unchanged. +# +# Triggering — mirrors amber-integration's label gate (NOT file paths): +# - PR: runs only when one of the labels mapped to the amber-integration +# stack in required-checks.yml's LABEL_STACKS is present on the PR. +# Labels are applied by the .github/labeler.yml workflow on opened / +# synchronize, so we wait for that workflow to complete before +# deciding (same pattern required-checks.yml uses). +# - push to main: always runs (same trimmed grid as PR for quick post- +# merge signal) and publishes to gh-pages. +# - schedule (weekly): runs the full 36-config sweep and publishes to +# gh-pages — this is the authoritative long-term baseline. +# - workflow_dispatch: manual full-grid run (no publish; bring-your-own +# trigger for ad-hoc exploration). +# +# Two modes via BENCH_MODE env (read by the bench Scala main): +# pr — 3 configs × 20 batches, ~5 min (PR + push-to-main) +# full — 36 configs × 200 batches, ~50-60 min (schedule + dispatch) +# +# Non-blocking: this workflow is NOT included in required-checks.yml's +# `required-checks` aggregator, so its result doesn't gate merges even +# when it fails. Adding it to branch protection later is a deliberate +# .asf.yaml change. +# +# Permissions: +# contents: write — needed by benchmark-action's auto-push to gh-pages. +# PR runs (which GitHub auto-downgrades to read-only on forks) gate +# auto-push off via the event check, so the missing write is never +# exercised. + +name: Benchmarks + +on: + push: + branches: [main] + pull_request: + types: [opened, reopened, synchronize, labeled, unlabeled] + schedule: + # Weekly full-grid baseline refresh, Sundays 08:00 UTC. PR and post- + # merge runs use a trimmed 3-config grid to stay around 5 min; the + # scheduled run covers the full 36-config sweep that the gh-pages + # dashboard tracks long-term. + - cron: "0 8 * * 0" + workflow_dispatch: + +permissions: + contents: write + +concurrency: + group: benchmarks-${{ github.ref }} + # On main: never cancel an in-flight baseline run; on PRs: supersede. + cancel-in-progress: ${{ github.ref != 'refs/heads/main' }} + +jobs: + precheck: + # Decide whether to run based on PR labels (push / dispatch always + # run). Lifted from required-checks.yml's precheck so the trigger + # surface matches amber-integration exactly. + name: Precheck + runs-on: ubuntu-latest + outputs: + run_bench: ${{ steps.decide.outputs.run_bench }} + steps: + - name: Wait for Pull Request Labeler + if: github.event_name == 'pull_request' + uses: actions/github-script@v8 + with: + script: | + const ref = context.payload.pull_request.head.sha; + const maxAttempts = 30; + for (let i = 0; i < maxAttempts; i++) { + const { data } = await github.rest.checks.listForRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref, + check_name: "labeler", + }); + const check = data.check_runs[0]; + if (check && check.status === "completed") { + core.info(`labeler ${check.conclusion}`); + return; + } + core.info(`labeler not ready (attempt ${i + 1}/${maxAttempts})`); + await new Promise((r) => setTimeout(r, 10000)); + } + core.warning("labeler did not complete within 5 minutes; proceeding with current labels."); + + - name: Decide whether to run bench + id: decide + uses: actions/github-script@v8 + with: + script: | + const eventName = context.eventName; + if (eventName !== "pull_request") { + // push to main / workflow_dispatch always run. + core.info(`event=${eventName} — running unconditionally`); + core.setOutput("run_bench", "true"); + return; + } + // Re-fetch labels: the labeler may have just added some. + const { data: pr } = await github.rest.pulls.get({ + owner: context.repo.owner, + repo: context.repo.repo, + pull_number: context.payload.pull_request.number, + }); + const labels = pr.labels.map((l) => l.name); + core.info(`PR labels: ${labels.join(", ") || "(none)"}`); + // Mirrors LABEL_STACKS in required-checks.yml: every label + // whose stack list contains "amber-integration" triggers this + // bench. Keep in sync if LABEL_STACKS there changes. + const TRIGGER_LABELS = new Set([ + "pyamber", + "engine", + "amber-integration", + "common", + "ddl-change", + "ci", + ]); + const matched = labels.filter((l) => TRIGGER_LABELS.has(l)); + const shouldRun = matched.length > 0; + core.info( + shouldRun + ? `Triggering on labels: ${matched.join(", ")}` + : "No trigger label present; skipping bench." + ); + core.setOutput("run_bench", shouldRun ? "true" : "false"); + + bench: + name: Bench + needs: precheck + if: ${{ needs.precheck.outputs.run_bench == 'true' }} + runs-on: ubuntu-22.04 + env: + JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 + JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 + # `pr` mode = 3-config trimmed sweep (~5 min) for PR + post-merge. + # `full` mode = 36-config sweep (~50-60 min) for schedule + manual. + # Read by the bench Scala main (see GridSpec switch); workflow only + # decides which mode to pass. + BENCH_MODE: ${{ (github.event_name == 'schedule' || github.event_name == 'workflow_dispatch') && 'full' || 'pr' }} + services: + # The bench itself doesn't touch the DB, but sbt's transitive compile + # chain reaches `common/auth` which imports JOOQ-generated classes + # from `org.apache.texera.dao.jooq.generated.*`. JOOQ codegen at + # sbt compile time requires a live Postgres to introspect against; + # without it the auth module's `User` / `UserRoleEnum` symbols fail + # to resolve and the whole bench compile aborts. Mirrors the same + # service block from amber-integration in build.yml. + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=10s + --health-timeout=5s + --health-retries=5 + steps: + - name: Checkout + uses: actions/checkout@v5 + with: + fetch-depth: 0 + - name: Setup JDK + uses: actions/setup-java@v5 + with: + distribution: "temurin" + java-version: 17 + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.12" + - name: Install Python dependencies + # Mirrors amber-integration's installer in build.yml so the bench + # subprocess imports resolve identically (pytorch CPU index + + # betterproto plugin via dev-requirements). + run: | + python -m pip install uv + if [ -f amber/requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/requirements.txt; fi + if [ -f amber/operator-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; fi + if [ -f amber/dev-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi + - name: Install protoc + run: | + PROTOC_VERSION=$(cat bin/protoc-version.txt) + curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" + sudo unzip -o /tmp/protoc.zip -d /usr/local + sudo chmod +x /usr/local/bin/protoc + sudo chmod -R a+rX /usr/local/include/google + - name: Create Database for JOOQ codegen + # Minimal subset of amber-integration's "Create Databases" step — + # JOOQ only introspects against texera_db, not iceberg/lakefs/ + # lakekeeper schemas which the bench never touches. + run: psql -h localhost -U postgres -f sql/texera_ddl.sql + env: + PGPASSWORD: postgres + - name: Generate Python proto bindings + run: bash bin/python-proto-gen.sh + - name: Setup sbt launcher + uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22 + - uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # v8.1.0 + with: + extraSbtFiles: '["*.sbt", "project/**.{scala,sbt}", "project/build.properties" ]' + + - name: Run benchmarks + # Single opaque entry point — this workflow doesn't know which + # benches exist. Adding a JMH suite later = appending one line + # inside bin/run-benchmarks.sh and adding a publish step below. + run: bash bin/run-benchmarks.sh + + - name: Stash PR number for downstream comment workflow + # PR fork workflows can't comment (GitHub forces read-only token); + # benchmarks-pr-comment.yml runs separately via workflow_run with + # proper write access, and needs the PR number to find the target. + # github.event.workflow_run.pull_requests is empty for fork PRs, + # so we ferry the number via artifact. + if: ${{ github.event_name == 'pull_request' && !cancelled() }} + env: + PR_NUMBER: ${{ github.event.pull_request.number }} + run: echo "$PR_NUMBER" > bench-results/pr-number.txt + + - name: Render bench summary + # Render the bench CSV into a markdown table on the workflow run + # page. Visible without further clicks — and doesn't need any + # extra permissions (writes to $GITHUB_STEP_SUMMARY only). + if: ${{ !cancelled() }} + run: | + { + echo "## Bench results (\`$BENCH_MODE\` mode)" + echo + if [ -f bench-results/arrow-flight-e2e.csv ]; then + echo '```csv' + cat bench-results/arrow-flight-e2e.csv + echo '```' + else + echo "_(no bench-results/arrow-flight-e2e.csv produced)_" + fi + } >> "$GITHUB_STEP_SUMMARY" + + - name: Upload bench artifacts + if: ${{ !cancelled() }} + uses: actions/upload-artifact@v4 + with: + name: bench-results-${{ github.run_id }} + path: bench-results/ + retention-days: 14 + + # Publish to the gh-pages dashboard. auto-push + save-data-file are + # both gated on push-to-main / schedule: PR runs only emit the job + # summary and the uploaded artifact, never touching the tracked + # baseline. Adding a new benchmark = adding one publish block below + # matching the JSON filename convention in bin/run-benchmarks.sh. + # + # `skip-fetch-gh-pages: true` because gh-pages does not exist on + # apache/texera yet — without this the action fatals with + # `couldn't find remote ref gh-pages` even before the auto-push + # decision. auto-push: true on push/schedule still creates the + # branch on first write. Once the dashboard is seeded, flip this + # to false to re-enable baseline comparison + alert-threshold. + # + # `continue-on-error: true` keeps any other gh-pages-side surprise + # (permission glitches, transient git failures) from failing the + # bench job overall — the bench data itself is already in the + # uploaded artifact above. + - name: Publish throughput + if: ${{ !cancelled() }} + continue-on-error: true + uses: benchmark-action/github-action-benchmark@52576c92bccf6ac60c8223ec7eb2565637cae9ba # v1.22.1 + with: + name: Arrow Flight E2E Throughput + tool: customBiggerIsBetter + output-file-path: bench-results/arrow-flight-e2e-throughput.json + github-token: ${{ secrets.GITHUB_TOKEN }} + auto-push: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} + save-data-file: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} + skip-fetch-gh-pages: true + gh-pages-branch: gh-pages + benchmark-data-dir-path: dev/bench + alert-threshold: "150%" + # comment-on-alert needs pull-requests:write; skip and let + # results show up via summary-always instead. + comment-on-alert: false + summary-always: true + - name: Publish latency + if: ${{ !cancelled() }} + continue-on-error: true + uses: benchmark-action/github-action-benchmark@52576c92bccf6ac60c8223ec7eb2565637cae9ba # v1.22.1 + with: + name: Arrow Flight E2E Latency + tool: customSmallerIsBetter + output-file-path: bench-results/arrow-flight-e2e-latency.json + github-token: ${{ secrets.GITHUB_TOKEN }} + auto-push: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} + save-data-file: ${{ (github.event_name == 'push' && github.ref == 'refs/heads/main') || github.event_name == 'schedule' }} + skip-fetch-gh-pages: true + gh-pages-branch: gh-pages + benchmark-data-dir-path: dev/bench + alert-threshold: "150%" + comment-on-alert: false + summary-always: true diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala new file mode 100644 index 00000000000..b020dfd34ba --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.pythonworker.bench + +import org.apache.pekko.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props} +import org.apache.pekko.testkit.TestProbe +import org.apache.texera.amber.clustering.SingleNodeListener +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity, + WorkflowIdentity +} +import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity} +import org.apache.texera.amber.engine.architecture.common.WorkflowActor.{NetworkAck, NetworkMessage} +import org.apache.texera.amber.engine.architecture.pythonworker.PythonWorkflowWorker +import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.ReturnInvocation +import org.apache.texera.amber.engine.architecture.scheduling.config.WorkerConfig +import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.OneToOnePartitioning +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.ambermessage.WorkflowMessage.getInMemSize +import org.apache.texera.amber.util.VirtualIdentityUtils + +import java.io.PrintWriter +import java.nio.file.{Files, Paths} +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * End-to-end micro-benchmark of the real Arrow Flight data path through a + * live PythonWorkflowWorker actor. + * + * Each measured config spawns a fresh PythonWorkflowWorker (real Pekko actor, + * real Python subprocess via texera_run_python_worker.py, real Arrow Flight + * transport), wires up an identity Python UDF, and times the round-trip of + * `numBatches` DataFrames send→echo through the actor mailbox. + * + * Output (rewritten incrementally after every config so a killed sweep + * still preserves usable data): + * - stdout summary per config + * - bench-results/arrow-flight-e2e.csv (one row per config) + * - bench-results/arrow-flight-e2e-throughput.json (github-action-benchmark customBiggerIsBetter) + * - bench-results/arrow-flight-e2e-latency.json (github-action-benchmark customSmallerIsBetter) + * + * Run with: + * sbt "WorkflowExecutionService/Test/runMain \ + * org.apache.texera.amber.engine.architecture.pythonworker.bench.ArrowFlightActorBench" + * + * Caveat: `Utils.amberHomePath` does a `Files.walk(cwd, 2).findAny` for any + * dir ending in `amber`. If `.claude/amber/` exists locally, the Python + * subprocess may end up trying to launch from that path; move it aside for + * the run, or fix `amberHomePath` upstream. + */ +object ArrowFlightActorBench { + + // --------------------------------------------------------------------------- + // Identity Python UDF — passes input tuples straight through to output. + // --------------------------------------------------------------------------- + private val IdentityPythonCode: String = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + | yield tuple_ + |""".stripMargin + + private val WorkflowId = WorkflowIdentity(1L) + private val InputPortId = PortIdentity(id = 0, internal = false) + private val OutputPortId = PortIdentity(id = 0, internal = false) + + // Sweep grid + iteration counts switch on BENCH_MODE so PR / post-merge + // checks stay around 5 min while scheduled / manual runs do the full + // 36-config grid that the gh-pages dashboard tracks long-term. + // pr — 3 configs × 20 batches, warmup 5 (~4-5 min in CI) + // full — 36 configs × 200 batches, warmup 20 (~50-60 min in CI) + // BENCH_NUM_BATCHES, if set, overrides numBatches for the current mode + // (useful for local smoke). + private val BenchMode: String = sys.env.getOrElse("BENCH_MODE", "full").toLowerCase + + private case class GridSpec( + batchSizes: Seq[Int], + schemaWidths: Seq[Int], + stringLens: Seq[Int], + numBatches: Int, + warmupBatches: Int + ) + + private val grid: GridSpec = BenchMode match { + case "pr" => + GridSpec( + batchSizes = Seq(10, 100, 1000), + schemaWidths = Seq(10), + stringLens = Seq(64), + numBatches = sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(20), + warmupBatches = 5 + ) + case _ => + GridSpec( + batchSizes = Seq(10, 100, 1000, 10000), + schemaWidths = Seq(1, 10, 50), + stringLens = Seq(8, 64, 512), + numBatches = sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(200), + warmupBatches = 20 + ) + } + + private val DefaultBatchSizes: Seq[Int] = grid.batchSizes + private val DefaultSchemaWidths: Seq[Int] = grid.schemaWidths + private val DefaultStringLens: Seq[Int] = grid.stringLens + private val DefaultNumBatches: Int = grid.numBatches + private val WarmupBatches: Int = grid.warmupBatches + + // All artifacts land under bench-results/ so CI can artifact-upload the + // whole directory uniformly without knowing individual filenames beyond + // what its publish matrix declares. + // Conventions for new benches: + // bench-results/-throughput.json → customBiggerIsBetter + // bench-results/-latency.json → customSmallerIsBetter + // bench-results/-jmh.json → tool=jmh + private val OutDir = Paths.get("bench-results") + private val CsvOutPath = OutDir.resolve("arrow-flight-e2e.csv") + // Two JSON files — github-action-benchmark needs distinct + // customBiggerIsBetter / customSmallerIsBetter inputs since each upload + // direction is per-`tool` parameter. + private val ThroughputJsonPath = OutDir.resolve("arrow-flight-e2e-throughput.json") + private val LatencyJsonPath = OutDir.resolve("arrow-flight-e2e-latency.json") + + // --------------------------------------------------------------------------- + // Sink actor: stands in for the downstream worker. Auto-acks every + // NetworkMessage from the worker (otherwise PekkoMessageTransferService + // throttles after the first unacked reply and the bench stalls), and + // forwards every received message to the bench probe for inspection. + // --------------------------------------------------------------------------- + private class AutoAckSink(forwardTo: ActorRef) extends Actor { + override def receive: Receive = { + case msg @ NetworkMessage(id, internal) => + sender() ! NetworkAck(id, getInMemSize(internal), 0L) + forwardTo ! msg + case other => + forwardTo ! other + } + } + + private case class BenchConfig( + configIdx: Int, + batchSize: Int, + schemaWidth: Int, + stringLen: Int, + numBatches: Int + ) + + private case class BenchResult( + cfg: BenchConfig, + totalWallNs: Long, + totalTuples: Long, + totalBytesApprox: Long, + latencyP50Ns: Long, + latencyP95Ns: Long, + latencyP99Ns: Long + ) { + def tuplesPerSec: Double = totalTuples * 1e9 / totalWallNs + def mbPerSec: Double = totalBytesApprox * 1e9 / totalWallNs / (1024.0 * 1024.0) + } + + def main(args: Array[String]): Unit = { + val system = ActorSystem("arrow-flight-bench", AmberRuntime.pekkoConfig) + system.actorOf(Props[SingleNodeListener](), "cluster-info") + + val configs: Seq[BenchConfig] = (for { + sw <- DefaultSchemaWidths + sl <- DefaultStringLens + bs <- DefaultBatchSizes + } yield (sw, sl, bs)).zipWithIndex.map { + case ((sw, sl, bs), idx) => + BenchConfig( + idx, + batchSize = bs, + schemaWidth = sw, + stringLen = sl, + numBatches = DefaultNumBatches + ) + } + + println(s"[bench] sweeping ${configs.size} configurations × ${DefaultNumBatches} batches each") + // Pre-create output dir + rewrite the result files after every completed + // config so a killed / timed-out sweep still leaves a usable artifact. + Files.createDirectories(OutDir) + val resultsBuf = scala.collection.mutable.ArrayBuffer.empty[BenchResult] + configs.foreach { cfg => + try { + val r = runConfig(system, cfg) + resultsBuf += r + writeCsv(resultsBuf.toSeq) + writeJsonForGitHubActionBenchmark(resultsBuf.toSeq) + } catch { + case t: Throwable => + println(s"[bench] FAILED config #${cfg.configIdx} ($cfg): $t") + } + } + printSummary(resultsBuf.toSeq) + Await.result(system.terminate(), 30.seconds) + } + + // --------------------------------------------------------------------------- + // One configuration: spawn fresh worker, run warmup + timed loop, tear down. + // --------------------------------------------------------------------------- + private def runConfig(system: ActorSystem, cfg: BenchConfig): BenchResult = { + val workerId = + VirtualIdentityUtils.createWorkerIdentity(WorkflowId, "bench", "main", cfg.configIdx) + val downstreamId = + VirtualIdentityUtils.createWorkerIdentity(WorkflowId, "benchsink", "main", cfg.configIdx) + + val ctlChannelIn = ChannelIdentity(downstreamId, workerId, isControl = true) + val dataChannelIn = ChannelIdentity(downstreamId, workerId, isControl = false) + val dataChannelOut = ChannelIdentity(workerId, downstreamId, isControl = false) + + val probe = TestProbe()(system) + val sink = system.actorOf( + Props(new AutoAckSink(probe.ref)), + name = s"bench-sink-${cfg.configIdx}" + ) + val worker = system.actorOf( + PythonWorkflowWorker.props(WorkerConfig(workerId)), + name = s"bench-worker-${cfg.configIdx}" + ) + + println(s"\n[bench] config #${cfg.configIdx}: $cfg") + + try { + val schema = makeSchema(cfg.schemaWidth) + val schemaMap = schema.getAttributes.map(a => a.getName -> a.getType.name()).toMap + + var ctlSeq: Long = 0L + var dataSeq: Long = 0L + var msgId: Long = 0L + + def sendCtl(payload: ControlInvocation): Unit = { + val fifo = WorkflowFIFOMessage(ctlChannelIn, ctlSeq, payload) + ctlSeq += 1 + worker.tell(NetworkMessage(msgId, fifo), sink) + msgId += 1 + } + def sendOnDataChannel( + payload: org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload + ): Unit = { + val fifo = WorkflowFIFOMessage(dataChannelIn, dataSeq, payload) + dataSeq += 1 + worker.tell(NetworkMessage(msgId, fifo), sink) + msgId += 1 + } + + // ----------------------------------------------------------------------- + // Setup control sequence + StartChannel ECM (see Pass 1 for details). + // ----------------------------------------------------------------------- + val ctx = AsyncRPCContext(sender = downstreamId, receiver = workerId) + sendCtl( + ControlInvocation( + "InitializeExecutor", + InitializeExecutorRequest( + 1, + OpExecWithCode(IdentityPythonCode, "python"), + isSource = false + ), + ctx, + 0L + ) + ) + sendCtl( + ControlInvocation( + "AssignPort", + AssignPortRequest(InputPortId, input = true, schemaMap, Seq.empty, Seq.empty), + ctx, + 1L + ) + ) + sendCtl( + ControlInvocation( + "AssignPort", + AssignPortRequest(OutputPortId, input = false, schemaMap, Seq.empty, Seq.empty), + ctx, + 2L + ) + ) + sendCtl( + ControlInvocation( + "AddInputChannel", + AddInputChannelRequest(dataChannelIn, InputPortId), + ctx, + 3L + ) + ) + val outLink = PhysicalLink( + fromOpId = VirtualIdentityUtils.getPhysicalOpId(workerId), + fromPortId = OutputPortId, + toOpId = VirtualIdentityUtils.getPhysicalOpId(downstreamId), + toPortId = InputPortId + ) + sendCtl( + ControlInvocation( + "AddPartitioning", + AddPartitioningRequest( + outLink, + // batch_size = cfg.batchSize keeps the Python-side partitioning + // buffer aligned with our send size — one Java DataFrame in maps + // to exactly one Python DataFrame out. + OneToOnePartitioning(batchSize = cfg.batchSize, channels = Seq(dataChannelOut)) + ), + ctx, + 4L + ) + ) + sendCtl(ControlInvocation("OpenExecutor", EmptyRequest(), ctx, 5L)) + sendCtl(ControlInvocation("StartWorker", EmptyRequest(), ctx, 6L)) + + waitForReturns(probe, 7, 60.seconds) + + // StartChannel ECM enables data flow on the input channel. + val startChannelEcm = EmbeddedControlMessage( + id = EmbeddedControlMessageIdentity("StartChannel"), + ecmType = EmbeddedControlMessageType.NO_ALIGNMENT, + scope = Seq.empty, + commandMapping = Map( + workerId.name -> ControlInvocation( + "StartChannel", + EmptyRequest(), + AsyncRPCContext(ActorVirtualIdentity(""), ActorVirtualIdentity("")), + -1L + ) + ) + ) + sendOnDataChannel(startChannelEcm) + // Drain the StartChannel-echo the worker forwards downstream so it + // doesn't show up in the data-loop's measurement window. + drainNonDataFor(probe, 2.seconds) + + // ----------------------------------------------------------------------- + // Build sample tuples once; reuse across all batches in this config. + // ----------------------------------------------------------------------- + val sampleBatch: Array[Tuple] = buildBatch(schema, cfg.batchSize, cfg.stringLen) + val approxBytesPerBatch: Long = + cfg.batchSize.toLong * cfg.schemaWidth.toLong * cfg.stringLen.toLong + + // Warmup — let JIT settle, Python import any lazy modules. + var warmedBatches = 0 + while (warmedBatches < WarmupBatches) { + sendOnDataChannel(DataFrame(sampleBatch)) + if (awaitOneDataFrameEcho(probe, 30.seconds)) warmedBatches += 1 + } + + // ----------------------------------------------------------------------- + // Timed loop — per-batch latency from send to corresponding echo. + // Because the Python pipeline is FIFO, sending batch i then awaiting + // exactly one DataFrame echo gives latency_i = receive_i - send_i. + // ----------------------------------------------------------------------- + val latencies = new Array[Long](cfg.numBatches) + val totalStart = System.nanoTime() + var i = 0 + while (i < cfg.numBatches) { + val t0 = System.nanoTime() + sendOnDataChannel(DataFrame(sampleBatch)) + if (!awaitOneDataFrameEcho(probe, 60.seconds)) { + throw new RuntimeException(s"timed out waiting for echo of batch $i") + } + latencies(i) = System.nanoTime() - t0 + i += 1 + } + val totalNs = System.nanoTime() - totalStart + + val totalTuples = cfg.numBatches.toLong * cfg.batchSize.toLong + val totalBytes = cfg.numBatches.toLong * approxBytesPerBatch + val result = BenchResult( + cfg, + totalWallNs = totalNs, + totalTuples = totalTuples, + totalBytesApprox = totalBytes, + latencyP50Ns = percentile(latencies, 0.50), + latencyP95Ns = percentile(latencies, 0.95), + latencyP99Ns = percentile(latencies, 0.99) + ) + + printOne(result) + result + } finally { + worker ! PoisonPill + sink ! PoisonPill + // Give the worker a moment to tear down its Python subprocess + flight + // server cleanly before we move to the next config. + Thread.sleep(500) + } + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + private def makeSchema(width: Int): Schema = { + val attrs = (0 until width).map(i => new Attribute(s"col$i", AttributeType.STRING)) + Schema(attrs.toList) + } + + private def buildBatch(schema: Schema, batchSize: Int, stringLen: Int): Array[Tuple] = { + val arr = new Array[Tuple](batchSize) + val sampleVal = "x" * stringLen + var i = 0 + val attrs = schema.getAttributes + while (i < batchSize) { + val b = Tuple.builder(schema) + var j = 0 + while (j < attrs.size) { + b.add(attrs(j), sampleVal) + j += 1 + } + arr(i) = b.build() + i += 1 + } + arr + } + + private def waitForReturns(probe: TestProbe, n: Int, timeout: FiniteDuration): Unit = { + val deadline = System.currentTimeMillis() + timeout.toMillis + var seen = 0 + while (seen < n && System.currentTimeMillis() < deadline) { + probe.receiveOne(2.seconds) match { + case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: ReturnInvocation)) => + seen += 1 + case _ => // ignore acks + other + } + } + if (seen < n) { + throw new RuntimeException(s"only $seen/$n control returns within $timeout") + } + } + + private def awaitOneDataFrameEcho(probe: TestProbe, timeout: FiniteDuration): Boolean = { + // Each iteration uses the *remaining* time, not the full timeout — so a + // flood of ACK / ECM messages can't extend the overall wait beyond the + // caller's deadline by `timeout` × N. + val deadline = System.nanoTime() + timeout.toNanos + while (true) { + val remainingNs = deadline - System.nanoTime() + if (remainingNs <= 0) return false + probe.receiveOne(remainingNs.nanos) match { + case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: DataFrame)) => return true + case null => return false + case _ => // ignore acks, ECM forwards; loop + } + } + false + } + + private def drainNonDataFor(probe: TestProbe, dur: FiniteDuration): Unit = { + val end = System.currentTimeMillis() + dur.toMillis + while (System.currentTimeMillis() < end) { + probe.receiveOne(200.millis) match { + case null => return + case _ => // discard + } + } + } + + private def percentile(values: Array[Long], q: Double): Long = { + if (values.isEmpty) 0L + else { + val sorted = values.sorted + val idx = math.min(sorted.length - 1, math.max(0, (sorted.length * q).toInt)) + sorted(idx) + } + } + + private def printOne(r: BenchResult): Unit = { + val ms = r.totalWallNs / 1e6 + println( + f" -> total=${ms}%.0fms tuples/s=${r.tuplesPerSec}%,.0f MB/s=${r.mbPerSec}%.2f " + + f"p50=${r.latencyP50Ns / 1000.0}%.1fus p95=${r.latencyP95Ns / 1000.0}%.1fus " + + f"p99=${r.latencyP99Ns / 1000.0}%.1fus" + ) + } + + private def writeCsv(results: Seq[BenchResult]): Unit = { + val pw = new PrintWriter(Files.newBufferedWriter(CsvOutPath)) + try { + pw.println( + "config_idx,batch_size,schema_width,string_len,num_batches," + + "total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec," + + "lat_p50_us,lat_p95_us,lat_p99_us" + ) + results.foreach { r => + val c = r.cfg + pw.println( + List( + c.configIdx, + c.batchSize, + c.schemaWidth, + c.stringLen, + c.numBatches, + f"${r.totalWallNs / 1e6}%.2f", + r.totalTuples, + r.totalBytesApprox, + f"${r.tuplesPerSec}%.0f", + f"${r.mbPerSec}%.3f", + f"${r.latencyP50Ns / 1000.0}%.2f", + f"${r.latencyP95Ns / 1000.0}%.2f", + f"${r.latencyP99Ns / 1000.0}%.2f" + ).mkString(",") + ) + } + } finally pw.close() + println(s"\n[bench] wrote ${results.size} rows to ${CsvOutPath.toAbsolutePath}") + } + + /** + * Emit two JSON arrays per github-action-benchmark's customBiggerIsBetter + * (throughput) and customSmallerIsBetter (latency) input formats. Each + * config produces one throughput entry and three latency entries (p50/p95/ + * p99). Spec: https://github.com/benchmark-action/github-action-benchmark + */ + private def writeJsonForGitHubActionBenchmark(results: Seq[BenchResult]): Unit = { + def cfgLabel(c: BenchConfig): String = + s"bs=${c.batchSize} sw=${c.schemaWidth} sl=${c.stringLen}" + + def jsonEntry(name: String, unit: String, value: Double): String = + s""" { "name": ${quoteJson(name)}, "unit": ${quoteJson(unit)}, "value": $value }""" + + val throughputEntries = results.map { r => + jsonEntry(s"throughput / ${cfgLabel(r.cfg)}", "tuples/sec", r.tuplesPerSec) + } + val latencyEntries = results.flatMap { r => + Seq( + jsonEntry(s"latency p50 / ${cfgLabel(r.cfg)}", "us", r.latencyP50Ns / 1000.0), + jsonEntry(s"latency p95 / ${cfgLabel(r.cfg)}", "us", r.latencyP95Ns / 1000.0), + jsonEntry(s"latency p99 / ${cfgLabel(r.cfg)}", "us", r.latencyP99Ns / 1000.0) + ) + } + + writeJsonArray(ThroughputJsonPath, throughputEntries) + writeJsonArray(LatencyJsonPath, latencyEntries) + println( + s"[bench] wrote ${results.size} throughput entries to ${ThroughputJsonPath.toAbsolutePath}" + ) + println( + s"[bench] wrote ${latencyEntries.size} latency entries to ${LatencyJsonPath.toAbsolutePath}" + ) + } + + private def writeJsonArray(path: java.nio.file.Path, entries: Seq[String]): Unit = { + val pw = new PrintWriter(Files.newBufferedWriter(path)) + try { + pw.println("[") + pw.println(entries.mkString(",\n")) + pw.println("]") + } finally pw.close() + } + + private def quoteJson(s: String): String = + "\"" + s.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" + + private def printSummary(results: Seq[BenchResult]): Unit = { + println("\n[bench] === summary ===") + println( + f"${"#"}%3s ${"bs"}%5s ${"sw"}%3s ${"sl"}%4s ${"tuples/s"}%10s ${"MB/s"}%7s " + + f"${"p50us"}%8s ${"p99us"}%8s" + ) + results.foreach { r => + println( + f"${r.cfg.configIdx}%3d ${r.cfg.batchSize}%5d ${r.cfg.schemaWidth}%3d ${r.cfg.stringLen}%4d " + + f"${r.tuplesPerSec}%,10.0f ${r.mbPerSec}%7.2f " + + f"${r.latencyP50Ns / 1000.0}%8.1f ${r.latencyP99Ns / 1000.0}%8.1f" + ) + } + } +} diff --git a/bin/run-benchmarks.sh b/bin/run-benchmarks.sh new file mode 100755 index 00000000000..e93f8f5a1bf --- /dev/null +++ b/bin/run-benchmarks.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Single entry-point for all Texera benchmarks. CI calls this script +# verbatim — it does NOT reference individual benchmark main classes. +# Adding a new benchmark (e.g., a JMH suite) means appending one block +# to this script; no CI workflow change. +# +# Output convention: every benchmark writes to bench-results/ with a +# self-describing filename suffix that matches the github-action-benchmark +# `tool` parameter expected by the publish step in build.yml: +# bench-results/-throughput.json → tool: customBiggerIsBetter +# bench-results/-latency.json → tool: customSmallerIsBetter +# bench-results/-jmh.json → tool: jmh +# CSV / log / debug files may live alongside; the publish matrix only +# cares about the *.json files declared in build.yml. +# +# Env vars honored: +# BENCH_NUM_BATCHES — passes through to the e2e bench (default 100). +# Lower for fast PR runs; higher for stable nightlies. +# UDF_PYTHON_PATH — Python executable for the spawned worker subprocess. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +cd "$REPO_ROOT" + +mkdir -p bench-results + +echo "=== run-benchmarks: arrow-flight-e2e ===" +sbt --error \ + "WorkflowExecutionService/Test/runMain org.apache.texera.amber.engine.architecture.pythonworker.bench.ArrowFlightActorBench" + +# Future benchmarks: add new blocks below. Each block should self-contain +# the run command and ensure its outputs land in bench-results/. Example +# for a future JMH suite: +# echo "=== run-benchmarks: arrow-utils-jmh ===" +# sbt "WorkflowExecutionService/Jmh/run -rf json -rff $REPO_ROOT/bench-results/arrow-utils-jmh.json" + +echo +echo "=== bench artifacts ===" +ls -la bench-results/