docs/source/working-notes/troyraen/ztf-v4.02/readme.md

Update to ZTF Avro Schema v4.02

Download the schema files

baseurl="https://raw.githubusercontent.com/ZwickyTransientFacility/ztf-avro-alert/master/schema/"
format="avsc"
schemas=(alert candidate cutout fp_hist prv_candidate)

for schema in ${schemas[@]}; do
    fname="${schema}.${format}"
    curl -L -o "schema/${fname}" "${baseurl}${fname}"
done

# alert.avsc expects the other files to have "ztf.alert." prepended to their names
for schema in ${schemas[@]}; do
    fname="${schema}.${format}"
    mv "schema/${fname}" "schema/ztf.alert.${fname}"
done

Load schema

import fastavro

schema = fastavro.schema.load_schema("schema/ztf.alert.alert.avsc")

Create the BigQuery schema file and then create the table

import numpy as np
import os
import sys
from pathlib import Path

# need to use some functions from the dir ingest-ztf-archive
sys.path.append('../ingest-ztf-archive')
import ingest_tarballs
import schemas


PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
# tarball has v4.01 but real version will be v4.02
VERSION = "4.01"  # use this to create the table using a downloaded alert
# VERSION = "4.02"  # use this later to fill in the production table
VERSIONTAG = f"v{VERSION.replace('.', '_')}"

DATASET = "ztf_raenztf402"  # manually created this dataset
TABLE = f"{PROJECT_ID}.{DATASET}.alerts_{VERSIONTAG}"

# easiest way to create the bigquery schema file is to create a table using an alert then download the json
# manually downloaded the tarball of alerts and extracted
# https://caltech.box.com/s/jxm00q0jufcnshnluktle8wk05lam73c
alert_dir = "../ingest-ztf-archive/alerts/apfp_20231012"
alert_path = f"{alert_dir}/2475433850015010009.avro"

# fix the schema
ingest_tarballs.touch_schema(VERSION, alert_path)
schema = schemas.loadavsc(VERSION, nocutouts=True)
ingest_tarballs.fix_alert_on_disk((Path(alert_path), schema, VERSION))
alert_path = f"{alert_dir}/apfp_20231012/ZTF17aadcvhq/2475433850015010009.avro"

# create the table using the downloaded alert
ingest_tarballs.touch_table(TABLE, alert_path, schema, VERSION)

# --------- skip down to "download the bigquery schema to a json file" ---------

# there were already alerts in the kafka topic when i turned the consumer on. ran into bigquery rate limit.
# no errors with alerts getting into the bucket, so use it fill in the table.
VERSION = "4.02"
VERSIONTAG = f"v{VERSION.replace('.', '_')}"
DATASET = "ztf"
TABLE = f"{PROJECT_ID}.{DATASET}.alerts_{VERSIONTAG}"
bucket = f"{PROJECT_ID}-{DATASET}_alerts_{VERSIONTAG}"
BUCKET = ingest_tarballs.STORAGE_CLIENT.get_bucket(
    ingest_tarballs.STORAGE_CLIENT.bucket(bucket, user_project=PROJECT_ID)
)

# get list of candid in bucket
blobs = list(BUCKET.list_blobs())
bucket_candids = sorted(int(Path(blob.name).stem) for blob in blobs)

# get series of candid in table
sql = (f"SELECT candid FROM `{TABLE}`")
query_job = ingest_tarballs.BQ_CLIENT.query(sql)
table_candids = query_job.result().to_dataframe().squeeze().sort_values()

# upload what's missing to the table
missing_candids = set(bucket_candids) - set(table_candids)
source_uris = [
    "gs://" + blob.bucket.name + "/" + blob.name for blob in blobs
    if int(Path(blob.name).stem) in missing_candids
]
jobs = []  # can only send 10_000 alerts sat a time
for uri_chunk in np.array_split(source_uris, (len(source_uris) // 10_000) + 1):
    jobs.append(ingest_tarballs.load_table_from_bucket(TABLE, list(uri_chunk), reportid=f"len{len(uri_chunk)}"))

# download the bigquery schema to a json file
schema_file="../../../../../broker/setup_broker/templates/bq_ztf_alerts_v4_02_schema.json"
bq show --schema --format=prettyjson \
    ardent-cycling-243415:ztf_raenztf402.alerts_v4_02 \
    > ${schema_file}
# we created this with a v4.01 alert but nothing needs to be changed in the json for v4.02

# make the production table
bq mk --table ardent-cycling-243415:ztf.alerts_v4_02 ${schema_file}

Create the bucket

# copied from broker/cloud_functions/ps_to_gcs/deploy.sh
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
region=us-central1
survey=ztf
versiontag=v4_02

avro_bucket="${PROJECT_ID}-${survey}_alerts_${versiontag}"
gsutil mb -l "${region}" "gs://${avro_bucket}"
gsutil uniformbucketlevelaccess set on "gs://${avro_bucket}"
gsutil requesterpays set on "gs://${avro_bucket}"
gcloud storage buckets add-iam-policy-binding "gs://${avro_bucket}" \
    --member="allUsers" \
    --role="roles/storage.objectViewer"

Update VERSIONTAG for all cloud functions

survey=ztf
region=us-central1
versiontag=v4_02
cloudfncs=(\
    "${survey}-check_cue_response" \
    "${survey}-classify_with_SuperNNova" \
    "${survey}-lite" \
    "${survey}-store_in_BigQuery" \
    "${survey}-tag"\
    "${survey}-upload_bytes_to_bucket" \
)

for cloudfnc in ${cloudfncs[@]}; do
    # this gives me the error
    # ERROR: (gcloud.functions.deploy) Uncompressed deployment is 2237421867B, bigger than maximum allowed size of 536870912B.
    # so i just updated them manually from the console
    gcloud functions deploy "${cloudfnc}" --region="${region}" --update-env-vars=VERSIONTAG="${versiontag}"
done