docs/source/working-notes/troyraen/stream-looper/README.md
Streaming Stream Looper
VM Setup
Define configs
vmname="stream-looper" vmtype="f1-micro" brokerbucket="avid-heading-329016-generic-broker_files" bucketdir="/stream-looper/" workingdir="/home${bucketdir}" looperscript="streaming_stream_looper.py" # looperscript="$(cat ${looperscriptpy})" installscript="""#! /bin/bash apt-get update apt-get install -y python3-pip screen pip3 install google-cloud-pubsub google-cloud-logging ipython gsutil cp "gs://${brokerbucket}${bucketdir}${looperscript}" "${workingdir}${looperscript}" chmod 755 "${workingdir}${looperscript}" echo 'Install complete! Shutting down...' shutdown -h now """ # echo "$(cat ${looperscript})" > "${workingdir}${looperscript}" # install anaconda and create a python3.7 env # cd /tmp # dist='Anaconda3-2022.05-Linux-x86_64.sh' # curl -O "https://repo.anaconda.com/archive/${dist}" # bash "${dist}" -b # conda create -n broker python=3.7 # conda activate broker # pip3 install ipython pgb-broker-utils
Upload the file streaming_stream_looper.py to the generic broker bucket so that the VM can download it. Run this command from inside the same directory this README is in:
gsutil cp "${looperscript}" "gs://${brokerbucket}${bucketdir}" # gcloud compute scp "${vmname}:~/${looperscript}" # gcloud compute ssh "${vmname}" "sudo chmod 744 ${workingdir}${looperscript}"
(Note: Once pittgoogle-client #6 is complete, much of the logic in this file can be replaced with the use of a
pubsub.Consumer
.Create the machine and install stuff:
gcloud compute instances create "${vmname}" \ --scopes=cloud-platform \ --metadata=google-logging-enabled=true,startup-script="${installscript}"
The machine will shut itself down when the installs are done.
After it shuts down, complete the remaining steps in this section.
Unset the startup script so the installs don’t run again:
gcloud compute instances add-metadata "${vmname}" \ --metadata=startup-script=""
The looper script can run on a much smaller machine than is required for the install script. Change it now:
gcloud compute instances set-machine-type "${vmname}" \ --machine-type "${vmtype}"
ZTF Stream
Set a startup script to execute the python file in a background thread:
(This actually fails to load google.cloud.logging
, but using the debug instructions below works fine.)
startupscript="""#! /bin/bash
dir=/home/consumer_sim/
fname=streaming_stream_looper
nohup python3 "${dir}${fname}.py" >> "${dir}${fname}.out" 2>&1 &
"""
gcloud compute instances add-metadata stream-looper --metadata=startup-script="$startupscript"
gcloud compute instances start stream-looper
Run this in screen
to debug
from streaming_stream_looper import StreamLooper
TOPIC_NAME = "ztf-loop"
SUBSCRIPTION_NAME = "ztf-alerts-reservoir"
looper = StreamLooper(TOPIC_NAME, SUBSCRIPTION_NAME)
looper.run_looper()
ELASTICC Stream
Log in to the stream-looper VM and run the looper in a screen:
# Note:
# to detach from the screen, use: "Ctrl+a" then "d"
# to reattach to the screen, use: `screen -r elasticc`
# Note:
# reset the following variables using values at the top of this README:
# bucketdir, workingdir
Incoming alerts
gcloud compute ssh "${vmname}"
screen -S elasticc
cd "${workingdir}"
ipython
from streaming_stream_looper import StreamLooper
subscrip = "elasticc-loop"
topic = "elasticc-loop"
project_id = "avid-heading-329016"
StreamLooper(topic, subscrip, project_id).run_looper()
Outgoing classifications
gcloud compute ssh "${vmname}"
screen -S classifications
cd "${workingdir}"
fname=brokerClassification.avsc
curl -L -o $fname \
"https://raw.githubusercontent.com/LSSTDESC/elasticc/main/alert_schema/elasticc.v0_9_1.${fname}"
pip3 install fastavro
ipython
import io
import fastavro
from datetime import datetime, timedelta, timezone
from google.cloud import pubsub_v1
from random import random
from time import sleep
client = pubsub_v1.PublisherClient()
topic_name = "classifications-loop"
project_id = "avid-heading-329016"
topic_path = f"projects/{project_id}/topics/{topic_name}"
schema_out = fastavro.schema.load_schema("brokerClassification.avsc")
while True:
# mocking some attributes as done in broker_utils.testing.Mock
# (don't want to install broker-utils just for this)
now = datetime.now(timezone.utc)
msg = {
"alertId": int(random() * 1e10),
"diaSourceId": int(random() * 1e6),
# create int POSIX timestamps in milliseconds
"elasticcPublishTimestamp": int(now.timestamp() * 1e3),
"brokerIngestTimestamp": int((now + timedelta(seconds=0.345678)).timestamp() * 1e3),
"brokerName": "Pitt-Google Broker",
"brokerVersion": "v0.6",
"classifierName": "SuperNNova_v1.3",
"classifierParams": "",
"classifications": [{"classId": 111, "probability": random(),}],
}
# avro serialize the dictionary
fout = io.BytesIO()
fastavro.schemaless_writer(fout, schema_out, msg)
fout.seek(0)
avro = fout.getvalue()
# read in to make sure we got it right
# from io import BytesIO
# with BytesIO(avro) as fin:
# class_dict = fastavro.schemaless_reader(fin, schema_out)
# print(class_dict)
# publish
future = client.publish(topic_path, avro)
future.result() # blocks until published. raises an exception if publish fails
sleep(1)