import numpy as np
import os
import sys
from pathlib import Path
# need to use some functions from the dir ingest-ztf-archive
sys.path.append('../ingest-ztf-archive')
import ingest_tarballs
import schemas
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
# tarball has v4.01 but real version will be v4.02
VERSION = "4.01" # use this to create the table using a downloaded alert
# VERSION = "4.02" # use this later to fill in the production table
VERSIONTAG = f"v{VERSION.replace('.', '_')}"
DATASET = "ztf_raenztf402" # manually created this dataset
TABLE = f"{PROJECT_ID}.{DATASET}.alerts_{VERSIONTAG}"
# easiest way to create the bigquery schema file is to create a table using an alert then download the json
# manually downloaded the tarball of alerts and extracted
# https://caltech.box.com/s/jxm00q0jufcnshnluktle8wk05lam73c
alert_dir = "../ingest-ztf-archive/alerts/apfp_20231012"
alert_path = f"{alert_dir}/2475433850015010009.avro"
# fix the schema
ingest_tarballs.touch_schema(VERSION, alert_path)
schema = schemas.loadavsc(VERSION, nocutouts=True)
ingest_tarballs.fix_alert_on_disk((Path(alert_path), schema, VERSION))
alert_path = f"{alert_dir}/apfp_20231012/ZTF17aadcvhq/2475433850015010009.avro"
# create the table using the downloaded alert
ingest_tarballs.touch_table(TABLE, alert_path, schema, VERSION)
# --------- skip down to "download the bigquery schema to a json file" ---------
# there were already alerts in the kafka topic when i turned the consumer on. ran into bigquery rate limit.
# no errors with alerts getting into the bucket, so use it fill in the table.
VERSION = "4.02"
VERSIONTAG = f"v{VERSION.replace('.', '_')}"
DATASET = "ztf"
TABLE = f"{PROJECT_ID}.{DATASET}.alerts_{VERSIONTAG}"
bucket = f"{PROJECT_ID}-{DATASET}_alerts_{VERSIONTAG}"
BUCKET = ingest_tarballs.STORAGE_CLIENT.get_bucket(
ingest_tarballs.STORAGE_CLIENT.bucket(bucket, user_project=PROJECT_ID)
)
# get list of candid in bucket
blobs = list(BUCKET.list_blobs())
bucket_candids = sorted(int(Path(blob.name).stem) for blob in blobs)
# get series of candid in table
sql = (f"SELECT candid FROM `{TABLE}`")
query_job = ingest_tarballs.BQ_CLIENT.query(sql)
table_candids = query_job.result().to_dataframe().squeeze().sort_values()
# upload what's missing to the table
missing_candids = set(bucket_candids) - set(table_candids)
source_uris = [
"gs://" + blob.bucket.name + "/" + blob.name for blob in blobs
if int(Path(blob.name).stem) in missing_candids
]
jobs = [] # can only send 10_000 alerts sat a time
for uri_chunk in np.array_split(source_uris, (len(source_uris) // 10_000) + 1):
jobs.append(ingest_tarballs.load_table_from_bucket(TABLE, list(uri_chunk), reportid=f"len{len(uri_chunk)}"))