Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 35 additions & 40 deletions amber/src/main/python/texera_run_python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import json
import sys
from loguru import logger

Expand Down Expand Up @@ -49,54 +50,48 @@ def init_loguru_logger(stream_log_level) -> None:
logger.add(sys.stderr, level=stream_log_level)


if __name__ == "__main__":
(
_,
worker_id,
output_port,
logger_level,
r_path,
iceberg_catalog_type,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_table_state_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
s3_endpoint,
s3_region,
s3_auth_username,
s3_auth_password,
s3_large_binaries_base_uri,
) = sys.argv
init_loguru_logger(logger_level)
def main(raw_config: str) -> None:
"""Start a Python worker from its JSON startup configuration.

Startup configuration is passed by name as a single JSON object (see
PythonWorkflowWorker on the JVM side). Reading by key means a missing or
renamed field raises a clear KeyError instead of silently misaligning, as
could happen with the previous positional sys.argv unpacking.
"""
config = json.loads(raw_config)

init_loguru_logger(config["loggerLevel"])
StorageConfig.initialize(
iceberg_catalog_type,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_table_state_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
s3_endpoint,
s3_region,
s3_auth_username,
s3_auth_password,
s3_large_binaries_base_uri,
config["icebergCatalogType"],
config["icebergPostgresCatalogUriWithoutScheme"],
config["icebergPostgresCatalogUsername"],
config["icebergPostgresCatalogPassword"],
config["icebergRestCatalogUri"],
config["icebergRestCatalogWarehouseName"],
config["icebergTableNamespace"],
config["icebergTableStateNamespace"],
config["icebergFileStorageDirectoryPath"],
config["icebergTableCommitBatchSize"],
config["s3Endpoint"],
config["s3Region"],
config["s3AuthUsername"],
config["s3AuthPassword"],
config["s3LargeBinariesBaseUri"],
)

# Setting R_HOME environment variable for R-UDF usage
r_path = config["rPath"]
if r_path:
import os

os.environ["R_HOME"] = r_path

PythonWorker(
worker_id=worker_id, host="localhost", output_port=int(output_port)
worker_id=config["workerId"],
host="localhost",
output_port=int(config["outputPort"]),
).run()


