docs/source/working-notes/troyraen/v0.7.1/process_streams.md

Create and test process_pubsub_counters.py

Make the BigQuery table

cd /Users/troyraen/Documents/broker/repo3/broker/setup_broker
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
dataset=ztf_alerts_v071
table=metadata
survey=ztf
bq mk --table ${PROJECT_ID}:${dataset}.${table} templates/bq_${survey}_${table}_schema.json

Timestamps exploration

import time
from datetime import datetime
from broker_utils import gcp_utils

# test with current time
now = time.time()
secnow = int(now)
nanonow = int((now - secnow) * 10**9)
tmp = f'{nanonow*1e-9}'.split('.')[1]

# convert pubsub publish_time to a datetime
psmsg = gcp_utils.pull_pubsub('ztf-alerts-reservoir', msg_only=False)[0]
seconds = psmsg.message.publish_time.seconds
nanoseconds = psmsg.message.publish_time.nanos
tmpnano = f'{nanoseconds*1e-9}'.split('.')[1]
fseconds = float(f'{seconds}.{tmpnano}')
dt = datetime.utcfromtimestamp(fseconds)

Test process_pubsub_counters.py on the SuperNNova cloud function

cd /Users/troyraen/Documents/broker/repo3/broker/night_conductor/end_night/
from google.cloud import pubsub_v1
from broker_utils import data_utils, gcp_utils, schema_maps
import process_pubsub_counters as ppc

# pull some messages and publish them to the ztf-exgalac_trans topic
# so the SNN cloud fnc processes them
max_msgs = 2
msgs = gcp_utils.pull_pubsub('ztf-alerts-reservoir', max_messages=max_msgs)
publisher = pubsub_v1.PublisherClient()
for msg in msgs:
    alert_dict_tmp = data_utils.decode_alert(msg)
    dropcols = ['cutoutScience', 'cutoutTemplate', 'cutoutDifference']
    alert_dict = {k: v for k, v in alert_dict_tmp.items() if k not in dropcols}
    gcp_utils.publish_pubsub('ztf-exgalac_trans', alert_dict, publisher=publisher)

# process metadata from the SNN subscription

# test the SubscriptionMetadataCollector
sub_name = 'ztf-SuperNNova-counter'
schema_map = schema_maps.load_schema_map('ztf', 'False')
sub_collector = ppc.SubscriptionMetadataCollector(sub_name, schema_map)
# sub_collector.pull_and_process_messages()
sub_collector._pull_messages_and_extract_metadata()  # sub_collector.metadata_dicts_list
sub_collector._package_metadata_into_df()  # sub_collector.metadata_df
sub_collector.metadata_df

# test the MetadataCollector
# publish more messages to process using code above
collector = ppc.MetadataCollector('ztf', False, 500)
collector._collect_all_metadata()  # collector.metadata_dfs_list
collector._group_metadata_by_candidate()
gcp_utils.insert_rows_bigquery(collector.bq_table, collector.metadata_dicts_list)
# collector.collect_and_store_all_metadata()

Test process_pubsub_counters.py on the full pipeline

cd /Users/troyraen/Documents/broker/repo3/broker/night_conductor/end_night/
# from google.cloud import pubsub_v1
from broker_utils import consumer_sim, schema_maps
import process_pubsub_counters as ppc

testid = 'v071'
survey = 'ztf'
instance = (survey, testid)
alert_rate = (25, 'once')
# alert_rate = 'ztf-active-avg'
# runtime = (30, 'min')  # options: 'sec', 'min', 'hr', 'night'(=10 hrs)
consumer_sim.publish_stream(alert_rate, instance)

# process metadata

# test the SubscriptionMetadataCollector on each subscription
sub_names = ppc._resource_names(survey, testid)['subscriptions']
schema_map = schema_maps.load_schema_map(survey, testid)
batch_size = 500

i = 1
sub_collector = ppc.SubscriptionMetadataCollector(sub_names[i], schema_map, batch_size)
sub_collector.pull_and_process_messages()
# sub_collector._pull_messages_and_extract_metadata()  # sub_collector.metadata_dicts_list
# sub_collector._package_metadata_into_df()  # sub_collector.metadata_df
sub_collector.metadata_df

# test the MetadataCollector
# publish more messages to process using code above
collector = ppc.MetadataCollector(survey, testid, batch_size)
# collector.collect_and_store_all_metadata()
collector._collect_all_metadata()  # collector.metadata_dfs_dict
collector._join_metadata()  # collector.metadata_df
collector._load_metadata_to_bigquery()

Process last night’s ZTF stream

In python:

import process_pubsub_counters as ppc
survey = 'ztf'
# testid = 'v070'
testid = False
batch_size = 2
single_batch = True
timeout = 5

# collector = ppc.MetadataCollector(survey, testid, batch_size, single_batch)
collector = ppc.MetadataCollector(survey, testid, timeout=timeout)
collector.collect_and_store_all_metadata()

Command-line:

survey='ztf'
b=2
sb=True
t=5
python3 process_pubsub_counters.py --survey=$survey --production --timeout=$t
# python3 process_pubsub_counters.py --survey=$survey --production --batch_size=$b --single_batch=$sb

Streaming

import process_pubsub_counters as ppc
survey = 'ztf'
testid = False
timeout = 2
testrun = True

collector = ppc.MetadataCollector(survey, testid, timeout=timeout, testrun=testrun)
collector.collect_and_store_all_metadata()

sub_name = 'ztf-alert_avros-counter'
topic_stub = "alert_avros"
sub_collector = ppc.SubscriptionMetadataCollector((topic_stub, sub_name), timeout=timeout)
# sub_collector.pull_and_process_messages()
sub_collector._pull_messages_and_extract_metadata()  # sub_collector.metadata_dicts_list
sub_collector._package_metadata_into_df()  # sub_collector.metadata_df
sub_collector.metadata_df

Command-line:

survey='ztf'
testid='False'
to=2
tr=True
python3 process_pubsub_counters.py --survey=$survey --production --timeout=$to --testrun=$tr