Datastream¶
Datastream is a serverless and easy-to-use change data capture (CDC) and replication service. The Datastream
class in gcp-pilot provides a high-level interface for interacting with Google Cloud Datastream.
Installation¶
To use the Datastream functionality, you need to install gcp-pilot:
pip install gcp-pilot
Usage¶
Initialization¶
from gcp_pilot.datastream import Datastream
# Initialize with default credentials
datastream = Datastream()
# Initialize with specific project
datastream = Datastream(project_id="my-project")
# Initialize with specific location
datastream = Datastream(location="us-central1")
# Initialize with service account impersonation
datastream = Datastream(impersonate_account="service-account@project-id.iam.gserviceaccount.com")
Managing Streams¶
Listing Streams¶
# List all streams in a project
streams = datastream.get_streams()
for stream in streams:
print(f"Stream: {stream['name']}")
# List streams in a specific location
streams = datastream.get_streams(location="us-central1")
for stream in streams:
print(f"Stream: {stream['name']}")
Getting a Stream¶
# Get information about a stream
stream = datastream.get_stream(
stream_name="my-stream",
location="us-central1", # Optional: defaults to the default location
project_id="my-project", # Optional: defaults to the project associated with credentials
)
print(f"Stream: {stream['name']}")
print(f"State: {stream['state']}")
print(f"Source: {stream['sourceConfig']['sourceConnectionProfile']}")
print(f"Destination: {stream['destinationConfig']['destinationConnectionProfile']}")
Deleting a Stream¶
# Delete a stream
datastream.delete_stream(
stream_name="my-stream",
location="us-central1", # Optional: defaults to the default location
project_id="my-project", # Optional: defaults to the project associated with credentials
)
Managing Stream Objects¶
Stream objects represent the database objects (tables, schemas) that are being replicated by a stream.
Listing Stream Objects¶
# List all objects in a stream
objects = datastream.get_objects(
stream_name="my-stream",
location="us-central1", # Optional: defaults to the default location
project_id="my-project", # Optional: defaults to the project associated with credentials
)
for obj in objects:
print(f"Object: {obj['displayName']}")
print(f"Source Type: {obj['sourceObject']['postgresqlTable']['table']}")
print(f"Backfill Status: {obj['backfillJob']['state']}")
Getting a Stream Object¶
# Get information about a specific stream object
obj = datastream.get_object(
object_id="my-object-id",
stream_name="my-stream",
location="us-central1", # Optional: defaults to the default location
project_id="my-project", # Optional: defaults to the project associated with credentials
)
print(f"Object: {obj['displayName']}")
print(f"Source Type: {obj['sourceObject']['postgresqlTable']['table']}")
print(f"Backfill Status: {obj['backfillJob']['state']}")
Finding a Stream Object¶
# Find a stream object by schema and table name
obj = datastream.find_object(
schema="public",
table="users",
stream_name="my-stream",
location="us-central1", # Optional: defaults to the default location
project_id="my-project", # Optional: defaults to the project associated with credentials
)
print(f"Object: {obj['displayName']}")
Managing Backfill Jobs¶
Backfill jobs are used to replicate existing data from the source to the destination.
Starting a Backfill Job¶
# Start a backfill job for a specific table
response = datastream.start_backfill(
schema="public",
table="users",
stream_name="my-stream",
location="us-central1", # Optional: defaults to the default location
project_id="my-project", # Optional: defaults to the project associated with credentials
)
print(f"Backfill job started: {response}")
Stopping a Backfill Job¶
# Stop a backfill job for a specific table
response = datastream.stop_backfill(
schema="public",
table="users",
stream_name="my-stream",
location="us-central1", # Optional: defaults to the default location
project_id="my-project", # Optional: defaults to the project associated with credentials
)
print(f"Backfill job stopped: {response}")
Error Handling¶
The Datastream class handles common errors and converts them to more specific exceptions:
from gcp_pilot import exceptions
try:
datastream.get_stream(stream_name="non-existent-stream")
except exceptions.NotFound:
print("Stream not found")
try:
datastream.find_object(schema="non-existent", table="non-existent", stream_name="my-stream")
except exceptions.NotFound:
print("Object not found")
Working with PostgreSQL Sources¶
The examples above focus on PostgreSQL sources, which is what the current implementation supports. The find_object
method specifically looks for PostgreSQL identifiers. If you're working with other source types, you may need to modify the code or wait for future updates to the library.