Pub/Sub Streams

Learning Objectives:

  1. Subscribe to one of our Pub/Sub streams

  2. Pull messages from your subscription

This tutorial covers subscribing to our Pub/Sub streams and pulling messages. It demonstrates two methods: Python using the pgb-utils package; and the command line using Google’s gcloud CLI.

For more information, see:

Prerequisites

  1. Complete the Initial Setup. Be sure to:

    • set your environment variables

    • enable the Pub/Sub API

    • install the pgb-utils package if you want to use Python

    • install the CLI if you want to use the command line

Create a subscription

The code below will create a Pub/Sub subscription in your GCP project that is attached to a topic in our project. This only needs to be done once per desired topic.

See Data Overview: Pub/Sub for a list of available topics.

After your subscription is created, messages we publish to the topic are immediately available in your subscription. They will remain there until they are either pulled and acknowledged or until they expire (7 days, max). Messages published before your subscription was created are not available.

You can also view and manage the subscriptions in your GCP project at any time from the web Console Subscriptions page (you may need to select your project from the dropdown at the top).

Method A: Python

import pgb_utils as pgb

# choose an existing Pitt-Google topic
topic_name = 'ztf-loop'

# name your subscription whatever you'd like
subscription_name = 'ztf-loop'

# create the subscription
subscription = pgb.pubsub.subscribe(topic)

Method B: Command line

# choose an existing Pitt-Google topic
TOPIC_NAME="ztf-loop"

# name your subscription whatever you'd like
SUBSCRIPTION_NAME="ztf-loop"

# create the subscription
gcloud pubsub subscriptions create $SUBSCRIPTION_NAME \
    --topic=$TOPIC_NAME \
    --topic-project="ardent-cycling-243415"  # Pitt-Google project ID

Pull Messages

Method A: Python

Two options:

  1. Pull a fixed number of messages. Useful for testing.

import pgb_utils as pgb

# pull messages
subscription_name = 'ztf-loop'
max_messages = 5
msgs = pgb.pubsub.pull(subscription_name, max_messages=max_messages)  # list[bytes,]

# now process the messages. for example:
# convert the bytes to a pandas dataframe
df = pgb.pubsub.decode_ztf_alert(msgs[0], return_format='df')
  1. Pull messages continuously.

import pgb_utils as pgb

# create a function that executes your processing logic
# and then acknowledge the message
def callback(message):
    # your processing logic here. for example:
    # convert the bytes to a pandas dataframe and print the 1st row
    df = pgbps.decode_ztf_alert(message.data, return_format='df')
    print(df.head(1))

    # acknowledge the message
    message.ack()

# open the connection and process the streaming messages
subscription_name = 'ztf-loop'
timeout = 5
pgb.pubsub.streamingPull(subscription_name, callback, timeout=timeout)

Method B: Command line

SUBSCRIPTION="ztf-loop"
limit=1  # default=1
gcloud pubsub subscriptions pull $SUBSCRIPTION --auto-ack --limit=$limit