pgb_utils.pubsub¶
The pubsub module facilitates access to Pitt-Google Pub/Sub streams.
- pgb_utils.pgb_utils.pubsub.create_subscription(topic_name, subscription_name=None, project_id=None)[source]¶
Create a subscription to a Pitt-Google Pub/Sub topic.
Wrapper for google.cloud.pubsub_v1.SubscriberClient().create_subscription() documented at https://googleapis.dev/python/pubsub/latest/subscriber/api/client.html.
See also: https://cloud.google.com/pubsub/docs/admin#manage_subs
- Parameters
topic_name (
str
) – Name of a Pitt-Google Broker Pub/Sub topic to subscribe to.project_id (
Optional
[str
]) – User’s GCP project ID. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used. The subscription will be created in this account.subscription_name (
Optional
[str
]) – Name for the user’s Pub/Sub subscription. If None, topic_name will be used.
- Return type
Subscription
- Returns
A Pub/Sub Subscription instance
- pgb_utils.pgb_utils.pubsub.decode_message(msg_bytes, return_alert_as='dict')[source]¶
Decode the message and return in requested format.
- Parameters
msg_bytes (
bytes
) – a single alertreturn_alert_as (
str
) – Format for the returned alert. One of “dict” (dictionary), “df” (Pandas DataFrame) or “table” (Astropy Table). Note that only the “dict” option returns the full packet. Using “df” or “table” drops some metadata and the cutouts (if present).
- Return type
Union
[dict
,DataFrame
,Table
,Tuple
[dict
,dict
],Tuple
[DataFrame
,dict
],Tuple
[Table
,dict
]]- Returns
If the message contains an alert packet only, it is returned in the requested format.
If the message contains an alert packet plus value added products, it is returned as a tuple where the first element is the alert packet in the requested format, and the second element is the value added products as a dict.
- pgb_utils.pgb_utils.pubsub.delete_subscription(subscription_name, project_id=None)[source]¶
Delete a Pub/Sub subscription.
Wrapper for google.cloud.pubsub_v1.SubscriberClient().delete_subscription() documented at https://googleapis.dev/python/pubsub/latest/subscriber/api/client.html.
See also: https://cloud.google.com/pubsub/docs/admin#delete_subscription
- Parameters
subscription_name (
str
) – Name for the user’s Pub/Sub subscription. If None, topic_name will be used.project_id (
Optional
[str
]) – User’s GCP project ID. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used. The subscription will be created in this account.
- Return type
None
- pgb_utils.pgb_utils.pubsub.publish(topic_name, message, project_id=None, attrs={})[source]¶
Publish messages to a Pub/Sub topic.
Wrapper for google.cloud.pubsub_v1.PublisherClient().publish() documented at https://googleapis.dev/python/pubsub/latest/publisher/api/client.html.
See also: https://cloud.google.com/pubsub/docs/publisher
- Parameters
topic_name (
str
) – The Pub/Sub topic name for publishing alerts.message (
bytes
) – The message to be published.project_id (
Optional
[str
]) – GCP project ID for the project containing the topic. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used.attrs (
dict
) – Message attributes to be published.
- Return type
str
- Returns
published message ID
- pgb_utils.pgb_utils.pubsub.pull(subscription_name, max_messages=1, project_id=None, msg_only=True)[source]¶
Pull and acknowledge a fixed number of messages from a Pub/Sub topic.
Wrapper for the synchronous google.cloud.pubsub_v1.SubscriberClient().pull() documented at https://googleapis.dev/python/pubsub/latest/subscriber/api/client.html.
See also: https://cloud.google.com/pubsub/docs/pull
- Parameters
subscription_name (
str
) – Name of the Pub/Sub subcription to pull from.max_messages (
int
) – The maximum number of messages to pull.project_id (
Optional
[str
]) – GCP project ID for the project containing the subscription. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used.msg_only (
bool
) – Whether to return the message contents only or the full packet.
- Return type
Union
[List
[bytes
],List
[ReceivedMessage
]]- Returns
A list of messages. If msg_only is True, the messages are bytes containing the message data only. Otherwise the messages are the full message packets.
- pgb_utils.pgb_utils.pubsub.streamingPull(subscription_name, callback, project_id=None, block=True)[source]¶
Pull and process Pub/Sub messages continuously in streaming mode.
Wrapper for the asynchronous google.cloud.pubsub_v1.SubscriberClient().subscribe() documented at https://googleapis.dev/python/pubsub/latest/subscriber/api/client.html.
See also: https://cloud.google.com/pubsub/docs/pull
- Parameters
subscription_name (
str
) – The Pub/Sub subcription to pull from.callback (
Callable
[[PubsubMessage
],None
]) – The callback function containing the message processing and acknowledgement logic.project_id (
Optional
[str
]) – GCP project ID for the project containing the subscription. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used.block (
bool
) – Whether to block while streaming messages or return the StreamingPullFuture object for the user to manage separately.
- Return type
Optional
[StreamingPullFuture
]- Returns
If block is False, immediately returns the StreamingPullFuture object that manages the background thread that is pulling and processing messages. Call its cancel() method to stop streaming messages.
If block is True, the user’s thread is blocked until the streaming encounters an error. Use Control+C to stop streaming.