diff --git a/amber/src/main/python/texera_run_python_worker.py b/amber/src/main/python/texera_run_python_worker.py index 5c3e25e0969..1cbec51a041 100644 --- a/amber/src/main/python/texera_run_python_worker.py +++ b/amber/src/main/python/texera_run_python_worker.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import json import sys from loguru import logger @@ -49,54 +50,103 @@ def init_loguru_logger(stream_log_level) -> None: logger.add(sys.stderr, level=stream_log_level) -if __name__ == "__main__": - ( - _, - worker_id, - output_port, - logger_level, - r_path, - iceberg_catalog_type, - iceberg_postgres_catalog_uri_without_scheme, - iceberg_postgres_catalog_username, - iceberg_postgres_catalog_password, - iceberg_rest_catalog_uri, - iceberg_rest_catalog_warehouse_name, - iceberg_table_namespace, - iceberg_table_state_namespace, - iceberg_file_storage_directory_path, - iceberg_table_commit_batch_size, - s3_endpoint, - s3_region, - s3_auth_username, - s3_auth_password, - s3_large_binaries_base_uri, - ) = sys.argv - init_loguru_logger(logger_level) +# Keys the JVM side (PythonWorkflowWorker) sends in the startup-config JSON. +# Declared here so any drift between the two sides fails loudly instead of being +# silently misassigned, as could happen with the previous positional unpacking. +EXPECTED_CONFIG_KEYS = frozenset( + { + "workerId", + "outputPort", + "loggerLevel", + "rPath", + "icebergCatalogType", + "icebergPostgresCatalogUriWithoutScheme", + "icebergPostgresCatalogUsername", + "icebergPostgresCatalogPassword", + "icebergRestCatalogUri", + "icebergRestCatalogWarehouseName", + "icebergTableNamespace", + "icebergTableStateNamespace", + "icebergFileStorageDirectoryPath", + "icebergTableCommitBatchSize", + "s3Endpoint", + "s3Region", + "s3AuthUsername", + "s3AuthPassword", + "s3LargeBinariesBaseUri", + } +) + + +def parse_startup_config(raw_config: str) -> dict: + """Parse and validate the JSON startup configuration. + + The configuration is passed by name (see PythonWorkflowWorker on the JVM + side), so the two sides must agree on an exact key set. Key order is + irrelevant since it is a JSON object. Any drift fails loudly: + - a missing or unexpected key raises ValueError; + - a non-string value raises TypeError. + """ + config = json.loads(raw_config) + if not isinstance(config, dict): + raise TypeError( + f"startup config must be a JSON object, got {type(config).__name__}" + ) + + actual_keys = set(config) + missing = EXPECTED_CONFIG_KEYS - actual_keys + unexpected = actual_keys - EXPECTED_CONFIG_KEYS + if missing or unexpected: + raise ValueError( + f"startup config key mismatch: missing={sorted(missing)}, " + f"unexpected={sorted(unexpected)}" + ) + + non_string_keys = sorted(k for k, v in config.items() if not isinstance(v, str)) + if non_string_keys: + raise TypeError( + f"startup config values must be strings; non-string keys: {non_string_keys}" + ) + + return config + + +def main(raw_config: str) -> None: + """Start a Python worker from its validated JSON startup configuration.""" + config = parse_startup_config(raw_config) + + init_loguru_logger(config["loggerLevel"]) StorageConfig.initialize( - iceberg_catalog_type, - iceberg_postgres_catalog_uri_without_scheme, - iceberg_postgres_catalog_username, - iceberg_postgres_catalog_password, - iceberg_rest_catalog_uri, - iceberg_rest_catalog_warehouse_name, - iceberg_table_namespace, - iceberg_table_state_namespace, - iceberg_file_storage_directory_path, - iceberg_table_commit_batch_size, - s3_endpoint, - s3_region, - s3_auth_username, - s3_auth_password, - s3_large_binaries_base_uri, + config["icebergCatalogType"], + config["icebergPostgresCatalogUriWithoutScheme"], + config["icebergPostgresCatalogUsername"], + config["icebergPostgresCatalogPassword"], + config["icebergRestCatalogUri"], + config["icebergRestCatalogWarehouseName"], + config["icebergTableNamespace"], + config["icebergTableStateNamespace"], + config["icebergFileStorageDirectoryPath"], + config["icebergTableCommitBatchSize"], + config["s3Endpoint"], + config["s3Region"], + config["s3AuthUsername"], + config["s3AuthPassword"], + config["s3LargeBinariesBaseUri"], ) # Setting R_HOME environment variable for R-UDF usage + r_path = config["rPath"] if r_path: import os os.environ["R_HOME"] = r_path PythonWorker( - worker_id=worker_id, host="localhost", output_port=int(output_port) + worker_id=config["workerId"], + host="localhost", + output_port=int(config["outputPort"]), ).run() + + +if __name__ == "__main__": + main(sys.argv[1]) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala index c7ed3b63242..0f9ffb608e2 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala @@ -39,6 +39,7 @@ import org.apache.texera.amber.engine.common.actormessage.{Backpressure, CreditU import org.apache.texera.amber.engine.common.ambermessage.WorkflowMessage.getInMemSize import org.apache.texera.amber.engine.common.ambermessage._ import org.apache.texera.amber.engine.common.{CheckpointState, Utils} +import org.apache.texera.amber.util.JSONUtils.objectMapper import java.nio.file.Path import org.apache.texera.web.resource.pythonvirtualenvironment.PveManager @@ -47,6 +48,20 @@ import scala.sys.process.{BasicIO, Process} object PythonWorkflowWorker { def props(workerConfig: WorkerConfig): Props = Props(new PythonWorkflowWorker(workerConfig)) + + /** + * Serialize the Python worker startup configuration to a JSON object, keyed by + * name. Built from a sequence of (key, value) pairs so a duplicate key fails + * loudly here instead of being silently dropped by Map construction. + */ + def encodeStartupConfig(entries: Seq[(String, String)]): String = { + val duplicateKeys = entries.groupBy(_._1).collect { case (key, group) if group.size > 1 => key } + require( + duplicateKeys.isEmpty, + s"duplicate Python worker startup config keys: ${duplicateKeys.mkString(", ")}" + ) + objectMapper.writeValueAsString(entries.toMap) + } } class PythonWorkflowWorker( @@ -187,30 +202,45 @@ class PythonWorkflowWorker( // Set the Iceberg related arguments based on the catalog type. val isPostgres = StorageConfig.icebergCatalogType == "postgres" val isRest = StorageConfig.icebergCatalogType == "rest" + + // Pass startup configuration to the Python worker by name, as a single JSON + // object, rather than by argv position. This way the two sides agree by key, + // so adding/removing/reordering a field can no longer silently misassign + // values; a missing or renamed key fails loudly on the Python side instead. + // Built as a sequence so a duplicate key fails loudly (see encodeStartupConfig) + // rather than being silently dropped. + val startupConfig: Seq[(String, String)] = Seq( + "workerId" -> workerConfig.workerId.name, + "outputPort" -> Integer.toString(pythonProxyServer.getPortNumber.get()), + "loggerLevel" -> UdfConfig.pythonLogStreamHandlerLevel, + "rPath" -> RENVPath, + "icebergCatalogType" -> StorageConfig.icebergCatalogType, + "icebergPostgresCatalogUriWithoutScheme" -> + (if (isPostgres) StorageConfig.icebergPostgresCatalogUriWithoutScheme else ""), + "icebergPostgresCatalogUsername" -> + (if (isPostgres) StorageConfig.icebergPostgresCatalogUsername else ""), + "icebergPostgresCatalogPassword" -> + (if (isPostgres) StorageConfig.icebergPostgresCatalogPassword else ""), + "icebergRestCatalogUri" -> (if (isRest) StorageConfig.icebergRESTCatalogUri else ""), + "icebergRestCatalogWarehouseName" -> + (if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else ""), + "icebergTableNamespace" -> StorageConfig.icebergTableResultNamespace, + "icebergTableStateNamespace" -> StorageConfig.icebergTableStateNamespace, + "icebergFileStorageDirectoryPath" -> StorageConfig.fileStorageDirectoryPath.toString, + "icebergTableCommitBatchSize" -> StorageConfig.icebergTableCommitBatchSize.toString, + "s3Endpoint" -> StorageConfig.s3Endpoint, + "s3Region" -> StorageConfig.s3Region, + "s3AuthUsername" -> StorageConfig.s3Username, + "s3AuthPassword" -> StorageConfig.s3Password, + "s3LargeBinariesBaseUri" -> workerConfig.largeBinaryBaseUri + ) + pythonServerProcess = Process( Seq( pythonBin, "-u", udfEntryScriptPath, - workerConfig.workerId.name, - Integer.toString(pythonProxyServer.getPortNumber.get()), - UdfConfig.pythonLogStreamHandlerLevel, - RENVPath, - StorageConfig.icebergCatalogType, - if (isPostgres) StorageConfig.icebergPostgresCatalogUriWithoutScheme else "", - if (isPostgres) StorageConfig.icebergPostgresCatalogUsername else "", - if (isPostgres) StorageConfig.icebergPostgresCatalogPassword else "", - if (isRest) StorageConfig.icebergRESTCatalogUri else "", - if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else "", - StorageConfig.icebergTableResultNamespace, - StorageConfig.icebergTableStateNamespace, - StorageConfig.fileStorageDirectoryPath.toString, - StorageConfig.icebergTableCommitBatchSize.toString, - StorageConfig.s3Endpoint, - StorageConfig.s3Region, - StorageConfig.s3Username, - StorageConfig.s3Password, - workerConfig.largeBinaryBaseUri + PythonWorkflowWorker.encodeStartupConfig(startupConfig) ) ).run(BasicIO.standard(false)) } diff --git a/amber/src/test/python/test_run_python_worker.py b/amber/src/test/python/test_run_python_worker.py new file mode 100644 index 00000000000..3cc41bc9e2b --- /dev/null +++ b/amber/src/test/python/test_run_python_worker.py @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from unittest import mock + +import pytest + +import texera_run_python_worker as entry + + +def _full_config() -> dict: + """A complete startup config matching the keys PythonWorkflowWorker emits.""" + return { + "workerId": "worker-1", + "outputPort": "5005", + "loggerLevel": "INFO", + "rPath": "", + "icebergCatalogType": "postgres", + "icebergPostgresCatalogUriWithoutScheme": "host:5432/db", + "icebergPostgresCatalogUsername": "pg-user", + "icebergPostgresCatalogPassword": "pg-pass", + "icebergRestCatalogUri": "", + "icebergRestCatalogWarehouseName": "", + "icebergTableNamespace": "result_ns", + "icebergTableStateNamespace": "state_ns", + "icebergFileStorageDirectoryPath": "/tmp/files", + "icebergTableCommitBatchSize": "100", + "s3Endpoint": "http://s3:9000", + "s3Region": "us-west-2", + "s3AuthUsername": "s3-user", + "s3AuthPassword": "s3-pass", + "s3LargeBinariesBaseUri": "s3://bucket/base", + } + + +def _patched_collaborators(): + """Patch the heavy collaborators so main() exercises only the config wiring.""" + return ( + mock.patch.object(entry, "StorageConfig"), + mock.patch.object(entry, "PythonWorker"), + mock.patch.object(entry, "init_loguru_logger"), + ) + + +def test_full_config_keys_match_the_expected_set(): + # Guards against the sample config in this test drifting from the contract. + assert set(_full_config()) == set(entry.EXPECTED_CONFIG_KEYS) + + +def test_main_maps_named_config_to_storage_and_worker(): + """Each named field reaches the correct StorageConfig.initialize argument and + worker parameter — guarding against the silent misalignment that positional + argv passing allowed.""" + config = _full_config() + storage_patch, worker_patch, _logger_patch = _patched_collaborators() + with storage_patch as storage_config, worker_patch as python_worker, _logger_patch: + entry.main(json.dumps(config)) + + storage_config.initialize.assert_called_once_with( + "postgres", + "host:5432/db", + "pg-user", + "pg-pass", + "", + "", + "result_ns", + "state_ns", + "/tmp/files", + "100", + "http://s3:9000", + "us-west-2", + "s3-user", + "s3-pass", + "s3://bucket/base", + ) + python_worker.assert_called_once_with( + worker_id="worker-1", host="localhost", output_port=5005 + ) + python_worker.return_value.run.assert_called_once() + + +def test_main_mapping_is_independent_of_key_order(): + """Reordering the JSON keys must not change where values land (it is a dict).""" + reordered = dict(reversed(list(_full_config().items()))) + storage_patch, worker_patch, _logger_patch = _patched_collaborators() + with storage_patch as storage_config, worker_patch as python_worker, _logger_patch: + entry.main(json.dumps(reordered)) + + storage_config.initialize.assert_called_once_with( + "postgres", + "host:5432/db", + "pg-user", + "pg-pass", + "", + "", + "result_ns", + "state_ns", + "/tmp/files", + "100", + "http://s3:9000", + "us-west-2", + "s3-user", + "s3-pass", + "s3://bucket/base", + ) + python_worker.assert_called_once_with( + worker_id="worker-1", host="localhost", output_port=5005 + ) + + +def test_main_sets_r_home_when_r_path_present(monkeypatch): + monkeypatch.delenv("R_HOME", raising=False) + config = _full_config() + config["rPath"] = "/opt/R" + storage_patch, worker_patch, _logger_patch = _patched_collaborators() + with storage_patch, worker_patch, _logger_patch: + import os + + entry.main(json.dumps(config)) + assert os.environ["R_HOME"] == "/opt/R" + + +@pytest.mark.parametrize("missing_key", sorted(_full_config().keys())) +def test_parse_rejects_a_missing_key(missing_key): + """A missing key fails loudly rather than being silently misassigned.""" + config = _full_config() + del config[missing_key] + with pytest.raises(ValueError, match="key mismatch"): + entry.parse_startup_config(json.dumps(config)) + + +def test_parse_rejects_an_unexpected_key(): + """An extra key (e.g. the JVM side added a field) fails instead of being ignored.""" + config = _full_config() + config["someNewField"] = "value" + with pytest.raises(ValueError, match="key mismatch"): + entry.parse_startup_config(json.dumps(config)) + + +def test_parse_rejects_a_non_string_value(): + """A wrongly-typed value (e.g. a number instead of a string) fails.""" + config = _full_config() + config["outputPort"] = 5005 # number instead of the expected string + with pytest.raises(TypeError, match="must be strings"): + entry.parse_startup_config(json.dumps(config)) + + +def test_parse_rejects_a_non_object_payload(): + with pytest.raises(TypeError, match="must be a JSON object"): + entry.parse_startup_config(json.dumps(["not", "an", "object"])) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorkerStartupConfigSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorkerStartupConfigSpec.scala new file mode 100644 index 00000000000..b3255dd110e --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorkerStartupConfigSpec.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.pythonworker + +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec + +class PythonWorkflowWorkerStartupConfigSpec extends AnyFlatSpec { + + "encodeStartupConfig" should "serialize entries to a JSON object keyed by name" in { + val json = PythonWorkflowWorker.encodeStartupConfig( + Seq("workerId" -> "w-1", "outputPort" -> "5005", "s3Region" -> "us-west-2") + ) + val parsed = objectMapper.readValue(json, classOf[java.util.Map[String, String]]) + assert(parsed.get("workerId") == "w-1") + assert(parsed.get("outputPort") == "5005") + assert(parsed.get("s3Region") == "us-west-2") + assert(parsed.size() == 3) + } + + it should "fail loudly when the same key appears more than once" in { + val exception = intercept[IllegalArgumentException] { + PythonWorkflowWorker.encodeStartupConfig( + Seq("s3Region" -> "us-west-2", "s3Region" -> "us-east-1") + ) + } + assert(exception.getMessage.contains("duplicate")) + } +}