Skip to content

Cloud Pub/Sub

Cloud Pub/Sub is a messaging service for exchanging event data among applications and services. The CloudPublisher, CloudSubscriber, and Message classes in gcp-pilot provide high-level interfaces for interacting with Google Cloud Pub/Sub.

Installation

To use the Cloud Pub/Sub functionality, you need to install gcp-pilot with the pubsub extra:

pip install gcp-pilot[pubsub]

Usage

CloudPublisher

The CloudPublisher class allows you to create and manage topics, and publish messages to them.

Initialization

from gcp_pilot.pubsub import CloudPublisher

# Initialize with default credentials
publisher = CloudPublisher()

# Initialize with specific project
publisher = CloudPublisher(project_id="my-project")

# Initialize with message ordering enabled
publisher = CloudPublisher(enable_message_ordering=True)

# Initialize with service account impersonation
publisher = CloudPublisher(impersonate_account="service-account@project-id.iam.gserviceaccount.com")

Managing Topics

# Create a topic
topic = publisher.create_topic(
    topic_id="my-topic",
    project_id="my-project",  # Optional: defaults to the project associated with credentials
    exists_ok=True,  # Optional: if True, returns the existing topic if it already exists
    labels={"environment": "production"},  # Optional: labels to apply to the topic
)
print(f"Topic created: {topic.name}")

# Update a topic
topic = publisher.update_topic(
    topic_id="my-topic",
    project_id="my-project",  # Optional: defaults to the project associated with credentials
    labels={"environment": "staging"},  # New labels to apply to the topic
)
print(f"Topic updated: {topic.name}")

# Get a topic
topic = publisher.get_topic(
    topic_id="my-topic",
    project_id="my-project",  # Optional: defaults to the project associated with credentials
)
print(f"Topic: {topic.name}")

# List topics
topics = publisher.list_topics(
    prefix="my-",  # Optional: filter topics by prefix
    suffix="-topic",  # Optional: filter topics by suffix
    project_id="my-project",  # Optional: defaults to the project associated with credentials
)
for topic in topics:
    print(f"Topic: {topic.name}")

Publishing Messages

# Publish a message to a topic
message_id = publisher.publish(
    message="Hello, world!",
    topic_id="my-topic",
    project_id="my-project",  # Optional: defaults to the project associated with credentials
    attributes={"key": "value"},  # Optional: attributes to attach to the message
)
print(f"Message published with ID: {message_id}")

# Publish a message to a topic that doesn't exist yet (it will be created automatically)
message_id = publisher.publish(
    message="Hello, world!",
    topic_id="new-topic",
    project_id="my-project",  # Optional: defaults to the project associated with credentials
)
print(f"Message published with ID: {message_id}")

CloudSubscriber

The CloudSubscriber class allows you to create and manage subscriptions, and subscribe to messages.

Initialization

from gcp_pilot.pubsub import CloudSubscriber

# Initialize with default credentials
subscriber = CloudSubscriber()

# Initialize with specific project
subscriber = CloudSubscriber(project_id="my-project")

# Initialize with service account impersonation
subscriber = CloudSubscriber(impersonate_account="service-account@project-id.iam.gserviceaccount.com")

Managing Subscriptions

# Create a subscription
subscription = subscriber.create_subscription(
    topic_id="my-topic",
    subscription_id="my-subscription",
    project_id="my-project",  # Optional: defaults to the project associated with credentials
    exists_ok=True,  # Optional: if True, returns the existing subscription if it already exists
    auto_create_topic=True,  # Optional: if True, creates the topic if it doesn't exist
    enable_message_ordering=False,  # Optional: if True, enables message ordering
    push_to_url=None,  # Optional: URL to push messages to
    use_oidc_auth=False,  # Optional: if True, uses OIDC authentication for push
    dead_letter_topic_id=None,  # Optional: topic to send dead-letter messages to
    dead_letter_subscription_id=None,  # Optional: subscription for the dead-letter topic
    max_retries=None,  # Optional: maximum number of delivery attempts
    min_backoff=10,  # Optional: minimum backoff time in seconds
    max_backoff=600,  # Optional: maximum backoff time in seconds
    expiration_ttl=31,  # Optional: expiration time in days
    enable_exactly_once_delivery=False,  # Optional: if True, enables exactly-once delivery
    message_filter=None,  # Optional: filter for messages
)
print(f"Subscription created: {subscription.name}")

