Source code for broker.broker_utils.broker_utils.consumer_sim

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
""" Simulate the consumer by publishing alerts to a Pub/Sub topic.
"""

import os
import sys
import time
from typing import Optional, Tuple, Union

from google.cloud import pubsub_v1

# get project id from environment variable, else default to production project
# cloud functions use GCP_PROJECT
if "GCP_PROJECT" in os.environ:
    PROJECT_ID = os.getenv("GCP_PROJECT")
else:
    PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT", "ardent-cycling-243415")


[docs]def publish_stream( alert_rate: Union[Tuple[int, str], str], instance: Optional[Tuple[str, str]] = None, runtime: Optional[Tuple[int, str]] = None, publish_batch_every: Tuple[int, str] = (5, 'sec'), sub_id: Optional[str] = None, topic_id: Optional[str] = None, nack: bool = False, auto_confirm: bool = False ): """Pulls messages from from a Pub/Sub subscription determined by either `instance` or `sub_id`, and publishes them to a topic determined by either `instance` or `topic_id`. Specific options are described in the docs. Args: alert_rate: Desired rate at which alerts will be published. instance: (survey, testid). Keywords of the broker instance. Used to determine the subscription and topic. If `None`, `sub_id` and `topic_id` must be valid names. If both `instance` and `sub_id`/`topic_id` are passed, `sub_id`/`topic_id` will prevail. runtime: Desired length of time simulator runs for. publish_batch_every: Simulator will sleep for this amount of time between batches sub_id: Name of the Pub/Sub subscription from which to pull alerts. If `None`, `instance` must contain valid keywords, and then the production instance reservoir '{survey}-alerts-reservoir' will be used. topic_id: Name of the Pub/Sub topic to which alerts will be published. If `None`, `instance` must contain valid keywords, and then the topic '{survey}-alerts-{testid}' will be used. nack: Whether to "nack" (not acknowledge) the messages. If `True`, messages are published to the topic, but they are not dropped from the subscription and so will be delivered again at an arbitrary time in the future. auto_confirm: Whether to automatically answer "Y" to the confirmation prompt. """ pbeN, pbeU = publish_batch_every # shorthand # get number of alerts to publish per batch alerts_per_batch, aRate_tuple = _get_number_alerts_per_batch(alert_rate, publish_batch_every) # int, tuple # get number of batches to run Nbatches = _convert_runtime_to_Nbatches(runtime, publish_batch_every, aRate_tuple[1]) # tell the user about the rates print(f"\nReceived desired alert_rate={aRate_tuple}, runtime={runtime}.") print( f"\nPublishing:\n\t{Nbatches} batches\n\teach with {alerts_per_batch} alerts\n\tat a rate of 1 batch per {pbeN} {pbeU} (plus processing time)\n\tfor a total of {Nbatches * alerts_per_batch} alerts") # publish the stream _do_publish_stream(instance, alerts_per_batch, Nbatches, publish_batch_every, sub_id, topic_id, nack, auto_confirm)
def _do_publish_stream( instance, alerts_per_batch, Nbatches, publish_batch_every, sub_id=None, topic_id=None, nack=False, auto_confirm=False ): # check units if publish_batch_every[1] != 'sec': raise ValueError("Units of publish_batch_every must = 'sec'.") # setup for subscription pulls and publishing subscriber, sub_path, request = _setup_subscribe(alerts_per_batch, instance, sub_id) publisher, topic_path = _setup_publish(alerts_per_batch, instance, topic_id) print(f"\nThis will\n\tPull from subscription: {sub_path}") print(f"and\n\tPublish to topic: {topic_path}\n") # make the user confirm _user_confirm(auto_confirm) print(f"\nPublishing...") b = 0 while b < Nbatches: # get alerts from reservoir # response = subscriber.pull(request=request) # for pubsub2+ response = subscriber.pull(request) # publish alerts to topic, raise exception on failure _publish_received_messages(publisher, topic_path, response) # ack or nack subscription messages ack_ids = [msg.ack_id for msg in response.received_messages] _handle_acks(subscriber, sub_path, ack_ids, nack) # increment and sleep between batches b = b + 1 time.sleep(publish_batch_every[0]) def _user_confirm(auto_confirm=False): if not auto_confirm: cont = input('Continue? [Y/n]: ') or 'Y' if cont not in ['Y', 'y']: sys.exit('Exiting consumer simulator.') def _setup_subscribe(alerts_per_batch, instance=None, sub_id=None): if (instance is None) and (sub_id is None): raise ValueError('Must provide either `instance` or `sub_id`.') if sub_id is None: # use the reservoir of the production instance for this survey survey = instance[0] sub_id = f'{survey}-alerts-reservoir' subscriber = pubsub_v1.SubscriberClient() sub_path = subscriber.subscription_path(PROJECT_ID, sub_id) request = { "subscription": sub_path, "max_messages": alerts_per_batch, } return (subscriber, sub_path, request) def _setup_publish(alerts_per_batch, instance=None, topic_id=None): if (instance is None) and (topic_id is None): raise ValueError('Must provide either `instance` or `topic_id`.') if topic_id is None: survey, testid = instance topic_id = f'{survey}-alerts-{testid}' # calls to publish are batched automatically # let's try to get all alerts into 1 publisher batch batch_settings = pubsub_v1.types.BatchSettings(max_messages=alerts_per_batch) # some default batch settings to be aware of: # max_messages = 100 # max_bytes = 1 MB # max_latency = 10 ms publisher = pubsub_v1.PublisherClient(batch_settings) topic_path = publisher.topic_path(PROJECT_ID, topic_id) return (publisher, topic_path) def _publish_received_messages(publisher, topic_path, sub_response): """ Iterate through messages and publish them, along with their attributes, to the Pub/Sub topic. """ for msg in sub_response.received_messages: # print("Received message:", msg.message.data) attrs = msg.message.attributes # pass msg attributes through future = publisher.publish(topic_path, msg.message.data, **attrs) # non blocking future.add_done_callback(_callback) # check for errors in a separate thread def _handle_acks(subscriber, sub_path, ack_ids=[], nack=False): """ If nack is False, acknowledge messages, else nack them so they stay in the reservoir. """ if not nack: request = { "subscription": sub_path, "ack_ids": ack_ids, } subscriber.acknowledge(request) else: request = { "subscription": sub_path, "ack_ids": ack_ids, "ack_deadline_seconds": 0, } subscriber.modify_ack_deadline(request) def _callback(future): # Publishing failures are automatically retried, except for errors that do not warrant retries. message_id = future.result() # raises an exception if publish ultimately failed # print(message_id) def _convert_runtime_to_Nbatches(runtime, publish_batch_every, aRate_unit): if aRate_unit == 'once': Nbatches = 1 elif type(runtime) != tuple: msg = "runtime must be given as a tuple, unless the alert_rate units are set to 'once'" raise ValueError(msg) else: pbeN, pbeU = publish_batch_every # shorthands publish_runtime = _convert_time_to_publish_unit(runtime, pbeU) # int Nbatches = _convert_publish_runtime_to_Nbatches(publish_runtime, pbeN) # int return Nbatches def _convert_publish_runtime_to_Nbatches(publish_runtime, publish_batch_every_N): """ Args: publish_runtime (int or float): runtime requested by user, in units used by the publisher publish_batch_every_N (int): publisher will use a rate of 1 batch per publish_batch_every_N. Should be in the same units as publish_runtime. Returns: Nbatches (int): number of batches the publisher should complete, rounded to an int """ Nbatches = publish_runtime * (1 / publish_batch_every_N) return int(Nbatches) def _convert_time_to_publish_unit(runtime, publish_unit): """ Args: runtime (int, str): (number, unit) to convert to publish_unit publish_unit (str): time units consumer will use for publish rate Returns: publish_runtime (int): runtime converted to publish_unit """ rtN, rtU = runtime # convert the time to (N, 'sec') if publish_unit == 'sec': if rtU == 'sec': publish_runtime = rtN elif rtU == 'min': publish_runtime = rtN * 60 elif rtU == 'hr': publish_runtime = rtN * 3600 elif rtU == 'night': publish_runtime = rtN * 10 * 3600 else: msg = f"received publish_units='{publish_unit}', but only configured for publish_unit='sec'" raise ValueError(msg) return publish_runtime def _get_number_alerts_per_batch(alert_rate, publish_batch_every): """ Find the number of alerts to publish per batch """ aRate_tuple = _convert_rate_to_tuple(alert_rate) # tuple # if user requested 1 batch, return their number if aRate_tuple[1] == 'once': alerts_per_batch = aRate_tuple[0] # else convert to batch rate else: pbeN, pbeU = publish_batch_every avg_publish_rate = _convert_rate_to_publish_unit(aRate_tuple, publish_unit=pbeU) # float alerts_per_batch = _get_number_alerts_per_batch_from_avg(avg_publish_rate, pbeN) # int, per batch return (alerts_per_batch, aRate_tuple) def _get_number_alerts_per_batch_from_avg(avg_publish_rate, publish_interval): return int(avg_publish_rate * publish_interval) def _convert_rate_to_publish_unit(alert_rate, publish_unit='sec'): """ Args: alert_rate (int, str): (number, unit) to convert to publish_unit publish_unit (str): time units consumer will use for publish rate Returns: alerts_per_unit (float): alert_rate in units per publish_unit """ arN, arU = alert_rate # convert the rate to (N, 'perSec') if publish_unit == 'sec': if arU == 'perSec': alerts_per_unit = arN if arU == 'perMin': alerts_per_unit = arN / 60 if arU == 'perHr': alerts_per_unit = arN / 3600 if arU == 'perNight': alerts_per_unit = arN / 10 / 3600 else: msg = f"received publish_units='{publish_unit}', but only configured for publish_unit='sec'" raise ValueError(msg) return alerts_per_unit def _convert_rate_to_tuple(alert_rate): if type(alert_rate) == tuple: # type == tuple, so just return it aRate = alert_rate elif type(alert_rate) == str: aRate = _convert_rate_string_to_tuple(alert_rate) # returns tuple or raises ValueError else: msg = 'alert_rate must be a tuple or a string' raise ValueError(msg) return aRate def _convert_rate_string_to_tuple(alert_rate): # convert strings to tuples if alert_rate == 'ztf-active-avg': aRate = (300000, 'perNight') elif alert_rate == 'ztf-live-max': aRate = (200, 'perSec') else: msg = f"'ztf-active-avg' and 'ztf-live-max' are the only strings currently configured for the alert_rate" raise ValueError(msg) return aRate