Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions import-automation/workflow/aggregation-helper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ def aggregation_helper(request):
HTTP Cloud Function that takes importName and runs a BQ query.
"""
request_json = request.get_json(silent=True)
import_list = request_json.get('importList')
if not import_list:
return ("'importList' parameter is missing", 400)
import_list = request_json.get('importList', [])
logging.info(f"Received request for importList: {import_list}")
results = []
try:
Expand Down
5 changes: 0 additions & 5 deletions import-automation/workflow/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,19 @@ steps:
- id: 'import-automation-workflow'
name: 'gcr.io/cloud-builders/gcloud'
args: ['workflows', 'deploy', 'import-automation-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
dir: 'import-automation/workflow'

- id: 'spanner-ingestion-workflow'
name: 'gcr.io/cloud-builders/gcloud'
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}']
dir: 'import-automation/workflow'

- id: 'spanner-ingestion-helper'
name: 'gcr.io/cloud-builders/gcloud'
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}']
dir: 'import-automation/workflow'

- id: 'import-aggregation-helper'
name: 'gcr.io/cloud-builders/gcloud'
args: ['functions', 'deploy', 'import-aggregation-helper', '--runtime', 'python312', '--source', 'aggregation-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'aggregation_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION},BQ_DATASET_ID=${_BQ_DATASET_ID}']
dir: 'import-automation/workflow'

- id: 'import-automation-helper'
name: 'gcr.io/cloud-builders/gcloud'
args: ['functions', 'deploy', 'import-automation-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'import-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'handle_feed_event', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID}']
dir: 'import-automation/workflow'
2 changes: 1 addition & 1 deletion import-automation/workflow/cloudbuild_main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ steps:
- '.'
- '--config=cloudbuild.yaml'
- '--project=${_PROJECT_ID}'
- '--substitutions=_ENV=staging,_PROJECT_ID=${_PROJECT_ID},_SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},_SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},_SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},_GCS_BUCKET_ID=${_GCS_BUCKET_ID},_LOCATION=${_LOCATION},_GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},_BQ_DATASET_ID=${_BQ_DATASET_ID}'
- '--substitutions=_PROJECT_ID=${_PROJECT_ID},_SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},_SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},_SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},_GCS_BUCKET_ID=${_GCS_BUCKET_ID},_LOCATION=${_LOCATION},_GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},_BQ_DATASET_ID=${_BQ_DATASET_ID}'
dir: 'import-automation/workflow'

# 2. Run E2E Tests on Staging
Expand Down
33 changes: 19 additions & 14 deletions import-automation/workflow/import-automation-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ main:
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- region: ${sys.get_env("LOCATION")}
- imageUri: ${default(map.get(args, "imageUri"), "gcr.io/datcom-ci/dc-import-executor:stable")}
- jobId: ${text.substring(args.jobName, 0, 50) + "-" + string(int(sys.now()))}
- jobId: ${text.replace_all(text.to_lower(text.substring(text.split(args.importName, ":")[1], 0, 50) + "-" + string(int(sys.now()))), "_", "-")}
- importName: ${args.importName}
- importConfig: ${args.importConfig}
- importConfig: ${default(map.get(args, "importConfig"), "{}")}
- gcsMountBucket: ${sys.get_env("GCS_MOUNT_BUCKET")}
- gcsImportBucket: ${sys.get_env("GCS_BUCKET_ID")}
- gcsMountPath: "/tmp/gcs"
- ingestionHelper: "spanner-ingestion-helper"
- functionUrl: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/" + ingestionHelper}
- startTime: ${sys.now()}
- defaultResources:
machine: "n2-standard-8"
cpu: 8000
memory: 32768
disk: 100
- resources: ${default(map.get(args, "resources"), defaultResources)}
- runImportJob:
try:
call: googleapis.batch.v1.projects.locations.jobs.create
Expand All @@ -25,11 +31,11 @@ main:
allocationPolicy:
instances:
- policy:
machineType: ${args.resources.machine}
machineType: ${resources.machine}
provisioningModel: "STANDARD"
bootDisk:
image: "projects/debian-cloud/global/images/family/debian-12"
size_gb: ${args.resources.disk}
size_gb: ${resources.disk}
installOpsAgent: true
taskGroups:
taskSpec:
Expand All @@ -38,14 +44,14 @@ main:
remotePath: ${gcsMountBucket}
mountPath: ${gcsMountPath}
computeResource:
cpuMilli: ${args.resources.cpu}
memoryMib: ${args.resources.memory}
cpuMilli: ${resources.cpu}
memoryMib: ${resources.memory}
runnables:
- container:
imageUri: ${imageUri}
commands:
- ${"--import_name=" + args.importName}
- ${"--import_config=" + args.importConfig}
- ${"--import_name=" + importName}
- ${"--import_config=" + importConfig}
environment:
variables:
IMPORT_NAME: ${importName}
Expand Down Expand Up @@ -73,11 +79,10 @@ main:
body:
actionType: 'update_import_status'
jobId: ${jobId}
importName: ${args.importName}
status: 'FAILED'
importName: ${importName}
status: 'FAILURE'
executionTime: ${int(sys.now() - startTime)}
latestVersion: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")}
schedule: ${default(map.get(args, "schedule"), "")}
latestVersion: ${"gs://" + gcsImportBucket + "/" + text.replace_all(importName, ":", "/")}
result: functionResponse
- failWorkflow:
raise: ${e}
Expand All @@ -89,8 +94,8 @@ main:
type: OIDC
body:
actionType: 'update_import_version'
importName: ${args.importName}
version: 'staging'
importName: ${importName}
version: 'STAGING'
override: false
comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}'
result: functionResponse
Expand Down
148 changes: 148 additions & 0 deletions import-automation/workflow/import-helper/import_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import json
import logging
import os
import croniter
from datetime import datetime, timezone
from google.auth.transport.requests import Request
from google.oauth2 import id_token
from google.cloud import storage
from google.cloud.workflows import executions_v1
import requests

logging.getLogger().setLevel(logging.INFO)

PROJECT_ID = os.environ.get('PROJECT_ID')
LOCATION = os.environ.get('LOCATION')
GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID')
INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper"
WORKFLOW_ID = 'spanner-ingestion-workflow'

def invoke_ingestion_workflow(import_name: str):
"""Triggers the graph ingestion workflows.

