broker_utils.gcp_utils

The gcp_utils module contains common functions used to interact with GCP resources.

broker.broker_utils.broker_utils.gcp_utils.cs_download_file(localdir, bucket_id, filename=None)[source]
Parameters
  • localdir (str) – Path to local directory where file(s) will be downloaded to.

  • bucket_id (str) – Name of the GCS bucket, not including the project ID. For example, pass ‘ztf-alert_avros’ for the bucket ‘ardent-cycling-243415-ztf-alert_avros’.

  • filename (Optional[str]) – Name or prefix of the file(s) in the bucket to download.

broker.broker_utils.broker_utils.gcp_utils.cs_upload_file(local_file, bucket_id, bucket_filename=None)[source]
Parameters
  • local_file (str) – Path of the file to upload.

  • bucket_id (str) – Name of the GCS bucket, not including the project ID. For example, pass ‘ztf-alert_avros’ for the bucket ‘ardent-cycling-243415-ztf-alert_avros’.

  • bucket_filename (Optional[str]) – String to name the file in the bucket. If None, bucket_filename = local_filename.

broker.broker_utils.broker_utils.gcp_utils.insert_rows_bigquery(table_id, rows)[source]

Insert rows into a table using the streaming API.

Parameters
  • table_id (str) – Identifier for the BigQuery table in the form {dataset}.{table}. For example, ‘ztf_alerts.alerts’.

  • rows (List[dict]) – Data to load in to the table. Keys must include all required fields in the schema. Keys which do not correspond to a field in the schema are ignored.

broker.broker_utils.broker_utils.gcp_utils.load_dataframe_bigquery(table_id, df, use_table_schema=True, logger=None)[source]

Load a dataframe to a table.

Parameters
  • table_id (str) – Identifier for the BigQuery table in the form {dataset}.{table}. For example, ‘ztf_alerts.alerts’.

  • df (DataFrame) – Data to load in to the table. If the dataframe schema does not match the BigQuery table schema, must pass a valid schema.

  • use_table_schema (bool) – Conform the dataframe to the table schema by converting dtypes and dropping extra columns.

  • logger (Optional[Logger]) – If not None, messages will be sent to the logger. Else, print them.

broker.broker_utils.broker_utils.gcp_utils.publish_pubsub(topic_name, message, project_id=None, attrs=None, publisher=None)[source]

Publish messages to a Pub/Sub topic.

Wrapper for google.cloud.pubsub_v1.PublisherClient().publish(). See also: https://cloud.google.com/pubsub/docs/publisher#publishing_messages.

Parameters
  • topic_name (str) – The Pub/Sub topic name for publishing alerts.

  • message (Union[bytes, dict]) – The message to be published.

  • project_id (Optional[str]) – GCP project ID for the project containing the topic. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used.

  • attrs (Optional[dict]) – Message attributes to be published.

  • publisher (Optional[PublisherClient]) – An instantiated PublisherClient. Use this kwarg if you are calling this function repeatedly. The publisher will automatically batch the messages over a small time window (currently 0.05 seconds) to avoid making too many separate requests to the service. This helps increase throughput. See https://googleapis.dev/python/pubsub/1.7.0/publisher/index.html#batching

Return type

str

Returns

published message ID

broker.broker_utils.broker_utils.gcp_utils.pull_pubsub(subscription_name, max_messages=1, project_id=None, msg_only=True, callback=None, return_count=False)[source]

Pull and acknowledge a fixed number of messages from a Pub/Sub topic.

Wrapper for the synchronous google.cloud.pubsub_v1.SubscriberClient().pull(). See also: https://cloud.google.com/pubsub/docs/pull#synchronous_pull

Parameters
  • subscription_name (str) – Name of the Pub/Sub subcription to pull from.

  • max_messages (int) – The maximum number of messages to pull.

  • project_id (Optional[str]) – GCP project ID for the project containing the subscription. If None, the module’s pgb_project_id will be used.

  • msg_only (bool) – Whether to work with and return the message contents only or the full packet. If return_count is True, it supersedes the returned object.

  • callback (Optional[Callable[[Union[ReceivedMessage, bytes]], bool]]) – Function used to process each message. Its input type is determined by the value of msg_only. It should return True if the message should be acknowledged, else False.

  • return_count (bool) – Whether to return the messages or just the total number of acknowledged messages.

Return type

Union[List[bytes], List[ReceivedMessage]]

Returns

A list of messages

broker.broker_utils.broker_utils.gcp_utils.query_bigquery(query, project_id=None, job_config=None)[source]

Query BigQuery.

Parameters
  • query (str) – SQL query statement.

  • project_id (Optional[str]) – The GCP project id that will be used to make the API call. If not provided, the Pitt-Google production project id will be used.

  • job_config (Optional[QueryJobConfig]) – Optional job config to send with the query.

Example query:

`` query = (

f’SELECT * ‘ f’FROM {dataset_project_id}.{dataset}.{table} ‘ f’WHERE objectId={objectId} ‘

Examples of working with the query_job:

`` # Cast it to a DataFrame: query_job.to_dataframe()

# Iterate row-by-row for r, row in enumerate(query_job):

# row values can be accessed by field name or index print(f”objectId={row[0]}, candid={row[‘candid’]}”)

``

Return type

QueryJob

broker.broker_utils.broker_utils.gcp_utils.streamingPull_pubsub(subscription_name, callback, project_id=None, timeout=10, block=True, flow_control=None)[source]

Pull and process Pub/Sub messages continuously in a background thread.

Wrapper for the asynchronous google.cloud.pubsub_v1.SubscriberClient().subscribe(). See also: https://cloud.google.com/pubsub/docs/pull#asynchronous-pull

Parameters
  • subscription_name (str) – The Pub/Sub subcription to pull from.

  • callback (Callable[[PubsubMessage], None]) – The callback function containing the message processing and acknowledgement logic.

  • project_id (Optional[str]) – GCP project ID for the project containing the subscription. If None, the environment variable GOOGLE_CLOUD_PROJECT will be used.

  • timeout (int) – The number of seconds before the subscribe call times out and closes the connection.

  • block (bool) – Whether to block while streaming messages or return the StreamingPullFuture object for the user to manage separately.

Return type

Optional[StreamingPullFuture]

Returns

If block is False, immediately returns the StreamingPullFuture object that manages the background thread. Call its cancel() method to stop streaming messages. If block is True, returns None once the streaming encounters an error or timeout.