docs/source/working-notes/troyraen/v0.7.0/supernnova.md
Implementing SuperNNova Classifier
SuperNNova Links:
The following is mostly code to develop and test and Cloud Function. This uses a pre-trained model provided by the SuperNNova team.
Preliminary exploration
Following run_onthefly.py
pip install supernnova
pip install torch
pip install h5py
pip install natsort
pip install scikit-learn
pip install seaborn
Get some alerts and classify them:
from astropy.time import Time
import numpy as np
import pandas as pd
from pgb_utils import pubsub as pgbps
from supernnova.validation.validate_onthefly import classify_lcs
COLUMN_NAMES = [
"SNID",
"MJD",
"FLUXCAL",
"FLUXCALERR",
"FLT"
]
cols = ['objectId', 'jd', 'magpsf', 'sigmapsf', 'magzpsci', 'fid']
ztf_fid_names = {1:'g', 2:'r', 3:'i'}
device='cpu'
model_file='/Users/troyraen/Documents/broker/SNN/ZTF_DMAM_V19_NoC_SNIa_vs_CC_forFink/vanilla_S_0_CLF_2_R_none_photometry_DF_1.0_N_global_lstm_32x2_0.05_128_True_mean.pt'
# rnn_state = torch.load(model_file, map_location=lambda storage, loc: storage)
subscription = 'ztf-loop'
msgs = pgbps.pull(subscription, max_messages=10)
# dflist = [pgbps.decode_ztf_alert(m, return_format='df') for m in msgs]
dflist = []
for m in msgs:
df = pgbps.decode_ztf_alert(m, return_format='df')
df['objectId'] = df.objectId
df = df[cols]
df['SNID'] = df['objectId']
df['MJD'] = Time(df['jd'], format='jd').mjd
df['FLUXCAL'] = 10 ** ((df['magzpsci'] - df['magpsf']) / 2.5)
df['FLUXCALERR'] = df['FLUXCAL'] * df['sigmapsf'] * np.log(10 / 2.5)
df['FLT'] = df['fid'].map(ztf_fid_names)
dflist.append(df)
dfs = pd.concat(dflist)
ids_preds, pred_probs = classify_lcs(dfs, model_file, device)
preds_df = reformat_to_df(pred_probs, ids=ids_preds)
def reformat_to_df(pred_probs, ids=None):
""" Reformat SNN predictions to a DataFrame
# TO DO: suppport nb_inference != 1
"""
num_inference_samples = 1
d_series = {}
for i in range(pred_probs[0].shape[1]):
d_series["SNID"] = []
d_series[f"prob_class{i}"] = []
for idx, value in enumerate(pred_probs):
d_series["SNID"] += [ids[idx]] if len(ids) > 0 else idx
value = value.reshape((num_inference_samples, -1))
value_dim = value.shape[1]
for i in range(value_dim):
d_series[f"prob_class{i}"].append(value[:, i][0])
preds_df = pd.DataFrame.from_dict(d_series)
# get predicted class
preds_df["pred_class"] = np.argmax(pred_probs, axis=-1).reshape(-1)
return preds_df
Test the Cloud Function pieces locally
export GCP_PROJECT=$GOOGLE_CLOUD_PROJECT
export TESTID=False
export SURVEY=ztf
cd /Users/troyraen/Documents/broker/repo3/broker/cloud_functions/classify_snn
import main
from broker_utils import gcp_utils
msg = gcp_utils.pull_pubsub('ztf-alerts-reservoir', msg_only=False)[0]
alert_dict = data_utils.decode_alert(msg.data)
dropcols = ['cutoutScience', 'cutoutTemplate', 'cutoutDifference']
alert_dict = {k: v for k, v in alert_dict.items() if k not in dropcols}
snn_dict = main.classify_with_snn(alert_dict)
gcp_utils.publish_pubsub('test', {'alert': alert_dict, 'SuperNNova': snn_dict})
snn_msg = gcp_utils.pull_pubsub('test')[0]
gcp_utils.insert_rows_bigquery(bq_table, [snn_dict])
Classify known Ia
The docs indicate that “usually” class0 indicates a Ia and class1 indicates non-Ia, but this will depend on how the model was trained.
I (Troy) checked by classifying a recent observation of the known Ia that I recently pulled from file storage to send to Ella (SN_2021rhu aka ZTF21abiuvdk, ZTF21abiuvdk_lightcurve.png and compare with Alerce).
Results indicate the opposite of expected… SN_2021rhu is assigned to class1 with high confidence.
Adding this to my list of questions for Anais about the trained model.
import main
from broker_utils import data_utils, gcp_utils
snIa = 'ZTF21abiuvdk'
fname = '/Users/troyraen/Documents/broker/ella/avros/ZTF21abiuvdk.1664460940815015004.ztf_20210723_programid1.avro'
alert_dict = data_utils.decode_alert(fname)
snn_dict = main.classify_with_snn(alert_dict)
snn_dict
# output is:
{'objectId': 'ZTF21abiuvdk',
'candid': 1664460940815015004,
'prob_class0': 0.04458457976579666,
'prob_class1': 0.9554154872894287,
'pred_class': 1}
Local, full test
conda create -n snn python=3.7
conda activate snn
export 'GOOGLE_APPLICATION_CREDENTIALS=/Users/troyraen/Documents/broker/repo/GCP_auth_key.json'
export GCP_PROJECT='ardent-cycling-243415'
export TESTID='False'
export SURVEY='ztf'
cd '/Users/troyraen/Documents/broker/repo3/broker/cloud_functions/classify_snn'
pip install -r requirements.txt
pip install ipython
# cd '/Users/troyraen/Documents/broker/repo3/broker/broker_utils'
# python -m pip install -e .
ipython
from broker_utils import data_utils, gcp_utils
import main
# create alert as it would come from the ex-galac stream
msg = gcp_utils.pull_pubsub('ztf-alerts-reservoir')[0]
alert_dict_tmp = data_utils.decode_alert(msg)
dropcols = ['cutoutScience', 'cutoutTemplate', 'cutoutDifference']
alert_dict = {k: v for k, v in alert_dict_tmp.items() if k not in dropcols}
# publish it and pull it back down
gcp_utils.publish_pubsub('test', alert_dict)
msgs_exgalac = gcp_utils.pull_pubsub('test', msg_only=False)
msg_exgalac = msgs_exgalac[0].message
# run cloud function: classify, publish pubsub, load bigquery
main.run(msg_exgalac, {})
# check results
# pull pubsub
msg_snn = gcp_utils.pull_pubsub('ztf-SuperNNova-sub')[0]
snn_dicts = data_utils.decode_alert(msg_snn)
# query bigquery
candid = snn_dicts['alert']['candid']
query = (
f'SELECT * '
f'FROM `ardent-cycling-243415.ztf_alerts.SuperNNova` '
f'WHERE candid={candid} '
)
snn_queryjob = gcp_utils.query_bigquery(query)
for r, row in enumerate(snn_queryjob):
print(row)
This works.
Deploy Cloud Function
survey="ztf"
testid="False"
classify_snn_CF_name="${survey}-classify_with_SuperNNova"
classify_snn_trigger_topic="${survey}-exgalac_trans"
classify_snn_entry_point="run"
cd '/Users/troyraen/Documents/broker/repo3/broker/cloud_functions/classify_snn'
gcloud functions deploy "$classify_snn_CF_name" \
--entry-point "$classify_snn_entry_point" \
--runtime python37 \
--trigger-topic "$classify_snn_trigger_topic" \
--set-env-vars TESTID="$testid",SURVEY="$survey"
This errors out with:
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Build failed: Build error details not available.Please check the logs at https://console.cloud.google.com/cloud-build/builds;region=us-central1/d782dfbb-285d-44aa-a164-040e48660089?project=591409139500. Please visit https://cloud.google.com/functions/docs/troubleshooting#build for in-depth troubleshooting documentation for build related errors.
I cannot find any more specific info in the logs or at the provided links. The step that it fails on is called “uploading_python_pkg_layer”.
Verified I can successfully deploy a different cloud function.
survey="ztf"
testid="False"
classify_snn_CF_name="troy-test-check_cue_response"
classify_snn_trigger_topic="${survey}-exgalac_trans"
classify_snn_entry_point="run"
cd '/Users/troyraen/Documents/broker/repo3/broker/cloud_functions/check_cue_response'
gcloud functions deploy "$classify_snn_CF_name" \
--entry-point "$classify_snn_entry_point" \
--runtime python37 \
--trigger-topic "$classify_snn_trigger_topic" \
--set-env-vars TESTID="$testid",SURVEY="$survey"
Try a different deployment method… use Cloud Build directly.
gcloud builds submit --config \
/Users/troyraen/Documents/broker/repo3/version_tracking/v0.7.0/cloud_build.yaml \
/Users/troyraen/Documents/broker/repo3/broker/cloud_functions/classify_snn
Same result.
Comment everything out of the Cloud Function and put things back one by one until it fails.
Ok, the problem is torch
and the fact that Cloud Functions don’t have support for GPU libraries.
See here.
Fixed by providing a direct URL in requirements.txt.