From 2c7efed929d6abe6769bcf61374389feb65bee88 Mon Sep 17 00:00:00 2001 From: Akanksha Gupta Date: Wed, 10 Jun 2026 18:48:24 -0700 Subject: [PATCH] Add a script to deploy VS Code on GKE CPU node pool PiperOrigin-RevId: 930204177 --- .../shared_pathways_service/gke_utils.py | 175 ++++++++++++++---- .../shared_pathways_service/isc_pathways.py | 4 +- .../start_vscode_on_cpu_np.py | 162 ++++++++++++++++ .../yamls/code-server.yaml | 58 ++++++ 4 files changed, 358 insertions(+), 41 deletions(-) create mode 100644 pathwaysutils/experimental/shared_pathways_service/start_vscode_on_cpu_np.py create mode 100644 pathwaysutils/experimental/shared_pathways_service/yamls/code-server.yaml diff --git a/pathwaysutils/experimental/shared_pathways_service/gke_utils.py b/pathwaysutils/experimental/shared_pathways_service/gke_utils.py index 596127f..2e08a49 100644 --- a/pathwaysutils/experimental/shared_pathways_service/gke_utils.py +++ b/pathwaysutils/experimental/shared_pathways_service/gke_utils.py @@ -63,20 +63,26 @@ def fetch_cluster_credentials( raise -def deploy_gke_yaml(yaml: str) -> None: +def deploy_gke_yaml(yaml: str, action: str = "apply") -> None: """Deploys the given YAML to the GKE cluster. Args: yaml: The GKE YAML to deploy. + action: The kubectl action to perform ("apply" or "create"). Create is + equivalent to "apply" but does not support "replacing" the resource if it + already exists. Raises: subprocess.CalledProcessError: If the kubectl command fails. + ValueError: If action is not "apply" or "create". """ - _logger.info("Deploying GKE YAML: %s", yaml) - kubectl_apply_command = ["kubectl", "apply", "-f", "-"] + if action not in ("apply", "create"): + raise ValueError(f"Invalid kubectl action: {action}") + _logger.info("Deploying GKE YAML with action %s: %s", action, yaml) + kubectl_command = ["kubectl", action, "-f", "-"] try: proxy_result = subprocess.run( - kubectl_apply_command, + kubectl_command, input=yaml, check=True, capture_output=True, @@ -93,6 +99,49 @@ def deploy_gke_yaml(yaml: str) -> None: ) +def delete_gke_resource( + resource_type: str, name: str, namespace: str = "default" +) -> None: + """Deletes the given resource from the GKE cluster. + + Args: + resource_type: The type of resource to delete (e.g. "deployment", + "service", "job"). + name: The name of the resource. + namespace: The namespace of the resource. + """ + _validate_k8s_name(resource_type) + _validate_k8s_name(name) + _validate_k8s_name(namespace) + _logger.info( + "Deleting %s: %s in namespace: %s", resource_type, name, namespace + ) + command = [ + "kubectl", + "delete", + resource_type, + "-n", + namespace, + "--ignore-not-found", + "--", + name, + ] + try: + result = subprocess.run( + command, + check=True, + capture_output=True, + text=True, + ) + _logger.info("Successfully deleted %s. %s", resource_type, result.stdout) + except subprocess.CalledProcessError as e: + _logger.exception( + "Failed to delete %s. kubectl output:\n%r", resource_type, e.stderr + ) + raise + + + def get_pod_from_job(job_name: str) -> str: """Returns the pod name for the given job. @@ -224,7 +273,7 @@ def wait_for_pod(job_name: str) -> str: return check_pod_ready(pod_name) -def __test_pod_connection(port: int) -> None: +def _test_remote_connection(port: int) -> None: """Tests the connection to the pod. Args: @@ -233,20 +282,22 @@ def __test_pod_connection(port: int) -> None: _logger.info("Connecting to localhost:%d", port) try: with socket.create_connection(("localhost", port), timeout=30): - _logger.info("Pod is ready.") + _logger.info("Connection to localhost:%d is ready.", port) except (socket.timeout, ConnectionRefusedError) as exc: raise RuntimeError("Could not connect to the pod.") from exc def enable_port_forwarding( - pod_name: str, + remote_server: str, server_port: int, + namespace: str = "default", ) -> tuple[int, subprocess.Popen[str]]: """Enables port forwarding for the given pod. Args: - pod_name: The name of the pod. + remote_server: The name of the pod or service. server_port: The port of the server to forward to. + namespace: The namespace of the pod. Returns: A tuple containing the pod port and the port forwarding process. @@ -255,28 +306,36 @@ def enable_port_forwarding( cannot be established. """ try: - port_available = portpicker.pick_unused_port() + local_port = portpicker.pick_unused_port() except Exception as e: _logger.exception("Error finding free local port: %r", e) raise - _logger.info("Found free local port: %d", port_available) + _logger.info("Found free local port: %d", local_port) _logger.info( "Starting port forwarding from local port %d to %s:%d", - port_available, - pod_name, + local_port, + remote_server, server_port, ) - _validate_k8s_name(pod_name) + if "/" in remote_server: + parts = remote_server.split("/", 1) + _validate_k8s_name(parts[0]) + _validate_k8s_name(parts[1]) + else: + _validate_k8s_name(remote_server) + _validate_k8s_name(namespace) port_forward_command = [ "kubectl", "port-forward", + "-n", + namespace, "--address", "localhost", "--", - f"pod/{pod_name}", - f"{port_available}:{server_port}", + f"{remote_server}", + f"{local_port}:{server_port}", ] try: # Start port forwarding in the background. @@ -316,12 +375,12 @@ def enable_port_forwarding( ) try: - __test_pod_connection(port_available) + _test_remote_connection(local_port) except Exception: port_forward_process.terminate() raise - return (port_available, port_forward_process) + return (local_port, port_forward_process) def stream_pod_logs(pod_name: str) -> subprocess.Popen[str]: @@ -351,30 +410,68 @@ def stream_pod_logs(pod_name: str) -> subprocess.Popen[str]: raise -def delete_gke_job(job_name: str) -> None: - """Deletes the given job from the GKE cluster. - Args: - job_name: The name of the job. - """ - _validate_k8s_name(job_name) - _logger.info("Deleting job: %s", job_name) - delete_job_command = [ +def wait_for_deployment( + name: str, namespace: str = "default", timeout: int = 300 +) -> None: + """Waits for deployment to be ready.""" + _validate_k8s_name(name) + _validate_k8s_name(namespace) + _logger.info("Waiting for deployment %s to be ready...", name) + command = [ "kubectl", - "delete", - "job", - "--ignore-not-found", - "--", - job_name, + "rollout", + "status", + f"deployment/{name}", + "-n", + namespace, + f"--timeout={timeout}s", ] try: - result = subprocess.run( - delete_job_command, - check=True, - capture_output=True, - text=True, - ) + subprocess.run(command, check=True, capture_output=True, text=True) except subprocess.CalledProcessError as e: - _logger.exception("Failed to delete job. kubectl output:\\n%r", e.stderr) - raise - _logger.info("Successfully deleted job. %s", result.stdout) + _logger.exception("Deployment failed to become ready: %r", e) + raise RuntimeError(f"Deployment did not become ready: {e.stderr}") from e + _logger.info("Deployment %s is ready.", name) + + +def wait_for_service_ip( + name: str, namespace: str = "default", timeout: int = 300 +) -> str: + """Waits for service to get an external IP and returns it.""" + _validate_k8s_name(name) + start_time = time.time() + while time.time() - start_time < timeout: + command = [ + "kubectl", + "get", + "svc", + name, + "-n", + namespace, + "-o", + "jsonpath={.status.loadBalancer.ingress[0].ip}", + ] + try: + result = subprocess.run( + command, check=True, capture_output=True, text=True + ) + ip = result.stdout.strip() + if ip: + _logger.info("Service IP assigned: %s", ip) + return ip + except subprocess.CalledProcessError as e: + _logger.warning("Failed to get service IP: %r", e) + time.sleep(2) + raise RuntimeError(f"Timeout waiting for service IP for {name}") + + +def pick_unused_local_port() -> int: + """Picks an unused local port.""" + return portpicker.pick_unused_port() + + +def is_local_port_free(port: int) -> bool: + """Checks if a local port is free.""" + return portpicker.is_port_free(port) + diff --git a/pathwaysutils/experimental/shared_pathways_service/isc_pathways.py b/pathwaysutils/experimental/shared_pathways_service/isc_pathways.py index 9af27e4..80fc04c 100644 --- a/pathwaysutils/experimental/shared_pathways_service/isc_pathways.py +++ b/pathwaysutils/experimental/shared_pathways_service/isc_pathways.py @@ -339,7 +339,7 @@ def __enter__(self): self.proxy_pod_name = gke_utils.wait_for_pod(self._proxy_job_name) self._proxy_port, self._port_forward_process = ( gke_utils.enable_port_forwarding( - self.proxy_pod_name, PROXY_SERVER_PORT + f"pod/{self.proxy_pod_name}", PROXY_SERVER_PORT ) ) @@ -394,7 +394,7 @@ def _cleanup(self) -> None: # Delete the proxy GKE job. _logger.info("Deleting Pathways proxy...") - gke_utils.delete_gke_job(self._proxy_job_name) + gke_utils.delete_gke_resource("job", self._proxy_job_name) _logger.info("Pathways proxy GKE job deletion complete.") # Restore JAX variables. diff --git a/pathwaysutils/experimental/shared_pathways_service/start_vscode_on_cpu_np.py b/pathwaysutils/experimental/shared_pathways_service/start_vscode_on_cpu_np.py new file mode 100644 index 0000000..872261b --- /dev/null +++ b/pathwaysutils/experimental/shared_pathways_service/start_vscode_on_cpu_np.py @@ -0,0 +1,162 @@ +"""Deploys VSCode on a GKE CPU node pool and sets up port forwarding.""" + +import os +import random +import string +import time + +from absl import app +from absl import flags +from absl import logging +from pathwaysutils.experimental.shared_pathways_service import gke_utils + +FLAGS = flags.FLAGS + +_CLUSTER = flags.DEFINE_string( + "cluster", None, "The name of the GKE cluster.", required=True +) +_PROJECT = flags.DEFINE_string( + "project", None, "The GCP project ID.", required=True +) +_REGION = flags.DEFINE_string("region", None, "The GCP region.", required=True) +_NAMESPACE = flags.DEFINE_string( + "namespace", "default", "Kubernetes namespace." +) +_NAME = flags.DEFINE_string( + "name", "code-server", "Name of the deployment and service prefix." +) +_IMAGE = flags.DEFINE_string( + "image", "codercom/code-server:latest", "VS Code image." +) +_PASSWORD = flags.DEFINE_string("password", "mypwd", "Password for VS Code.") +_INSTANCE_TYPE = flags.DEFINE_string( + "instance_type", "c4-standard-192", "Node instance type selector." +) +_DRY_RUN = flags.DEFINE_boolean( + "dry_run", + False, + "If true, only print the generated YAML without deploying.", +) + +_TEMPLATE_FILE = os.path.join( + os.path.dirname(__file__), "yamls/code-server.yaml" +) + + +def _prepare_deployment_yaml(service_name: str, remote_port: int) -> str: + """Prepares the deployment YAML for VS Code.""" + context = { + "NAME": service_name, + "NAMESPACE": _NAMESPACE.value, + "IMAGE": _IMAGE.value, + "PASSWORD": _PASSWORD.value, + "INSTANCE_TYPE": _INSTANCE_TYPE.value, + "SERVICE_NAME": service_name, + "PORT": str(remote_port), + } + + logging.info("Loading and substituting template...") + try: + with open(_TEMPLATE_FILE, "r") as f: + template_str = f.read() + except OSError as err: + raise ValueError("Could not read template file: " + _TEMPLATE_FILE) from err + + template = string.Template(template_str) + return template.substitute(context) + + +def _deploy_vscode( + service_name: str, + deployment_yaml: str, +) -> None: + """Deploys VS Code and sets up port forwarding.""" + gke_utils.deploy_gke_yaml(deployment_yaml, action="create") + + logging.info("Waiting for deployment to be ready...") + gke_utils.wait_for_deployment(service_name, _NAMESPACE.value) + + logging.info("Waiting for service to get external IP...") + try: + ip = gke_utils.wait_for_service_ip(service_name, _NAMESPACE.value) + logging.info("Service External IP (Internal Load Balancer): %s", ip) + except RuntimeError as e: + logging.warning("Could not get service IP: %s. Continuing anyway.", e) + + +def _start_port_forwarding( + service_name: str, + remote_port: int, +) -> None: + """Starts port forwarding for the given service.""" + logging.info("Starting port forwarding...") + pf_process = None + try: + local_port, pf_process = gke_utils.enable_port_forwarding( + remote_server=f"svc/{service_name}", + server_port=remote_port, + namespace=_NAMESPACE.value, + ) + logging.info("VS Code is accessible at http://localhost:%d", local_port) + logging.info("Press Ctrl+C to stop port forwarding.") + while True: + time.sleep(1) + except KeyboardInterrupt: + logging.info("Stopping port forwarding...") + except Exception as e: # pylint: disable=broad-exception-caught + logging.exception("Failed to start port forwarding: %s", e) + finally: + if pf_process: + pf_process.terminate() + pf_process.wait() + logging.info("Port forwarding stopped.") + + +def _cleanup_gke_resources(service_name: str, namespace: str) -> None: + logging.info("Deleting VS Code deployment and service...") + try: + gke_utils.delete_gke_resource("deployment", service_name, namespace) + except Exception as e: # pylint: disable=broad-exception-caught + logging.exception("Failed to delete VS Code deployment: %s", e) + try: + gke_utils.delete_gke_resource("service", service_name, namespace) + except Exception as e: # pylint: disable=broad-exception-caught + logging.exception("Failed to delete VS Code service: %s", e) + + +def main(argv): + if len(argv) > 1: + raise app.UsageError("Too many command-line arguments.") + + service_name = "{}".format( + _NAME.value + + f"-{os.environ.get('USER', 'user')}-" + + "".join(random.choices(string.ascii_lowercase + string.digits, k=4)) + ) + logging.info("Service name: %s", service_name) + + remote_port = 8080 + + deployment_yaml = _prepare_deployment_yaml(service_name, remote_port) + + if _DRY_RUN.value: + logging.info( + "Dry run: Would deploy the following YAML:\n%s", deployment_yaml + ) + return + + logging.info("Fetching cluster credentials...") + gke_utils.fetch_cluster_credentials( + cluster_name=_CLUSTER.value, + project_id=_PROJECT.value, + location=_REGION.value, + ) + try: + _deploy_vscode(service_name, deployment_yaml) + _start_port_forwarding(service_name, remote_port) + finally: + _cleanup_gke_resources(service_name, _NAMESPACE.value) + + +if __name__ == "__main__": + app.run(main) diff --git a/pathwaysutils/experimental/shared_pathways_service/yamls/code-server.yaml b/pathwaysutils/experimental/shared_pathways_service/yamls/code-server.yaml new file mode 100644 index 0000000..483f803 --- /dev/null +++ b/pathwaysutils/experimental/shared_pathways_service/yamls/code-server.yaml @@ -0,0 +1,58 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ${NAME} + namespace: ${NAMESPACE} +spec: + replicas: 1 + selector: + matchLabels: + app: ${NAME} + template: + metadata: + labels: + app: ${NAME} + spec: + automountServiceAccountToken: false + containers: + - name: code-server + image: ${IMAGE} + securityContext: + capabilities: + drop: + - ALL + ports: + - containerPort: ${PORT} + env: + - name: PASSWORD + value: "${PASSWORD}" + - name: PORT + value: "${PORT}" + securityContext: + runAsUser: 1000 # go/gke-shipshape#rootless + runAsGroup: 1000 # go/gke-shipshape#rootless + readOnlyRootFilesystem: true + capabilities: # go/gke-shipshape#capabilities + drop: + - ALL + seccompProfile: # go/gke-shipshape#seccomp + type: RuntimeDefault + allowPrivilegeEscalation: false # go/gke-shipshape#allowprivilegeescalation + nodeSelector: + node.kubernetes.io/instance-type: "${INSTANCE_TYPE}" +--- +apiVersion: v1 +kind: Service +metadata: + name: ${SERVICE_NAME} + namespace: ${NAMESPACE} + annotations: + # Forces GKE to provision a private internal load balancer instead of a public one + networking.gke.io/load-balancer-type: "Internal" +spec: + type: LoadBalancer + ports: + - port: ${PORT} + targetPort: ${PORT} + selector: + app: ${NAME}