From 0caea370640de320e2facad1ab137ea88f9c20eb Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Thu, 2 Apr 2026 13:59:08 +0300 Subject: [PATCH 1/5] Refactor SLO workload to use environment variables - Migrate from CLI subcommands (table-run, topic-run) to environment-based configuration (WORKLOAD_NAME, YDB_ENDPOINT, etc.) - Simplify argument parsing: remove subcommand structure, add env var injection - Update metrics collection to always be enabled (remove otlp_endpoint checks) - Replace quantile-estimator with hdrhistogram - Update workflow matrix from include to sdk naming - Upgrade ydb-slo-action from commit hash to v2 tag - Add pyrightconfig.json for type checking - Update Dockerfile comments to reflect new env-var-based interface --- .github/workflows/slo-report.yml | 37 +---- .github/workflows/slo.yml | 153 ++++------------- pyproject.toml | 3 + tests/slo/Dockerfile | 18 +- tests/slo/requirements.txt | 2 +- tests/slo/src/__main__.py | 4 +- tests/slo/src/core/metrics.py | 220 +++++++++++++------------ tests/slo/src/jobs/async_topic_jobs.py | 4 - tests/slo/src/jobs/base.py | 3 - tests/slo/src/jobs/table_jobs.py | 4 +- tests/slo/src/options.py | 214 +++++++----------------- tests/slo/src/pyrightconfig.json | 4 + tests/slo/src/root_runner.py | 116 +++++-------- tests/slo/src/runners/table_runner.py | 2 +- tests/slo/src/runners/topic_runner.py | 4 +- 15 files changed, 268 insertions(+), 520 deletions(-) create mode 100644 tests/slo/src/pyrightconfig.json diff --git a/.github/workflows/slo-report.yml b/.github/workflows/slo-report.yml index 07231d3b..049f919e 100644 --- a/.github/workflows/slo-report.yml +++ b/.github/workflows/slo-report.yml @@ -16,42 +16,7 @@ jobs: pull-requests: write steps: - name: Publish YDB SLO Report - uses: ydb-platform/ydb-slo-action/report@13c687b7d4b2879da79dd12932dee0ed2b65dd1c + uses: ydb-platform/ydb-slo-action/report@v2 with: github_token: ${{ secrets.GITHUB_TOKEN }} github_run_id: ${{ github.event.workflow_run.id }} - - remove-slo-label: - if: always() && github.event.workflow_run.event == 'pull_request' - name: Remove SLO Label - needs: ydb-slo-action-report - runs-on: ubuntu-latest - permissions: - pull-requests: write - steps: - - name: Remove SLO label from PR - uses: actions/github-script@v7 - with: - script: | - const pullRequests = context.payload.workflow_run.pull_requests; - if (pullRequests && pullRequests.length > 0) { - for (const pr of pullRequests) { - try { - await github.rest.issues.removeLabel({ - owner: context.repo.owner, - repo: context.repo.repo, - issue_number: pr.number, - name: 'SLO' - }); - console.log(`Removed SLO label from PR #${pr.number}`); - } catch (error) { - if (error.status === 404) { - console.log(`SLO label not found on PR #${pr.number}, skipping`); - } else { - throw error; - } - } - } - } else { - console.log('No pull requests associated with this workflow run'); - } diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index fbbf66cc..465e173a 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -42,13 +42,11 @@ jobs: strategy: fail-fast: false matrix: - include: - - id: sync-table - prefix: table - workload: sync-table - - id: sync-query - prefix: table - workload: sync-query + sdk: + - name: sync-table + command: "--read-rps ${{ inputs.slo_workload_read_max_rps || '1000' }} --write-rps ${{ inputs.slo_workload_write_max_rps || '100' }}" + - name: sync-query + command: "--read-rps ${{ inputs.slo_workload_read_max_rps || '1000' }} --write-rps ${{ inputs.slo_workload_write_max_rps || '100' }}" concurrency: group: slo-${{ github.ref }}-${{ matrix.workload }} @@ -141,125 +139,32 @@ jobs: -t "ydb-app-baseline" \ "$GITHUB_WORKSPACE/baseline" - - name: Initialize YDB SLO - id: ydb_slo - uses: ydb-platform/ydb-slo-action/init@13c687b7d4b2879da79dd12932dee0ed2b65dd1c + - name: Run SLO Tests + uses: ydb-platform/ydb-slo-action/init@v2 + timeout-minutes: 30 with: - github_issue: ${{ github.event.pull_request.number || inputs.github_issue }} + github_issue: ${{ github.event.inputs.github_issue }} github_token: ${{ secrets.GITHUB_TOKEN }} - workload_name: ydb-python-${{ matrix.workload }} + workload_name: ${{ matrix.sdk.name }} + workload_duration: ${{ inputs.slo_workload_duration_seconds || '600' }} workload_current_ref: ${{ github.head_ref || github.ref_name }} + workload_current_image: ydb-app-current + workload_current_command: ${{ matrix.sdk.command }} workload_baseline_ref: ${{ steps.baseline.outputs.ref }} - - - name: Prepare SLO Database - run: | - docker run --rm \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - -e "WORKLOAD=${{ matrix.workload }}" \ - -e "REF=${{ github.head_ref || github.ref_name }}" \ - ydb-app-current \ - ${{ matrix.prefix }}-create grpc://ydb:2136 /Root/testdb - - - name: Run SLO Tests (current + baseline in parallel) - timeout-minutes: 15 - env: - WORKLOAD: ${{ matrix.workload }} - DURATION: ${{ inputs.slo_workload_duration_seconds || 600 }} - READ_RPS: ${{ inputs.slo_workload_read_max_rps || 1000 }} - WRITE_RPS: ${{ inputs.slo_workload_write_max_rps || 100 }} - CURRENT_REF: ${{ github.head_ref || github.ref_name }} - BASELINE_REF: ${{ steps.baseline.outputs.ref }} - run: | - ARGS="${{ matrix.prefix }}-run grpc://ydb:2136 /Root/testdb \ - --otlp-endpoint http://prometheus:9090/api/v1/otlp/v1/metrics \ - --report-period 250 \ - --time ${DURATION} \ - --read-rps ${READ_RPS} \ - --write-rps ${WRITE_RPS} \ - --read-timeout 1000 \ - --write-timeout 1000" - - echo "Starting current workload (ref=${CURRENT_REF}, workload=${WORKLOAD})..." - docker run -d \ - --name ydb-app-current \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - -e "REF=${CURRENT_REF}" \ - -e "WORKLOAD=${WORKLOAD}" \ - ydb-app-current \ - $ARGS - - echo "Starting baseline workload (ref=${BASELINE_REF}, workload=${WORKLOAD})..." - docker run -d \ - --name ydb-app-baseline \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - -e "REF=${BASELINE_REF}" \ - -e "WORKLOAD=${WORKLOAD}" \ - ydb-app-baseline \ - $ARGS - - echo "" - echo "==================== INITIAL CURRENT LOGS ====================" - docker logs -n 15 ydb-app-current 2>&1 || echo "No current container" - echo "" - echo "==================== INITIAL BASELINE LOGS ====================" - docker logs -n 15 ydb-app-baseline 2>&1 || echo "No baseline container" - echo "" - - echo "Waiting for workloads to complete (${DURATION}s)..." - sleep ${DURATION} - - echo "Stopping containers after ${DURATION}s..." - docker stop --timeout=30 ydb-app-current ydb-app-baseline 2>&1 || true - - # Force kill if still running - docker kill ydb-app-current ydb-app-baseline 2>&1 || true - - # Check exit codes - CURRENT_EXIT=$(docker inspect ydb-app-current --format='{{.State.ExitCode}}' 2>/dev/null || echo "1") - BASELINE_EXIT=$(docker inspect ydb-app-baseline --format='{{.State.ExitCode}}' 2>/dev/null || echo "1") - - echo "Current exit code: ${CURRENT_EXIT}" - echo "Baseline exit code: ${BASELINE_EXIT}" - - echo "" - echo "==================== FINAL CURRENT LOGS ====================" - docker logs -n 15 ydb-app-current 2>&1 || echo "No current container" - echo "" - echo "==================== FINAL BASELINE LOGS ====================" - docker logs -n 15 ydb-app-baseline 2>&1 || echo "No baseline container" - echo "" - - if [[ "${CURRENT_EXIT}" != "0" || "${BASELINE_EXIT}" != "0" ]]; then - echo "One or both workloads failed." - exit 0 - fi - - echo "SUCCESS: Workloads completed successfully" - - - if: always() - name: Store logs - run: | - docker logs ydb-app-current > current.log 2>&1 || echo "No current container" > current.log - docker logs ydb-app-baseline > baseline.log 2>&1 || echo "No baseline container" > baseline.log - - - if: always() - name: Upload logs - uses: actions/upload-artifact@v4 + workload_baseline_image: ydb-app-baseline + workload_baseline_command: ${{ matrix.sdk.command }} + + ydb-slo-action-report: + runs-on: ubuntu-latest + name: Publish YDB SLO Report + needs: ydb-slo-action + permissions: + checks: write + contents: read + pull-requests: write + steps: + - name: Publish YDB SLO Report + uses: ydb-platform/ydb-slo-action/report@v2 with: - name: ydb-python-${{ matrix.workload }}-logs - path: | - ./current.log - ./baseline.log - retention-days: 1 + github_token: ${{ secrets.GITHUB_TOKEN }} + github_run_id: ${{ github.run_id }} diff --git a/pyproject.toml b/pyproject.toml index 41e7ef6f..13ec71e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ +[tool.ty.environment] +extra-paths = ["tests/slo/src"] + [tool.black] line-length = 120 diff --git a/tests/slo/Dockerfile b/tests/slo/Dockerfile index 4f18a2b3..ecf1bca2 100644 --- a/tests/slo/Dockerfile +++ b/tests/slo/Dockerfile @@ -1,13 +1,21 @@ # syntax=docker/dockerfile:1 # This image packages the Python SLO workload runner. -# It expects to be run with arguments like: -# docker run --rm table-run --otlp-endpoint http://prometheus:9090/api/v1/otlp/v1/metrics ... +# +# Connection and workload identity are configured via environment variables: +# YDB_ENDPOINT grpc://ydb:2136 +# YDB_DATABASE /Root/testdb +# WORKLOAD_DURATION 600 +# WORKLOAD_NAME sync-query | sync-table | topic +# WORKLOAD_REF +# OTEL_EXPORTER_OTLP_METRICS_ENDPOINT http://ydb-prometheus:9090/api/v1/otlp/v1/metrics +# +# Additional tuning flags (read/write RPS, timeouts, thread counts) are passed via +# the Docker CMD, e.g.: +# docker run --rm --env-file ... --read-rps 1000 --write-rps 100 # # Notes: # - OpenTelemetry 1.39.x requires Python >= 3.9. -# - The entrypoint is `python ./tests/slo/src`, i.e. it runs the `__main__.py` -# from that directory (same as `python tests/slo/src ...` in CI). FROM python:3.11-slim AS build @@ -36,3 +44,5 @@ COPY --from=build /opt/venv /opt/venv COPY --from=build /src/tests/slo/src /app/tests/slo/src ENTRYPOINT ["python", "./tests/slo/src"] + +CMD ["--read-rps", "1000", "--write-rps", "100"] diff --git a/tests/slo/requirements.txt b/tests/slo/requirements.txt index cd5cdfe1..01f894a1 100644 --- a/tests/slo/requirements.txt +++ b/tests/slo/requirements.txt @@ -1,6 +1,6 @@ requests==2.33.0 aiolimiter==1.1.0 -quantile-estimator==0.1.2 +hdrhistogram # OpenTelemetry (OTLP/HTTP exporter) # NOTE: OpenTelemetry 1.39.1 requires Python >= 3.9. diff --git a/tests/slo/src/__main__.py b/tests/slo/src/__main__.py index dd1ae0b7..96eb186c 100644 --- a/tests/slo/src/__main__.py +++ b/tests/slo/src/__main__.py @@ -2,7 +2,7 @@ import logging from options import parse_options -from root_runner import run_from_args +from root_runner import run_all if __name__ == "__main__": @@ -12,4 +12,4 @@ log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig(level=log_level, format="%(asctime)s %(levelname)-8s %(message)s") - run_from_args(args) + run_all(args) diff --git a/tests/slo/src/core/metrics.py b/tests/slo/src/core/metrics.py index bff90eda..cacd73ea 100644 --- a/tests/slo/src/core/metrics.py +++ b/tests/slo/src/core/metrics.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import threading import time from abc import ABC, abstractmethod from collections.abc import Iterable @@ -12,8 +13,8 @@ OP_TYPE_READ, OP_TYPE_WRITE = "read", "write" OP_STATUS_SUCCESS, OP_STATUS_FAILURE = "success", "err" -REF = environ.get("REF", "main") -WORKLOAD = environ.get("WORKLOAD", "sync-query") +WORKLOAD_REF = environ.get("WORKLOAD_REF", environ.get("REF", "main")) +WORKLOAD_NAME = environ.get("WORKLOAD_NAME", environ.get("WORKLOAD", "sync-query")) logger = logging.getLogger(__name__) @@ -101,116 +102,119 @@ def push(self) -> None: class OtlpMetrics(BaseMetrics): """ - Canonical OpenTelemetry metrics implementation. - - This exports metrics via OTLP/HTTP to a Prometheus server with OTLP receiver enabled: - POST http(s)://:/api/v1/otlp/v1/metrics - - Naming notes: - - Metric names follow OpenTelemetry conventions (dot-separated namespaces, e.g. `sdk.operations.total`). - - Prometheus OTLP translation typically converts dots to underscores and may add suffixes like - `_total` for counters and `_bucket/_sum/_count` for histograms. + OpenTelemetry metrics implementation. + + Exports via OTLP/HTTP; the endpoint is configured through standard OTel env vars: + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics) + OTEL_EXPORTER_OTLP_ENDPOINT (fallback base URL) + OTEL_EXPORTER_OTLP_PROTOCOL (default: http/protobuf) + + Latency is tracked with an HDR histogram per (operation_type, operation_status) label + combination and published as three Gauge instruments: + sdk_operation_latency_p50_seconds + sdk_operation_latency_p95_seconds + sdk_operation_latency_p99_seconds """ - def __init__(self, otlp_metrics_endpoint: str): + # HDR histogram range: 1 µs … 60 s (in microseconds), 3 significant figures. + _HDR_MIN_US = 1 + _HDR_MAX_US = 60_000_000 + _HDR_SIG_FIGS = 3 + + def __init__(self): from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( OTLPMetricExporter, ) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader - from opentelemetry.sdk.metrics.view import ( - ExplicitBucketHistogramAggregation, - View, - ) from opentelemetry.sdk.resources import Resource - # Resource attributes: Prometheus maps service.name -> job, service.instance.id -> instance. resource = Resource.create( { - "service.name": f"workload-{WORKLOAD}", - "service.instance.id": environ.get("SLO_INSTANCE_ID", f"{REF}-{WORKLOAD}"), - "ref": REF, + "service.name": f"workload-{WORKLOAD_NAME}", + "service.instance.id": environ.get( + "SLO_INSTANCE_ID", f"{WORKLOAD_REF}-{WORKLOAD_NAME}" + ), + "ref": WORKLOAD_REF, "sdk": "ydb-python-sdk", "sdk_version": version("ydb"), - "workload": WORKLOAD, + "workload": WORKLOAD_NAME, "workload_version": "0.0.0", } ) - exporter = OTLPMetricExporter(endpoint=otlp_metrics_endpoint) - reader = PeriodicExportingMetricReader(exporter) # we force_flush() explicitly in push() - - latency_view = View( - instrument_name="sdk.operation.latency", - aggregation=ExplicitBucketHistogramAggregation( - boundaries=( - 0.001, - 0.002, - 0.003, - 0.004, - 0.005, - 0.0075, - 0.010, - 0.020, - 0.050, - 0.100, - 0.200, - 0.500, - 1.000, - ) - ), - ) + # Endpoint is read automatically from OTEL_EXPORTER_OTLP_METRICS_ENDPOINT / + # OTEL_EXPORTER_OTLP_ENDPOINT by the exporter; no need to pass it explicitly. + exporter = OTLPMetricExporter() + reader = PeriodicExportingMetricReader(exporter) - self._provider = MeterProvider( - resource=resource, - metric_readers=[reader], - views=[latency_view], - ) + self._provider = MeterProvider(resource=resource, metric_readers=[reader]) self._meter = self._provider.get_meter("ydb-slo") - # Instruments (sync) + # Counters self._errors = self._meter.create_counter( name="sdk.errors.total", description="Total number of errors encountered, categorized by error type.", ) self._operations_total = self._meter.create_counter( name="sdk.operations.total", - description="Total number of operations, categorized by type attempted by the SDK.", + description="Total number of operations attempted by the SDK.", ) self._operations_success_total = self._meter.create_counter( name="sdk.operations.success.total", - description="Total number of successful operations, categorized by type.", + description="Total number of successful operations.", ) self._operations_failure_total = self._meter.create_counter( name="sdk.operations.failure.total", - description="Total number of failed operations, categorized by type.", + description="Total number of failed operations.", ) - self._latency = self._meter.create_histogram( - name="sdk.operation.latency", - unit="s", - description="Latency of operations performed by the SDK in seconds, categorized by type and status.", + self._retry_attempts_total = self._meter.create_counter( + name="sdk.retry.attempts.total", + description="Total number of retry attempts.", ) - self._pending = self._meter.create_up_down_counter( name="sdk.pending.operations", - description="Current number of pending operations, categorized by type.", + description="Current number of pending operations.", ) - self._retry_attempts_total = self._meter.create_counter( - name="sdk.retry.attempts.total", - description="Total number of retry attempts, categorized by ref and operation type.", + # Latency gauges (fed from HDR histograms via push()) + self._latency_p50 = self._meter.create_gauge( + name="sdk_operation_latency_p50_seconds", + unit="s", + description="P50 operation latency in seconds.", + ) + self._latency_p95 = self._meter.create_gauge( + name="sdk_operation_latency_p95_seconds", + unit="s", + description="P95 operation latency in seconds.", + ) + self._latency_p99 = self._meter.create_gauge( + name="sdk_operation_latency_p99_seconds", + unit="s", + description="P99 operation latency in seconds.", ) + # HDR histograms: key → (operation_type, operation_status) + self._hdr_lock = threading.Lock() + self._hdr: dict = {} + self.reset() + def _hdr_for(self, key: tuple): + """Return (creating if necessary) an HDR histogram for the given label key.""" + from hdrh.histogram import HdrHistogram + + hist = self._hdr.get(key) + if hist is None: + hist = HdrHistogram(self._HDR_MIN_US, self._HDR_MAX_US, self._HDR_SIG_FIGS) + self._hdr[key] = hist + return hist + def start(self, labels) -> float: labels_t = _normalize_labels(labels) self._pending.add( 1, - attributes={ - "ref": REF, - "operation_type": labels_t[0], - }, + attributes={"ref": WORKLOAD_REF, "operation_type": labels_t[0]}, ) return time.time() @@ -223,76 +227,74 @@ def stop( ) -> None: labels_t = _normalize_labels(labels) duration = time.time() - start_time + duration_us = max(self._HDR_MIN_US, int(duration * 1_000_000)) op_type = labels_t[0] - base_attrs = { - "ref": REF, - "operation_type": op_type, - } + op_status = OP_STATUS_SUCCESS if error is None else OP_STATUS_FAILURE + base_attrs = {"ref": WORKLOAD_REF, "operation_type": op_type} - # Update instruments self._retry_attempts_total.add(int(attempts), attributes=base_attrs) self._pending.add(-1, attributes=base_attrs) - - # Counters + latency self._operations_total.add(1, attributes=base_attrs) if error is not None: self._errors.add( 1, - attributes={ - **base_attrs, - "error_type": type(error).__name__, - }, + attributes={**base_attrs, "error_type": type(error).__name__}, ) self._operations_failure_total.add(1, attributes=base_attrs) - self._latency.record( - duration, - attributes={ - **base_attrs, - "operation_status": OP_STATUS_FAILURE, - }, - ) - return - - self._operations_success_total.add(1, attributes=base_attrs) - self._latency.record( - duration, - attributes={ - **base_attrs, - "operation_status": OP_STATUS_SUCCESS, - }, - ) + else: + self._operations_success_total.add(1, attributes=base_attrs) + + with self._hdr_lock: + self._hdr_for((op_type, op_status)).record_value(duration_us) def push(self) -> None: - # Metrics job calls push() with the cadence of --report-period. - # force_flush() makes the exporter send immediately. + with self._hdr_lock: + snapshot = {k: v for k, v in self._hdr.items()} + + for (op_type, op_status), hist in snapshot.items(): + attrs = { + "ref": WORKLOAD_REF, + "operation_type": op_type, + "operation_status": op_status, + } + p50 = hist.get_value_at_percentile(50.0) / 1_000_000 + p95 = hist.get_value_at_percentile(95.0) / 1_000_000 + p99 = hist.get_value_at_percentile(99.0) / 1_000_000 + self._latency_p50.set(p50, attributes=attrs) + self._latency_p95.set(p95, attributes=attrs) + self._latency_p99.set(p99, attributes=attrs) + self._provider.force_flush() def reset(self) -> None: - # OpenTelemetry counters/histograms are cumulative and cannot be reset. - # Reset is implemented as an immediate push/flush. self.push() -def create_metrics(otlp_endpoint: Optional[str]) -> BaseMetrics: +def create_metrics() -> BaseMetrics: """ Factory used by SLO runners. - Metrics are enabled if either: - - OTLP_ENDPOINT env var is set, or - - `--otlp-endpoint` is provided (and non-empty) - - If endpoint is empty, metrics are disabled (DummyMetrics). + Metrics are enabled when OTEL_EXPORTER_OTLP_METRICS_ENDPOINT (or the generic + OTEL_EXPORTER_OTLP_ENDPOINT) is set in the environment; otherwise a no-op + DummyMetrics is returned. """ - endpoint = (environ.get("OTLP_ENDPOINT") or (otlp_endpoint or "")).strip() + endpoint = ( + environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") + or environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + or "" + ).strip() + if not endpoint: - logger.info("Creating dummy metrics (metrics disabled)") + logger.info("OTLP endpoint not configured — metrics disabled") return DummyMetrics() - logger.info("Creating OTLP metrics exporter to Prometheus: %s", endpoint) + logger.info("Creating OTLP metrics exporter (endpoint from env)") try: - return OtlpMetrics(endpoint) + return OtlpMetrics() except Exception: - logger.exception("Failed to initialize OTLP metrics exporter; falling back to DummyMetrics") + logger.exception( + "Failed to initialize OTLP metrics — falling back to DummyMetrics" + ) return DummyMetrics() diff --git a/tests/slo/src/jobs/async_topic_jobs.py b/tests/slo/src/jobs/async_topic_jobs.py index 92ae9fa4..03f10f30 100644 --- a/tests/slo/src/jobs/async_topic_jobs.py +++ b/tests/slo/src/jobs/async_topic_jobs.py @@ -120,10 +120,6 @@ async def _run_topic_reads(self, limiter: AsyncLimiter): logger.info("Stop async topic read workload") def _run_metric_job(self): - # Metrics are enabled only if an OTLP endpoint is provided (CLI: --otlp-endpoint). - if not getattr(self.args, "otlp_endpoint", None): - return [] - task = asyncio.create_task( self._async_metric_sender(self.args.time), name="slo_metrics_sender", diff --git a/tests/slo/src/jobs/base.py b/tests/slo/src/jobs/base.py index 32246fa7..4bf1a529 100644 --- a/tests/slo/src/jobs/base.py +++ b/tests/slo/src/jobs/base.py @@ -54,9 +54,6 @@ def run_tests(self): pass def _run_metric_job(self): - if not getattr(self.args, "otlp_endpoint", None): - return [] - report_period_ms = max(1, int(self.args.report_period)) limiter = SyncRateLimiter(min_interval_s=report_period_ms / 1000.0) diff --git a/tests/slo/src/jobs/table_jobs.py b/tests/slo/src/jobs/table_jobs.py index 2452c7b7..8e4cf6bd 100644 --- a/tests/slo/src/jobs/table_jobs.py +++ b/tests/slo/src/jobs/table_jobs.py @@ -92,9 +92,9 @@ def __init__(self, driver, args, metrics, table_name, max_id): self.table_name = table_name self.max_id = max_id - from core.metrics import WORKLOAD + from core.metrics import WORKLOAD_NAME - self.workload_type = WORKLOAD + self.workload_type = WORKLOAD_NAME def run_tests(self): if self.workload_type == "sync-table": diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index 7082d086..11094eb7 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -1,182 +1,86 @@ import argparse +import os -def add_common_options(parser): - parser.add_argument("endpoint", help="YDB endpoint") - parser.add_argument("db", help="YDB database name") - parser.add_argument("-t", "--table-name", default="key_value", help="Table name") - parser.add_argument("--debug", action="store_true", help="Enable debug logging") - parser.add_argument("--async", action="store_true", help="Use async mode for operations") - +def parse_options(): + """ + Parse CLI arguments (passed via Docker CMD section). + Connection, duration, and workload identity are configured via environment variables: + YDB_ENDPOINT — YDB endpoint (e.g. grpc://ydb:2136) + YDB_DATABASE — YDB database path (e.g. /Root/testdb) + WORKLOAD_DURATION — total run duration in seconds (default: 600) + """ + parser = argparse.ArgumentParser( + description="YDB Python SLO workload", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) -def make_table_create_parser(subparsers): - table_create_parser = subparsers.add_parser("table-create", help="Create tables and fill with initial content") - add_common_options(table_create_parser) + parser.add_argument("--debug", action="store_true", help="Enable debug logging") - table_create_parser.add_argument( - "-p-min", - "--min-partitions-count", - default=6, - type=int, - help="Minimum amount of partitions in table", - ) - table_create_parser.add_argument( - "-p-max", - "--max-partitions-count", - default=1000, - type=int, - help="Maximum amount of partitions in table", + # Table params + parser.add_argument("--table-name", default="key_value", help="Table name") + parser.add_argument("--min-partitions-count", default=6, type=int) + parser.add_argument("--max-partitions-count", default=1000, type=int) + parser.add_argument( + "--partition-size", default=100, type=int, help="Partition size [mb]" ) - table_create_parser.add_argument("-p-size", "--partition-size", default=100, type=int, help="Partition size [mb]") - table_create_parser.add_argument( - "-c", + parser.add_argument( "--initial-data-count", default=1000, type=int, - help="Total number of records to generate", + help="Number of rows to pre-fill", ) - - table_create_parser.add_argument( - "--write-timeout", - default=20000, - type=int, - help="Write requests execution timeout [ms]", + parser.add_argument( + "--batch-size", default=100, type=int, help="Rows per insert batch" ) - - table_create_parser.add_argument( - "--batch-size", - default=100, - type=int, - help="Number of new records in each create request", + parser.add_argument( + "--threads", default=10, type=int, help="Threads for initial data fill" ) - table_create_parser.add_argument("--threads", default=10, type=int, help="Number of threads to use") - -def make_table_run_parser(subparsers): - table_run_parser = subparsers.add_parser("table-run", help="Run table SLO workload") - add_common_options(table_run_parser) - - table_run_parser.add_argument("--read-rps", default=100, type=int, help="Read request rps") - table_run_parser.add_argument( - "--read-timeout", - default=10000, - type=int, - help="Read requests execution timeout [ms]", + # Run params + parser.add_argument("--read-rps", default=100, type=int, help="Read RPS limit") + parser.add_argument( + "--read-timeout", default=10000, type=int, help="Read timeout [ms]" ) - - table_run_parser.add_argument("--write-rps", default=10, type=int, help="Write request rps") - table_run_parser.add_argument( - "--write-timeout", - default=20000, - type=int, - help="Write requests execution timeout [ms]", + parser.add_argument("--write-rps", default=10, type=int, help="Write RPS limit") + parser.add_argument( + "--write-timeout", default=20000, type=int, help="Write timeout [ms]" ) - - table_run_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") - table_run_parser.add_argument( - "--shutdown-time", - default=10, - type=int, - help="Graceful shutdown time in seconds", + parser.add_argument( + "--read-threads", default=8, type=int, help="Read worker threads" ) - - table_run_parser.add_argument( - "--otlp-endpoint", - default="http://localhost:9090/api/v1/otlp/v1/metrics", - type=str, - help="Full Prometheus OTLP metrics endpoint (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics). Empty to disable.", + parser.add_argument( + "--write-threads", default=4, type=int, help="Write worker threads" ) - table_run_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") - - table_run_parser.add_argument("--read-threads", default=8, type=int, help="Number of threads to use for write") - table_run_parser.add_argument("--write-threads", default=4, type=int, help="Number of threads to use for read") - - -def make_table_cleanup_parser(subparsers): - table_cleanup_parser = subparsers.add_parser("table-cleanup", help="Drop tables") - add_common_options(table_cleanup_parser) - - -def make_topic_create_parser(subparsers): - topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer") - add_common_options(topic_create_parser) - - topic_create_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") - topic_create_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name") - topic_create_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count") - - -def make_topic_run_parser(subparsers): - topic_parser = subparsers.add_parser("topic-run", help="Run topic SLO workload") - add_common_options(topic_parser) - - topic_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") - topic_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name") - topic_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count") - topic_parser.add_argument("--read-rps", default=100, type=int, help="Topic read request rps") - topic_parser.add_argument("--read-timeout", default=5000, type=int, help="Topic read timeout [ms]") - topic_parser.add_argument("--write-rps", default=100, type=int, help="Topic write request rps") - topic_parser.add_argument("--write-timeout", default=5000, type=int, help="Topic write timeout [ms]") - topic_parser.add_argument( - "--read-threads", - default=1, - type=int, - help="Number of threads for topic reading", + parser.add_argument( + "--shutdown-time", default=10, type=int, help="Graceful shutdown time [s]" ) - topic_parser.add_argument( - "--write-threads", - default=1, - type=int, - help="Number of threads for topic writing", + parser.add_argument( + "--report-period", default=1000, type=int, help="Metrics push period [ms]" ) - topic_parser.add_argument("--message-size", default=100, type=int, help="Topic message size in bytes") - topic_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") - topic_parser.add_argument( - "--shutdown-time", - default=10, - type=int, - help="Graceful shutdown time in seconds", + # Topic params (used when WORKLOAD_NAME contains 'topic') + parser.add_argument("--topic-path", default="/local/slo_topic", help="Topic path") + parser.add_argument( + "--topic-consumer", default="slo_consumer", help="Topic consumer name" ) - topic_parser.add_argument( - "--otlp-endpoint", - default="http://localhost:9090/api/v1/otlp/v1/metrics", - type=str, - help="Full Prometheus OTLP metrics endpoint (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics). Empty to disable.", + parser.add_argument( + "--topic-partitions", default=1, type=int, help="Topic partition count" ) - topic_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") - - -def make_topic_cleanup_parser(subparsers): - topic_cleanup_parser = subparsers.add_parser("topic-cleanup", help="Drop topic") - add_common_options(topic_cleanup_parser) - - topic_cleanup_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") - - -def get_root_parser(): - parser = argparse.ArgumentParser( - formatter_class=argparse.RawDescriptionHelpFormatter, - description="YDB Python SLO application", - ) - - subparsers = parser.add_subparsers( - title="subcommands", - dest="subcommand", - help="List of subcommands", + parser.add_argument( + "--message-size", default=100, type=int, help="Topic message size [bytes]" ) - make_table_create_parser(subparsers) - make_table_run_parser(subparsers) - make_table_cleanup_parser(subparsers) + args = parser.parse_args() - make_topic_create_parser(subparsers) - make_topic_run_parser(subparsers) - make_topic_cleanup_parser(subparsers) + # Inject env-var-driven config as attributes so the rest of the code can use args.* uniformly + args.endpoint = os.environ.get("YDB_ENDPOINT", "grpc://localhost:2136") + args.db = os.environ.get("YDB_DATABASE", "/local") + args.time = int(os.environ.get("WORKLOAD_DURATION", "600")) - return parser + # Aliases used by topic runner + args.path = args.topic_path + args.consumer = args.topic_consumer + args.partitions_count = args.topic_partitions - -def parse_options(): - parser = get_root_parser() - return parser.parse_args() + return args diff --git a/tests/slo/src/pyrightconfig.json b/tests/slo/src/pyrightconfig.json new file mode 100644 index 00000000..e2cad99c --- /dev/null +++ b/tests/slo/src/pyrightconfig.json @@ -0,0 +1,4 @@ +{ + "pythonVersion": "3.9", + "extraPaths": ["."] +} diff --git a/tests/slo/src/root_runner.py b/tests/slo/src/root_runner.py index 20589c14..73d92bb3 100644 --- a/tests/slo/src/root_runner.py +++ b/tests/slo/src/root_runner.py @@ -1,94 +1,56 @@ import asyncio -import ydb -import ydb.aio import logging -from typing import Dict -from runners.topic_runner import TopicRunner +from core.metrics import WORKLOAD_NAME from runners.table_runner import TableRunner -from runners.base import BaseRunner - -logger = logging.getLogger(__name__) - - -class SLORunner: - def __init__(self): - self.runners: Dict[str, type(BaseRunner)] = {} - - def register_runner(self, prefix: str, runner_cls: type(BaseRunner)): - self.runners[prefix] = runner_cls +from runners.topic_runner import TopicRunner - def run_command(self, args): - subcommand_parts = args.subcommand.split("-", 1) - if len(subcommand_parts) < 2: - raise ValueError(f"Invalid subcommand format: {args.subcommand}. Expected 'prefix-command'") +import ydb +import ydb.aio - prefix, command = subcommand_parts - if prefix not in self.runners: - raise ValueError(f"Unknown prefix: {prefix}. Available: {list(self.runners.keys())}") +logger = logging.getLogger(__name__) - runner_instance = self.runners[prefix]() +_RUNNERS = { + "sync-table": TableRunner, + "sync-query": TableRunner, + "topic": TopicRunner, +} - # Check if async mode is requested and command is 'run' - if getattr(args, "async", False) and command == "run": - asyncio.run(self._run_async_command(args, runner_instance, command)) - else: - self._run_sync_command(args, runner_instance, command) - def _run_sync_command(self, args, runner_instance, command): - """Run command in synchronous mode""" - driver_config = ydb.DriverConfig( - args.endpoint, - database=args.db, - grpc_keep_alive_timeout=5000, +def _get_runner(): + runner_cls = _RUNNERS.get(WORKLOAD_NAME) + if runner_cls is None: + raise ValueError( + f"Unknown WORKLOAD_NAME: {WORKLOAD_NAME!r}. Known: {list(_RUNNERS)}" ) + return runner_cls() - with ydb.Driver(driver_config) as driver: - driver.wait(timeout=300) - try: - runner_instance.set_driver(driver) - if command == "create": - runner_instance.create(args) - elif command == "run": - runner_instance.run(args) - elif command == "cleanup": - runner_instance.cleanup(args) - else: - raise RuntimeError(f"Unknown command {command} for prefix {runner_instance.prefix}") - except BaseException: - logger.exception("Something went wrong") - raise - finally: - driver.stop(timeout=getattr(args, "shutdown_time", 10)) - async def _run_async_command(self, args, runner_instance, command): - """Run command in asynchronous mode""" - driver_config = ydb.DriverConfig( - args.endpoint, - database=args.db, - grpc_keep_alive_timeout=5000, - ) +def run_all(args): + """Create infrastructure, run the workload, then clean up — all in one go.""" + runner = _get_runner() - async with ydb.aio.Driver(driver_config) as driver: - await driver.wait(timeout=300) - try: - runner_instance.set_driver(driver) - if command == "run": - await runner_instance.run_async(args) - else: - raise RuntimeError(f"Async mode only supports 'run' command, got '{command}'") - except BaseException: - logger.exception("Something went wrong in async mode") - raise + driver_config = ydb.DriverConfig( + args.endpoint, + database=args.db, + grpc_keep_alive_timeout=5000, + ) + with ydb.Driver(driver_config) as driver: + driver.wait(timeout=300) + runner.set_driver(driver) -def create_runner() -> SLORunner: - runner = SLORunner() - runner.register_runner("table", TableRunner) - runner.register_runner("topic", TopicRunner) - return runner + try: + logger.info("[%s] Creating resources", WORKLOAD_NAME) + runner.create(args) + logger.info("[%s] Running workload for %d s", WORKLOAD_NAME, args.time) + runner.run(args) + finally: + logger.info("[%s] Cleaning up resources", WORKLOAD_NAME) + try: + runner.cleanup(args) + except Exception: + logger.exception("Cleanup failed — ignoring") -def run_from_args(args): - runner = create_runner() - runner.run_command(args) + driver.stop(timeout=args.shutdown_time) diff --git a/tests/slo/src/runners/table_runner.py b/tests/slo/src/runners/table_runner.py index fb6f00dc..f5643912 100644 --- a/tests/slo/src/runners/table_runner.py +++ b/tests/slo/src/runners/table_runner.py @@ -88,7 +88,7 @@ def transaction(session: ydb.table.Session): self.logger.info("Table creation completed") def run(self, args): - metrics = create_metrics(args.otlp_endpoint) + metrics = create_metrics() self.logger.info("Starting table SLO tests") diff --git a/tests/slo/src/runners/topic_runner.py b/tests/slo/src/runners/topic_runner.py index c9a8bdaa..e8de9a93 100644 --- a/tests/slo/src/runners/topic_runner.py +++ b/tests/slo/src/runners/topic_runner.py @@ -70,7 +70,7 @@ def create(self, args): def run(self, args): assert self.driver is not None, "Driver is not initialized. Call set_driver() before run()." - metrics = create_metrics(args.otlp_endpoint) + metrics = create_metrics() self.logger.info("Starting topic SLO tests") @@ -85,7 +85,7 @@ def run(self, args): async def run_async(self, args): """Async version of topic SLO tests using ydb.aio.Driver""" assert self.driver is not None, "Driver is not initialized. Call set_driver() before run_async()." - metrics = create_metrics(args.otlp_endpoint) + metrics = create_metrics() self.logger.info("Starting async topic SLO tests") From 520747975f74ee749b61d0a7378c8896196532ed Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Thu, 2 Apr 2026 14:12:55 +0300 Subject: [PATCH 2/5] Configure Ruff and reformat code to 120 chars Add Ruff configuration to match Black's line length, then reformat code accordingly. Remove unnecessary blank line in __main__.py. --- pyproject.toml | 3 +++ tests/slo/src/__main__.py | 1 - tests/slo/src/core/metrics.py | 12 +++------ tests/slo/src/options.py | 48 +++++++++-------------------------- tests/slo/src/root_runner.py | 4 +-- 5 files changed, 19 insertions(+), 49 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 13ec71e7..0bda93e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,9 @@ [tool.ty.environment] extra-paths = ["tests/slo/src"] +[tool.ruff] +line-length = 120 + [tool.black] line-length = 120 diff --git a/tests/slo/src/__main__.py b/tests/slo/src/__main__.py index 96eb186c..eb5504e0 100644 --- a/tests/slo/src/__main__.py +++ b/tests/slo/src/__main__.py @@ -4,7 +4,6 @@ from options import parse_options from root_runner import run_all - if __name__ == "__main__": args = parse_options() gc.disable() diff --git a/tests/slo/src/core/metrics.py b/tests/slo/src/core/metrics.py index cacd73ea..3a10b342 100644 --- a/tests/slo/src/core/metrics.py +++ b/tests/slo/src/core/metrics.py @@ -132,9 +132,7 @@ def __init__(self): resource = Resource.create( { "service.name": f"workload-{WORKLOAD_NAME}", - "service.instance.id": environ.get( - "SLO_INSTANCE_ID", f"{WORKLOAD_REF}-{WORKLOAD_NAME}" - ), + "service.instance.id": environ.get("SLO_INSTANCE_ID", f"{WORKLOAD_REF}-{WORKLOAD_NAME}"), "ref": WORKLOAD_REF, "sdk": "ydb-python-sdk", "sdk_version": version("ydb"), @@ -281,9 +279,7 @@ def create_metrics() -> BaseMetrics: DummyMetrics is returned. """ endpoint = ( - environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") - or environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") - or "" + environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") or environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") or "" ).strip() if not endpoint: @@ -294,7 +290,5 @@ def create_metrics() -> BaseMetrics: try: return OtlpMetrics() except Exception: - logger.exception( - "Failed to initialize OTLP metrics — falling back to DummyMetrics" - ) + logger.exception("Failed to initialize OTLP metrics — falling back to DummyMetrics") return DummyMetrics() diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index 11094eb7..61a9e6ab 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -21,55 +21,31 @@ def parse_options(): parser.add_argument("--table-name", default="key_value", help="Table name") parser.add_argument("--min-partitions-count", default=6, type=int) parser.add_argument("--max-partitions-count", default=1000, type=int) - parser.add_argument( - "--partition-size", default=100, type=int, help="Partition size [mb]" - ) + parser.add_argument("--partition-size", default=100, type=int, help="Partition size [mb]") parser.add_argument( "--initial-data-count", default=1000, type=int, help="Number of rows to pre-fill", ) - parser.add_argument( - "--batch-size", default=100, type=int, help="Rows per insert batch" - ) - parser.add_argument( - "--threads", default=10, type=int, help="Threads for initial data fill" - ) + parser.add_argument("--batch-size", default=100, type=int, help="Rows per insert batch") + parser.add_argument("--threads", default=10, type=int, help="Threads for initial data fill") # Run params parser.add_argument("--read-rps", default=100, type=int, help="Read RPS limit") - parser.add_argument( - "--read-timeout", default=10000, type=int, help="Read timeout [ms]" - ) + parser.add_argument("--read-timeout", default=10000, type=int, help="Read timeout [ms]") parser.add_argument("--write-rps", default=10, type=int, help="Write RPS limit") - parser.add_argument( - "--write-timeout", default=20000, type=int, help="Write timeout [ms]" - ) - parser.add_argument( - "--read-threads", default=8, type=int, help="Read worker threads" - ) - parser.add_argument( - "--write-threads", default=4, type=int, help="Write worker threads" - ) - parser.add_argument( - "--shutdown-time", default=10, type=int, help="Graceful shutdown time [s]" - ) - parser.add_argument( - "--report-period", default=1000, type=int, help="Metrics push period [ms]" - ) + parser.add_argument("--write-timeout", default=20000, type=int, help="Write timeout [ms]") + parser.add_argument("--read-threads", default=8, type=int, help="Read worker threads") + parser.add_argument("--write-threads", default=4, type=int, help="Write worker threads") + parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time [s]") + parser.add_argument("--report-period", default=1000, type=int, help="Metrics push period [ms]") # Topic params (used when WORKLOAD_NAME contains 'topic') parser.add_argument("--topic-path", default="/local/slo_topic", help="Topic path") - parser.add_argument( - "--topic-consumer", default="slo_consumer", help="Topic consumer name" - ) - parser.add_argument( - "--topic-partitions", default=1, type=int, help="Topic partition count" - ) - parser.add_argument( - "--message-size", default=100, type=int, help="Topic message size [bytes]" - ) + parser.add_argument("--topic-consumer", default="slo_consumer", help="Topic consumer name") + parser.add_argument("--topic-partitions", default=1, type=int, help="Topic partition count") + parser.add_argument("--message-size", default=100, type=int, help="Topic message size [bytes]") args = parser.parse_args() diff --git a/tests/slo/src/root_runner.py b/tests/slo/src/root_runner.py index 73d92bb3..2270383d 100644 --- a/tests/slo/src/root_runner.py +++ b/tests/slo/src/root_runner.py @@ -20,9 +20,7 @@ def _get_runner(): runner_cls = _RUNNERS.get(WORKLOAD_NAME) if runner_cls is None: - raise ValueError( - f"Unknown WORKLOAD_NAME: {WORKLOAD_NAME!r}. Known: {list(_RUNNERS)}" - ) + raise ValueError(f"Unknown WORKLOAD_NAME: {WORKLOAD_NAME!r}. Known: {list(_RUNNERS)}") return runner_cls() From baacda34c517b0e2d2154c3f0c36feeebef26387 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Thu, 2 Apr 2026 14:14:29 +0300 Subject: [PATCH 3/5] Use SDK name in SLO workflow concurrency group Replace matrix.workload with matrix.sdk.name to properly identify concurrent jobs by SDK variant. --- .github/workflows/slo.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 465e173a..fb8e8455 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -49,7 +49,7 @@ jobs: command: "--read-rps ${{ inputs.slo_workload_read_max_rps || '1000' }} --write-rps ${{ inputs.slo_workload_write_max_rps || '100' }}" concurrency: - group: slo-${{ github.ref }}-${{ matrix.workload }} + group: slo-${{ github.ref }}-${{ matrix.sdk.name }} cancel-in-progress: true steps: From a3013cd75941bd3bcc525081e4c12b8bec35ddda Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Thu, 2 Apr 2026 14:23:46 +0300 Subject: [PATCH 4/5] Pin hdrhistogram and improve argument parsing Add type hint to _run_metric_job return type. Move environment variable fallback logic from post-parse injection into argument parser definition using nargs="?" and default values. Remove unused asyncio import. Pin hdrhistogram and improve argument parsing - Pin hdrhistogram to v0.10.3 for reproducibility - Move YDB and workload config to proper argparse arguments with environment variable fallbacks instead of post-parsing assignment - Add type hint to _run_metric_job return type - Remove unused asyncio import --- tests/slo/requirements.txt | 2 +- tests/slo/src/jobs/base.py | 3 ++- tests/slo/src/options.py | 26 +++++++++++++++++++++----- tests/slo/src/root_runner.py | 1 - 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/tests/slo/requirements.txt b/tests/slo/requirements.txt index 01f894a1..d9021cc5 100644 --- a/tests/slo/requirements.txt +++ b/tests/slo/requirements.txt @@ -1,6 +1,6 @@ requests==2.33.0 aiolimiter==1.1.0 -hdrhistogram +hdrhistogram==0.10.3 # OpenTelemetry (OTLP/HTTP exporter) # NOTE: OpenTelemetry 1.39.1 requires Python >= 3.9. diff --git a/tests/slo/src/jobs/base.py b/tests/slo/src/jobs/base.py index 4bf1a529..9cc06598 100644 --- a/tests/slo/src/jobs/base.py +++ b/tests/slo/src/jobs/base.py @@ -2,6 +2,7 @@ import threading import time from abc import ABC, abstractmethod +from typing import Any import ydb @@ -53,7 +54,7 @@ def __init__(self, driver, args, metrics): def run_tests(self): pass - def _run_metric_job(self): + def _run_metric_job(self) -> list[Any]: report_period_ms = max(1, int(self.args.report_period)) limiter = SyncRateLimiter(min_interval_s=report_period_ms / 1000.0) diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index 61a9e6ab..883cfbf6 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -15,6 +15,27 @@ def parse_options(): formatter_class=argparse.RawDescriptionHelpFormatter, ) + # Positional args with env var fallback — pass explicitly for local runs, + # or rely on env vars (YDB_ENDPOINT / YDB_DATABASE / WORKLOAD_DURATION) in Docker/CI. + parser.add_argument( + "endpoint", + nargs="?", + default=os.environ.get("YDB_ENDPOINT", "grpc://localhost:2136"), + help="YDB endpoint (default: $YDB_ENDPOINT)", + ) + parser.add_argument( + "db", + nargs="?", + default=os.environ.get("YDB_DATABASE", "/local"), + help="YDB database (default: $YDB_DATABASE)", + ) + parser.add_argument( + "--time", + default=int(os.environ.get("WORKLOAD_DURATION", "600")), + type=int, + help="Workload duration in seconds (default: $WORKLOAD_DURATION)", + ) + parser.add_argument("--debug", action="store_true", help="Enable debug logging") # Table params @@ -49,11 +70,6 @@ def parse_options(): args = parser.parse_args() - # Inject env-var-driven config as attributes so the rest of the code can use args.* uniformly - args.endpoint = os.environ.get("YDB_ENDPOINT", "grpc://localhost:2136") - args.db = os.environ.get("YDB_DATABASE", "/local") - args.time = int(os.environ.get("WORKLOAD_DURATION", "600")) - # Aliases used by topic runner args.path = args.topic_path args.consumer = args.topic_consumer diff --git a/tests/slo/src/root_runner.py b/tests/slo/src/root_runner.py index 2270383d..c3d75fb2 100644 --- a/tests/slo/src/root_runner.py +++ b/tests/slo/src/root_runner.py @@ -1,4 +1,3 @@ -import asyncio import logging from core.metrics import WORKLOAD_NAME From 27bdeab0016a2c786ad890c5523244f085861ad8 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Thu, 2 Apr 2026 14:50:23 +0300 Subject: [PATCH 5/5] Update SLO workflow baseline image to use current version. --- .github/workflows/slo.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index fb8e8455..e8b68a79 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -151,7 +151,7 @@ jobs: workload_current_image: ydb-app-current workload_current_command: ${{ matrix.sdk.command }} workload_baseline_ref: ${{ steps.baseline.outputs.ref }} - workload_baseline_image: ydb-app-baseline + workload_baseline_image: ydb-app-current workload_baseline_command: ${{ matrix.sdk.command }} ydb-slo-action-report: