Pub/Sub Streams¶
Learning Objectives:
This tutorial covers subscribing to our Pub/Sub streams and pulling messages. It demonstrates two methods: Python using the pgb-utils package; and the command line using Google’s gcloud CLI.
For more information, see:
All Google APIs and references (many languages are available)
Python client documentation:
google.cloud.pubsub (pgb-utils contains thin wrappers for this API)
Prerequisites¶
Complete the Initial Setup. Be sure to:
set your environment variables
enable the Pub/Sub API
install the pgb-utils package if you want to use Python
install the CLI if you want to use the command line
Create a subscription¶
The code below will create a Pub/Sub subscription in your GCP project that is attached to a topic in our project. This only needs to be done once per desired topic.
See Data Overview: Pub/Sub for a list of available topics.
After your subscription is created, messages we publish to the topic are immediately available in your subscription. They will remain there until they are either pulled and acknowledged or until they expire (7 days, max). Messages published before your subscription was created are not available.
You can also view and manage the subscriptions in your GCP project at any time from the web Console Subscriptions page (you may need to select your project from the dropdown at the top).
Method A: Python
import pgb_utils as pgb
# choose an existing Pitt-Google topic
topic_name = 'ztf-loop'
# name your subscription whatever you'd like
subscription_name = 'ztf-loop'
# create the subscription
subscription = pgb.pubsub.subscribe(topic)
Method B: Command line
# choose an existing Pitt-Google topic
TOPIC_NAME="ztf-loop"
# name your subscription whatever you'd like
SUBSCRIPTION_NAME="ztf-loop"
# create the subscription
gcloud pubsub subscriptions create $SUBSCRIPTION_NAME \
--topic=$TOPIC_NAME \
--topic-project="ardent-cycling-243415" # Pitt-Google project ID
Pull Messages¶
Method A: Python
Two options:
Pull a fixed number of messages. Useful for testing.
import pgb_utils as pgb
# pull messages
subscription_name = 'ztf-loop'
max_messages = 5
msgs = pgb.pubsub.pull(subscription_name, max_messages=max_messages) # list[bytes,]
# now process the messages. for example:
# convert the bytes to a pandas dataframe
df = pgb.pubsub.decode_ztf_alert(msgs[0], return_format='df')
Pull messages continuously.
import pgb_utils as pgb
# create a function that executes your processing logic
# and then acknowledge the message
def callback(message):
# your processing logic here. for example:
# convert the bytes to a pandas dataframe and print the 1st row
df = pgbps.decode_ztf_alert(message.data, return_format='df')
print(df.head(1))
# acknowledge the message
message.ack()
# open the connection and process the streaming messages
subscription_name = 'ztf-loop'
timeout = 5
pgb.pubsub.streamingPull(subscription_name, callback, timeout=timeout)
Method B: Command line
SUBSCRIPTION="ztf-loop"
limit=1 # default=1
gcloud pubsub subscriptions pull $SUBSCRIPTION --auto-ack --limit=$limit