from google.cloud import pubsub_v1
from broker_utils import data_utils, gcp_utils, schema_maps
import process_pubsub_counters as ppc
# pull some messages and publish them to the ztf-exgalac_trans topic
# so the SNN cloud fnc processes them
max_msgs = 2
msgs = gcp_utils.pull_pubsub('ztf-alerts-reservoir', max_messages=max_msgs)
publisher = pubsub_v1.PublisherClient()
for msg in msgs:
alert_dict_tmp = data_utils.decode_alert(msg)
dropcols = ['cutoutScience', 'cutoutTemplate', 'cutoutDifference']
alert_dict = {k: v for k, v in alert_dict_tmp.items() if k not in dropcols}
gcp_utils.publish_pubsub('ztf-exgalac_trans', alert_dict, publisher=publisher)
# process metadata from the SNN subscription
# test the SubscriptionMetadataCollector
sub_name = 'ztf-SuperNNova-counter'
schema_map = schema_maps.load_schema_map('ztf', 'False')
sub_collector = ppc.SubscriptionMetadataCollector(sub_name, schema_map)
# sub_collector.pull_and_process_messages()
sub_collector._pull_messages_and_extract_metadata() # sub_collector.metadata_dicts_list
sub_collector._package_metadata_into_df() # sub_collector.metadata_df
sub_collector.metadata_df
# test the MetadataCollector
# publish more messages to process using code above
collector = ppc.MetadataCollector('ztf', False, 500)
collector._collect_all_metadata() # collector.metadata_dfs_list
collector._group_metadata_by_candidate()
gcp_utils.insert_rows_bigquery(collector.bq_table, collector.metadata_dicts_list)
# collector.collect_and_store_all_metadata()