docs/source/working-notes/troyraen/stream-looper/README.md

Streaming Stream Looper

VM Setup

  1. Define configs

    vmname="stream-looper"
    vmtype="f1-micro"
    
    brokerbucket="avid-heading-329016-generic-broker_files"
    bucketdir="/stream-looper/"
    workingdir="/home${bucketdir}"
    looperscript="streaming_stream_looper.py"
    # looperscript="$(cat ${looperscriptpy})"
    
    installscript="""#! /bin/bash
    apt-get update
    apt-get install -y python3-pip screen
    pip3 install google-cloud-pubsub google-cloud-logging ipython
    gsutil cp "gs://${brokerbucket}${bucketdir}${looperscript}" "${workingdir}${looperscript}"
    chmod 755 "${workingdir}${looperscript}"
    echo 'Install complete! Shutting down...'
    shutdown -h now
    """
    
    # echo "$(cat ${looperscript})" > "${workingdir}${looperscript}"
    
    # install anaconda and create a python3.7 env
    # cd /tmp
    # dist='Anaconda3-2022.05-Linux-x86_64.sh'
    # curl -O "https://repo.anaconda.com/archive/${dist}"
    # bash "${dist}" -b
    # conda create -n broker python=3.7
    # conda activate broker
    # pip3 install ipython pgb-broker-utils
    
  2. Upload the file streaming_stream_looper.py to the generic broker bucket so that the VM can download it. Run this command from inside the same directory this README is in:

    gsutil cp "${looperscript}" "gs://${brokerbucket}${bucketdir}"
    # gcloud compute scp  "${vmname}:~/${looperscript}"
    # gcloud compute ssh "${vmname}" "sudo chmod 744 ${workingdir}${looperscript}"
    

    (Note: Once pittgoogle-client #6 is complete, much of the logic in this file can be replaced with the use of a pubsub.Consumer.

  3. Create the machine and install stuff:

    gcloud compute instances create "${vmname}" \
        --scopes=cloud-platform \
        --metadata=google-logging-enabled=true,startup-script="${installscript}"
    

    The machine will shut itself down when the installs are done.

    After it shuts down, complete the remaining steps in this section.

  4. Unset the startup script so the installs don’t run again:

    gcloud compute instances add-metadata "${vmname}" \
        --metadata=startup-script=""
    
  5. The looper script can run on a much smaller machine than is required for the install script. Change it now:

    gcloud compute instances set-machine-type "${vmname}" \
        --machine-type "${vmtype}"
    

ZTF Stream

Set a startup script to execute the python file in a background thread:

(This actually fails to load google.cloud.logging, but using the debug instructions below works fine.)

startupscript="""#! /bin/bash
dir=/home/consumer_sim/
fname=streaming_stream_looper
nohup python3 "${dir}${fname}.py" >> "${dir}${fname}.out" 2>&1 &
"""

gcloud compute instances add-metadata stream-looper --metadata=startup-script="$startupscript"

gcloud compute instances start stream-looper

Run this in screen to debug

from streaming_stream_looper import StreamLooper

TOPIC_NAME = "ztf-loop"
SUBSCRIPTION_NAME = "ztf-alerts-reservoir"
looper = StreamLooper(TOPIC_NAME, SUBSCRIPTION_NAME)
looper.run_looper()

ELASTICC Stream

Log in to the stream-looper VM and run the looper in a screen:

# Note:
# to detach from the screen, use: "Ctrl+a" then "d"
# to reattach to the screen, use: `screen -r elasticc`

# Note:
# reset the following variables using values at the top of this README:
# bucketdir, workingdir

Incoming alerts

gcloud compute ssh "${vmname}"

screen -S elasticc

cd "${workingdir}"

ipython
from streaming_stream_looper import StreamLooper


subscrip = "elasticc-loop"
topic = "elasticc-loop"
project_id = "avid-heading-329016"

StreamLooper(topic, subscrip, project_id).run_looper()

Outgoing classifications

gcloud compute ssh "${vmname}"

screen -S classifications

cd "${workingdir}"

fname=brokerClassification.avsc
curl -L -o $fname \
   "https://raw.githubusercontent.com/LSSTDESC/elasticc/main/alert_schema/elasticc.v0_9_1.${fname}"

pip3 install fastavro

ipython
import io
import fastavro
from datetime import datetime, timedelta, timezone
from google.cloud import pubsub_v1
from random import random
from time import sleep


client = pubsub_v1.PublisherClient()
topic_name = "classifications-loop"
project_id = "avid-heading-329016"
topic_path = f"projects/{project_id}/topics/{topic_name}"
schema_out = fastavro.schema.load_schema("brokerClassification.avsc")


while True:
    # mocking some attributes as done in broker_utils.testing.Mock
    # (don't want to install broker-utils just for this)
    now = datetime.now(timezone.utc)

    msg = {
        "alertId": int(random() * 1e10),
        "diaSourceId": int(random() * 1e6),
        # create int POSIX timestamps in milliseconds
        "elasticcPublishTimestamp": int(now.timestamp() * 1e3),
        "brokerIngestTimestamp": int((now + timedelta(seconds=0.345678)).timestamp() * 1e3),
        "brokerName": "Pitt-Google Broker",
        "brokerVersion": "v0.6",
        "classifierName": "SuperNNova_v1.3",
        "classifierParams": "",
        "classifications": [{"classId": 111, "probability": random(),}],
    }

    # avro serialize the dictionary
    fout = io.BytesIO()
    fastavro.schemaless_writer(fout, schema_out, msg)
    fout.seek(0)
    avro = fout.getvalue()

    # read in to make sure we got it right
    # from io import BytesIO
    # with BytesIO(avro) as fin:
    #    class_dict = fastavro.schemaless_reader(fin, schema_out)
    # print(class_dict)

    # publish
    future = client.publish(topic_path, avro)
    future.result()  # blocks until published. raises an exception if publish fails

    sleep(1)