From a27bca7e4b1c41b17b8afd2b25869a1755632dac Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Tue, 10 Feb 2026 14:53:52 +0000 Subject: [PATCH] Import workflow fixes --- .../workflow/aggregation-helper/main.py | 4 +- import-automation/workflow/cloudbuild.yaml | 5 - .../workflow/cloudbuild_main.yaml | 2 +- .../workflow/import-automation-workflow.yaml | 33 +-- .../workflow/import-helper/import_helper.py | 148 ++++++++++++ .../workflow/import-helper/main.py | 109 +++------ .../workflow/import-helper/requirements.txt | 2 + .../workflow/ingestion-helper/README.md | 12 +- .../workflow/ingestion-helper/import_utils.py | 52 +++-- .../workflow/ingestion-helper/main.py | 94 ++++---- .../ingestion-helper/requirements.txt | 1 - .../ingestion-helper/spanner_client.py | 211 +++++++++++++----- .../ingestion-helper/storage_client.py | 112 ++++++---- .../workflow/spanner-ingestion-workflow.yaml | 45 ++-- import-automation/workflow/spanner_schema.sql | 1 + 15 files changed, 554 insertions(+), 277 deletions(-) create mode 100644 import-automation/workflow/import-helper/import_helper.py diff --git a/import-automation/workflow/aggregation-helper/main.py b/import-automation/workflow/aggregation-helper/main.py index a64bab9fe8..f5c69a84dd 100644 --- a/import-automation/workflow/aggregation-helper/main.py +++ b/import-automation/workflow/aggregation-helper/main.py @@ -40,9 +40,7 @@ def aggregation_helper(request): HTTP Cloud Function that takes importName and runs a BQ query. """ request_json = request.get_json(silent=True) - import_list = request_json.get('importList') - if not import_list: - return ("'importList' parameter is missing", 400) + import_list = request_json.get('importList', []) logging.info(f"Received request for importList: {import_list}") results = [] try: diff --git a/import-automation/workflow/cloudbuild.yaml b/import-automation/workflow/cloudbuild.yaml index 13abe0ef14..125def6764 100644 --- a/import-automation/workflow/cloudbuild.yaml +++ b/import-automation/workflow/cloudbuild.yaml @@ -30,24 +30,19 @@ steps: - id: 'import-automation-workflow' name: 'gcr.io/cloud-builders/gcloud' args: ['workflows', 'deploy', 'import-automation-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}'] - dir: 'import-automation/workflow' - id: 'spanner-ingestion-workflow' name: 'gcr.io/cloud-builders/gcloud' args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}'] - dir: 'import-automation/workflow' - id: 'spanner-ingestion-helper' name: 'gcr.io/cloud-builders/gcloud' args: ['functions', 'deploy', 'spanner-ingestion-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}'] - dir: 'import-automation/workflow' - id: 'import-aggregation-helper' name: 'gcr.io/cloud-builders/gcloud' args: ['functions', 'deploy', 'import-aggregation-helper', '--runtime', 'python312', '--source', 'aggregation-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'aggregation_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION},BQ_DATASET_ID=${_BQ_DATASET_ID}'] - dir: 'import-automation/workflow' - id: 'import-automation-helper' name: 'gcr.io/cloud-builders/gcloud' args: ['functions', 'deploy', 'import-automation-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'import-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'handle_feed_event', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID}'] - dir: 'import-automation/workflow' diff --git a/import-automation/workflow/cloudbuild_main.yaml b/import-automation/workflow/cloudbuild_main.yaml index 414f2f3876..beb1760671 100644 --- a/import-automation/workflow/cloudbuild_main.yaml +++ b/import-automation/workflow/cloudbuild_main.yaml @@ -38,7 +38,7 @@ steps: - '.' - '--config=cloudbuild.yaml' - '--project=${_PROJECT_ID}' - - '--substitutions=_ENV=staging,_PROJECT_ID=${_PROJECT_ID},_SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},_SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},_SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},_GCS_BUCKET_ID=${_GCS_BUCKET_ID},_LOCATION=${_LOCATION},_GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},_BQ_DATASET_ID=${_BQ_DATASET_ID}' + - '--substitutions=_PROJECT_ID=${_PROJECT_ID},_SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},_SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},_SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},_GCS_BUCKET_ID=${_GCS_BUCKET_ID},_LOCATION=${_LOCATION},_GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},_BQ_DATASET_ID=${_BQ_DATASET_ID}' dir: 'import-automation/workflow' # 2. Run E2E Tests on Staging diff --git a/import-automation/workflow/import-automation-workflow.yaml b/import-automation/workflow/import-automation-workflow.yaml index 2aea5cd77f..6e552df4c4 100644 --- a/import-automation/workflow/import-automation-workflow.yaml +++ b/import-automation/workflow/import-automation-workflow.yaml @@ -6,15 +6,21 @@ main: - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} - region: ${sys.get_env("LOCATION")} - imageUri: ${default(map.get(args, "imageUri"), "gcr.io/datcom-ci/dc-import-executor:stable")} - - jobId: ${text.substring(args.jobName, 0, 50) + "-" + string(int(sys.now()))} + - jobId: ${text.replace_all(text.to_lower(text.substring(text.split(args.importName, ":")[1], 0, 50) + "-" + string(int(sys.now()))), "_", "-")} - importName: ${args.importName} - - importConfig: ${args.importConfig} + - importConfig: ${default(map.get(args, "importConfig"), "{}")} - gcsMountBucket: ${sys.get_env("GCS_MOUNT_BUCKET")} - gcsImportBucket: ${sys.get_env("GCS_BUCKET_ID")} - gcsMountPath: "/tmp/gcs" - ingestionHelper: "spanner-ingestion-helper" - functionUrl: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/" + ingestionHelper} - startTime: ${sys.now()} + - defaultResources: + machine: "n2-standard-8" + cpu: 8000 + memory: 32768 + disk: 100 + - resources: ${default(map.get(args, "resources"), defaultResources)} - runImportJob: try: call: googleapis.batch.v1.projects.locations.jobs.create @@ -25,11 +31,11 @@ main: allocationPolicy: instances: - policy: - machineType: ${args.resources.machine} + machineType: ${resources.machine} provisioningModel: "STANDARD" bootDisk: image: "projects/debian-cloud/global/images/family/debian-12" - size_gb: ${args.resources.disk} + size_gb: ${resources.disk} installOpsAgent: true taskGroups: taskSpec: @@ -38,14 +44,14 @@ main: remotePath: ${gcsMountBucket} mountPath: ${gcsMountPath} computeResource: - cpuMilli: ${args.resources.cpu} - memoryMib: ${args.resources.memory} + cpuMilli: ${resources.cpu} + memoryMib: ${resources.memory} runnables: - container: imageUri: ${imageUri} commands: - - ${"--import_name=" + args.importName} - - ${"--import_config=" + args.importConfig} + - ${"--import_name=" + importName} + - ${"--import_config=" + importConfig} environment: variables: IMPORT_NAME: ${importName} @@ -73,11 +79,10 @@ main: body: actionType: 'update_import_status' jobId: ${jobId} - importName: ${args.importName} - status: 'FAILED' + importName: ${importName} + status: 'FAILURE' executionTime: ${int(sys.now() - startTime)} - latestVersion: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")} - schedule: ${default(map.get(args, "schedule"), "")} + latestVersion: ${"gs://" + gcsImportBucket + "/" + text.replace_all(importName, ":", "/")} result: functionResponse - failWorkflow: raise: ${e} @@ -89,8 +94,8 @@ main: type: OIDC body: actionType: 'update_import_version' - importName: ${args.importName} - version: 'staging' + importName: ${importName} + version: 'STAGING' override: false comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}' result: functionResponse diff --git a/import-automation/workflow/import-helper/import_helper.py b/import-automation/workflow/import-helper/import_helper.py new file mode 100644 index 0000000000..dd1be52343 --- /dev/null +++ b/import-automation/workflow/import-helper/import_helper.py @@ -0,0 +1,148 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import json +import logging +import os +import croniter +from datetime import datetime, timezone +from google.auth.transport.requests import Request +from google.oauth2 import id_token +from google.cloud import storage +from google.cloud.workflows import executions_v1 +import requests + +logging.getLogger().setLevel(logging.INFO) + +PROJECT_ID = os.environ.get('PROJECT_ID') +LOCATION = os.environ.get('LOCATION') +GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID') +INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper" +WORKFLOW_ID = 'spanner-ingestion-workflow' + +def invoke_ingestion_workflow(import_name: str): + """Triggers the graph ingestion workflows. + + Args: + import_name: The name of the import. + """ + workflow_args = {"importList": [import_name.split(':')[-1]]} + + logging.info(f"Invoking {WORKFLOW_ID} for {import_name}") + execution_client = executions_v1.ExecutionsClient() + parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}" + execution_req = executions_v1.Execution(argument=json.dumps(workflow_args)) + response = execution_client.create_execution(parent=parent, + execution=execution_req) + logging.info( + f"Triggered workflow {WORKFLOW_ID} for {import_name}. Execution ID: {response.name}" + ) + + +def update_import_status(import_name, + import_status, + import_version, + graph_path, + job_id, + cron_schedule=None): + """Updates the status for the specified import job. + + Args: + import_name: The name of the import. + import_status: The new status of the import. + import_version: The version of the import. + graph_path: The graph path for the import. + job_id: The job ID associated with the import. + cron_schedule: The cron schedule for the import (optional). + """ + logging.info(f"Updating {import_name} status: {import_status}") + latest_version = 'gs://' + GCS_BUCKET_ID + '/' + import_name.replace( + ':', '/') + '/' + import_version + request = { + 'actionType': 'update_import_status', + 'importName': import_name, + 'status': import_status, + 'job_id': job_id, + 'latestVersion': latest_version, + 'graphPath': graph_path + } + if cron_schedule: + try: + next_refresh = croniter.croniter( + cron_schedule, + datetime.now(timezone.utc)).get_next(datetime).isoformat() + request['nextRefresh'] = next_refresh + except (croniter.CroniterError) as e: + logging.error( + f"Error calculating next refresh from schedule '{cron_schedule}': {e}" + ) + logging.info(f"Update request: {request}") + auth_req = Request() + token = id_token.fetch_id_token(auth_req, INGESTION_HELPER_URL) + headers = {'Authorization': f'Bearer {token}'} + response = requests.post(INGESTION_HELPER_URL, + json=request, + headers=headers) + response.raise_for_status() + logging.info(f"Updated status for {import_name}") + + +def parse_message(request) -> dict: + """Processes the incoming Pub/Sub message. + + Args: + request: The flask request object. + + Returns: + A dictionary containing the message data, or None if invalid. + """ + request_json = request.get_json(silent=True) + if not request_json or 'message' not in request_json: + logging.error('Invalid Pub/Sub message format') + return None + + pubsub_message = request_json['message'] + logging.info(f"Received Pub/Sub message: {pubsub_message}") + try: + data_bytes = base64.b64decode(pubsub_message["data"]) + notification_json = data_bytes.decode("utf-8") + logging.info(f"Notification content: {notification_json}") + except Exception as e: + logging.error(f"Error decoding message data: {e}") + + return pubsub_message + + +def check_duplicate(message_id: str): + """Checks for duplicate messages using a GCS file. + + Args: + message_id: The ID of the message to check. + + Returns: + True if the message is a duplicate, False otherwise. + """ + duplicate = False + if not message_id: + return duplicate + logging.info(f"Checking for existing message: {message_id}") + storage_client = storage.Client() + bucket = storage_client.bucket(GCS_BUCKET_ID) + blob = bucket.blob(f"google3/transfers/{message_id}") + try: + blob.upload_from_string("", if_generation_match=0) + except Exception: + duplicate = True + return duplicate diff --git a/import-automation/workflow/import-helper/main.py b/import-automation/workflow/import-helper/main.py index 4d88abc7a6..d9cb4ec1bc 100644 --- a/import-automation/workflow/import-helper/main.py +++ b/import-automation/workflow/import-helper/main.py @@ -12,98 +12,47 @@ # See the License for the specific language governing permissions and # limitations under the License. -import base64 import functions_framework -import json import logging -import os from datetime import datetime, timezone -from google.auth.transport.requests import Request -from google.oauth2 import id_token -from google.cloud.workflows import executions_v1 -import requests +import import_helper as helper logging.getLogger().setLevel(logging.INFO) -PROJECT_ID = os.environ.get('PROJECT_ID') -LOCATION = os.environ.get('LOCATION') -GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID') -WORKFLOW_ID = 'spanner-ingestion-workflow' -INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper" - - -def invoke_ingestion_workflow(import_name): - """Invokes the spanner ingestion workflow.""" - execution_client = executions_v1.ExecutionsClient() - parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}" - workflow_args = {"importList": [import_name]} - try: - execution_req = executions_v1.Execution( - argument=json.dumps(workflow_args)) - response = execution_client.create_execution(parent=parent, - execution=execution_req) - logging.info( - f"Triggered workflow {WORKFLOW_ID} for {import_name}. Execution ID: {response.name}" - ) - except Exception as e: - logging.error(f"Error triggering workflow: {e}") - - -def update_import_status(import_name, request_json): - """Updates the status for the specified import job.""" - try: - auth_req = Request() - token = id_token.fetch_id_token(auth_req, INGESTION_HELPER_URL) - headers = {'Authorization': f'Bearer {token}'} - response = requests.post(INGESTION_HELPER_URL, - json=request_json, - headers=headers) - response.raise_for_status() - logging.info(f"Updated status for {import_name}") - except Exception as e: - logging.error(f'Error updating import status for {import_name}: {e}') - # Triggered from a message on a Cloud Pub/Sub topic. @functions_framework.http def handle_feed_event(request): # Updates status in spanner and triggers ingestion workflow # for an import using CDA feed - request_json = request.get_json(silent=True) - if not request_json or 'message' not in request_json: + message = helper.parse_message(request) + if not message: return 'Invalid Pub/Sub message format', 400 - pubsub_message = request_json['message'] - logging.info(f"Received Pub/Sub message: {pubsub_message}") - try: - data_bytes = base64.b64decode(pubsub_message["data"]) - notification_json = data_bytes.decode("utf-8") - logging.info(f"Notification content: {notification_json}") - except Exception as e: - logging.error(f"Error decoding message data: {e}") - - attributes = pubsub_message.get('attributes', {}) - if attributes.get('transfer_status') == 'TRANSFER_COMPLETED': - import_name = attributes.get('import_name') - import_status = attributes.get('import_status', 'STAGING') - import_version = 'gs://' + GCS_BUCKET_ID + '/' + import_name.replace( - ':', '/') + '/' + attributes.get( - 'import_version', - datetime.now(timezone.utc).strftime("%Y-%m-%d")) - graph_path = attributes.get('graph_path', "/**/*mcf*") - request = { - 'actionType': 'update_import_status', - 'importName': import_name, - 'status': import_status, - 'latestVersion': import_version, - 'nextRefresh': datetime.now(timezone.utc).isoformat(), - 'graphPath': graph_path - } - - logging.info( - f"Updating import status for {import_name} to {import_status}") - update_import_status(import_name, request) - if import_status == 'READY': - invoke_ingestion_workflow(import_name) - + attributes = message.get('attributes', {}) + message_id = message.get('messageId', '') + if attributes.get('transfer_status') != 'TRANSFER_COMPLETED': + return 'OK', 200 + + duplicate = helper.check_duplicate(message_id) + if duplicate: + logging.info(f"Message {message_id} already processed. Skipping.") + return 'OK', 200 + + import_name = attributes.get('import_name') + import_status = 'STAGING' + latest_version = attributes.get( + 'import_version', + datetime.now(timezone.utc).strftime("%Y-%m-%d")) + graph_path = attributes.get('graph_path', "/**/*.mcf*") + job_id = attributes.get('feed_name', 'cda_feed') + cron_schedule = attributes.get('cron_schedule', '') + post_process = attributes.get('post_process', '') + # Update import status in spanner + helper.update_import_status(import_name, import_status, latest_version, + graph_path, job_id, cron_schedule) + + # Invoke ingestion workflow to trigger dataflow job + if post_process == 'spanner_ingestion_workflow': + helper.invoke_ingestion_workflow(import_name) return 'OK', 200 diff --git a/import-automation/workflow/import-helper/requirements.txt b/import-automation/workflow/import-helper/requirements.txt index d70e560976..9d321e8151 100644 --- a/import-automation/workflow/import-helper/requirements.txt +++ b/import-automation/workflow/import-helper/requirements.txt @@ -2,3 +2,5 @@ functions-framework==3.* google-cloud-workflows google-auth requests +google-cloud-storage +croniter diff --git a/import-automation/workflow/ingestion-helper/README.md b/import-automation/workflow/ingestion-helper/README.md index 7a3131cc0e..3e9b1eec5a 100644 --- a/import-automation/workflow/ingestion-helper/README.md +++ b/import-automation/workflow/ingestion-helper/README.md @@ -12,10 +12,10 @@ The function expects a JSON payload with a required `actionType` parameter, whic ### Supported Actions and Parameters -#### `get_import_list` -Gets the list of imports that are ready for ingestion. +#### `get_import_info` +Gets the details of imports that are ready for ingestion. -* `importList` (Optional): A list of import names to filter the results by. +* `importList` (Optional): list of imports to ingest. #### `acquire_ingestion_lock` Attempts to acquire the global lock for ingestion to prevent concurrent modifications. @@ -33,6 +33,7 @@ Updates the status of imports after an ingestion job completes. * `importList` (Required): A list of import names involved in the ingestion. * `workflowId` (Required): The ID of the workflow. +* `status` (Required): Import status. * `jobId` (Required): The Dataflow job ID associated with the ingestion. #### `update_import_status` @@ -45,14 +46,13 @@ Updates the status of a specific import job. * `dataVolume` (Optional): Data volume in bytes. * `latestVersion` (Optional): Latest version string. * `graphPath` (Optional): Graph path regex. -* `schedule` (Optional): A cron schedule string. * `nextRefresh` (Optional): Next refresh timestamp. #### `update_import_version` -Updates the version of an import, records an audit log, and marks the import as `READY`. +Updates the version of an import, records version history, and updates the status. * `importName` (Required): The name of the import. -* `version` (Required): The version string. If set to `'staging'`, it resolves to the current staging version. +* `version` (Required): The version string. If set to `'STAGING'`, it resolves to the current staging version. * `comment` (Required): A comment for the audit log explaining the version update. * `override` (Optional): Override version without checking import status (boolean) diff --git a/import-automation/workflow/ingestion-helper/import_utils.py b/import-automation/workflow/ingestion-helper/import_utils.py index d1142b0846..33f9d1fac6 100644 --- a/import-automation/workflow/ingestion-helper/import_utils.py +++ b/import-automation/workflow/ingestion-helper/import_utils.py @@ -14,7 +14,6 @@ """Utility functions for the ingestion helper.""" import logging -import croniter import re from datetime import datetime, timezone from googleapiclient.discovery import build @@ -24,8 +23,38 @@ from google.auth import jwt +def get_next_refresh(project_id: str, location: str, import_name: str) -> str: + """Fetches the next scheduled run time for the import job from Cloud Scheduler. + + Args: + project_id: The GCP project ID. + location: The location of the Cloud Scheduler job. + import_name: The name of the import (used as the job name). + + Returns: + The next scheduled run time as an ISO formatted string, or None if not found/error. + """ + try: + scheduler = build('cloudscheduler', 'v1', cache_discovery=False) + job_id = import_name.split(':')[-1] + job_name = f"projects/{project_id}/locations/{location}/jobs/{job_id}" + job = scheduler.projects().locations().jobs().get( + name=job_name).execute() + return job.get('scheduleTime') + except HttpError as e: + logging.warning(f"Could not fetch scheduler job {import_name}: {e}") + return None + + def get_caller_identity(request): - """Extracts the caller's email from the Authorization header (JWT).""" + """Extracts the caller's email from the Authorization header (JWT). + + Args: + request: The HTTP request object. + + Returns: + The email of the caller, or an error string/warning if extraction fails. + """ auth_header = request.headers.get('Authorization') if auth_header: parts = auth_header.split() @@ -74,22 +103,9 @@ def get_import_params(request) -> dict: data_volume = request_json.get('data_volume', 0) latest_version = request_json.get('latest_version', '') graph_path = request_json.get('graph_path', '') - schedule = request_json.get('schedule', '') - next_refresh_str = request_json.get('next_refresh', '') - next_refresh = datetime.now(timezone.utc) - if next_refresh_str: - try: - next_refresh = datetime.fromisoformat(next_refresh_str) - except ValueError: - logging.error(f"Error parsing next_refresh: {next_refresh_str}") - if schedule: - try: - next_refresh = croniter.croniter(schedule, datetime.now( - timezone.utc)).get_next(datetime) - except (croniter.CroniterError) as e: - logging.error( - f"Error calculating next refresh from schedule '{schedule}': {e}" - ) + next_refresh = request_json.get('next_refresh', + datetime.now(timezone.utc).isoformat()) + return { 'import_name': import_name, 'status': status, diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index 531401cfcc..8804d5a0d1 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -1,25 +1,11 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - import functions_framework from spanner_client import SpannerClient from storage_client import StorageClient -from flask import jsonify import logging -import import_utils import os from absl import flags +import import_utils +from flask import jsonify logging.getLogger().setLevel(logging.INFO) @@ -68,13 +54,14 @@ def ingestion_helper(request): FLAGS.spanner_database_id) storage = StorageClient(FLAGS.gcs_bucket_id) - if actionType == 'get_import_list': - # Gets the list of imports that are ready for ingestion. + if actionType == 'get_import_info': + # Gets the details of imports that are ready for ingestion. # Input: - # importList: list of import names to filter by (optional) + # importList: list of import names to ingest (optional) import_list = request_json.get('importList', []) - imports = spanner.get_import_list(import_list) - return jsonify(imports) + import_info = spanner.get_import_info(import_list) + return jsonify(import_info) + elif actionType == 'acquire_ingestion_lock': # Attempts to acquire the global lock for ingestion. # Input: @@ -89,7 +76,8 @@ def ingestion_helper(request): status = spanner.acquire_lock(workflow, timeout) if not status: return ('Failed to acquire lock', 500) - return ('Lock acquired', 200) + return ('OK', 200) + elif actionType == 'release_ingestion_lock': # Releases the global ingestion lock. # Input: @@ -101,36 +89,44 @@ def ingestion_helper(request): status = spanner.release_lock(workflow) if not status: return ('Failed to release lock', 500) - return ('Lock released', 200) + return ('OK', 200) + elif actionType == 'update_ingestion_status': # Updates the status of imports after ingestion. # Input: # importList: list of import names # workflowId: ID of the workflow + # status: import status # jobId: Dataflow job ID validation_error = _validate_params( - request_json, ['importList', 'workflowId', 'jobId']) + request_json, ['importList', 'workflowId', 'jobId', 'status']) if validation_error: return (validation_error, 400) import_list = request_json['importList'] workflow_id = request_json['workflowId'] + status = request_json['status'] job_id = request_json['jobId'] + ingested_imports = [item['importName'] for item in import_list] + + spanner.update_ingestion_status(ingested_imports, workflow_id, status) metrics = import_utils.get_ingestion_metrics(FLAGS.project_id, FLAGS.location, job_id) - spanner.update_ingestion_status(import_list, workflow_id, job_id, - metrics) - return ('Updated ingestion status', 200) + spanner.update_ingestion_history(workflow_id, job_id, ingested_imports, + metrics) + if status == 'SUCCESS': + spanner.update_import_version_history(import_list, workflow_id) + return ('OK', 200) + elif actionType == 'update_import_status': # Updates the status of a specific import job. # Input: # importName: name of the import # status: new status - # jobId: Dataflow job ID (optional) + # jobId: Batch job ID (optional) # executionTime: execution time in seconds (optional) # dataVolume: data volume in bytes (optional) # latestVersion: latest version string (optional) # graphPath: graph path regex (optional) - # schedule: cron schedule string (optional) # nextRefresh: next refresh timestamp (optional) validation_error = _validate_params(request_json, ['importName', 'status']) @@ -140,46 +136,62 @@ def ingestion_helper(request): status = request_json['status'] logging.info(f'Updating import {import_name} to status {status}') params = import_utils.get_import_params(request_json) + next_refresh = import_utils.get_next_refresh(FLAGS.project_id, + FLAGS.location, + import_name) + if next_refresh: + params['next_refresh'] = next_refresh if status == 'STAGING': - latest_version = os.path.basename(request_json['latestVersion']) - storage.update_staging_version(import_name, latest_version) + version = os.path.basename(request_json.get('latestVersion', '')) + if not version: + return (f'Empty version for import {import_name}', 500) + storage.update_staging_version(import_name, version) + storage.update_provenance_file(import_name, version) + storage.update_import_summary(params) + storage.update_version_file(import_name, version) + comment = f"import-workflow:{request_json.get('jobId','')}" + spanner.update_version_history(import_name, version, comment) spanner.update_import_status(params) - return (f"Updated import {import_name} to status {params['status']}", - 200) + return ('OK', 200) + elif actionType == 'update_import_version': - # Updates the version of an import and marks it as READY. + # Updates the version and status of an import. # Input: # importName: name of the import # version: version string # comment: audit log comment # override: override status check (optional) + # triggerIngestion: trigger ingestion workflow (optional) validation_error = _validate_params( request_json, ['importName', 'version', 'comment']) if validation_error: return (validation_error, 400) import_name = request_json['importName'] version = request_json['version'] - logging.info(f"Updating import {import_name} to version {version}") comment = request_json['comment'] + logging.info( + f"Updating import {import_name} to version {version} comment:{comment}" + ) override = request_json.get('override', False) - if version == 'staging': + if version == 'STAGING': version = storage.get_staging_version(import_name) summary = storage.get_import_summary(import_name, version) params = import_utils.get_import_params(summary) if override: - params['status'] = 'READY' + params['status'] = 'STAGING' caller = import_utils.get_caller_identity(request) comment = f'version-override:{caller} {comment}' - if params['status'] == 'READY': + if params['status'] == 'STAGING': + storage.update_provenance_file(import_name, version) storage.update_version_file(import_name, version) spanner.update_version_history(import_name, version, comment) - logging.info( - f"Import {import_name} version {version} comment: {comment}") + logging.info(f"Updated import {import_name} to version {version}") else: logging.info(f"Skipping {import_name} version update") spanner.update_import_status(params) return ( - f"Import {import_name} version {version} status: {params['status']}", + f"OK [Import: {import_name} Version: {version} Status: {params['status']}]", 200) + else: return (f'Unknown actionType: {actionType}', 400) diff --git a/import-automation/workflow/ingestion-helper/requirements.txt b/import-automation/workflow/ingestion-helper/requirements.txt index fb06d6bbc3..aedb0398ae 100644 --- a/import-automation/workflow/ingestion-helper/requirements.txt +++ b/import-automation/workflow/ingestion-helper/requirements.txt @@ -1,6 +1,5 @@ functions-framework==3.* google-cloud-spanner -croniter google-api-python-client google-cloud-storage google-auth diff --git a/import-automation/workflow/ingestion-helper/spanner_client.py b/import-automation/workflow/ingestion-helper/spanner_client.py index 55c62f2449..b97998fae6 100644 --- a/import-automation/workflow/ingestion-helper/spanner_client.py +++ b/import-automation/workflow/ingestion-helper/spanner_client.py @@ -32,14 +32,24 @@ class SpannerClient: def __init__(self, project_id: str, instance_id: str, database_id: str): """Initializes a Spanner client and connects to a specific database.""" spanner_client = spanner.Client( - project=project_id, client_options={'quota_project_id': project_id}) + project=project_id, + client_options={'quota_project_id': project_id}, + disable_builtin_metrics=True) instance = spanner_client.instance(instance_id) database = instance.database(database_id) logging.info(f"Successfully initialized database: {database.name}") self.database = database def acquire_lock(self, workflow_id: str, timeout: int) -> bool: - """Attempts to acquire the global ingestion lock.""" + """Attempts to acquire the global ingestion lock. + + Args: + workflow_id: The ID of the workflow attempting to acquire the lock. + timeout: The duration in seconds after which a lock is considered stale. + + Returns: + True if the lock was acquired, False otherwise. + """ logging.info(f"Attempting to acquire lock for {workflow_id}") def _acquire(transaction: Transaction) -> bool: @@ -90,7 +100,14 @@ def _acquire(transaction: Transaction) -> bool: raise def release_lock(self, workflow_id: str) -> bool: - """Releases the global lock.""" + """Releases the global lock. + + Args: + workflow_id: The ID of the workflow attempting to release the lock. + + Returns: + True if the lock was released, False otherwise. + """ logging.info(f"Attempting to release lock for {workflow_id}") def _release(transaction: Transaction) -> None: @@ -124,62 +141,109 @@ def _release(transaction: Transaction) -> None: logging.error(f'Error releasing lock for {workflow_id}: {e}') raise - def get_import_list(self, import_list: list) -> list: - """Get the list of imports ready to ingest.""" + def get_import_info(self, import_list: list) -> list: + """Get the details of imports to ingest. + + If import_list is empty, return info for ready imports. + If import_list is not empty, return info for the imports in the list irrespective of status. + + Args: + import_list: A list of import names to fetch details for. + + Returns: + A list of dictionaries, where each dictionary contains 'importName', 'latestVersion', and 'graphPath'. + """ pending_imports = [] - sql = "SELECT ImportName, LatestVersion, GraphPath FROM ImportStatus WHERE State = 'READY'" + logging.info(f"Fetching imports from import list {import_list}.") + + params = {} + param_types = {} + if import_list: + sql = "SELECT ImportName, LatestVersion, GraphPath FROM ImportStatus WHERE ImportName IN UNNEST(@importNames)" + params = {"importNames": import_list} + param_types = {"importNames": Array(STRING)} + else: + sql = "SELECT ImportName, LatestVersion, GraphPath FROM ImportStatus WHERE State = 'STAGING'" + # Use a read-only snapshot for this query try: with self.database.snapshot() as snapshot: - results = snapshot.execute_sql(sql) + results = snapshot.execute_sql(sql, + params=params, + param_types=param_types) for row in results: - if not import_list or row[0] in import_list: - import_json = {} - import_json['importName'] = row[0] - import_json['latestVersion'] = row[1] - import_json['graphPath'] = row[2] - pending_imports.append(import_json) - - logging.info(f"Found {len(pending_imports)} import jobs as READY.") + import_json = {} + import_json['importName'] = row[0] + import_json['latestVersion'] = os.path.basename(row[1]) + import_json[ + 'graphPath'] = f"{row[1].rstrip('/')}/{row[2].lstrip('/')}" + pending_imports.append(import_json) + + logging.info(f"Found {len(pending_imports)} import jobs.") return pending_imports except Exception as e: logging.error(f'Error getting import list: {e}') raise - def update_ingestion_status(self, import_list_json: list, workflow_id: str, - job_id: str, metrics: dict): - """Marks the ingested imports as DONE and records the ingestion event.""" - logging.info(f"Marking import status for {import_list_json} as DONE.") + def update_ingestion_status(self, import_names: list, workflow_id: str, + status: str): + """Updates the ImportStatus table. - succeeded_imports = [] - for import_json in import_list_json: - succeeded_imports.append(import_json['importName']) + Args: + import_names: List of import names. + workflow_id: The ID of the workflow. + status: The status of the ingestion. + """ + if not import_names: + return + + logging.info(f"Updated ingestion status for {import_names}") + + def _update(transaction: Transaction): + update_sql = "UPDATE ImportStatus SET State = @importStatus, WorkflowId = @workflowId, StatusUpdateTimestamp = PENDING_COMMIT_TIMESTAMP() WHERE ImportName IN UNNEST(@importNames)" + transaction.execute_update(update_sql, + params={ + "importNames": import_names, + "workflowId": workflow_id, + "importStatus": status + }, + param_types={ + "importNames": Array(STRING), + "workflowId": STRING, + "importStatus": STRING + }) - def _record(transaction: Transaction): - # 1. Update the ImportStatus table - if succeeded_imports: - update_sql = "UPDATE ImportStatus SET State = 'DONE', WorkflowId = @workflowId, StatusUpdateTimestamp = PENDING_COMMIT_TIMESTAMP() WHERE ImportName IN UNNEST(@importNames)" - updated_rows = transaction.execute_update( - update_sql, - params={ - "importNames": succeeded_imports, - "workflowId": workflow_id - }, - param_types={ - "importNames": Array(STRING), - "workflowId": STRING - }) - logging.info(f"Marked {updated_rows} import jobs as DONE.") - - # 2. Insert into the IngestionHistory table + try: + self.database.run_in_transaction(_update) + logging.info(f"Marked {len(import_names)} import jobs as {status}.") + except Exception as e: + logging.error(f'Error updating ImportStatus table: {e}') + raise + + def update_ingestion_history(self, workflow_id: str, job_id: str, + ingested_imports: list, metrics: dict): + """Updates the IngestionHistory table. + + Args: + workflow_id: The ID of the workflow. + job_id: The Dataflow job ID. + ingested_imports: List of ingested import names. + metrics: A dictionary containing metrics about the ingestion. + """ + + logging.info( + f"Updating IngestionHistory table for workflow {workflow_id}") + + def _insert(transaction: Transaction): columns = [ - "CompletionTimestamp", "WorkflowExecutionID", "DataflowJobId", - "IngestedImports", "ExecutionTime", "NodeCount", "EdgeCount", - "ObservationCount" + "CompletionTimestamp", "IngestionFailure", + "WorkflowExecutionID", "DataflowJobId", "IngestedImports", + "ExecutionTime", "NodeCount", "EdgeCount", "ObservationCount" ] values = [[ - spanner.COMMIT_TIMESTAMP, workflow_id, job_id, - succeeded_imports, metrics['execution_time'], + spanner.COMMIT_TIMESTAMP, + self.check_failed_imports(), workflow_id, job_id, + ingested_imports, metrics['execution_time'], metrics['node_count'], metrics['edge_count'], metrics['obs_count'] ]] @@ -187,15 +251,36 @@ def _record(transaction: Transaction): columns=columns, values=values) - # 3. Update ImportVersionHistory table + try: + self.database.run_in_transaction(_insert) + logging.info( + f"Updated IngestionHistory table for workflow {workflow_id}") + except Exception as e: + logging.error(f'Error updating IngestionHistory table: {e}') + raise + + def update_import_version_history(self, import_list_json: list, + workflow_id: str): + """Updates the ImportVersionHistory table. + + Args: + import_list_json: A list of dictionaries containing import details. + workflow_id: The ID of the workflow. + """ + if not import_list_json: + return + + logging.info( + f"Updating ImportVersionHistory table for workflow {workflow_id}") + + def _insert(transaction: Transaction): version_history_columns = [ "ImportName", "Version", "UpdateTimestamp", "Comment" ] version_history_values = [] for import_json in import_list_json: version_history_values.append([ - import_json['importName'], - os.path.basename(import_json['latestVersion']), + import_json['importName'], import_json['latestVersion'], spanner.COMMIT_TIMESTAMP, "ingestion-workflow:" + workflow_id ]) @@ -205,26 +290,42 @@ def _record(transaction: Transaction): columns=version_history_columns, values=version_history_values) + try: + self.database.run_in_transaction(_insert) logging.info( - f"Updated ingestion history table for workflow {workflow_id}") + f"Updated ImportVersionHistory table for workflow {workflow_id}" + ) + except Exception as e: + logging.error(f'Error updating ImportVersionHistory table: {e}') + raise + def check_failed_imports(self) -> bool: + """Checks if there are any failed imports.""" try: - self.database.run_in_transaction(_record) + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql( + "SELECT 1 FROM ImportStatus WHERE State = 'PENDING' LIMIT 1" + ) + return any(results) except Exception as e: - logging.error(f'Error updating ingestion status: {e}') - raise + logging.error(f'Error checking for pending imports: {e}') + return True def update_import_status(self, params: dict): - """Updates the status for the specified import job.""" + """Updates the status for the specified import job. + + Args: + params: A dictionary containing import parameters. + """ import_name = params['import_name'] job_id = params['job_id'] execution_time = params['execution_time'] data_volume = params['data_volume'] status = params['status'] latest_version = params['latest_version'] - next_refresh = params['next_refresh'] + next_refresh = datetime.fromisoformat(params['next_refresh']) graph_path = params['graph_path'] - logging.info(f"Updating import status for {import_name} to {status}") + logging.info(f"Updating import status in spanner {params}") def _record(transaction: Transaction): columns = [ @@ -239,7 +340,7 @@ def _record(transaction: Transaction): spanner.COMMIT_TIMESTAMP ] - if status == 'READY': + if status == 'STAGING': columns.append("DataImportTimestamp") row_values.append(spanner.COMMIT_TIMESTAMP) diff --git a/import-automation/workflow/ingestion-helper/storage_client.py b/import-automation/workflow/ingestion-helper/storage_client.py index 8339e7d007..c41494392b 100644 --- a/import-automation/workflow/ingestion-helper/storage_client.py +++ b/import-automation/workflow/ingestion-helper/storage_client.py @@ -58,6 +58,21 @@ def get_import_summary(self, import_name: str, version: str) -> dict: f'Error reading import summary file {summary_file}: {e}') return {} + def update_import_summary(self, import_summary: dict): + """Updates the import summary in GCS. + + Args: + import_summary: A dictionary containing the summary of the import. + """ + latest_version = import_summary.get('latest_version') + path = latest_version[5:].split('/', 1) + summary_file = os.path.join(path[1], _IMPORT_SUMMARY_JSON) + logging.info( + f'Updating import summary at {summary_file} {import_summary}') + blob = self.bucket.blob(summary_file) + blob.upload_from_string(json.dumps(import_summary)) + logging.info(f'Updated import summary at {summary_file}') + def get_staging_version(self, import_name: str) -> str: """Retrieves the latest version from the staging directory. @@ -84,49 +99,68 @@ def update_staging_version(self, import_name: str, version: str): import_name: The name of the import. version: The new version string. """ - try: - logging.info( - f'Updating staging version file for import {import_name} to {version}' - ) - output_dir = import_name.replace(':', '/') - version_file = self.bucket.blob( - os.path.join(output_dir, _STAGING_VERSION_FILE)) - version_file.upload_from_string(version) - logging.info( - f'Updated staging version file {version_file} to {version}') - except exceptions.NotFound as e: - logging.error(f'Error updating version file for {import_name}: {e}') - raise + logging.info( + f'Updating staging version file for import {import_name} to {version}' + ) + output_dir = import_name.replace(':', '/') + version_file = self.bucket.blob( + os.path.join(output_dir, _STAGING_VERSION_FILE)) + version_file.upload_from_string(version) + logging.info( + f'Updated staging version file {version_file.name} to {version}') - def update_version_file(self, import_name: str, version: str): - """Updates the version file in GCS. + def update_provenance_file(self, import_name: str, version: str): + """Updates the provenance file for the import. - Copies the latest version file and import metadata from staging to the - output directory. + Args: + import_name: The name of the import. + version: The version of the import. + """ + logging.info( + f'Updating provenance file for import {import_name} to add {version}' + ) + output_dir = import_name.replace(':', '/') + metadata_blob = self.bucket.blob( + os.path.join(output_dir, version, 'provenance', 'genmcf', + _IMPORT_METADATA_MCF)) + if metadata_blob.exists(): + self.bucket.copy_blob( + metadata_blob, self.bucket, + os.path.join(output_dir, 'import_metadata_mcf.mcf')) + else: + logging.warning( + f'Generating default metadata for import {import_name}') + base_name = import_name.split(':')[-1] + default_provenance = f"Node: dcid:dc/base/{base_name}\ntypeOf: dcid:Provenance\n" + new_blob = self.bucket.blob( + os.path.join(output_dir, version, 'provenance', 'genmcf', + 'import_metadata_mcf.mcf')) + new_blob.upload_from_string(default_provenance) + + provenance_file = import_name.split(':')[-1] + '.mcf' + provenance_blob = self.bucket.blob( + os.path.join('provenance', provenance_file)) + if provenance_blob.exists(): + self.bucket.copy_blob( + provenance_blob, self.bucket, + os.path.join(output_dir, version, 'provenance', 'genmcf', + provenance_file)) + logging.info( + f'Updated provenance file for import {import_name} to add {version}' + ) + + def update_version_file(self, import_name: str, version: str): + """Updates the latest version file in GCS. Args: import_name: The name of the import. version: The new version string. """ - try: - logging.info( - f'Updating version history file for import {import_name} to add {version}' - ) - output_dir = import_name.replace(':', '/') - version_file = self.bucket.blob( - os.path.join(output_dir, _LATEST_VERSION_FILE)) - metadata_blob = self.bucket.blob( - os.path.join(output_dir, version, _IMPORT_METADATA_MCF)) - if metadata_blob.exists(): - self.bucket.copy_blob( - metadata_blob, self.bucket, - os.path.join(output_dir, 'import_metadata_mcf.mcf')) - else: - logging.error( - f'Metadata file not found for import {import_name} {version}' - ) - version_file.upload_from_string(version) - logging.info(f'Updated version history file {version_file}') - except exceptions.NotFound as e: - logging.error(f'Error updating version file for {import_name}: {e}') - raise + logging.info( + f'Updating version history file for import {import_name} to add {version}' + ) + output_dir = import_name.replace(':', '/') + version_file = self.bucket.blob( + os.path.join(output_dir, _LATEST_VERSION_FILE)) + version_file.upload_from_string(version) + logging.info(f'Updated version history file {version_file.name}') diff --git a/import-automation/workflow/spanner-ingestion-workflow.yaml b/import-automation/workflow/spanner-ingestion-workflow.yaml index d990e9af70..cb46738eff 100644 --- a/import-automation/workflow/spanner-ingestion-workflow.yaml +++ b/import-automation/workflow/spanner-ingestion-workflow.yaml @@ -6,7 +6,7 @@ main: - lock_timeout: 82800 # 23 hours - wait_period: 300 # seconds - project_id: '${sys.get_env("PROJECT_ID")}' - - dataflow_job_name: 'ingestion-job' + - dataflow_job_name: ${"ingestion-job-" + string(int(sys.now()))} - dataflow_gcs_path: 'gs://datcom-templates/templates/flex/ingestion.json' - location: '${sys.get_env("LOCATION")}' - spanner_project_id: '${sys.get_env("SPANNER_PROJECT_ID")}' @@ -14,7 +14,7 @@ main: - spanner_database_id: '${sys.get_env("SPANNER_DATABASE_ID")}' - ingestion_function: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + "spanner-ingestion-helper"} - aggregation_function: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + "import-aggregation-helper"} - - import_list: ${default(map.get(args, "imports"), [])} + - import_list: ${default(map.get(args, "importList"), [])} - execution_error: null - acquire_ingestion_lock: try: @@ -30,22 +30,22 @@ main: result: lock_status retry: predicate: ${http.default_retry_predicate} - max_retries: 10 + max_retries: 20 backoff: initial_delay: 300 - max_delay: 300 - multiplier: 1 + max_delay: 600 + multiplier: 2 - process_ingestion: try: steps: - - get_import_list: + - get_import_info: call: http.post args: url: ${ingestion_function} auth: type: OIDC body: - actionType: get_import_list + actionType: get_import_info importList: ${import_list} result: import_info - run_ingestion_job: @@ -60,6 +60,8 @@ main: spanner_instance_id: ${spanner_instance_id} spanner_database_id: ${spanner_database_id} wait_period: ${wait_period} + ingestion_function: ${ingestion_function} + workflow_id: '${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}' result: dataflow_job_id - run_aggregation: call: http.post @@ -80,6 +82,7 @@ main: workflowId: '${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}' jobId: '${dataflow_job_id}' importList: '${import_info.body}' + status: 'SUCCESS' result: function_response except: as: e @@ -106,11 +109,13 @@ main: # This sub-workflow launches a Dataflow job and waits for it to complete. run_dataflow_job: - params: [import_list, project_id, job_name, template_gcs_path, location, spanner_project_id, spanner_instance_id, spanner_database_id, wait_period] + params: [import_list, project_id, job_name, template_gcs_path, location, spanner_project_id, spanner_instance_id, spanner_database_id, wait_period, ingestion_function, workflow_id] steps: - init: assign: - - jobName: '${job_name + "-" + string(int(sys.now()))}' + - jobName: ${job_name} + - machineType: 'n2-highmem-8' + - numWorkers: 3 - log_imports: call: sys.log args: @@ -135,8 +140,8 @@ run_dataflow_job: spannerInstanceId: '${spanner_instance_id}' spannerDatabaseId: '${spanner_database_id}' environment: - numWorkers: 3 - machineType: 'n2-highmem-8' + numWorkers: ${numWorkers} + machineType: ${machineType} result: launch_result - wait_for_job_completion: call: sys.sleep @@ -156,10 +161,22 @@ run_dataflow_job: switch: - condition: ${job_status.currentState == "JOB_STATE_DONE"} return: ${launch_result.job.id} - - condition: ${job_status.currentState == "JOB_STATE_FAILED" or - job_status.currentState == "JOB_STATE_CANCELLED"} - next: fail_workflow + - condition: ${job_status.currentState == "JOB_STATE_FAILED" or job_status.currentState == "JOB_STATE_CANCELLED"} + next: record_failed_imports next: wait_for_job_completion + - record_failed_imports: + call: http.post + args: + url: ${ingestion_function} + auth: + type: OIDC + body: + actionType: update_ingestion_status + workflowId: '${workflow_id}' + jobId: '${launch_result.job.id}' + importList: '${json.decode(import_list)}' + status: 'PENDING' + result: retry_response - fail_workflow: raise: message: '${jobName + " dataflow job failed with status: " + job_status.currentState}' diff --git a/import-automation/workflow/spanner_schema.sql b/import-automation/workflow/spanner_schema.sql index 7985066292..fcbc9365c5 100644 --- a/import-automation/workflow/spanner_schema.sql +++ b/import-automation/workflow/spanner_schema.sql @@ -31,6 +31,7 @@ CREATE TABLE ImportStatus ( CREATE TABLE IngestionHistory ( CompletionTimestamp TIMESTAMP NOT NULL OPTIONS ( allow_commit_timestamp = TRUE ), + IngestionFailure Bool NOT NULL, WorkflowExecutionID STRING(1024) NOT NULL, DataflowJobID STRING(1024), IngestedImports ARRAY,