# Update a subscription
subscription = subscriber.update_subscription(
    topic_id="my-topic",
    subscription_id="my-subscription",
    project_id="my-project",  # Optional: defaults to the project associated with credentials
    push_to_url="https://example.com/push",  # Optional: URL to push messages to
    use_oidc_auth=True,  # Optional: if True, uses OIDC authentication for push
    dead_letter_topic_id="my-dead-letter-topic",  # Optional: topic to send dead-letter messages to
    dead_letter_subscription_id="my-dead-letter-subscription",  # Optional: subscription for the dead-letter topic
    max_retries=5,  # Optional: maximum number of delivery attempts
    min_backoff=30,  # Optional: minimum backoff time in seconds
    max_backoff=300,  # Optional: maximum backoff time in seconds
    expiration_ttl=None,  # Optional: expiration time in days (None means never expire)
    message_filter="attributes.key = \"value\"",  # Optional: filter for messages
)
print(f"Subscription updated: {subscription.name}")

# Create or update a subscription
subscription = subscriber.create_or_update_subscription(
    topic_id="my-topic",
    subscription_id="my-subscription",
    project_id="my-project",  # Optional: defaults to the project associated with credentials
    auto_create_topic=True,  # Optional: if True, creates the topic if it doesn't exist
    enable_message_ordering=False,  # Optional: if True, enables message ordering
    push_to_url="https://example.com/push",  # Optional: URL to push messages to
    use_oidc_auth=True,  # Optional: if True, uses OIDC authentication for push
    dead_letter_topic_id="my-dead-letter-topic",  # Optional: topic to send dead-letter messages to
    dead_letter_subscription_id="my-dead-letter-subscription",  # Optional: subscription for the dead-letter topic
    max_retries=5,  # Optional: maximum number of delivery attempts
    min_backoff=30,  # Optional: minimum backoff time in seconds
    max_backoff=300,  # Optional: maximum backoff time in seconds
    expiration_ttl=None,  # Optional: expiration time in days (None means never expire)
    message_filter="attributes.key = \"value\"",  # Optional: filter for messages
)
print(f"Subscription created or updated: {subscription.name}")

# Get a subscription
subscription = subscriber.get_subscription(
    subscription_id="my-subscription",
    project_id="my-project",  # Optional: defaults to the project associated with credentials
)
print(f"Subscription: {subscription.name}")

# List subscriptions
subscriptions = subscriber.list_subscriptions(
    prefix="my-",  # Optional: filter subscriptions by prefix
    suffix="-subscription",  # Optional: filter subscriptions by suffix
    project_id="my-project",  # Optional: defaults to the project associated with credentials
)
for subscription in subscriptions:
    print(f"Subscription: {subscription.name}")

# Delete a subscription
subscriber.delete_subscription(
    subscription_id="my-subscription",
    project_id="my-project",  # Optional: defaults to the project associated with credentials
)

Subscribing to Messages

# Define a callback function to process messages
def process_message(message):
    print(f"Received message: {message.data}")
    print(f"Attributes: {message.attributes}")
    message.ack()  # Acknowledge the message

# Subscribe to a topic
subscriber.subscribe(
    topic_id="my-topic",
    subscription_id="my-subscription",
    callback=process_message,
    project_id="my-project",  # Optional: defaults to the project associated with credentials
)

Message

The Message class represents a Pub/Sub message and provides methods for serialization and deserialization.

Loading Messages

from gcp_pilot.pubsub import Message
import json

# Load a message from a dictionary
message_dict = {
    "message": {
        "attributes": {"key": "value"},
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",  # Base64-encoded "Hello Cloud Pub/Sub! Here is my message!"
        "messageId": "136969346945",
    },
    "subscription": "projects/myproject/subscriptions/mysubscription",
}
message = Message.load(body=message_dict)
print(f"Message ID: {message.id}")
print(f"Message Data: {message.data}")
print(f"Message Attributes: {message.attributes}")
print(f"Subscription: {message.subscription}")

# Load a message from a JSON string
message_json = json.dumps(message_dict)
message = Message.load(body=message_json)
print(f"Message ID: {message.id}")
print(f"Message Data: {message.data}")

# Load a message from bytes
message_bytes = json.dumps(message_dict).encode()
message = Message.load(body=message_bytes)
print(f"Message ID: {message.id}")
print(f"Message Data: {message.data}")

# Load a message with a custom parser
def custom_parser(data):
    return data.upper()

message = Message.load(body=message_dict, parser=custom_parser)
print(f"Message Data: {message.data}")  # Will be uppercase

Dumping Messages

