docs/source/working-notes/troyraen/branch-tests/update_broker_utils.md

update_broker_utils

Setup

survey=ztf
testid=brokerutils
project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
survey, testid = "ztf", "brokerutils"

Deploy instance

cd broker/setup_broker
teardown=False
./setup_broker.sh "$testid" "$teardown" "$survey"

Publish test alerts

Following Alerts for Testing.

from collections import namedtuple
from datetime import datetime
import os
import re
from pathlib import Path
import yaml
from google.cloud import pubsub_v1
from broker_utils import data_utils, gcp_utils

# GCP project that the gcp_utils module connects to
gcp_utils.pgb_project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
# Cloud Storage bucket that holds test alerts
testing_bucket_id = "ztf-test_alerts"

# Set alert_dir to a local path for alert storage
alert_dir = Path("/Users/troyraen/Documents/broker/Pitt-Google/testing/docs/source/working-notes/troyraen/alerts-for-testing")

def _get_avro_paths(dir):
    """Return a generator of paths to Avro files in the directory `dir`."""
    return (p for p in dir.glob('*.avro'))
paths = _get_avro_paths(alert_dir)
path = next(paths)

survey, testid = "ztf", "brokerutils"
topic_name = f"{survey}-alerts-{testid}"
publisher = pubsub_v1.PublisherClient()

# define two helper functions
FName = namedtuple("FName", ["objectId", "candid", "kafka_topic", "avro"])
def parse_fname(fin: str):
    return FName._make(fin.split('.'))

def match_name_stub(resource_name: str, name_stub: str):
    """Regex match `resource_name`, `name_stub` accounting for broker name syntax."""
    rexp = fr"(\w*[-_])({name_stub})(.*)"
    if isinstance(resource_name, pd.Series):
        # call regex on each element in Series
        return resource_name.str.match(rexp)
    return re.match(rexp, resource_name)

def publish_alert(path, topic_name):
    # determine if the topic name stub is "alerts" and set variables accordingly
    # if so, we'll publish Avro-serialized bytes
    # else we'll publish JSON bytes
    is_alerts = bool(match_name_stub(topic_name, "alerts"))
    if is_alerts:
        return_as = "bytes"  # Pub/Sub publishes as bytes-encoded, Avro-serialized
        kwargs = {}
    else:
        return_as = "dict"  # Pub/Sub publishes as bytes-encoded, JSON
        kwargs = {
            "drop_cutouts": True,
            "schema_map": alert_dir / "ztf-schema-map.yaml"
        }

    # create dummy attributes to attach
    attrs = {
        "kafka_topic": parse_fname(path.name).kafka_topic,
        "kafka_timestamp": str(datetime.utcnow())
    }

    # load the alert and publish
    alert = data_utils.load_alert(path, return_as, **kwargs)
    result = gcp_utils.publish_pubsub(
        topic_name, alert, attrs=attrs, publisher=publisher
    )
    return result

# publish a single alert to Pub/Sub
publish_alert(path, topic_name)

# publish every alert in the directory to Pub/Sub
for path in _get_avro_paths(alert_dir):
    publish_alert(path, topic_name)

Wait a bit for things to process, then cue night conductor to collect metadata

NIGHT="END"
vm_name="${survey}-night-conductor-${testid}"
gcloud compute instances add-metadata "$vm_name" \
          --metadata "NIGHT=${NIGHT}"
gcloud compute instances start "$vm_name"

Query test results

According to the README included with the test alerts, we expect:

  • 37 alerts to pass all filters and show up in the SuperNNova table

  • 13 alerts will not pass all filters and will not show up in the SuperNNova table

import os
import pandas as pd

import setup_gcp as sup

dataset = f"{survey}_alerts_{testid}"
fin = "/Users/troyraen/Documents/broker/Pitt-Google/onboarding/docs/source/broker/onboarding/alerts-for-testing/alerts-for-testing.csv"
df_expect = pd.read_csv(fin)
datasets = sup._resources("BQ", survey, testid)  # dict(str(dataset): [str(table),])
# buckets = sup._resources("GCS", survey, testid)

as_expected = {}  # dict(str(dataset.table): bool(result is as expected))
for dataset, tables in datasets.items():
    for table in tables:
        compare = ["objectId", "candid"]    # fields to compare in dataframes
        select = compare                    # columns to query
        # we'll use more info from the metadata table
        if table == "metadata":
            compare = select + ["kafka_topic__alerts", "filename__alert_avros"]
            select = mycompare + ["bucketId__alert_avros"]

        df = gcp_utils.query_bigquery((
            f"SELECT {', '.join(my_select)} "
            f"FROM `{project_id}.{dataset}.{table}`"
        )).to_dataframe()

        # validate results
        as_expect = True
        if table == "metadata":
            as_expect = bool(
                match_name_stub(df["bucketId__alert_avros"], "alert_avros")
            )
        as_expected[f"{dataset}.{table}"] = (
            as_expect & df[compare].equals(df_expect[compare])
        )


broker-utils not installing on night-conductor

Solution: use a Conda environment with Python 3.7

# shart over with a fresh vm
survey=ztf
testid=brokerutils
vm_name="${survey}-night-conductor-${testid}"
gcloud compute instances stop "$vm_name"
gcloud compute instances delete "$vm_name"
machinetype=e2-standard-2
gcloud compute instances create "$vm_name" \
    --machine-type="$machinetype" \
    --scopes=cloud-platform \
    --metadata=google-logging-enabled=true
# log in
gcloud compute instances start "$vm_name"
gcloud compute ssh "$vm_name"
# install pip3 and screen
apt-get update
apt-get install -y python3-pip screen
# then install pgb-broker-utils==0.2.28
pip3 install pgb-broker-utils==0.2.28 &> install_broker_utils_0.2.28.out
# this does not succeed
# astropy requires jinja2 which requires MarkupSafe.soft_unicode
# but this was removed in v2.1.0
# https://markupsafe.palletsprojects.com/en/2.1.x/changes/#version-2-1-0
# go back to local machine and the download the log file to this directory
exit
gcloud compute scp "troyraen@${vm_name}:/home/troyraen/install_broker_utils_0.2.28.out" .

gcloud compute instances add-metadata --zone="$zone" "$vm_name" \
    --metadata "ATTRIBUTE1=${ATTRIBUTE1},ATTRIBUTE2=${ATTRIBUTE2}"


wget https://repo.anaconda.com/archive/Anaconda3-2021.05-Linux-x86_64.sh
bash Anaconda3-2021.05-Linux-x86_64.sh

conda create -n pgb python=3.7
conda activate pgb

pip install pgb-broker-utils==0.2.28

Night conductor’s environment is python 3.7.3. More info about the VM environment is in the file install_broker_utils_0.2.28.out in this directory.

I tried installing pinned versions of all requirements (obtained by successfully installing broker-utils in a clean environment on a Mac, Big Sur, python 3.7.10), with astropy last. Everything installed but astropy, which failed with the same error.

WILL HAVE TO USE A CONDA ENVIRONMENT. IT WORKS.