Data engineering moves, transforms, and validates data so analysts and ML models can trust it. Python is the lingua franca of data pipelines — from cron scripts to enterprise orchestration with Airflow and Spark.

ETL vs ELT

Approach Flow When to use
ETL Extract → Transform → Load Legacy warehouses, strict schemas
ELT Extract → Load → Transform Cloud warehouses (Snowflake, BigQuery)

Modern cloud warehouses prefer ELT — load raw data first, transform with SQL (dbt) or Python in the warehouse.

Pipeline Architecture

  Sources (APIs, DBs, files)
        ↓
   Extract (Python scripts)
        ↓
   Staging (S3 / GCS raw zone)
        ↓
   Transform (pandas / Spark / SQL)
        ↓
   Data Warehouse (Snowflake, BigQuery, Redshift)
        ↓
   Analytics / ML / Dashboards
  

Extract: APIs and Databases

  import requests
import pandas as pd
from sqlalchemy import create_engine

def extract_from_api(url: str, params: dict) -> pd.DataFrame:
    response = requests.get(url, params=params, timeout=30)
    response.raise_for_status()
    return pd.DataFrame(response.json()["results"])

def extract_from_db(query: str, connection_string: str) -> pd.DataFrame:
    engine = create_engine(connection_string)
    return pd.read_sql(query, engine)
  

Use incremental extraction with watermarks:

  last_sync = get_watermark("orders")
query = f"SELECT * FROM orders WHERE updated_at > '{last_sync}'"
df = extract_from_db(query, DB_URL)
save_watermark("orders", df["updated_at"].max())
  

Transform with pandas

  def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["order_date"] = pd.to_datetime(df["order_date"], utc=True)
    df["total_usd"] = df["amount"] * df["exchange_rate"]
    df = df.dropna(subset=["customer_id"])
    df = df[df["total_usd"] > 0]
    df["order_month"] = df["order_date"].dt.to_period("M")
    return df
  

Data Quality Checks

  def validate(df: pd.DataFrame) -> None:
    assert df["customer_id"].notna().all(), "Null customer IDs"
    assert df["total_usd"].ge(0).all(), "Negative amounts"
    assert len(df) > 0, "Empty batch"
    duplicate_ids = df["order_id"].duplicated().sum()
    if duplicate_ids > 0:
        raise ValueError(f"{duplicate_ids} duplicate order IDs")
  

Fail fast on bad data — never silently load corrupt batches.

Load to Data Warehouse

  def load_to_warehouse(df: pd.DataFrame, table: str, engine) -> None:
    df.to_sql(
        table,
        engine,
        if_exists="append",
        index=False,
        method="multi",
        chunksize=5000,
    )
  

For large datasets, use warehouse-native loaders:

  • BigQueryload_table_from_uri from GCS Parquet
  • SnowflakeCOPY INTO from S3 stage
  • RedshiftCOPY command

Parquet is the standard interchange format:

  df.to_parquet("s3://data-lake/orders/2024/01/15/data.parquet", index=False)
  

Apache Airflow Orchestration

  # dags/daily_orders.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    "daily_orders_etl",
    default_args=default_args,
    schedule_interval="0 2 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    extract = PythonOperator(
        task_id="extract_orders",
        python_callable=extract_orders,
    )
    transform = PythonOperator(
        task_id="transform_orders",
        python_callable=transform_orders,
    )
    load = PythonOperator(
        task_id="load_orders",
        python_callable=load_orders,
    )

    extract >> transform >> load
  

Airflow manages scheduling, retries, dependencies, and monitoring. Run on managed services (MWAA, Cloud Composer) or self-hosted.

Streaming with Apache Kafka

For real-time pipelines:

  from confluent_kafka import Consumer, Producer
import json

consumer = Consumer({
    "bootstrap.servers": "kafka:9092",
    "group.id": "order-processor",
    "auto.offset.reset": "earliest",
})

consumer.subscribe(["orders.raw"])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    event = json.loads(msg.value())
    enriched = enrich_order(event)
    producer.produce("orders.enriched", json.dumps(enriched).encode())
    consumer.commit()
  

Pair Kafka with Kafka Connect (source/sink connectors) or Faust for stream processing in Python.

Data Lake Zones

  s3://company-data/
├── raw/          # Immutable source copies
├── staging/      # Cleaned, typed data
├── curated/      # Business-ready tables
└── analytics/    # Aggregated metrics
  

Apply lifecycle policies: raw data retained 90 days, curated indefinitely.

Schema Management

Use explicit schemas to catch drift:

  import pandera as pa

schema = pa.DataFrameSchema({
    "order_id": pa.Column(int, unique=True),
    "customer_id": pa.Column(int, nullable=False),
    "total_usd": pa.Column(float, pa.Check.ge(0)),
    "order_date": pa.Column(pa.DateTime, nullable=False),
})

validated = schema.validate(df)
  

For warehouses, manage DDL with dbt models and version-controlled SQL.

Idempotency and Backfills

Pipelines must be safely re-runnable:

  def load_idempotent(df: pd.DataFrame, partition_date: str, engine) -> None:
    with engine.begin() as conn:
        conn.execute(f"DELETE FROM orders WHERE order_date = '{partition_date}'")
        df.to_sql("orders", conn, if_exists="append", index=False)
  

Backfill historical data by looping over date partitions with Airflow’s catchup=True or manual triggers.

Monitoring and Alerting

Check Tool
Pipeline success/failure Airflow email/Slack alerts
Row count anomalies Great Expectations
Freshness Custom SLA sensors in Airflow
Cost Warehouse query monitoring
  # Great Expectations example
import great_expectations as gx

context = gx.get_context()
validator = context.sources.pandas_default.read_dataframe(df)
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_table_row_count_to_be_between(1000, 1_000_000)
results = validator.validate()
  

Production Checklist

  • Incremental loads with watermarks
  • Data validation before warehouse load
  • Idempotent writes per partition
  • Secrets in environment variables, not code
  • Parquet/Avro for storage efficiency
  • Orchestrator with retry and alerting
  • Documentation of schema and lineage
  • Backfill procedure tested

Reliable data engineering is about contracts: defined schemas, validated transforms, and observable pipelines that teams can trust.