# Create a message
message = Message(
    id="136969346945",
    data="Hello, world!",
    attributes={"key": "value"},
    subscription="projects/myproject/subscriptions/mysubscription",
)

# Dump the message to a JSON string
message_json = message.dump()
print(f"Message JSON: {message_json}")

# Dump the message with a custom parser
def custom_parser(data):
    return data.upper()

message_json = message.dump(parser=custom_parser)
print(f"Message JSON: {message_json}")  # Data will be uppercase

Error Handling

The Pub/Sub classes handle common errors and convert them to more specific exceptions:

from gcp_pilot import exceptions

try:
    publisher = CloudPublisher()
    publisher.create_topic(topic_id="my-topic", exists_ok=False)
except exceptions.AlreadyExists:
    print("Topic already exists")

try:
    subscriber = CloudSubscriber()
    subscriber.get_subscription(subscription_id="non-existent-subscription")
except exceptions.NotFound:
    print("Subscription not found")

try:
    subscriber = CloudSubscriber()
    subscriber.create_subscription(
        topic_id="my-topic",
        subscription_id="my-subscription",
        max_retries=5,
        dead_letter_topic_id=None,
    )
except exceptions.ValidationError:
    print("max_retries requires dead_letter_topic_id")

Working with Service Account Impersonation

Service account impersonation allows you to act as a service account without having its key file. This is a more secure approach than downloading and storing service account keys.

# Initialize with service account impersonation
publisher = CloudPublisher(impersonate_account="service-account@project-id.iam.gserviceaccount.com")
subscriber = CloudSubscriber(impersonate_account="service-account@project-id.iam.gserviceaccount.com")

# Now all operations will be performed as the impersonated service account
publisher.create_topic(topic_id="my-topic")
subscriber.create_subscription(topic_id="my-topic", subscription_id="my-subscription")

For more information on service account impersonation, see the Authentication documentation.

Message Filtering

Pub/Sub allows you to filter messages based on their attributes. You can specify a filter when creating or updating a subscription:

# Create a subscription with a filter
subscription = subscriber.create_subscription(
    topic_id="my-topic",
    subscription_id="my-subscription",
    message_filter="attributes.key = \"value\"",
)

# Update a subscription with a filter
subscription = subscriber.update_subscription(
    topic_id="my-topic",
    subscription_id="my-subscription",
    message_filter="attributes.key = \"value\"",
)

The filter is a CEL (Common Expression Language) expression that evaluates to a boolean value. For more information on message filtering, see the Google Cloud Pub/Sub documentation.

Dead Letter Topics

Dead letter topics are used to store messages that could not be delivered to a subscription. You can specify a dead letter topic when creating or updating a subscription:

# Create a subscription with a dead letter topic
subscription = subscriber.create_subscription(
    topic_id="my-topic",
    subscription_id="my-subscription",
    dead_letter_topic_id="my-dead-letter-topic",
    dead_letter_subscription_id="my-dead-letter-subscription",
    max_retries=5,
)

# Update a subscription with a dead letter topic
subscription = subscriber.update_subscription(
    topic_id="my-topic",
    subscription_id="my-subscription",
    dead_letter_topic_id="my-dead-letter-topic",
    dead_letter_subscription_id="my-dead-letter-subscription",
    max_retries=5,
)

Push Subscriptions

Pub/Sub supports push subscriptions, which push messages to a webhook endpoint instead of requiring the client to pull messages. You can create a push subscription by specifying a push_to_url:

# Create a push subscription
subscription = subscriber.create_subscription(
    topic_id="my-topic",
    subscription_id="my-push-subscription",
    push_to_url="https://example.com/push",
    use_oidc_auth=True,  # Optional: if True, uses OIDC authentication for push
)

# Update a subscription to use push
subscription = subscriber.update_subscription(
    topic_id="my-topic",
    subscription_id="my-subscription",
    push_to_url="https://example.com/push",
    use_oidc_auth=True,
)

When using push subscriptions, Pub/Sub will send messages to the specified URL as HTTP POST requests. The message will be in the request body in the format expected by the Message.load method.

Exactly-Once Delivery

Pub/Sub supports exactly-once delivery, which ensures that each message is delivered exactly once to a subscription. You can enable exactly-once delivery when creating a subscription:

# Create a subscription with exactly-once delivery
subscription = subscriber.create_subscription(
    topic_id="my-topic",
    subscription_id="my-subscription",
    enable_exactly_once_delivery=True,
)

Note that exactly-once delivery is only supported for push subscriptions that use OIDC authentication.