docs/source/working-notes/troyraen/v0.7.0/track_performance.md

Make the BigQuery table

cd /Users/troyraen/Documents/broker/repo3/broker/setup_broker
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
dataset=ztf_alerts
table=metadata
survey=ztf
bq mk --table ${PROJECT_ID}:${dataset}.${table} templates/bq_${survey}_${table}_schema.json

Timestamps exploration

import time
from datetime import datetime
from broker_utils import gcp_utils

# test with current time
now = time.time()
secnow = int(now)
nanonow = int((now - secnow) * 10**9)
tmp = f'{nanonow*1e-9}'.split('.')[1]

# convert pubsub publish_time to a datetime
psmsg = gcp_utils.pull_pubsub('ztf-alerts-reservoir', msg_only=False)[0]
seconds = psmsg.message.publish_time.seconds
nanoseconds = psmsg.message.publish_time.nanos
tmpnano = f'{nanoseconds*1e-9}'.split('.')[1]
fseconds = float(f'{seconds}.{tmpnano}')
dt = datetime.utcfromtimestamp(fseconds)

Test process_pubsub_counters.py

cd /Users/troyraen/Documents/broker/repo3/broker/night_conductor/end_night/
from google.cloud import pubsub_v1
from broker_utils import data_utils, gcp_utils, schema_maps
import process_pubsub_counters as ppc

# pull some messages and publish them to the ztf-exgalac_trans topic
# so the SNN cloud fnc processes them
max_msgs = 2
msgs = gcp_utils.pull_pubsub('ztf-alerts-reservoir', max_messages=max_msgs)
publisher = pubsub_v1.PublisherClient()
for msg in msgs:
    alert_dict_tmp = data_utils.decode_alert(msg)
    dropcols = ['cutoutScience', 'cutoutTemplate', 'cutoutDifference']
    alert_dict = {k: v for k, v in alert_dict_tmp.items() if k not in dropcols}
    gcp_utils.publish_pubsub('ztf-exgalac_trans', alert_dict, publisher=publisher)

# process metadata from the SNN subscription

# test the SubscriptionMetadataCollector
sub_name = 'ztf-SuperNNova-counter'
schema_map = schema_maps.load_schema_map('ztf', 'False')
sub_collector = ppc.SubscriptionMetadataCollector(sub_name, schema_map)
# sub_collector.pull_and_process_messages()
sub_collector._pull_messages_and_extract_metadata()  # sub_collector.metadata_dicts_list
sub_collector._package_metadata_into_df()  # sub_collector.metadata_df
sub_collector.metadata_df

# test the MetadataCollector
# publish more messages to process using code above
collector = ppc.MetadataCollector('ztf', False, 500)
collector._collect_all_metadata()  # collector.metadata_dfs_list
collector._group_metadata_by_candidate()
gcp_utils.insert_rows_bigquery(collector.bq_table, collector.metadata_dicts_list)
# collector.collect_and_store_all_metadata()

Process last night’s ZTF stream

import process_pubsub_counters as ppc
survey = 'ztf'
# testid = 'v070'
testid = False
batch_size = 500

collector = ppc.MetadataCollector(survey, testid, batch_size)
collector.collect_and_store_all_metadata()