Pub/Sub Streams¶
Learning Objectives:
Pub/Sub is an asynchronous, publish–subscribe messaging service. The Pitt-Google broker publishes alerts and value-added data to multiple topics (see here for a list). You can subscribe to one or more of these topics, and then pull and process the data either in real-time or up to 7 days after a message was published.
This tutorial demonstrates two methods: Python and the command line. The Python sections use the pgb-utils package, which contains example functions to complete these tasks. These example functions are thin wrappers for the Google API, which offers more options than those presented here. Interested users are encouraged to look at and extend the pgb-utils source code (see docs) to harness the full power of the Google API.
For more information, see:
Python client documentation:
google.cloud.pubsub (pgb-utils.pubsub contains thin wrappers for this API)
All Google APIs for Pub/Sub (many languages are available)
Prerequisites¶
Complete the Initial Setup. Be sure to:
set your environment variables
enable the Pub/Sub API
install the pgb-utils package if you want to use Python
install the CLI if you want to use the command line
Create a subscription¶
The code below will create a subscription in your GCP project that is attached to a topic in our project. This only needs to be done once per topic.
See Data Overview: Pub/Sub for a list of available topics. The code below subscribes the user to “ztf-loop”, a special stream intended for testing. We publish recent ZTF alerts to this topic at a constant rate of 1 per second, day and night.
After your subscription is created, messages we publish to the topic are immediately available in your subscription. They will remain there until they are either pulled and acknowledged or until they expire (7 days, max). Messages published before your subscription was created are not available.
You can also view and manage the subscriptions in your GCP project at any time from the web Console Subscriptions page (you may need to select your project from the dropdown at the top).
Method A: Python¶
import pgb_utils as pgb
# choose an existing Pitt-Google topic
topic_name = 'ztf-loop'
# name your subscription whatever you'd like
subscription_name = 'ztf-loop'
# create the subscription
subscription = pgb.pubsub.create_subscription(topic_name, subscription_name)
# you can look at the subscription object, but you don't need to do anything with it
For more information, view the docstring and source code for
pgb_utils.pgb_utils.pubsub.create_subscription()
.
Method B: Command line¶
# choose an existing Pitt-Google topic
topic_name="ztf-loop"
# name your subscription whatever you'd like
subscription_name="ztf-loop"
# create the subscription
gcloud pubsub subscriptions create $subscription_name \
--topic=$topic_name \
--topic-project="ardent-cycling-243415" # Pitt-Google project ID
Pull Messages¶
The code below pulls and acknowledges messages from a subscription.
Method A: Python¶
In Python you have the option to either (1) pull a fixed number of messages and then process them, or (2) pull and process messages continuously in streaming mode.
Pull a fixed number of messages¶
With this method, a fixed number (maximum) of messages are returned in a list. You can then process them however you’d like.
import pgb_utils as pgb
# pull and acknowledge messages
subscription_name = 'ztf-loop'
max_messages = 5
msgs = pgb.pubsub.pull(subscription_name, max_messages=max_messages)
# msgs is a list containing the alert data as bytes
# you can now process them however you'd like
# here we simply convert the first alert to an astropy table
table = pgb.pubsub.decode_message(msgs[0], return_alert_as='table')
For more information, view the docstring and source code for
pgb_utils.pgb_utils.pubsub.pull()
.
Pull messages in streaming mode¶
This method pulls, processes, and acknowledges messages continuously.
To use this method, we must first create a “callback” function that accepts a single message, processes the data according to the user’s desires, and then acknowledges the message. The message object is described here.
import pgb_utils as pgb
# create the callback function
def callback(message):
# extract the message data
alert = message.data # bytes
# process the message however you'd like
# here we simply convert it to a dataframe and print the 1st row
df = pgb.pubsub.decode_message(alert, return_alert_as='df')
print(df.head(1))
# acknowledge the message so it is not delivered again
message.ack()
# start streaming messages
subscription_name = 'ztf-loop'
pgb.pubsub.streamingPull(subscription_name, callback)
# use Control+C to cancel the streaming
For more information, view the docstring and source code for
pgb_utils.pgb_utils.pubsub.streamingPull()
.
Method B: Command line¶
This method returns a fixed number (maximum) of messages. See gcloud pubsub subscriptions pull (format options are listed here).
# set these parameters as desired
subscription_name="ztf-loop"
max_messages=5
format=json
# pull messages
gcloud pubsub subscriptions pull $subscription_name \
--limit $max_messages \
--format $format \
--auto-ack
Cleanup: Delete a subscription¶
If you are done with a subscription you can delete it.
Method A: Python¶
import pgb_utils as pgb
subscription_name = 'ztf-loop'
pgb.pubsub.delete_subscription(subscription_name)
For more information, view the docstring and source code for
pgb_utils.pgb_utils.pubsub.delete_subscription()
.
Method B: Command line¶
subscription_name="ztf-loop"
gcloud pubsub subscriptions delete $subscription_name