<a href="https://colab.research.google.com/github/mwvgroup/Pitt-Google-Broker/blob/v%2Ftjr%2F0.3.1/v0.3.1/setup_test_v031.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Dashboard

[Dashboard](https://console.cloud.google.com/monitoring/dashboards/builder/broker-instance-v031?project=ardent-cycling-243415&dashboardBuilderState=%257B%2522editModeEnabled%2522:false%257D&startTime=20210412T052643-04:00&endTime=20210412T073043-04:00) showing a test run of the `v031` broker instance.

# Setup

In [None]:
# Create a function to run and print a shell command.
def run(cmd: str):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

In [None]:
from google.colab import auth

In [None]:
auth.authenticate_user()

In [None]:
# install pgb
run('python3 -m pip install --index-url https://test.pypi.org/simple/ --no-deps pgb_utils')

# install dependencies
packages = ['apache-beam','google-apitools','APLpy',
            'beautifulsoup4==4.8',  # plot_cutouts grayscale stretch='arcsinh'
            'astropy-healpix==0.6',  # plot_cutouts grayscale stretch='arcsinh'
            'astropy==3.2.1',  # plot_cutouts grayscale stretch='arcsinh'
            ]
for package in packages:
    run(f'pip install --quiet {package}')

In [None]:
from google.cloud import bigquery
import pgb_utils as pgb

project = 'ardent-cycling-243415'
dataset = 'ztf_alerts'

In [None]:
# Connect your Google Drive file system
drive.mount('/content/drive')  # follow the instructions to authorize access
colabpath = '/content/drive/MyDrive/Colab\ Notebooks/PGB_dev'
colabpath_noesc = '/content/drive/MyDrive/Colab Notebooks/PGB_dev'
run(f'mkdir -p {colabpath}')

# Create broker instance Dashboards

- [Use the Dashboard API to build your own monitoring dashboard](https://cloud.google.com/blog/products/management-tools/cloud-monitoring-dashboards-using-an-api)
- [Managing dashboards by API](https://cloud.google.com/monitoring/dashboards/api-dashboard)

---

# DB Tables, Storage Buckets

## In `setup_broker.sh`, create BQ tables from json schemas

The current way we create tables for a testing instance does not work. The `alerts` table structure is wrong. Instead, let's get schema json files and use them to create the tables.

- [Copying a single source table](https://cloud.google.com/bigquery/docs/managing-tables#copying_a_single_source_table)

- download schemas using `bq show --schema --format=prettyjson ardent-cycling-243415:ztf_alerts.alerts > alerts_schema.json`, etc.
- create tables using `bq mk --table ardent-cycling-243415:ztf_alerts_<testid>.alerts alerts_schema.json`, etc.

## Create `ztf_alerts.DIASource` table

We need to provide the schema when creating the table. Easiest way is to 

~[create it from a query result](https://cloud.google.com/bigquery/docs/tables#creating_a_table_from_a_query_result) on the alerts table.~

~Taking the [candidate schema file from ZTF's repo](https://github.com/ZwickyTransientFacility/ztf-avro-alert/blob/master/schema/candidate.avsc) and altering it to get what we need~

Download the alerts table schema, then alter it.

Links:
- [Specifying a schema](https://cloud.google.com/bigquery/docs/schemas)

In [None]:
run('bq show \
--schema \
--format=prettyjson \
ardent-cycling-243415:ztf_alerts.alerts > alerts_schema.json')

- remove everything after the candidate record
- un-nest the candidate record and get rid of the main `RECORD` entry
- remove duplicate candid (save description)
- add `prv_candidates_candids` ~repeated field~ (let's not complicate things, just use a single string) field with type string
- save as `DIASource_schema.json`

In [None]:
run('bq mk --table ardent-cycling-243415:ztf_alerts.DIASource DIASource_schema.json')

---

## Make BigQuery dataset `ztf_alerts` publicly queryable

- go to the dataset in the Console
- click "Share Dataset"
- "allUsers" (should I instead used "allAuthenticatedUsers"? I don't understand why allUsers is an option.. you have to have authenticate your credentials to create a (python) client)
    - "BigQuery Data Viewer"
    - "BigQuery Metadata Viewer"

Links to more info:
- [Sharing a dataset with the public](https://cloud.google.com/bigquery/public-data#sharing_a_dataset_with_the_public)

## Making Cloud Storage bucket `ztf_alert_avros` public

- go to the bucket in the Console
- select "Uniform" access control
- "allUsers"
    - "Cloud Storage" -> "Storage Object Viewer"
    - "Cloud Storage Legacy" -> "Storage Legacy Bucket Reader"

Links to more info:
- [Making data public](https://cloud.google.com/storage/docs/access-control/making-data-public)
- [Understanding roles](https://cloud.google.com/iam/docs/understanding-roles)

## Make bucket `workshop_beam_test`

On the Console, create a bucket called `ardent-cycling-243415-workshop_beam_test`

Under permissions:
- "allUsers"
    - "Cloud Storage" -> "Storage Object Creator"
    - "Cloud Storage" -> "Storage Object Viewer"
    - "Cloud Storage Legacy" -> "Storage  Legacy Bucket Reader"
    - "Cloud Storage Legacy" -> "Storage Legacy Bucket Writer"

Under Lifecycle:
- set a rule to delete objects when they reach an age of 1 day.

In [None]:
from google.cloud import storage
from google.colab import auth

In [None]:
auth.authenticate_user()

In [None]:
project_id = 'ardent-cycling-243415'
bucket_name = f'{project_id}-workshop_beam_test'
# bucket = storage.Bucket(bucket_name)
storage_client = storage.Client(project_id)
storage_client.create_bucket(bucket_name)


In [None]:
ardent-cycling-243415-workshop_beam_test

# Test v0.3.1

- [Broker testing instance instructions](https://github.com/mwvgroup/Pitt-Google-Broker/blob/master/broker/README.md)

## Setup testing instance of broker

Run from command-line on my local machine:

Create a testing instance of the broker

```bash
git clone https://github.com/mwvgroup/Pitt-Google-Broker
cd Pitt-Google-Broker
git checkout v/tjr/0.3.1
cd broker/setup_broker

testid="v031"
./setup_broker.sh $testid
```

Stop the VMs so we can start the night

```bash
consumerVM="ztf-consumer-${testid}"
nconductVM="night-conductor-${testid}"
zone=us-central1-a
gcloud compute instances stop "$consumerVM" "$nconductVM" --zone="$zone"
```

Start the broker without the consumer

```bash
instancename="night-conductor-${testid}"
zone=us-central1-a
NIGHT="START"
KAFKA_TOPIC="NONE" # tell night-conductor to skip booting up consumer VM
gcloud compute instances add-metadata "$instancename" --zone="$zone" \
        --metadata NIGHT="$NIGHT",KAFKA_TOPIC="$KAFKA_TOPIC"
# night-conductor will get the testid by parsing its own instance name

gcloud compute instances start "$instancename" --zone "$zone"
```

## Test the broker using the Consumer Simulator

(Python)

In [None]:
import sys
path_to_dev_utils = '/Users/troyraen/Documents/PGB/repo2/dev_utils'
sys.path.append(path_to_dev_utils)

from consumer_sims import ztf_consumer_sim as zcs

In [None]:
testid = 'v031'
alertRate = (60, 'perMin')
runTime = (1, 'hr')

In [None]:
zcs.publish_stream(testid, alertRate, runTime=runTime, sub_id='ztf_alert_data-reservoir')

## Stop the testing instance of the broker

- trigger `night-conductor` to end the night

```bash
testid=v031
instancename="night-conductor-${testid}"
zone=us-central1-a
NIGHT=END
gcloud compute instances add-metadata "$instancename" --zone="$zone" \
      --metadata NIGHT="$NIGHT"
gcloud compute instances start "$instancename" --zone "$zone"
```

## Delete the testing instance of the broker

```bash
testid="v031"
teardown="True"
cd Pitt-Google-Broker/broker/setup_broker
./setup_broker.sh $testid $teardown
```

---

# Consumer simulator

---

# Dump here before trashing

In [None]:
columns = pgb.bigquery.get_history_column_names()
objectIds = ['ZTF18aczuwfe']
sql = pgb.bigquery.object_history_sql_statement(columns=columns, objectIds=objectIds)
# using this as a template. remove the GROUPBY and ARRAY_AGG
sql

'SELECT objectId, ARRAY_AGG(publisher ORDER BY candidate.jd) AS publisher, ARRAY_AGG(candid ORDER BY candidate.jd) AS candid, ARRAY_AGG(schemavsn ORDER BY candidate.jd) AS schemavsn, ARRAY_AGG(candidate.programpi ORDER BY candidate.jd) AS programpi, ARRAY_AGG(candidate.exptime ORDER BY candidate.jd) AS exptime, ARRAY_AGG(candidate.bimage ORDER BY candidate.jd) AS bimage, ARRAY_AGG(candidate.aimage ORDER BY candidate.jd) AS aimage, ARRAY_AGG(candidate.pid ORDER BY candidate.jd) AS pid, ARRAY_AGG(candidate.sigmapsf ORDER BY candidate.jd) AS sigmapsf, ARRAY_AGG(candidate.scorr ORDER BY candidate.jd) AS scorr, ARRAY_AGG(candidate.nmtchps ORDER BY candidate.jd) AS nmtchps, ARRAY_AGG(candidate.rbversion ORDER BY candidate.jd) AS rbversion, ARRAY_AGG(candidate.magdiff ORDER BY candidate.jd) AS magdiff, ARRAY_AGG(candidate.sgmag3 ORDER BY candidate.jd) AS sgmag3, ARRAY_AGG(candidate.diffmaglim ORDER BY candidate.jd) AS diffmaglim, ARRAY_AGG(candidate.sumrat ORDER BY candidate.jd) AS sumrat, AR

In [None]:
sql = 'SELECT objectId, ARRAY_AGG(publisher ORDER BY candidate.jd) AS publisher, ARRAY_AGG(candid ORDER BY candidate.jd) AS candid, ARRAY_AGG(schemavsn ORDER BY candidate.jd) AS schemavsn, ARRAY_AGG(candidate.programpi ORDER BY candidate.jd) AS programpi, ARRAY_AGG(candidate.exptime ORDER BY candidate.jd) AS exptime, ARRAY_AGG(candidate.bimage ORDER BY candidate.jd) AS bimage, ARRAY_AGG(candidate.aimage ORDER BY candidate.jd) AS aimage, ARRAY_AGG(candidate.pid ORDER BY candidate.jd) AS pid, ARRAY_AGG(candidate.sigmapsf ORDER BY candidate.jd) AS sigmapsf, ARRAY_AGG(candidate.scorr ORDER BY candidate.jd) AS scorr, ARRAY_AGG(candidate.nmtchps ORDER BY candidate.jd) AS nmtchps, ARRAY_AGG(candidate.rbversion ORDER BY candidate.jd) AS rbversion, ARRAY_AGG(candidate.magdiff ORDER BY candidate.jd) AS magdiff, ARRAY_AGG(candidate.sgmag3 ORDER BY candidate.jd) AS sgmag3, ARRAY_AGG(candidate.diffmaglim ORDER BY candidate.jd) AS diffmaglim, ARRAY_AGG(candidate.sumrat ORDER BY candidate.jd) AS sumrat, ARRAY_AGG(candidate.magpsf ORDER BY candidate.jd) AS magpsf, ARRAY_AGG(candidate.clrmed ORDER BY candidate.jd) AS clrmed, ARRAY_AGG(candidate.bimagerat ORDER BY candidate.jd) AS bimagerat, ARRAY_AGG(candidate.ndethist ORDER BY candidate.jd) AS ndethist, ARRAY_AGG(candidate.sgscore3 ORDER BY candidate.jd) AS sgscore3, ARRAY_AGG(candidate.nframesref ORDER BY candidate.jd) AS nframesref, ARRAY_AGG(candidate.nneg ORDER BY candidate.jd) AS nneg, ARRAY_AGG(candidate.ypos ORDER BY candidate.jd) AS ypos, ARRAY_AGG(candidate.ssmagnr ORDER BY candidate.jd) AS ssmagnr, ARRAY_AGG(candidate.nbad ORDER BY candidate.jd) AS nbad, ARRAY_AGG(candidate.ncovhist ORDER BY candidate.jd) AS ncovhist, ARRAY_AGG(candidate.classtar ORDER BY candidate.jd) AS classtar, ARRAY_AGG(candidate.szmag3 ORDER BY candidate.jd) AS szmag3, ARRAY_AGG(candidate.simag1 ORDER BY candidate.jd) AS simag1, ARRAY_AGG(candidate.magzpsciunc ORDER BY candidate.jd) AS magzpsciunc, ARRAY_AGG(candidate.sigmagap ORDER BY candidate.jd) AS sigmagap, ARRAY_AGG(candidate.nmatches ORDER BY candidate.jd) AS nmatches, ARRAY_AGG(candidate.fid ORDER BY candidate.jd) AS fid, ARRAY_AGG(candidate.field ORDER BY candidate.jd) AS field, ARRAY_AGG(candidate.jdstarthist ORDER BY candidate.jd) AS jdstarthist, ARRAY_AGG(candidate.rfid ORDER BY candidate.jd) AS rfid, ARRAY_AGG(candidate.magzpscirms ORDER BY candidate.jd) AS magzpscirms, ARRAY_AGG(candidate.neargaiabright ORDER BY candidate.jd) AS neargaiabright, ARRAY_AGG(candidate.neargaia ORDER BY candidate.jd) AS neargaia, ARRAY_AGG(candidate.jdstartref ORDER BY candidate.jd) AS jdstartref, ARRAY_AGG(candidate.sgscore2 ORDER BY candidate.jd) AS sgscore2, ARRAY_AGG(candidate.srmag2 ORDER BY candidate.jd) AS srmag2, ARRAY_AGG(candidate.zpmed ORDER BY candidate.jd) AS zpmed, ARRAY_AGG(candidate.maggaiabright ORDER BY candidate.jd) AS maggaiabright, ARRAY_AGG(candidate.srmag3 ORDER BY candidate.jd) AS srmag3, ARRAY_AGG(candidate.magzpsci ORDER BY candidate.jd) AS magzpsci, ARRAY_AGG(candidate.xpos ORDER BY candidate.jd) AS xpos, ARRAY_AGG(candidate.isdiffpos ORDER BY candidate.jd) AS isdiffpos, ARRAY_AGG(candidate.decnr ORDER BY candidate.jd) AS decnr, ARRAY_AGG(candidate.dsdiff ORDER BY candidate.jd) AS dsdiff, ARRAY_AGG(candidate.drb ORDER BY candidate.jd) AS drb, ARRAY_AGG(candidate.jd ORDER BY candidate.jd) AS jd, ARRAY_AGG(candidate.szmag2 ORDER BY candidate.jd) AS szmag2, ARRAY_AGG(candidate.sharpnr ORDER BY candidate.jd) AS sharpnr, ARRAY_AGG(candidate.szmag1 ORDER BY candidate.jd) AS szmag1, ARRAY_AGG(candidate.sky ORDER BY candidate.jd) AS sky, ARRAY_AGG(candidate.simag2 ORDER BY candidate.jd) AS simag2, ARRAY_AGG(candidate.clrrms ORDER BY candidate.jd) AS clrrms, ARRAY_AGG(candidate.objectidps2 ORDER BY candidate.jd) AS objectidps2, ARRAY_AGG(candidate.sgmag1 ORDER BY candidate.jd) AS sgmag1, ARRAY_AGG(candidate.magap ORDER BY candidate.jd) AS magap, ARRAY_AGG(candidate.objectidps1 ORDER BY candidate.jd) AS objectidps1, ARRAY_AGG(candidate.ssdistnr ORDER BY candidate.jd) AS ssdistnr, ARRAY_AGG(candidate.seeratio ORDER BY candidate.jd) AS seeratio, ARRAY_AGG(candidate.distpsnr3 ORDER BY candidate.jd) AS distpsnr3, ARRAY_AGG(candidate.ra ORDER BY candidate.jd) AS ra, ARRAY_AGG(candidate.chipsf ORDER BY candidate.jd) AS chipsf, ARRAY_AGG(candidate.dsnrms ORDER BY candidate.jd) AS dsnrms, ARRAY_AGG(candidate.simag3 ORDER BY candidate.jd) AS simag3, ARRAY_AGG(candidate.zpclrcov ORDER BY candidate.jd) AS zpclrcov, ARRAY_AGG(candidate.fwhm ORDER BY candidate.jd) AS fwhm, ARRAY_AGG(candidate.ranr ORDER BY candidate.jd) AS ranr, ARRAY_AGG(candidate.mindtoedge ORDER BY candidate.jd) AS mindtoedge, ARRAY_AGG(candidate.tblid ORDER BY candidate.jd) AS tblid, ARRAY_AGG(candidate.chinr ORDER BY candidate.jd) AS chinr, ARRAY_AGG(candidate.objectidps3 ORDER BY candidate.jd) AS objectidps3, ARRAY_AGG(candidate.maggaia ORDER BY candidate.jd) AS maggaia, ARRAY_AGG(candidate.distpsnr1 ORDER BY candidate.jd) AS distpsnr1, ARRAY_AGG(candidate.pdiffimfilename ORDER BY candidate.jd) AS pdiffimfilename, ARRAY_AGG(candidate.programid ORDER BY candidate.jd) AS programid, ARRAY_AGG(candidate.rcid ORDER BY candidate.jd) AS rcid, ARRAY_AGG(candidate.clrcounc ORDER BY candidate.jd) AS clrcounc, ARRAY_AGG(candidate.distnr ORDER BY candidate.jd) AS distnr, ARRAY_AGG(candidate.magnr ORDER BY candidate.jd) AS magnr, ARRAY_AGG(candidate.dec ORDER BY candidate.jd) AS dec, ARRAY_AGG(candidate.jdendhist ORDER BY candidate.jd) AS jdendhist, ARRAY_AGG(candidate.sigmagapbig ORDER BY candidate.jd) AS sigmagapbig, ARRAY_AGG(candidate.distpsnr2 ORDER BY candidate.jd) AS distpsnr2, ARRAY_AGG(candidate.ssnamenr ORDER BY candidate.jd) AS ssnamenr, ARRAY_AGG(candidate.magapbig ORDER BY candidate.jd) AS magapbig, ARRAY_AGG(candidate.elong ORDER BY candidate.jd) AS elong, ARRAY_AGG(candidate.jdendref ORDER BY candidate.jd) AS jdendref, ARRAY_AGG(candidate.sgscore1 ORDER BY candidate.jd) AS sgscore1, ARRAY_AGG(candidate.nid ORDER BY candidate.jd) AS nid, ARRAY_AGG(candidate.srmag1 ORDER BY candidate.jd) AS srmag1, ARRAY_AGG(candidate.drbversion ORDER BY candidate.jd) AS drbversion, ARRAY_AGG(candidate.aimagerat ORDER BY candidate.jd) AS aimagerat, ARRAY_AGG(candidate.magfromlim ORDER BY candidate.jd) AS magfromlim, ARRAY_AGG(candidate.clrcoeff ORDER BY candidate.jd) AS clrcoeff, ARRAY_AGG(candidate.ssnrms ORDER BY candidate.jd) AS ssnrms, ARRAY_AGG(candidate.sgmag2 ORDER BY candidate.jd) AS sgmag2, ARRAY_AGG(candidate.sigmagnr ORDER BY candidate.jd) AS sigmagnr, ARRAY_AGG(candidate.rb ORDER BY candidate.jd) AS rb, ARRAY_AGG(candidate.tooflag ORDER BY candidate.jd) AS tooflag FROM `ardent-cycling-243415.ztf_alerts.alerts` WHERE objectId IN ("ZTF18aczuwfe")'

In [None]:
client = bigquery.Client(project=project)

new_table = f'{project}.{dataset}.history'
job_config = bigquery.QueryJobConfig(destination=new_table)



# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config)  # Make an API request.
query_job.result()  # Wait for the job to complete.

print("Query results loaded to the table {}".format(table_id))

In [None]:
from pathlib import Path
inpath = Path('ztf_v3.3.pkl')
with inpath.open('rb') as infile:
    valid_schema = pickle.load(infile)

In [None]:
import json
outpath = Path('history_schema.json')
with outpath.open('w') as outfile:
    json.dump(valid_schema['fields'], outfile)