Source code for broker.broker_utils.broker_utils.schema_maps

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""The ``schema_maps`` module loads and returns schema maps stored in
Cloud Storage.
"""

import os
from pathlib import Path
from typing import Optional, Union

import yaml
from google.cloud import storage
from google.cloud.storage.blob import Blob

# get project id from environment variable, else default to production project
# cloud functions use GCP_PROJECT
if "GCP_PROJECT" in os.environ:
    PROJECT_ID = os.getenv("GCP_PROJECT")
else:
    PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT", "ardent-cycling-243415")


[docs]def load_schema_map( survey: str, testid: str, schema: Optional[Union[Path, str]] = None, ) -> dict: """Return a dict mapping the survey's field names to an internal broker standard. Args: survey: Name of the survey associated with the broker instance. If `schema` is a `pathlib.Path`, `survey` is ignored (left as an arg for backward compatibility). Otherwise, it determines (in conjunction with `testid`) which GCS bucket the schema map will be loaded from. If `schema` is `None`, this will also determine which survey's schema map is returned. testid: If `schema` is a `pathlib.Path`, `testid` is ignored (left as an arg for backward compatibility). Otherwise, it determines (in conjunction with `survey`) which GCS bucket the schema map will be loaded from. If this is a bool, it must be False (indicating a production instance). schema: If this is a `pathlib.Path`: path to a local yaml file to be loaded. In this case, `survey` and `testid` are ignored. If this is a `str`: name of the survey who's schema map is to be loaded. It will be retrieved from a Cloud Storage bucket. If this is `None`, the schema map corresponding to `survey` will be will be retrieved from a Cloud Storage bucket. Returns: Dictionary mapping the survey's field names to an internal broker standard. """ if not schema: # set schema to the survey name, map will be retrieved from a bucket schema = survey # str if isinstance(schema, str): # download from storage bucket return _download_schema_map(survey, testid, schema) # else load from local file return load_yaml(schema)
def _download_schema_map(survey: str, testid: str, schema: str) -> dict: """Download the schema map from Cloud Storage and return as a dict. Args: survey: Name of the survey associated with the broker instance. Determines (in conjunction with `testid`) which GCS bucket the schema map will be loaded from. If `schema` is `None`, this will also determine which survey's schema map is returned. testid: Determines (in conjunction with `survey`) which GCS bucket the schema map will be loaded from. If this is a bool, it must be False (indicating a production instance). schema: Name of the survey who's schema map is to be loaded. It will be retrieved from a Cloud Storage bucket. Returns: Dictionary mapping the survey's field names to an internal broker standard. """ # some setup bucket = _broker_bucket_name(survey, testid) schema = _schema_object_name(schema) client = storage.Client() # try to get the object from storage. blob = `None` if unsuccessful blob = client.bucket(bucket).get_blob(schema) # 403 error (Forbidden) if not auth'd if not blob: # raise an error _ = client.get_bucket(bucket) # 404 error (NotFound) if bucket doesn't exist raise ValueError(f"Bucket {bucket} exists but the object {schema} does not.") return load_yaml(blob) def _broker_bucket_name(survey, testid): if testid in ["False", False]: return f'{PROJECT_ID}-{survey}-broker_files' else: return f'{PROJECT_ID}-{survey}-broker_files-{testid}' def _schema_object_name(survey): return f'schema_maps/{survey}.yaml'
[docs]def load_yaml(fin: Union[Path, str, Blob]) -> dict: """Load a yaml file and return as a dict. Args: fin: Path-like or `google.cloud.storage.blob.Blob` object. Assumes yaml format. Returns: Dictionary mapping the survey's field names to an internal broker standard. """ if isinstance(fin, Blob): with fin.open("rt") as f: return yaml.safe_load(f) # dict with open(fin, "rb") as f: return yaml.safe_load(f) # dict
[docs]def get_value(key: str, alert_dict: dict, schema_map: dict) -> Union[int, str]: """Return the alert_dict value corresponding to schema_map and key.""" fullkey = schema_map.get(key) if isinstance(fullkey, str): return alert_dict.get(fullkey) if isinstance(fullkey, list): if len(fullkey) == 1: return alert_dict.get(fullkey[0]) if len(fullkey) == 2: return alert_dict.get(fullkey[0]).get(fullkey[1]) if len(fullkey) == 3: return alert_dict.get(fullkey[0]).get(fullkey[1]).get(fullkey[2]) return None
[docs]def get_key(key: str, schema_map: dict) -> Union[int, str]: """Return the key or final element in list corresponding to schema_map and key.""" fullkey = schema_map.get(key) if isinstance(fullkey, str): return fullkey if isinstance(fullkey, list): return fullkey[-1] return None