Implementing SuperNNova Classifier¶
SuperNNova Links:
The following is mostly code to develop and test and Cloud Function. This uses a pre-trained model provided by the SuperNNova team.
Preliminary exploration¶
Following run_onthefly.py
pip install supernnova
pip install torch
pip install h5py
pip install natsort
pip install scikit-learn
pip install seaborn
Get some alerts and classify them:
from astropy.time import Time
import numpy as np
import pandas as pd
from pgb_utils import pubsub as pgbps
from supernnova.validation.validate_onthefly import classify_lcs
COLUMN_NAMES = [
"SNID",
"MJD",
"FLUXCAL",
"FLUXCALERR",
"FLT"
]
cols = ['objectId', 'jd', 'magpsf', 'sigmapsf', 'magzpsci', 'fid']
ztf_fid_names = {1:'g', 2:'r', 3:'i'}
device='cpu'
model_file='/Users/troyraen/Documents/broker/SNN/ZTF_DMAM_V19_NoC_SNIa_vs_CC_forFink/vanilla_S_0_CLF_2_R_none_photometry_DF_1.0_N_global_lstm_32x2_0.05_128_True_mean.pt'
# rnn_state = torch.load(model_file, map_location=lambda storage, loc: storage)
subscription = 'ztf-loop'
msgs = pgbps.pull(subscription, max_messages=10)
# dflist = [pgbps.decode_ztf_alert(m, return_format='df') for m in msgs]
dflist = []
for m in msgs:
df = pgbps.decode_ztf_alert(m, return_format='df')
df['objectId'] = df.objectId
df = df[cols]
df['SNID'] = df['objectId']
df['MJD'] = Time(df['jd'], format='jd').mjd
df['FLUXCAL'] = 10 ** ((df['magzpsci'] - df['magpsf']) / 2.5)
df['FLUXCALERR'] = df['FLUXCAL'] * df['sigmapsf'] * np.log(10 / 2.5)
df['FLT'] = df['fid'].map(ztf_fid_names)
dflist.append(df)
dfs = pd.concat(dflist)
ids_preds, pred_probs = classify_lcs(dfs, model_file, device)
preds_df = reformat_to_df(pred_probs, ids=ids_preds)
def reformat_to_df(pred_probs, ids=None):
""" Reformat SNN predictions to a DataFrame
# TO DO: suppport nb_inference != 1
"""
num_inference_samples = 1
d_series = {}
for i in range(pred_probs[0].shape[1]):
d_series["SNID"] = []
d_series[f"prob_class{i}"] = []
for idx, value in enumerate(pred_probs):
d_series["SNID"] += [ids[idx]] if len(ids) > 0 else idx
value = value.reshape((num_inference_samples, -1))
value_dim = value.shape[1]
for i in range(value_dim):
d_series[f"prob_class{i}"].append(value[:, i][0])
preds_df = pd.DataFrame.from_dict(d_series)
# get predicted class
preds_df["pred_class"] = np.argmax(pred_probs, axis=-1).reshape(-1)
return preds_df
Test the Cloud Function pieces locally¶
export GCP_PROJECT=$GOOGLE_CLOUD_PROJECT
export TESTID=False
export SURVEY=ztf
cd /Users/troyraen/Documents/broker/repo3/broker/cloud_functions/classify_snn
import main
from broker_utils import gcp_utils
msg = gcp_utils.pull_pubsub('ztf-alerts-reservoir', msg_only=False)[0]
alert_dict = data_utils.decode_alert(msg.data)
dropcols = ['cutoutScience', 'cutoutTemplate', 'cutoutDifference']
alert_dict = {k: v for k, v in alert_dict.items() if k not in dropcols}
snn_dict = main.classify_with_snn(alert_dict)
gcp_utils.publish_pubsub('test', {'alert': alert_dict, 'SuperNNova': snn_dict})
snn_msg = gcp_utils.pull_pubsub('test')[0]
gcp_utils.insert_rows_bigquery(bq_table, [snn_dict])
Classify known Ia¶
The docs indicate that “usually” class0 indicates a Ia and class1 indicates non-Ia, but this will depend on how the model was trained.
I (Troy) checked by classifying a recent observation of the known Ia that I recently pulled from file storage to send to Ella (SN_2021rhu aka ZTF21abiuvdk, ZTF21abiuvdk_lightcurve.png and compare with Alerce).
Results indicate the opposite of expected… SN_2021rhu is assigned to class1 with high confidence.
Adding this to my list of questions for Anais about the trained model.
import main
from broker_utils import data_utils, gcp_utils
snIa = 'ZTF21abiuvdk'
fname = '/Users/troyraen/Documents/broker/ella/avros/ZTF21abiuvdk.1664460940815015004.ztf_20210723_programid1.avro'
alert_dict = data_utils.decode_alert(fname)
snn_dict = main.classify_with_snn(alert_dict)
snn_dict
# output is:
{'objectId': 'ZTF21abiuvdk',
'candid': 1664460940815015004,
'prob_class0': 0.04458457976579666,
'prob_class1': 0.9554154872894287,
'pred_class': 1}
Local, full test¶
conda create -n snn python=3.7
conda activate snn
export 'GOOGLE_APPLICATION_CREDENTIALS=/Users/troyraen/Documents/broker/repo/GCP_auth_key.json'
export GCP_PROJECT='ardent-cycling-243415'
export TESTID='False'
export SURVEY='ztf'
cd '/Users/troyraen/Documents/broker/repo3/broker/cloud_functions/classify_snn'
pip install -r requirements.txt
pip install ipython
# cd '/Users/troyraen/Documents/broker/repo3/broker/broker_utils'
# python -m pip install -e .
ipython
from broker_utils import data_utils, gcp_utils
import main
# create alert as it would come from the ex-galac stream
msg = gcp_utils.pull_pubsub('ztf-alerts-reservoir')[0]
alert_dict_tmp = data_utils.decode_alert(msg)
dropcols = ['cutoutScience', 'cutoutTemplate', 'cutoutDifference']
alert_dict = {k: v for k, v in alert_dict_tmp.items() if k not in dropcols}
# publish it and pull it back down
gcp_utils.publish_pubsub('test', alert_dict)
msgs_exgalac = gcp_utils.pull_pubsub('test', msg_only=False)
msg_exgalac = msgs_exgalac[0].message
# run cloud function: classify, publish pubsub, load bigquery
main.run(msg_exgalac, {})
# check results
# pull pubsub
msg_snn = gcp_utils.pull_pubsub('ztf-SuperNNova-sub')[0]
snn_dicts = data_utils.decode_alert(msg_snn)
# query bigquery
candid = snn_dicts['alert']['candid']
query = (
f'SELECT * '
f'FROM `ardent-cycling-243415.ztf_alerts.SuperNNova` '
f'WHERE candid={candid} '
)
snn_queryjob = gcp_utils.query_bigquery(query)
for r, row in enumerate(snn_queryjob):
print(row)
This works.
Deploy Cloud Function¶
survey="ztf"
testid="False"
classify_snn_CF_name="${survey}-classify_with_SuperNNova"
classify_snn_trigger_topic="${survey}-exgalac_trans"
classify_snn_entry_point="run"
cd '/Users/troyraen/Documents/broker/repo3/broker/cloud_functions/classify_snn'
gcloud functions deploy "$classify_snn_CF_name" \
--entry-point "$classify_snn_entry_point" \
--runtime python37 \
--trigger-topic "$classify_snn_trigger_topic" \
--set-env-vars TESTID="$testid",SURVEY="$survey"
This errors out with:
I cannot find any more specific info in the logs or at the provided links. The step that it fails on is called “uploading_python_pkg_layer”.
Verified I can successfully deploy a different cloud function.
survey="ztf"
testid="False"
classify_snn_CF_name="troy-test-check_cue_response"
classify_snn_trigger_topic="${survey}-exgalac_trans"
classify_snn_entry_point="run"
cd '/Users/troyraen/Documents/broker/repo3/broker/cloud_functions/check_cue_response'
gcloud functions deploy "$classify_snn_CF_name" \
--entry-point "$classify_snn_entry_point" \
--runtime python37 \
--trigger-topic "$classify_snn_trigger_topic" \
--set-env-vars TESTID="$testid",SURVEY="$survey"
Try a different deployment method… use Cloud Build directly.
gcloud builds submit --config \
/Users/troyraen/Documents/broker/repo3/version_tracking/v0.7.0/cloud_build.yaml \
/Users/troyraen/Documents/broker/repo3/broker/cloud_functions/classify_snn
Same result.
Comment everything out of the Cloud Function and put things back one by one until it fails.
Ok, the problem is torch
and the fact that Cloud Functions don’t have support for GPU libraries.
See here.
Fixed by providing a direct URL in requirements.txt.