Data Engineering with Python
Data engineering moves, transforms, and validates data so analysts and ML models can trust it. Python is the lingua franca of data pipelines — from cron scripts to enterprise orchestration with Airflow and Spark.
ETL vs ELT
| Approach | Flow | When to use |
|---|---|---|
| ETL | Extract → Transform → Load | Legacy warehouses, strict schemas |
| ELT | Extract → Load → Transform | Cloud warehouses (Snowflake, BigQuery) |
Modern cloud warehouses prefer ELT — load raw data first, transform with SQL (dbt) or Python in the warehouse.
Pipeline Architecture
Sources (APIs, DBs, files)
↓
Extract (Python scripts)
↓
Staging (S3 / GCS raw zone)
↓
Transform (pandas / Spark / SQL)
↓
Data Warehouse (Snowflake, BigQuery, Redshift)
↓
Analytics / ML / Dashboards
Extract: APIs and Databases
import requests
import pandas as pd
from sqlalchemy import create_engine
def extract_from_api(url: str, params: dict) -> pd.DataFrame:
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
return pd.DataFrame(response.json()["results"])
def extract_from_db(query: str, connection_string: str) -> pd.DataFrame:
engine = create_engine(connection_string)
return pd.read_sql(query, engine)
Use incremental extraction with watermarks:
last_sync = get_watermark("orders")
query = f"SELECT * FROM orders WHERE updated_at > '{last_sync}'"
df = extract_from_db(query, DB_URL)
save_watermark("orders", df["updated_at"].max())
Transform with pandas
def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["order_date"] = pd.to_datetime(df["order_date"], utc=True)
df["total_usd"] = df["amount"] * df["exchange_rate"]
df = df.dropna(subset=["customer_id"])
df = df[df["total_usd"] > 0]
df["order_month"] = df["order_date"].dt.to_period("M")
return df
Data Quality Checks
def validate(df: pd.DataFrame) -> None:
assert df["customer_id"].notna().all(), "Null customer IDs"
assert df["total_usd"].ge(0).all(), "Negative amounts"
assert len(df) > 0, "Empty batch"
duplicate_ids = df["order_id"].duplicated().sum()
if duplicate_ids > 0:
raise ValueError(f"{duplicate_ids} duplicate order IDs")
Fail fast on bad data — never silently load corrupt batches.
Load to Data Warehouse
def load_to_warehouse(df: pd.DataFrame, table: str, engine) -> None:
df.to_sql(
table,
engine,
if_exists="append",
index=False,
method="multi",
chunksize=5000,
)
For large datasets, use warehouse-native loaders:
- BigQuery —
load_table_from_urifrom GCS Parquet - Snowflake —
COPY INTOfrom S3 stage - Redshift —
COPYcommand
Parquet is the standard interchange format:
df.to_parquet("s3://data-lake/orders/2024/01/15/data.parquet", index=False)
Apache Airflow Orchestration
# dags/daily_orders.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"daily_orders_etl",
default_args=default_args,
schedule_interval="0 2 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
transform = PythonOperator(
task_id="transform_orders",
python_callable=transform_orders,
)
load = PythonOperator(
task_id="load_orders",
python_callable=load_orders,
)
extract >> transform >> load
Airflow manages scheduling, retries, dependencies, and monitoring. Run on managed services (MWAA, Cloud Composer) or self-hosted.
Streaming with Apache Kafka
For real-time pipelines:
from confluent_kafka import Consumer, Producer
import json
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "order-processor",
"auto.offset.reset": "earliest",
})
consumer.subscribe(["orders.raw"])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
event = json.loads(msg.value())
enriched = enrich_order(event)
producer.produce("orders.enriched", json.dumps(enriched).encode())
consumer.commit()
Pair Kafka with Kafka Connect (source/sink connectors) or Faust for stream processing in Python.
Data Lake Zones
s3://company-data/
├── raw/ # Immutable source copies
├── staging/ # Cleaned, typed data
├── curated/ # Business-ready tables
└── analytics/ # Aggregated metrics
Apply lifecycle policies: raw data retained 90 days, curated indefinitely.
Schema Management
Use explicit schemas to catch drift:
import pandera as pa
schema = pa.DataFrameSchema({
"order_id": pa.Column(int, unique=True),
"customer_id": pa.Column(int, nullable=False),
"total_usd": pa.Column(float, pa.Check.ge(0)),
"order_date": pa.Column(pa.DateTime, nullable=False),
})
validated = schema.validate(df)
For warehouses, manage DDL with dbt models and version-controlled SQL.
Idempotency and Backfills
Pipelines must be safely re-runnable:
def load_idempotent(df: pd.DataFrame, partition_date: str, engine) -> None:
with engine.begin() as conn:
conn.execute(f"DELETE FROM orders WHERE order_date = '{partition_date}'")
df.to_sql("orders", conn, if_exists="append", index=False)
Backfill historical data by looping over date partitions with Airflow’s catchup=True or manual triggers.
Monitoring and Alerting
| Check | Tool |
|---|---|
| Pipeline success/failure | Airflow email/Slack alerts |
| Row count anomalies | Great Expectations |
| Freshness | Custom SLA sensors in Airflow |
| Cost | Warehouse query monitoring |
# Great Expectations example
import great_expectations as gx
context = gx.get_context()
validator = context.sources.pandas_default.read_dataframe(df)
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_table_row_count_to_be_between(1000, 1_000_000)
results = validator.validate()
Production Checklist
- Incremental loads with watermarks
- Data validation before warehouse load
- Idempotent writes per partition
- Secrets in environment variables, not code
- Parquet/Avro for storage efficiency
- Orchestrator with retry and alerting
- Documentation of schema and lineage
- Backfill procedure tested
Reliable data engineering is about contracts: defined schemas, validated transforms, and observable pipelines that teams can trust.