docs/source/working-notes/troyraen/branch-tests/update_broker_utils.md
update_broker_utils
Setup
survey=ztf
testid=brokerutils
project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
survey, testid = "ztf", "brokerutils"
Deploy instance
cd broker/setup_broker
teardown=False
./setup_broker.sh "$testid" "$teardown" "$survey"
Publish test alerts
Following Alerts for Testing.
from collections import namedtuple
from datetime import datetime
import os
import re
from pathlib import Path
import yaml
from google.cloud import pubsub_v1
from broker_utils import data_utils, gcp_utils
# GCP project that the gcp_utils module connects to
gcp_utils.pgb_project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
# Cloud Storage bucket that holds test alerts
testing_bucket_id = "ztf-test_alerts"
# Set alert_dir to a local path for alert storage
alert_dir = Path("/Users/troyraen/Documents/broker/Pitt-Google/testing/docs/source/working-notes/troyraen/alerts-for-testing")
def _get_avro_paths(dir):
"""Return a generator of paths to Avro files in the directory `dir`."""
return (p for p in dir.glob('*.avro'))
paths = _get_avro_paths(alert_dir)
path = next(paths)
survey, testid = "ztf", "brokerutils"
topic_name = f"{survey}-alerts-{testid}"
publisher = pubsub_v1.PublisherClient()
# define two helper functions
FName = namedtuple("FName", ["objectId", "candid", "kafka_topic", "avro"])
def parse_fname(fin: str):
return FName._make(fin.split('.'))
def match_name_stub(resource_name: str, name_stub: str):
"""Regex match `resource_name`, `name_stub` accounting for broker name syntax."""
rexp = fr"(\w*[-_])({name_stub})(.*)"
if isinstance(resource_name, pd.Series):
# call regex on each element in Series
return resource_name.str.match(rexp)
return re.match(rexp, resource_name)
def publish_alert(path, topic_name):
# determine if the topic name stub is "alerts" and set variables accordingly
# if so, we'll publish Avro-serialized bytes
# else we'll publish JSON bytes
is_alerts = bool(match_name_stub(topic_name, "alerts"))
if is_alerts:
return_as = "bytes" # Pub/Sub publishes as bytes-encoded, Avro-serialized
kwargs = {}
else:
return_as = "dict" # Pub/Sub publishes as bytes-encoded, JSON
kwargs = {
"drop_cutouts": True,
"schema_map": alert_dir / "ztf-schema-map.yaml"
}
# create dummy attributes to attach
attrs = {
"kafka_topic": parse_fname(path.name).kafka_topic,
"kafka_timestamp": str(datetime.utcnow())
}
# load the alert and publish
alert = data_utils.load_alert(path, return_as, **kwargs)
result = gcp_utils.publish_pubsub(
topic_name, alert, attrs=attrs, publisher=publisher
)
return result
# publish a single alert to Pub/Sub
publish_alert(path, topic_name)
# publish every alert in the directory to Pub/Sub
for path in _get_avro_paths(alert_dir):
publish_alert(path, topic_name)
Wait a bit for things to process, then cue night conductor to collect metadata
NIGHT="END"
vm_name="${survey}-night-conductor-${testid}"
gcloud compute instances add-metadata "$vm_name" \
--metadata "NIGHT=${NIGHT}"
gcloud compute instances start "$vm_name"
Query test results
According to the README included with the test alerts, we expect:
37 alerts to pass all filters and show up in the SuperNNova table
13 alerts will not pass all filters and will not show up in the SuperNNova table
import os
import pandas as pd
import setup_gcp as sup
dataset = f"{survey}_alerts_{testid}"
fin = "/Users/troyraen/Documents/broker/Pitt-Google/onboarding/docs/source/broker/onboarding/alerts-for-testing/alerts-for-testing.csv"
df_expect = pd.read_csv(fin)
datasets = sup._resources("BQ", survey, testid) # dict(str(dataset): [str(table),])
# buckets = sup._resources("GCS", survey, testid)
as_expected = {} # dict(str(dataset.table): bool(result is as expected))
for dataset, tables in datasets.items():
for table in tables:
compare = ["objectId", "candid"] # fields to compare in dataframes
select = compare # columns to query
# we'll use more info from the metadata table
if table == "metadata":
compare = select + ["kafka_topic__alerts", "filename__alert_avros"]
select = mycompare + ["bucketId__alert_avros"]
df = gcp_utils.query_bigquery((
f"SELECT {', '.join(my_select)} "
f"FROM `{project_id}.{dataset}.{table}`"
)).to_dataframe()
# validate results
as_expect = True
if table == "metadata":
as_expect = bool(
match_name_stub(df["bucketId__alert_avros"], "alert_avros")
)
as_expected[f"{dataset}.{table}"] = (
as_expect & df[compare].equals(df_expect[compare])
)
broker-utils not installing on night-conductor
Solution: use a Conda environment with Python 3.7
# shart over with a fresh vm
survey=ztf
testid=brokerutils
vm_name="${survey}-night-conductor-${testid}"
gcloud compute instances stop "$vm_name"
gcloud compute instances delete "$vm_name"
machinetype=e2-standard-2
gcloud compute instances create "$vm_name" \
--machine-type="$machinetype" \
--scopes=cloud-platform \
--metadata=google-logging-enabled=true
# log in
gcloud compute instances start "$vm_name"
gcloud compute ssh "$vm_name"
# install pip3 and screen
apt-get update
apt-get install -y python3-pip screen
# then install pgb-broker-utils==0.2.28
pip3 install pgb-broker-utils==0.2.28 &> install_broker_utils_0.2.28.out
# this does not succeed
# astropy requires jinja2 which requires MarkupSafe.soft_unicode
# but this was removed in v2.1.0
# https://markupsafe.palletsprojects.com/en/2.1.x/changes/#version-2-1-0
# go back to local machine and the download the log file to this directory
exit
gcloud compute scp "troyraen@${vm_name}:/home/troyraen/install_broker_utils_0.2.28.out" .
gcloud compute instances add-metadata --zone="$zone" "$vm_name" \
--metadata "ATTRIBUTE1=${ATTRIBUTE1},ATTRIBUTE2=${ATTRIBUTE2}"
wget https://repo.anaconda.com/archive/Anaconda3-2021.05-Linux-x86_64.sh
bash Anaconda3-2021.05-Linux-x86_64.sh
conda create -n pgb python=3.7
conda activate pgb
pip install pgb-broker-utils==0.2.28
Night conductor’s environment is python 3.7.3. More info about the VM environment is in the file install_broker_utils_0.2.28.out in this directory.
I tried installing pinned versions of all requirements (obtained by successfully installing broker-utils in a clean environment on a Mac, Big Sur, python 3.7.10), with astropy last. Everything installed but astropy, which failed with the same error.
WILL HAVE TO USE A CONDA ENVIRONMENT. IT WORKS.