BigQuery¶
BigQuery is a fully managed, serverless data warehouse that enables scalable analysis over petabytes of data. The BigQuery
class in gcp-pilot provides a high-level interface for interacting with Google Cloud BigQuery.
Installation¶
To use the BigQuery functionality, you need to install gcp-pilot with the bigquery extra:
pip install gcp-pilot[bigquery]
Usage¶
Initialization¶
from gcp_pilot.big_query import BigQuery
# Initialize with default credentials
bq = BigQuery()
# Initialize with specific project
bq = BigQuery(project_id="my-project")
# Initialize with specific location
bq = BigQuery(location="us-central1")
# Initialize with service account impersonation
bq = BigQuery(impersonate_account="service-account@project-id.iam.gserviceaccount.com")
Managing Datasets and Tables¶
Listing Datasets¶
# List all datasets in the project
datasets = bq.list_datasets()
for dataset in datasets:
print(f"Dataset: {dataset.dataset_id}")
Listing Tables¶
# List all tables in a dataset
tables = bq.list_tables(dataset_id="my_dataset")
for table in tables:
print(f"Table: {table.table_id}")
Getting a Table¶
# Get a table
table = bq.get_table(
table_name="my_table",
dataset_name="my_dataset",
project_id="my-project", # Optional: defaults to the project associated with credentials
)
print(f"Table: {table.table_id}")
print(f"Schema: {table.schema}")
print(f"Rows: {table.num_rows}")
Creating a Table¶
from google.cloud import bigquery
# Define the schema
schema = [
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
]
# Create a table reference
dataset_ref = bq.client.dataset("my_dataset")
table_ref = dataset_ref.table("my_table")
# Create a table object
table = bigquery.Table(table_ref, schema=schema)
# Create the table
bq.create_table(table)
Deleting a Table¶
from google.cloud import bigquery
# Create a table reference
dataset_ref = bq.client.dataset("my_dataset")
table_ref = dataset_ref.table("my_table")
# Create a table object
table = bigquery.Table(table_ref)
# Delete the table
bq.delete_table(table)
Executing Queries¶
# Execute a simple query
results = bq.execute("SELECT * FROM `my_dataset.my_table` LIMIT 10")
for row in results:
print(row)
# Execute a query with parameters
results = bq.execute(
"SELECT * FROM `my_dataset.my_table` WHERE name = @name",
params={"name": "John"},
)
for row in results:
print(row)
# Execute a query and write the results to a destination table
bq.execute(
"SELECT * FROM `my_dataset.my_table`",
destination_table_name="my_destination_table",
destination_dataset_name="my_destination_dataset",
destination_project="my-destination-project", # Optional: defaults to the project associated with credentials
truncate=True, # Optional: if True, truncates the destination table before writing
)
Inserting Data¶
# Insert rows into a table
rows = [
{"name": "John", "age": 30, "email": "john@example.com"},
{"name": "Jane", "age": 25, "email": "jane@example.com"},
]
errors = bq.insert_rows(
dataset_name="my_dataset",
table_name="my_table",
rows=rows,
project_id="my-project", # Optional: defaults to the project associated with credentials
)
if errors:
print(f"Errors: {errors}")
Loading Data¶
# Load data from a local file
bq.load(
table_name="my_table",
filename="/path/to/data.csv",
dataset_name="my_dataset",
project_id="my-project", # Optional: defaults to the project associated with credentials
schema=[
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
],
wait=True, # Optional: if True, waits for the load job to complete
truncate=True, # Optional: if True, truncates the table before loading
)
# Load data from a file in Google Cloud Storage
bq.load(
table_name="my_table",
filename="gs://my-bucket/data.csv",
dataset_name="my_dataset",
project_id="my-project", # Optional: defaults to the project associated with credentials
schema=[
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
],
wait=True, # Optional: if True, waits for the load job to complete
)
# Load data from a local file, uploading it to Google Cloud Storage first
bq.load(
table_name="my_table",
filename="/path/to/data.csv",
dataset_name="my_dataset",
project_id="my-project", # Optional: defaults to the project associated with credentials
schema=[
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
],
gcs_bucket="my-bucket", # Uploads the file to this bucket before loading
wait=True, # Optional: if True, waits for the load job to complete
)
Copying Tables¶
# Copy a table
bq.copy(
source_dataset_name="my_source_dataset",
source_table_name="my_source_table",
destination_dataset_name="my_destination_dataset",
destination_table_name="my_destination_table",
destination_project="my-destination-project", # Optional: defaults to the project associated with credentials
wait=True, # Optional: if True, waits for the copy job to complete
)
Working with External Data Sources¶
# Add an external Google Cloud Storage data source
bq.add_external_gcs_source(
gcs_url="gs://my-bucket/data.csv",
dataset_name="my_dataset",
table_name="my_external_table",
skip_rows=1, # Optional: number of header rows to skip
delimiter=",", # Optional: field delimiter
quote='"', # Optional: quote character
source_format="CSV", # Optional: source format (CSV, NEWLINE_DELIMITED_JSON, AVRO, etc.)
project_id="my-project", # Optional: defaults to the project associated with credentials
)
Utility Methods¶
# Convert a datetime to a string format suitable for BigQuery
from datetime import datetime
dt = datetime.now()
date_str = BigQuery.date_to_str(dt)
print(f"Date string: {date_str}")
# Convert a datetime to a string format suitable for BigQuery table suffixes
date_str = BigQuery.date_to_str(dt, table_suffix=True)
print(f"Table suffix: {date_str}")
Error Handling¶
The BigQuery class handles common errors and converts them to more specific exceptions:
from gcp_pilot import exceptions
try:
bq.get_table(table_name="non_existent_table", dataset_name="my_dataset")
except exceptions.NotFound:
print("Table not found")
try:
bq.execute("SELECT * FROM `non_existent_dataset.non_existent_table`")
except exceptions.NotFound:
print("Table or dataset not found")