Args:
import_name: The name of the import.
"""
workflow_args = {"importList": [import_name.split(':')[-1]]}

logging.info(f"Invoking {WORKFLOW_ID} for {import_name}")
execution_client = executions_v1.ExecutionsClient()
parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}"
execution_req = executions_v1.Execution(argument=json.dumps(workflow_args))
response = execution_client.create_execution(parent=parent,
execution=execution_req)
logging.info(
f"Triggered workflow {WORKFLOW_ID} for {import_name}. Execution ID: {response.name}"
)


def update_import_status(import_name,
import_status,
import_version,
graph_path,
job_id,
cron_schedule=None):
"""Updates the status for the specified import job.

Args:
import_name: The name of the import.
import_status: The new status of the import.
import_version: The version of the import.
graph_path: The graph path for the import.
job_id: The job ID associated with the import.
cron_schedule: The cron schedule for the import (optional).
"""
logging.info(f"Updating {import_name} status: {import_status}")
latest_version = 'gs://' + GCS_BUCKET_ID + '/' + import_name.replace(
':', '/') + '/' + import_version
request = {
'actionType': 'update_import_status',
'importName': import_name,
'status': import_status,
'job_id': job_id,
'latestVersion': latest_version,
'graphPath': graph_path
}
if cron_schedule:
try:
next_refresh = croniter.croniter(
cron_schedule,
datetime.now(timezone.utc)).get_next(datetime).isoformat()
request['nextRefresh'] = next_refresh
except (croniter.CroniterError) as e:
logging.error(
f"Error calculating next refresh from schedule '{cron_schedule}': {e}"
)
logging.info(f"Update request: {request}")
auth_req = Request()
token = id_token.fetch_id_token(auth_req, INGESTION_HELPER_URL)
headers = {'Authorization': f'Bearer {token}'}
response = requests.post(INGESTION_HELPER_URL,
json=request,
headers=headers)
response.raise_for_status()
logging.info(f"Updated status for {import_name}")


def parse_message(request) -> dict:
"""Processes the incoming Pub/Sub message.

Args:
request: The flask request object.

Returns:
A dictionary containing the message data, or None if invalid.
"""
request_json = request.get_json(silent=True)
if not request_json or 'message' not in request_json:
logging.error('Invalid Pub/Sub message format')
return None

pubsub_message = request_json['message']
logging.info(f"Received Pub/Sub message: {pubsub_message}")
try:
data_bytes = base64.b64decode(pubsub_message["data"])
notification_json = data_bytes.decode("utf-8")
logging.info(f"Notification content: {notification_json}")
except Exception as e:
logging.error(f"Error decoding message data: {e}")

return pubsub_message


def check_duplicate(message_id: str):
"""Checks for duplicate messages using a GCS file.

Args:
message_id: The ID of the message to check.

Returns:
True if the message is a duplicate, False otherwise.
"""
duplicate = False
if not message_id:
return duplicate
logging.info(f"Checking for existing message: {message_id}")
storage_client = storage.Client()
bucket = storage_client.bucket(GCS_BUCKET_ID)
blob = bucket.blob(f"google3/transfers/{message_id}")
try:
blob.upload_from_string("", if_generation_match=0)
except Exception:
duplicate = True
return duplicate
Loading
Loading