pgb_utils.pubsub

The pubsub module facilitates access to Pitt-Google Pub/Sub streams.

pgb_utils.pgb_utils.pubsub.create_subscription(topic_name, subscription_name=None, project_id=None)[source]

Create a subscription to a Pitt-Google Pub/Sub topic.

Wrapper for google.cloud.pubsub_v1.SubscriberClient().create_subscription() documented at https://googleapis.dev/python/pubsub/latest/subscriber/api/client.html.

See also: https://cloud.google.com/pubsub/docs/admin#manage_subs

Parameters
  • topic_name (str) – Name of a Pitt-Google Broker Pub/Sub topic to subscribe to.

  • project_id (Optional[str]) – User’s GCP project ID. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used. The subscription will be created in this account.

  • subscription_name (Optional[str]) – Name for the user’s Pub/Sub subscription. If None, topic_name will be used.

Return type

Subscription

Returns

A Pub/Sub Subscription instance

pgb_utils.pgb_utils.pubsub.decode_message(msg_bytes, return_alert_as='dict')[source]

Decode the message and return in requested format.

Parameters
  • msg_bytes (bytes) – a single alert

  • return_alert_as (str) – Format for the returned alert. One of “dict” (dictionary), “df” (Pandas DataFrame) or “table” (Astropy Table). Note that only the “dict” option returns the full packet. Using “df” or “table” drops some metadata and the cutouts (if present).

Return type

Union[dict, DataFrame, Table, Tuple[dict, dict], Tuple[DataFrame, dict], Tuple[Table, dict]]

Returns

If the message contains an alert packet only, it is returned in the requested format.

If the message contains an alert packet plus value added products, it is returned as a tuple where the first element is the alert packet in the requested format, and the second element is the value added products as a dict.

pgb_utils.pgb_utils.pubsub.delete_subscription(subscription_name, project_id=None)[source]

Delete a Pub/Sub subscription.

Wrapper for google.cloud.pubsub_v1.SubscriberClient().delete_subscription() documented at https://googleapis.dev/python/pubsub/latest/subscriber/api/client.html.

See also: https://cloud.google.com/pubsub/docs/admin#delete_subscription

Parameters
  • subscription_name (str) – Name for the user’s Pub/Sub subscription. If None, topic_name will be used.

  • project_id (Optional[str]) – User’s GCP project ID. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used. The subscription will be created in this account.

Return type

None

pgb_utils.pgb_utils.pubsub.publish(topic_name, message, project_id=None, attrs={})[source]

Publish messages to a Pub/Sub topic.

Wrapper for google.cloud.pubsub_v1.PublisherClient().publish() documented at https://googleapis.dev/python/pubsub/latest/publisher/api/client.html.

See also: https://cloud.google.com/pubsub/docs/publisher

Parameters
  • topic_name (str) – The Pub/Sub topic name for publishing alerts.

  • message (bytes) – The message to be published.

  • project_id (Optional[str]) – GCP project ID for the project containing the topic. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used.

  • attrs (dict) – Message attributes to be published.

Return type

str

Returns

published message ID

pgb_utils.pgb_utils.pubsub.pull(subscription_name, max_messages=1, project_id=None, msg_only=True)[source]

Pull and acknowledge a fixed number of messages from a Pub/Sub topic.

Wrapper for the synchronous google.cloud.pubsub_v1.SubscriberClient().pull() documented at https://googleapis.dev/python/pubsub/latest/subscriber/api/client.html.

See also: https://cloud.google.com/pubsub/docs/pull

Parameters
  • subscription_name (str) – Name of the Pub/Sub subcription to pull from.

  • max_messages (int) – The maximum number of messages to pull.

  • project_id (Optional[str]) – GCP project ID for the project containing the subscription. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used.

  • msg_only (bool) – Whether to return the message contents only or the full packet.

Return type

Union[List[bytes], List[ReceivedMessage]]

Returns

A list of messages. If msg_only is True, the messages are bytes containing the message data only. Otherwise the messages are the full message packets.

pgb_utils.pgb_utils.pubsub.streamingPull(subscription_name, callback, project_id=None, block=True)[source]

Pull and process Pub/Sub messages continuously in streaming mode.

Wrapper for the asynchronous google.cloud.pubsub_v1.SubscriberClient().subscribe() documented at https://googleapis.dev/python/pubsub/latest/subscriber/api/client.html.

See also: https://cloud.google.com/pubsub/docs/pull

Parameters
  • subscription_name (str) – The Pub/Sub subcription to pull from.

  • callback (Callable[[PubsubMessage], None]) – The callback function containing the message processing and acknowledgement logic.

  • project_id (Optional[str]) – GCP project ID for the project containing the subscription. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used.

  • block (bool) – Whether to block while streaming messages or return the StreamingPullFuture object for the user to manage separately.

Return type

Optional[StreamingPullFuture]

Returns

If block is False, immediately returns the StreamingPullFuture object that manages the background thread that is pulling and processing messages. Call its cancel() method to stop streaming messages.

If block is True, the user’s thread is blocked until the streaming encounters an error. Use Control+C to stop streaming.