if __name__ == "__main__":
main(sys.argv[1])
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.texera.amber.engine.common.actormessage.{Backpressure, CreditU
import org.apache.texera.amber.engine.common.ambermessage.WorkflowMessage.getInMemSize
import org.apache.texera.amber.engine.common.ambermessage._
import org.apache.texera.amber.engine.common.{CheckpointState, Utils}
import org.apache.texera.amber.util.JSONUtils.objectMapper

import java.nio.file.Path
import org.apache.texera.web.resource.pythonvirtualenvironment.PveManager
Expand Down Expand Up @@ -187,30 +188,43 @@ class PythonWorkflowWorker(
// Set the Iceberg related arguments based on the catalog type.
val isPostgres = StorageConfig.icebergCatalogType == "postgres"
val isRest = StorageConfig.icebergCatalogType == "rest"

// Pass startup configuration to the Python worker by name, as a single JSON
// object, rather than by argv position. This way the two sides agree by key,
// so adding/removing/reordering a field can no longer silently misassign
// values; a missing or renamed key fails loudly on the Python side instead.
val startupConfig: Map[String, String] = Map(
"workerId" -> workerConfig.workerId.name,
"outputPort" -> Integer.toString(pythonProxyServer.getPortNumber.get()),
"loggerLevel" -> UdfConfig.pythonLogStreamHandlerLevel,
"rPath" -> RENVPath,
"icebergCatalogType" -> StorageConfig.icebergCatalogType,
"icebergPostgresCatalogUriWithoutScheme" ->
(if (isPostgres) StorageConfig.icebergPostgresCatalogUriWithoutScheme else ""),
"icebergPostgresCatalogUsername" ->
(if (isPostgres) StorageConfig.icebergPostgresCatalogUsername else ""),
"icebergPostgresCatalogPassword" ->
(if (isPostgres) StorageConfig.icebergPostgresCatalogPassword else ""),
"icebergRestCatalogUri" -> (if (isRest) StorageConfig.icebergRESTCatalogUri else ""),
"icebergRestCatalogWarehouseName" ->
(if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else ""),
"icebergTableNamespace" -> StorageConfig.icebergTableResultNamespace,
"icebergTableStateNamespace" -> StorageConfig.icebergTableStateNamespace,
"icebergFileStorageDirectoryPath" -> StorageConfig.fileStorageDirectoryPath.toString,
"icebergTableCommitBatchSize" -> StorageConfig.icebergTableCommitBatchSize.toString,
"s3Endpoint" -> StorageConfig.s3Endpoint,
"s3Region" -> StorageConfig.s3Region,
"s3AuthUsername" -> StorageConfig.s3Username,
"s3AuthPassword" -> StorageConfig.s3Password,
"s3LargeBinariesBaseUri" -> workerConfig.largeBinaryBaseUri
)

pythonServerProcess = Process(
Seq(
pythonBin,
"-u",
udfEntryScriptPath,
workerConfig.workerId.name,
Integer.toString(pythonProxyServer.getPortNumber.get()),
UdfConfig.pythonLogStreamHandlerLevel,
RENVPath,
StorageConfig.icebergCatalogType,
if (isPostgres) StorageConfig.icebergPostgresCatalogUriWithoutScheme else "",
if (isPostgres) StorageConfig.icebergPostgresCatalogUsername else "",
if (isPostgres) StorageConfig.icebergPostgresCatalogPassword else "",
if (isRest) StorageConfig.icebergRESTCatalogUri else "",
if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else "",
StorageConfig.icebergTableResultNamespace,
StorageConfig.icebergTableStateNamespace,
StorageConfig.fileStorageDirectoryPath.toString,
StorageConfig.icebergTableCommitBatchSize.toString,
StorageConfig.s3Endpoint,
StorageConfig.s3Region,
StorageConfig.s3Username,
StorageConfig.s3Password,
workerConfig.largeBinaryBaseUri
objectMapper.writeValueAsString(startupConfig)
)
).run(BasicIO.standard(false))
}
Expand Down
112 changes: 112 additions & 0 deletions amber/src/test/python/test_run_python_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import json
from unittest import mock

import pytest

import texera_run_python_worker as entry


def _full_config() -> dict:
"""A complete startup config matching the keys PythonWorkflowWorker emits."""
return {
"workerId": "worker-1",
"outputPort": "5005",
"loggerLevel": "INFO",
"rPath": "",
"icebergCatalogType": "postgres",
"icebergPostgresCatalogUriWithoutScheme": "host:5432/db",
"icebergPostgresCatalogUsername": "pg-user",
"icebergPostgresCatalogPassword": "pg-pass",
"icebergRestCatalogUri": "",
"icebergRestCatalogWarehouseName": "",
"icebergTableNamespace": "result_ns",
"icebergTableStateNamespace": "state_ns",
"icebergFileStorageDirectoryPath": "/tmp/files",
"icebergTableCommitBatchSize": "100",
"s3Endpoint": "http://s3:9000",
"s3Region": "us-west-2",
"s3AuthUsername": "s3-user",
"s3AuthPassword": "s3-pass",
"s3LargeBinariesBaseUri": "s3://bucket/base",
}


def test_main_maps_named_config_to_storage_and_worker():
"""Each named field reaches the correct StorageConfig.initialize argument and
worker parameter — guarding against the silent misalignment that positional
argv passing allowed."""
config = _full_config()
with (
mock.patch.object(entry, "StorageConfig") as storage_config,
mock.patch.object(entry, "PythonWorker") as python_worker,
mock.patch.object(entry, "init_loguru_logger"),
):
entry.main(json.dumps(config))

storage_config.initialize.assert_called_once_with(
"postgres",
"host:5432/db",
"pg-user",
"pg-pass",
"",
"",
"result_ns",
"state_ns",
"/tmp/files",
"100",
"http://s3:9000",
"us-west-2",
"s3-user",
"s3-pass",
"s3://bucket/base",
)
python_worker.assert_called_once_with(
worker_id="worker-1", host="localhost", output_port=5005
)
python_worker.return_value.run.assert_called_once()


def test_main_sets_r_home_only_when_r_path_is_present(monkeypatch):
monkeypatch.delenv("R_HOME", raising=False)
config = _full_config()
config["rPath"] = "/opt/R"
with (
mock.patch.object(entry, "StorageConfig"),
mock.patch.object(entry, "PythonWorker"),
mock.patch.object(entry, "init_loguru_logger"),
):
import os

entry.main(json.dumps(config))
assert os.environ["R_HOME"] == "/opt/R"


@pytest.mark.parametrize("missing_key", sorted(_full_config().keys()))
def test_main_raises_keyerror_when_a_field_is_missing(missing_key):
"""A missing/renamed key fails loudly rather than being silently misassigned."""
config = _full_config()
del config[missing_key]
with (
mock.patch.object(entry, "StorageConfig"),
mock.patch.object(entry, "PythonWorker"),
mock.patch.object(entry, "init_loguru_logger"),
):
with pytest.raises(KeyError):
entry.main(json.dumps(config))
Loading