Update the ZTF production broker to current¶
Functionality of current production broker should be the same as current repo code, but some of the resource names (e.g., pubsub) have not yet been updated to the new syntax rules. Doing that now.
[x] pull down current broker_files bucket ~in case we need a roll-back~ (the updated broker will use a new bucket called ztf-broker_files, so the old one will not be overwritten)
[x] check out repo master branch (which is currently v0.5.0)
[x] run setup_broker.sh, except VM install (night-conductor will be renamed and needs to be rebuilt; ztf-consumer does not need to be changed)
[x] push enough alerts through to check everything (salt2 will require the most alerts to produce a successful fit)
[x] check. view dashboard, then:
[x] pubsub - topic and subscription names will change. pull alerts from the “counter” subscriptions
[x] file storage (avro and salt2 bucket names do not change).
[x] bigquery (dataset and table names do not change), check that alerts made it into each table.
[x] cloud function ps_to_gcs will be updated with a new name and to use broker_utils
run setup_broker.sh¶
Run the setup script:
survey=ztf
testid=False
teardown=False
fout=~/Documents/broker/repo2/version_tracking/v0.6.0/update-production.out
cd ~/Documents/broker/repo3/broker/setup_broker
# comment out the vm setup, then
./setup_broker.sh $testid $teardown $survey 2>&1 | tee $fout
broker-utils had a bug in getting the broker bucket name for schema maps. Fixed it; creating cloud functions again:
fout=~/Documents/broker/repo2/version_tracking/v0.6.0/update-production-CF-retry.out
./deploy_cloud_fncs.sh "$testid" "$teardown" "$survey" 2>&1 | tee $fout
night-conductor’s name changed, so need to recreate it
fout=~/Documents/broker/repo2/version_tracking/v0.6.0/update-production-NC-create.out
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
broker_bucket="${PROJECT_ID}-${survey}-broker_files"
# comment out consumer VM setup, then
./create_vms.sh "$broker_bucket" "$testid" "$teardown" "$survey" 2>&1 | tee $fout
push some alerts through¶
Start the broker without the consumer
survey="ztf"
topic="${survey}-cue_night_conductor"
cue=START
attr=KAFKA_TOPIC=NONE
gcloud pubsub topics publish "$topic" --message="$cue" --attribute="$attr"
Run a consumer simulator
from broker_utils import consumer_sim as bcs
alert_rate = (1, 'perSec')
kwargs = {
'topic_id': 'ztf-alerts',
'sub_id': 'ztf-alerts-reservoir',
'runtime': (2, 'min')
}
bcs.publish_stream(alert_rate, **kwargs)
Shutdown the broker
# end the night
cue=END
gcloud pubsub topics publish "$topic" --message="$cue"
pull the alerts and make sure they are as expected¶
from google.cloud import storage
from pgb_utils import pubsub as ps
from setup_gcp import _resources
survey = 'ztf'
testid = False
topics = _resources('PS', survey=survey, testid=testid)
print(topics) # {'<topic_name>': ['<subscription_name>', ]}
afmt = 'table' # tried all three options and they all work
subscription_name = "ztf-alerts-counter" ###
msg_a = ps.pull(subscription_name)[0]
alert_a = ps.decode_message(msg_a, return_alert_as=afmt)
subscription_name = "ztf-alerts_pure-counter" ###
msg_p = ps.pull(subscription_name)[0]
alert_p = ps.decode_message(msg_p, return_alert_as=afmt)
subscription_name = "ztf-exgalac_trans-counter" ###
msg_et = ps.pull(subscription_name)[0]
alert_et = ps.decode_message(msg_et, return_alert_as=afmt)
subscription_name = "ztf-salt2-counter" ###
msg_s2 = ps.pull(subscription_name)[0]
alert_s2, s2_dict = ps.decode_message(msg_s2, return_alert_as=afmt)
subscription_name = "ztf-alert_avros-counter"
msg_aa = ps.pull(subscription_name, msg_only=False)[0]
# download the file
bucket_name = msg_aa.message.attributes['bucketId']
fname = msg_aa.message.attributes['objectId']
# ZTF18acegotq.1680242110915010008.ztf_20210808_programid1.avro
fout = f'/Users/troyraen/Documents/broker/repo2/version_tracking/v0.6.0/{fname}'
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(fname)
blob.download_to_filename(fout)
# plot stuff
with open(fout, 'rb') as fin:
alert_list = [r for r in fastavro.reader(fin)]
pgb.figures.plot_lightcurve_cutouts(alert_list[0])
# lightcurves plot fine
# cutouts throw this error
# OSError: No SIMPLE card found, this file does not appear to be a valid FITS file
# opened issue #68