diff --git a/flow/util/uploadMetadata.py b/flow/util/uploadMetadata.py index b940941e7a..9f8dc63fc3 100755 --- a/flow/util/uploadMetadata.py +++ b/flow/util/uploadMetadata.py @@ -1,13 +1,23 @@ #!/usr/bin/env python3 -import firebase_admin -from firebase_admin import credentials -from firebase_admin import firestore -from datetime import datetime, timezone import json import argparse import re import os +from datetime import datetime, timezone + +# --- FIRESTORE (remove when deprecating) --- +import firebase_admin +from firebase_admin import credentials +from firebase_admin import firestore + +# --- END FIRESTORE --- + +# --- PUBSUB --- +from google.cloud import pubsub_v1 +from google.oauth2 import service_account + +# --- END PUBSUB --- # make sure the working dir is flow/ os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) @@ -27,10 +37,24 @@ parser.add_argument("--cred", type=str, help="Service account credentials file") parser.add_argument("--variant", type=str, default="base") +# --- PUBSUB args --- +parser.add_argument("--pubsubProjectID", type=str, help="GCP project ID for Pub/Sub") +parser.add_argument( + "--pubsubTopicID", + type=str, + default="ci-metrics-reports-topics", + help="Pub/Sub topic ID", +) +parser.add_argument( + "--pubsubCred", type=str, help="Service account credentials file for Pub/Sub" +) +# --- END PUBSUB args --- + # Parse the arguments args = parser.parse_args() +# --- FIRESTORE (remove when deprecating) --- def upload_data(db, dataFile, platform, design, variant, args, rules): # Set the document data key = args.commitSHA + "-" + platform + "-" + design + "-" + variant @@ -166,6 +190,43 @@ def upload_data(db, dataFile, platform, design, variant, args, rules): raise Exception(f"Failed to upload data for {platform} {design} {variant}.") +# --- END FIRESTORE --- + + +# --- PUBSUB --- +def publish_to_pubsub( + publisher, topic_path, dataFile, platform, design, variant, args, rules +): + """Publish a single design's metrics to Pub/Sub as a JSON message.""" + with open(dataFile) as f: + data = json.load(f) + + # Build the payload: CLI args + metrics with ':' replaced by '__' + payload = { + "build_id": args.buildID, + "branch_name": args.branchName, + "pipeline_id": args.pipelineID, + "change_branch": args.changeBranch, + "commit_sha": args.commitSHA, + "jenkins_url": args.jenkinsURL, + "rules": rules, + } + + for k, v in data.items(): + new_key = re.sub(":", "__", k) + payload[new_key] = v + + message_data = json.dumps(payload).encode("utf-8") + future = publisher.publish(topic_path, data=message_data) + message_id = future.result() + print( + f"[INFO] Published to Pub/Sub (message ID: {message_id}) for {platform} {design} {variant}." + ) + + +# --- END PUBSUB --- + + def get_rules(dataFile): data = {} if os.path.exists(dataFile): @@ -175,10 +236,31 @@ def get_rules(dataFile): return data -# Initialize Firebase Admin SDK with service account credentials -firebase_admin.initialize_app(credentials.Certificate(args.cred)) -# Initialize Firestore client -db = firestore.client() +# --- FIRESTORE init (remove when deprecating) --- +db = None +if args.cred: + firebase_admin.initialize_app(credentials.Certificate(args.cred)) + db = firestore.client() +# --- END FIRESTORE init --- + +# --- PUBSUB init --- +publisher = None +topic_path = None +if args.pubsubCred and args.pubsubProjectID: + pubsub_credentials = service_account.Credentials.from_service_account_file( + args.pubsubCred + ) + publisher = pubsub_v1.PublisherClient(credentials=pubsub_credentials) + topic_path = publisher.topic_path(args.pubsubProjectID, args.pubsubTopicID) + print(f"[INFO] Pub/Sub publisher initialized for topic: {topic_path}") +elif args.pubsubProjectID: + # No credentials file — use default credentials (e.g., emulator or ADC) + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(args.pubsubProjectID, args.pubsubTopicID) + print( + f"[INFO] Pub/Sub publisher initialized (default creds) for topic: {topic_path}" + ) +# --- END PUBSUB init --- RUN_FILENAME = "metadata.json" @@ -199,6 +281,24 @@ def get_rules(dataFile): print(f"[WARN] Skiping upload {platform} {design} {variant}.") continue print(f"[INFO] Get rules for {platform} {design} {variant}.") - rules = get_rules(os.path.join("designs", platform, design, RUN_FILENAME)) - print(f"[INFO] Upload data for {platform} {design} {variant}.") - upload_data(db, dataFile, platform, design, variant, args, rules) + rules = get_rules( + os.path.join("designs", platform, design, f"rules-{variant}.json") + ) + + # --- FIRESTORE (remove when deprecating) --- + if db: + print(f"[INFO] Upload data for {platform} {design} {variant}.") + upload_data(db, dataFile, platform, design, variant, args, rules) + # --- END FIRESTORE --- + + # --- PUBSUB --- + if publisher: + try: + publish_to_pubsub( + publisher, topic_path, dataFile, platform, design, variant, args, rules + ) + except Exception as e: + print( + f"[WARN] Pub/Sub publish failed for {platform} {design} {variant}: {e}" + ) + # --- END PUBSUB ---