2. Create the Cloud Function (ETL)¶
This Cloud Function is the engine of our pipeline. It will run a Python script to connect to a MySQL database, execute a query to denormalize and fetch data, and then load that data into our BigQuery table, overwriting the previous contents.
Prerequisites¶
- Enabled APIs: Ensure Cloud Functions, Cloud Build, Artifact Registry, and BigQuery APIs are enabled.
- MySQL Credentials:
DB_USER:studentDB_PASS: your project name (e.g.,john-doe-pawait-1)DB_NAME:cloud_masteryDB_HOST: your Cloud SQL instance's public IP address.
-
BigQuery Table ID: You need an existing BigQuery Dataset and table where the pipeline will load the data from our first step.
BQ_TABLE_ID=john-higi-pawait-1.john_higi_pawait_1.cloud_mastery
Step 1: Prepare Your Code¶
Your function needs a main.py file for the logic and a requirements.txt for dependencies.
main.py
import os
import pandas as pd
import pymysql
from google.cloud import bigquery
def denormalize_and_load(request):
# Load credentials from environment variables
db_user = os.environ['DB_USER']
db_pass = os.environ['DB_PASS']
db_name = os.environ['DB_NAME']
db_host = os.environ['DB_HOST']
table_id = os.environ['BQ_TABLE_ID']
# SQL query to join tables and create a denormalized view
query = """
SELECT
o.id AS order_id,
o.orderNumber,
CONCAT(c.firstName, ' ', c.lastName) AS customer_name,
c.email,
o.orderDate,
od.product_id,
p.name AS product_name,
p.category,
od.quantity,
od.unitCost,
od.totalCost,
o.status,
o.paymentMethod
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN order_details od ON o.id = od.order_id
JOIN products p ON od.product_id = p.id
"""
# Establish connection and fetch data into a pandas DataFrame
connection = pymysql.connect(host=db_host, user=db_user, password=db_pass, database=db_name, port=3306)
df = pd.read_sql(query, connection)
connection.close()
# Ensure data types are correct for BigQuery
if 'orderDate' in df.columns:
df['orderDate'] = pd.to_datetime(df['orderDate'])
# Load DataFrame into BigQuery
client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
autodetect=True
)
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result() # Wait for the job to complete
return f"Loaded {len(df)} rows to {table_id}."
requirements.txt
functions-framework==3.*
pandas==2.2.2
SQLAlchemy==2.0.30
PyMySQL==1.1.0
google-cloud-bigquery==3.21.0
pyarrow==20.0.0