The Silver layer is the cleansed, deduplicated, and conformed data tier of the Medallion Architecture. It sits as a critical bridge between the raw Bronze layer and the business-ready Gold layer. While Bronze contains the unmodified data dump from source systems, Silver applies enterprise data governance through schema enforcement, data quality rules, and business logic standardization.
The Silver layer embodies several key concepts:
Silver tables are typically immutable once written and versioned through Delta Lake time-travel, enabling reproducible analytics and regulatory compliance. A mature Silver layer reduces data work downstream and provides a reliable foundation for 80% of analytical queries.
Unlike Bronze's flexible schema, Silver enforces a strict schema at write-time using Delta Lake's schema validation. This prevents silent data corruption and catches integration issues early.
from delta.tables import DeltaTable
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# Define explicit Silver schema
silver_schema = StructType([
StructField("customer_id", StringType(), False), # NOT NULL
StructField("customer_name", StringType(), False),
StructField("email", StringType(), True),
StructField("phone", StringType(), True),
StructField("address_line_1", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("zip_code", StringType(), True),
StructField("created_at", TimestampType(), False),
StructField("updated_at", TimestampType(), False),
StructField("_bronze_loaded_at", TimestampType(), False),
])
# Write with schema enforcement
cleaned_df = bronze_df.select(
F.col("id").cast("string").alias("customer_id"),
F.col("name").alias("customer_name"),
F.col("email"),
F.col("phone"),
F.col("address1").alias("address_line_1"),
F.col("city"),
F.col("state"),
F.col("postal_code").alias("zip_code"),
F.to_timestamp(F.col("created")).alias("created_at"),
F.current_timestamp().alias("updated_at"),
F.current_timestamp().alias("_bronze_loaded_at"),
)
# Enforce schema — will reject any column mismatch
cleaned_df.write \
.format("delta") \
.mode("overwrite") \
.schema(silver_schema) \
.option("overwriteSchema", "false") \
.save("/mnt/data/silver/dim_customer")
Every column in Silver should have an explicit, non-negotiable type. Nullable columns are marked explicitly and validated at query time.
# Define clear type constraints with null validation
silver_df = bronze_df.select(
F.col("cust_id").cast("string").alias("customer_id"),
F.col("order_amount").cast("decimal(18,2)").alias("order_amount"),
F.col("order_date").cast("date").alias("order_date"),
F.coalesce(F.col("discount_pct"), F.lit(0)).cast("decimal(5,2)").alias("discount_percentage"),
)
# Validate no nulls in business keys
assert silver_df.filter(F.col("customer_id").isNull()).count() == 0, "NULL customer_ids found"
assert silver_df.filter(F.col("order_amount").isNull()).count() == 0, "NULL order amounts found"
The business key is the natural, immutable identifier for an entity. Unlike surrogate keys, business keys are meaningful and stable across systems. They're essential for deduplication and SCD implementations.
# Examples of business keys across domains
business_keys = {
"dim_customer": ["customer_id"], # Single attribute
"fact_order": ["order_id"],
"dim_product": ["sku"],
"fact_daily_inventory": ["warehouse_id", "product_sku", "date"], # Composite key
"fact_sales_transaction": ["store_id", "register_id", "transaction_timestamp"],
}
# Use business keys to identify and remove duplicates
window_spec = Window.partitionBy("customer_id").orderBy(F.desc("_bronze_loaded_at"))
dedup_df = silver_df.withColumn(
"row_num",
F.row_number().over(window_spec)
).filter(F.col("row_num") == 1).drop("row_num")
Null values must be handled consistently. Options include: forward-fill (coalesce with previous value), backward-fill, constant replacement, or marking records as quarantined.
from pyspark.sql import Window
# Strategy 1: Coalesce with fallback values
cleansed_df = bronze_df.select(
F.col("customer_id"),
F.coalesce(F.col("email"), F.lit("no-email@unknown.com")).alias("email"),
F.coalesce(F.col("phone"), F.lit("")).alias("phone"),
F.col("created_at"),
)
# Strategy 2: Forward-fill (use last known value)
window_spec = Window.partitionBy("customer_id").orderBy("created_at").rowsBetween(
Window.unboundedPreceding, -1
)
filled_df = bronze_df.withColumn(
"phone_filled",
F.last(F.col("phone"), ignorenulls=True).over(window_spec)
)
# Strategy 3: fillna with domain-specific defaults
domain_defaults = {
"email": "unknown@example.com",
"phone": "000-000-0000",
"address": "N/A",
"discount_pct": 0.0,
}
filled_df = bronze_df.fillna(domain_defaults)
# Strategy 4: Identify records with critical nulls and quarantine
critical_nulls = bronze_df.filter(
F.col("customer_id").isNull() |
F.col("created_at").isNull()
)
valid_df = bronze_df.filter(
F.col("customer_id").isNotNull() &
F.col("created_at").isNotNull()
)
# Log quarantined records
if critical_nulls.count() > 0:
critical_nulls.write \
.format("delta") \
.mode("append") \
.save("/mnt/data/quarantine/dim_customer_nulls")
Cast columns to their target types with validation. Use try_cast patterns to gracefully handle type failures.
import pyspark.sql.functions as F
from pyspark.sql.types import DecimalType
# Safe type casting with validation
silver_df = bronze_df.select(
F.col("customer_id").cast("string").alias("customer_id"),
F.col("order_id").cast("long").alias("order_id"),
F.col("order_amount").cast(DecimalType(18, 2)).alias("order_amount"),
F.col("order_date").cast("date").alias("order_date"),
F.col("created_at").cast("timestamp").alias("created_at"),
)
# Identify casting failures
def safe_cast_to_decimal(col_name, precision=18, scale=2):
"""Returns column cast to decimal with validation."""
return F.try_cast(F.col(col_name), DecimalType(precision, scale))
result_df = bronze_df.select(
F.col("order_id"),
safe_cast_to_decimal("order_amount"),
safe_cast_to_decimal("tax_amount"),
)
# Log records with casting failures
cast_failures = result_df.filter(
F.col("order_amount").isNull() | F.col("tax_amount").isNull()
)
if cast_failures.count() > 0:
cast_failures.write \
.format("delta") \
.mode("append") \
.save("/mnt/data/quarantine/casting_failures")
Apply consistent transformations to text fields: trimming, case normalization, regex cleaning, and special character handling.
import re
from pyspark.sql.functions import col, trim, lower, upper, regexp_replace, when
# Normalize customer names: trim, title case
normalized_df = bronze_df.select(
F.col("customer_id"),
F.initcap(F.trim(F.col("customer_name"))).alias("customer_name"),
F.lower(F.trim(F.col("email"))).alias("email"),
)
# Remove special characters and multiple spaces from addresses
normalized_df = bronze_df.select(
F.col("customer_id"),
F.regexp_replace(
F.col("address"),
r"[^\w\s\-\.,#]", # Keep alphanumeric, space, dash, period, comma, hash
""
).alias("address_clean"),
)
# Normalize phone numbers: extract digits only
normalized_df = bronze_df.select(
F.col("customer_id"),
F.regexp_replace(F.col("phone"), r"[^\d]", "").alias("phone_digits_only"),
)
# Define a UDF for complex string transformations
def normalize_company_name(name):
"""Remove common suffixes and standardize."""
if name is None:
return None
name = name.strip().upper()
name = re.sub(r"\s+(INC|LLC|LTD|CORP|CO)\.?$", "", name)
name = re.sub(r"\s+", " ", name) # Collapse multiple spaces
return name
normalize_udf = F.udf(normalize_company_name, StringType())
normalized_df = bronze_df.select(
F.col("customer_id"),
normalize_udf(F.col("company_name")).alias("company_name_normalized"),
)
Parse dates from multiple formats and standardize to UTC timestamps with timezone awareness.
from datetime import datetime
from pyspark.sql import functions as F
# Handle multiple date formats
date_patterns = [
"yyyy-MM-dd",
"MM/dd/yyyy",
"dd-MMM-yyyy",
"yyyy-MM-dd'T'HH:mm:ss",
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
]
# Try each format sequentially
parsed_date = F.col("date_string")
for pattern in date_patterns:
parsed_date = F.coalesce(
F.to_date(parsed_date, pattern),
parsed_date
)
date_standardized_df = bronze_df.select(
F.col("transaction_id"),
F.to_timestamp(
F.col("created_at"),
"yyyy-MM-dd'T'HH:mm:ss"
).alias("created_at_utc"),
)
# Convert local time to UTC with timezone handling
from_tz = "America/New_York" # Source timezone
to_tz = "UTC"
tz_converted_df = date_standardized_df.select(
F.col("transaction_id"),
F.from_utc_timestamp(
F.to_utc_timestamp(F.col("created_at_utc"), from_tz),
to_tz
).alias("created_at_utc_normalized"),
)
# Extract date components for partitioning and aggregation
enriched_df = tz_converted_df.select(
"*",
F.to_date(F.col("created_at_utc_normalized")).alias("created_date"),
F.year(F.col("created_at_utc_normalized")).alias("created_year"),
F.month(F.col("created_at_utc_normalized")).alias("created_month"),
F.dayofweek(F.col("created_at_utc_normalized")).alias("created_dayofweek"),
)
Use window functions to identify and remove duplicates based on business keys, keeping the most recent or highest-priority record.
from pyspark.sql import Window
from pyspark.sql.functions import row_number, dense_rank, rank
# Simple deduplication: keep the latest record per customer_id
window_spec = Window.partitionBy("customer_id").orderBy(F.desc("_bronze_loaded_at"))
dedup_df = bronze_df.withColumn(
"row_num",
F.row_number().over(window_spec)
).filter(F.col("row_num") == 1).drop("row_num")
# Composite key deduplication with precedence
window_spec = Window.partitionBy("customer_id", "account_id").orderBy(
F.desc("data_quality_score"), # Prefer high-quality records
F.desc("_bronze_loaded_at"), # Then most recent
)
dedup_df = bronze_df.withColumn(
"rn",
F.row_number().over(window_spec)
).filter(F.col("rn") == 1).drop("rn")
# Rank duplicates and log lower-rank versions
window_spec = Window.partitionBy("customer_id").orderBy(F.desc("_bronze_loaded_at"))
ranked_df = bronze_df.withColumn(
"dup_rank",
F.row_number().over(window_spec)
)
# Keep top 1, quarantine the rest
valid_df = ranked_df.filter(F.col("dup_rank") == 1).drop("dup_rank")
duplicate_records = ranked_df.filter(F.col("dup_rank") > 1)
duplicate_records.write \
.format("delta") \
.mode("append") \
.save("/mnt/data/quarantine/dim_customer_duplicates")
The simplest approach: remove rows that are identical across all columns, or specific business key columns.
# Remove complete row duplicates (all columns must match)
dedup_df = bronze_df.dropDuplicates()
# Remove duplicates based on specific columns (business keys)
dedup_df = bronze_df.dropDuplicates(["customer_id", "account_id"])
# dropDuplicates keeps the first occurrence (non-deterministic in distributed context)
# For deterministic behavior, prefer window functions
Handle near-duplicates using fuzzy matching on strings and allowing small numeric differences.
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, CountVectorizer
from difflib import SequenceMatcher
# Simple fuzzy match using string similarity
def string_similarity(s1, s2):
"""Return similarity ratio between 0 and 1."""
if s1 is None or s2 is None:
return 0.0
return SequenceMatcher(None, s1, s2).ratio()
string_sim_udf = F.udf(string_similarity, "double")
# Find near-duplicate names
customer_df = bronze_df.select("customer_id", "customer_name")
# Self-join to find similar names
similar_df = customer_df.alias("a").join(
customer_df.alias("b"),
F.col("a.customer_id") < F.col("b.customer_id")
).select(
F.col("a.customer_id").alias("customer_id_a"),
F.col("a.customer_name").alias("name_a"),
F.col("b.customer_id").alias("customer_id_b"),
F.col("b.customer_name").alias("name_b"),
string_sim_udf(
F.lower(F.col("a.customer_name")),
F.lower(F.col("b.customer_name"))
).alias("similarity_score"),
)
# Filter for high-similarity pairs (>90%)
high_similarity = similar_df.filter(F.col("similarity_score") > 0.9)
# Flag and quarantine fuzzy duplicates
high_similarity.write \
.format("delta") \
.mode("append") \
.save("/mnt/data/quarantine/fuzzy_duplicates")
# Apply Levenshtein distance for more robust fuzzy matching
# Requires: pip install pyspark-distance
# from pyspark.mllib.common import callMLlibFunc
# Alternative: Use Metaphone or Soundex for phonetic matching
def soundex(name):
"""Generate Soundex code for fuzzy phonetic matching."""
if not name:
return None
name = name.upper()
first_letter = name[0]
name = name[1:]
mapping = {
'B': '1', 'F': '1', 'P': '1', 'V': '1',
'C': '2', 'G': '2', 'J': '2', 'K': '2', 'Q': '2', 'S': '2', 'X': '2', 'Z': '2',
'D': '3', 'T': '3',
'L': '4',
'M': '5', 'N': '5',
'R': '6',
}
code = first_letter
prev_code = mapping.get(first_letter, '0')
for char in name:
digit = mapping.get(char, '0')
if digit != '0' and digit != prev_code:
code += digit
if len(code) == 4:
break
prev_code = digit
return code.ljust(4, '0')
soundex_udf = F.udf(soundex, "string")
phonetic_df = bronze_df.select(
F.col("customer_id"),
F.col("customer_name"),
soundex_udf(F.col("customer_name")).alias("name_soundex"),
)
Use Delta Lake MERGE for idempotent, incremental updates with deduplication logic.
from delta.tables import DeltaTable
# Create or reference Silver table
silver_path = "/mnt/data/silver/dim_customer"
silver_table = DeltaTable.forPath(spark, silver_path)
# New records from Bronze (deduplicated)
new_records = bronze_df.dropDuplicates(["customer_id"])
# MERGE: update if exists, insert if new
silver_table.alias("target") \
.merge(
new_records.alias("source"),
"target.customer_id = source.customer_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# With explicit deduplication priority (keep most recent)
from pyspark.sql import Window
window_spec = Window.partitionBy("customer_id").orderBy(F.desc("_bronze_loaded_at"))
dedup_new_records = new_records.withColumn(
"rn", F.row_number().over(window_spec)
).filter(F.col("rn") == 1).drop("rn")
silver_table.alias("target") \
.merge(
dedup_new_records.alias("source"),
"target.customer_id = source.customer_id"
) \
.whenMatchedUpdate(
set={
"customer_name": "source.customer_name",
"email": "source.email",
"updated_at": F.current_timestamp(),
}
) \
.whenNotMatchedInsert(
values={
"customer_id": "source.customer_id",
"customer_name": "source.customer_name",
"email": "source.email",
"created_at": F.current_timestamp(),
"updated_at": F.current_timestamp(),
}
) \
.execute()
For streaming Silver tables, use watermarks to allow late-arriving updates while preventing unbounded state.
from pyspark.sql import functions as F
# Streaming Silver with watermark and deduplication
streaming_silver_df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/mnt/data/bronze/customer_events") \
.select("customer_id", "event_timestamp", "email", "phone")
# Add watermark to allow late-arriving updates (up to 1 hour late)
watermarked_df = streaming_silver_df.withWatermark("event_timestamp", "1 hour")
# Deduplication with watermark: keep latest within grace period
dedup_streaming = watermarked_df.dropDuplicates(
["customer_id"],
within_watermark=True # Only deduplicate within watermark window
)
# For more control, use foreachBatch with state
def upsert_to_silver(batch_df, batch_id):
"""Called for each micro-batch."""
from delta.tables import DeltaTable
silver_table = DeltaTable.forPath(spark, "/mnt/data/silver/dim_customer")
# Deduplicate within batch
window_spec = Window.partitionBy("customer_id").orderBy(F.desc("event_timestamp"))
batch_dedup = batch_df.withColumn(
"rn", F.row_number().over(window_spec)
).filter(F.col("rn") == 1).drop("rn")
# Merge into Silver
silver_table.alias("target") \
.merge(
batch_dedup.alias("source"),
"target.customer_id = source.customer_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
streaming_silver_df.writeStream \
.foreachBatch(upsert_to_silver) \
.option("checkpointLocation", "/mnt/data/.checkpoints/silver_customer") \
.start()
Delta Lake validates incoming data against the stored schema. By default, extra columns are rejected and missing columns cause errors.
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
# Define strict schema
silver_schema = StructType([
StructField("customer_id", StringType(), False),
StructField("customer_name", StringType(), False),
StructField("email", StringType(), True),
StructField("created_at", TimestampType(), False),
])
# Write with strict schema enforcement (default)
df.write \
.format("delta") \
.mode("overwrite") \
.schema(silver_schema) \
.option("overwriteSchema", "false") \
.save("/mnt/data/silver/dim_customer")
# Attempt to write with extra column (will fail)
df_with_extra = df.withColumn("extra_col", F.lit("value"))
try:
df_with_extra.write \
.format("delta") \
.mode("append") \
.save("/mnt/data/silver/dim_customer")
except Exception as e:
print(f"Schema mismatch: {e}")
# Schema violation caught at write-time
Two strategies for schema changes: mergeSchema adds columns, overwriteSchema replaces entirely.
# mergeSchema: Add new columns, keep existing ones
df_with_new_col = bronze_df.withColumn("phone_normalized", F.regexp_replace(F.col("phone"), r"[^\d]", ""))
df_with_new_col.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/mnt/data/silver/dim_customer")
# overwriteSchema: Replace entire schema (destructive)
df_redesigned = bronze_df.select("customer_id", "customer_name", "created_at", "updated_at")
df_redesigned.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("/mnt/data/silver/dim_customer")
# Preferred: Schema evolution via column renaming and type changes
from pyspark.sql.functions import col, cast
# Rename and retype a column
evolved_df = bronze_df \
.drop("old_email") \
.withColumn("email_primary", col("email_address").cast("string"))
evolved_df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/mnt/data/silver/dim_customer")
Column mapping allows renaming without schema breaking changes. Track both physical and logical names.
# Enable column mapping for safe refactoring
spark.sql("""
ALTER TABLE delta.`/mnt/data/silver/dim_customer`
SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')
""")
# Now safely rename columns
spark.sql("""
ALTER TABLE delta.`/mnt/data/silver/dim_customer`
RENAME COLUMN old_customer_id TO customer_id
""")
# Drop columns safely (mark as deprecated first)
spark.sql("""
ALTER TABLE delta.`/mnt/data/silver/dim_customer`
SET TBLPROPERTIES (
'delta.deprecatedColumns' = 'legacy_phone_type,legacy_address_2'
)
""")
# Type upgrades (backward-compatible)
spark.sql("""
ALTER TABLE delta.`/mnt/data/silver/dim_customer`
MODIFY COLUMN discount_pct DECIMAL(5,3) -- Increase precision
""")
# Breaking changes: use OVERWRITE with migration window
# 1. Create new table with new schema
spark.sql("""
CREATE TABLE silver.dim_customer_v2
USING DELTA
AS
SELECT
customer_id,
customer_name,
CAST(discount_pct AS DECIMAL(5,3)) AS discount_pct,
created_at
FROM silver.dim_customer
""")
# 2. Run shadow queries to validate
# 3. Switch over (update downstream queries)
# 4. Drop old table after cutover window
spark.sql("DROP TABLE silver.dim_customer")
spark.sql("ALTER TABLE silver.dim_customer_v2 RENAME TO dim_customer")
SCD Type 2 tracks the full history of dimensional changes with effective/end dates and current flags.
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from datetime import datetime
# Create SCD Type 2 Silver dimension with history
silver_table_path = "/mnt/data/silver/dim_customer_scd2"
# Initialize table with schema
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver.dim_customer_scd2 (
customer_id STRING NOT NULL,
customer_name STRING,
email STRING,
phone STRING,
address STRING,
effective_date DATE NOT NULL,
end_date DATE,
is_current BOOLEAN DEFAULT true,
_scd_version INT,
_inserted_at TIMESTAMP,
_updated_at TIMESTAMP
)
USING DELTA
""")
# New records from Bronze
new_records = spark.sql("""
SELECT DISTINCT
customer_id,
customer_name,
email,
phone,
address,
CURRENT_DATE() as effective_date
FROM bronze.dim_customer
WHERE customer_id IS NOT NULL
""")
# SCD Type 2 MERGE logic
silver_table = DeltaTable.forPath(spark, silver_table_path)
silver_table.alias("target") \
.merge(
new_records.alias("source"),
"""
target.customer_id = source.customer_id
AND target.is_current = true
"""
) \
.whenMatchedUpdate(
condition="""
(target.customer_name != source.customer_name
OR target.email != source.email
OR target.phone != source.phone
OR target.address != source.address)
OR (target.customer_name IS NULL AND source.customer_name IS NOT NULL)
""",
set={
"end_date": F.current_date() - F.expr("INTERVAL 1 DAY"),
"is_current": F.lit(False),
"_updated_at": F.current_timestamp(),
}
) \
.whenNotMatchedInsert(
values={
"customer_id": "source.customer_id",
"customer_name": "source.customer_name",
"email": "source.email",
"phone": "source.phone",
"address": "source.address",
"effective_date": "source.effective_date",
"end_date": F.lit(None),
"is_current": F.lit(True),
"_scd_version": F.lit(1),
"_inserted_at": F.current_timestamp(),
"_updated_at": F.current_timestamp(),
}
) \
.execute()
# Insert new versions for changed records
changes_df = silver_table.alias("target") \
.merge(
new_records.alias("source"),
"target.customer_id = source.customer_id"
) \
.toDF()
# Capture what changed and insert new versions
changed_records = spark.sql("""
SELECT
source.customer_id,
source.customer_name,
source.email,
source.phone,
source.address,
source.effective_date,
NULL as end_date,
true as is_current,
MAX(target._scd_version) + 1 as _scd_version,
CURRENT_TIMESTAMP() as _inserted_at,
CURRENT_TIMESTAMP() as _updated_at
FROM new_records source
JOIN (SELECT DISTINCT * FROM silver.dim_customer_scd2) target
ON source.customer_id = target.customer_id
AND target.is_current = true
WHERE (
target.customer_name != source.customer_name
OR target.email != source.email
OR target.phone != source.phone
OR target.address != source.address
)
GROUP BY source.customer_id, source.customer_name, source.email,
source.phone, source.address, source.effective_date
""")
changed_records.write \
.format("delta") \
.mode("append") \
.save(silver_table_path)
Pure SQL approach for SCD Type 2 using WITH statements and CTEs.
-- SCD Type 2: Mark old records as expired and insert new ones
WITH new_data AS (
SELECT DISTINCT
customer_id,
customer_name,
email,
phone,
address,
CURRENT_DATE() AS effective_date
FROM bronze.dim_customer
),
expiring_records AS (
SELECT
target.customer_key,
target.customer_id,
CURRENT_DATE() - INTERVAL 1 DAY AS end_date
FROM silver.dim_customer_scd2 target
INNER JOIN new_data source
ON target.customer_id = source.customer_id
WHERE target.is_current = true
AND (
target.customer_name != source.customer_name
OR target.email != source.email
OR target.phone != source.phone
OR target.address != source.address
)
),
new_records AS (
SELECT
source.customer_id,
source.customer_name,
source.email,
source.phone,
source.address,
source.effective_date,
NULL AS end_date,
true AS is_current,
COALESCE(MAX(target._scd_version), 0) + 1 AS _scd_version,
CURRENT_TIMESTAMP() AS _inserted_at,
CURRENT_TIMESTAMP() AS _updated_at
FROM new_data source
LEFT JOIN silver.dim_customer_scd2 target
ON source.customer_id = target.customer_id
GROUP BY source.customer_id, source.customer_name, source.email,
source.phone, source.address, source.effective_date
)
MERGE INTO silver.dim_customer_scd2 target
USING (
SELECT customer_key, customer_id, end_date FROM expiring_records
) source
ON target.customer_key = source.customer_key
WHEN MATCHED THEN
UPDATE SET
end_date = source.end_date,
is_current = false,
_updated_at = CURRENT_TIMESTAMP()
;
-- Insert new versions
INSERT INTO silver.dim_customer_scd2
SELECT
customer_id,
customer_name,
email,
phone,
address,
effective_date,
end_date,
is_current,
_scd_version,
_inserted_at,
_updated_at
FROM new_records;
Handle updates that arrive out of order with effective date in the past, requiring version reordering.
from pyspark.sql import functions as F
from pyspark.sql import Window
# Scenario: Receive update with effective_date in the past
late_update = spark.createDataFrame([
("CUST-001", "John Smith", "john.smith@example.com", "2026-02-15")
], ["customer_id", "customer_name", "email", "effective_date"])
# Find if this update falls between existing versions
late_update_scd = late_update.alias("new") \
.join(
spark.table("silver.dim_customer_scd2").alias("target"),
(F.col("new.customer_id") == F.col("target.customer_id"))
& (F.col("new.effective_date").between(
F.col("target.effective_date"),
F.coalesce(F.col("target.end_date"), F.current_date())
)),
"left"
)
# Case 1: Update falls within existing version (replace it)
# Case 2: Update is newer (standard SCD Type 2)
# Case 3: Update is between versions (splice history)
# Splice approach: reorder versions with new effective dates
def handle_late_update(customer_id, new_effective_date, new_data):
"""Rewrite SCD Type 2 history with late update."""
current_history = spark.sql(f"""
SELECT * FROM silver.dim_customer_scd2
WHERE customer_id = '{customer_id}'
ORDER BY effective_date
""")
# Find insertion point
history_list = current_history.collect()
insert_idx = next(
(i for i, row in enumerate(history_list)
if F.col(row.effective_date) > F.col(new_effective_date)),
len(history_list)
)
# Rebuild with new record inserted
# Close any open-ended version that now has a successor
if insert_idx < len(history_list):
history_list[insert_idx].end_date = new_effective_date - F.expr("INTERVAL 1 DAY")
return history_list
Use Great Expectations for comprehensive data validation with metrics logging and failure handling.
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
import great_expectations as ge
# Load and validate Silver data
silver_df = spark.read.format("delta").load("/mnt/data/silver/dim_customer")
silver_ge = SparkDFDataset(silver_df)
# Define expectations
expectations = [
# Column presence
("expect_table_columns_to_match_ordered_list", {
"column_list": ["customer_id", "customer_name", "email", "created_at"]
}),
# Non-null expectations
("expect_column_values_to_not_be_null", {
"column": "customer_id"
}),
("expect_column_values_to_not_be_null", {
"column": "customer_name"
}),
# Value range expectations
("expect_column_values_to_be_between", {
"column": "discount_pct",
"min_value": 0.0,
"max_value": 1.0
}),
# Pattern matching
("expect_column_values_to_match_regex", {
"column": "email",
"regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
}),
# Uniqueness
("expect_column_values_to_be_unique", {
"column": "customer_id"
}),
# Custom metric
("expect_table_row_count_to_be_between", {
"min_value": 1000,
"max_value": 10000000
}),
]
# Run expectations
results = []
for expectation_name, kwargs in expectations:
try:
result = getattr(silver_ge, expectation_name)(**kwargs)
results.append({
"expectation": expectation_name,
"passed": result.success,
"result": result.result if hasattr(result, "result") else None,
})
except Exception as e:
results.append({
"expectation": expectation_name,
"passed": False,
"error": str(e),
})
# Log results
results_df = spark.createDataFrame(results)
results_df.write \
.format("delta") \
.mode("append") \
.save("/mnt/data/silver/_quality_checks")
# Fail pipeline if critical expectations fail
critical_failures = [r for r in results if not r["passed"] and r["expectation"] in [
"expect_column_values_to_not_be_null",
"expect_column_values_to_be_unique",
]]
if critical_failures:
raise Exception(f"Data quality validation failed: {critical_failures}")
Use Databricks Lakehouse IQ DLT expectations for declarative quality rules.
import dlt
from pyspark.sql import functions as F
# Enable DLT expectations
dlt.create_table(
name="dim_customer",
comment="Customer dimension with SCD Type 2",
path="/mnt/data/silver/dim_customer",
)
@dlt.table
@dlt.expect("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect("customer_id_length", "LENGTH(customer_id) <= 50")
@dlt.expect("email_format", "email RLIKE '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$' OR email IS NULL")
@dlt.expect_or_drop("positive_discount", "discount_pct >= 0 AND discount_pct <= 1")
@dlt.expect_or_fail("created_date_valid", "created_at <= CURRENT_TIMESTAMP()")
def dim_customer():
"""Cleansed customer dimension."""
return (
spark.read.format("delta").load("/mnt/data/bronze/customer")
.select(
F.col("id").cast("string").alias("customer_id"),
F.initcap(F.trim(F.col("name"))).alias("customer_name"),
F.lower(F.trim(F.col("email"))).alias("email"),
F.col("discount").cast("decimal(5,2)").alias("discount_pct"),
F.col("created_at").cast("timestamp").alias("created_at"),
)
)
# Expectations can also be defined in a separate quality dataset
@dlt.quality_metrics("dim_customer")
def quality_checks():
return spark.sql("""
SELECT
COUNT(*) as total_records,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(CASE WHEN email IS NULL THEN 1 ELSE 0 END) as null_emails,
MIN(created_at) as earliest_created,
MAX(created_at) as latest_created
FROM silver.dim_customer
""")
Build a reusable quality check framework with metrics logging and quarantine routing.
from pyspark.sql import functions as F
from datetime import datetime
import json
class SilverQualityValidator:
"""Reusable Silver layer quality validation framework."""
def __init__(self, spark_session, table_name, table_path):
self.spark = spark_session
self.table_name = table_name
self.table_path = table_path
self.checks_run = []
self.quarantine_records = None
def check_null_columns(self, column_list, severity="ERROR"):
"""Validate specified columns are not null."""
df = self.spark.read.format("delta").load(self.table_path)
for col in column_list:
null_count = df.filter(F.col(col).isNull()).count()
total_count = df.count()
null_pct = (null_count / total_count * 100) if total_count > 0 else 0
passed = null_count == 0
self.checks_run.append({
"check_name": f"null_check_{col}",
"table_name": self.table_name,
"severity": severity,
"passed": passed,
"null_count": null_count,
"null_percentage": null_pct,
"check_timestamp": datetime.utcnow().isoformat(),
})
if not passed and severity == "ERROR":
quarantine_df = df.filter(F.col(col).isNull())
self._quarantine_records(quarantine_df, f"null_{col}")
def check_value_range(self, column, min_val, max_val, severity="ERROR"):
"""Validate column values are within range."""
df = self.spark.read.format("delta").load(self.table_path)
out_of_range = df.filter(
(F.col(column) < min_val) | (F.col(column) > max_val)
)
out_of_range_count = out_of_range.count()
total_count = df.count()
out_of_range_pct = (out_of_range_count / total_count * 100) if total_count > 0 else 0
passed = out_of_range_count == 0
self.checks_run.append({
"check_name": f"range_check_{column}",
"table_name": self.table_name,
"severity": severity,
"passed": passed,
"out_of_range_count": out_of_range_count,
"out_of_range_percentage": out_of_range_pct,
"expected_range": f"[{min_val}, {max_val}]",
"check_timestamp": datetime.utcnow().isoformat(),
})
if not passed and severity == "ERROR":
self._quarantine_records(out_of_range, f"range_violation_{column}")
def check_uniqueness(self, column_list, severity="ERROR"):
"""Validate uniqueness of column(s)."""
df = self.spark.read.format("delta").load(self.table_path)
total_count = df.count()
unique_count = df.select(*column_list).distinct().count()
duplicates = total_count - unique_count
passed = duplicates == 0
self.checks_run.append({
"check_name": f"uniqueness_{','.join(column_list)}",
"table_name": self.table_name,
"severity": severity,
"passed": passed,
"duplicate_count": duplicates,
"duplicate_percentage": (duplicates / total_count * 100) if total_count > 0 else 0,
"check_timestamp": datetime.utcnow().isoformat(),
})
if not passed and severity == "ERROR":
window_spec = Window.partitionBy(*column_list).orderBy(F.desc("_bronze_loaded_at"))
duplicate_df = df.withColumn(
"rn", F.row_number().over(window_spec)
).filter(F.col("rn") > 1).drop("rn")
self._quarantine_records(duplicate_df, f"duplicates_{','.join(column_list)}")
def _quarantine_records(self, df, reason):
"""Write failing records to quarantine."""
df.select("*", F.lit(reason).alias("quarantine_reason")) \
.write \
.format("delta") \
.mode("append") \
.save(f"/mnt/data/quarantine/{self.table_name}_{reason}")
def report(self):
"""Return quality check report."""
report_df = self.spark.createDataFrame(self.checks_run)
report_df.write \
.format("delta") \
.mode("append") \
.save(f"/mnt/data/silver/_quality_reports/{self.table_name}")
failed_checks = [c for c in self.checks_run if not c["passed"]]
if failed_checks:
print(f"WARNING: {len(failed_checks)} quality checks failed")
for check in failed_checks:
print(f" - {check['check_name']}: {check}")
return report_df
# Usage
validator = SilverQualityValidator(
spark,
"dim_customer",
"/mnt/data/silver/dim_customer"
)
validator.check_null_columns(["customer_id", "customer_name"])
validator.check_value_range("discount_pct", 0.0, 1.0)
validator.check_uniqueness(["customer_id"])
validator.report()
Join fact tables with dimensions at Silver layer for analytical readiness.
from pyspark.sql import functions as F
# Star schema: fact_order + dimensions
fact_order = spark.read.format("delta").load("/mnt/data/silver/fact_order")
dim_customer = spark.read.format("delta").load("/mnt/data/silver/dim_customer")
dim_product = spark.read.format("delta").load("/mnt/data/silver/dim_product")
dim_date = spark.read.format("delta").load("/mnt/data/silver/dim_date")
# Multi-dimensional join
enriched_orders = (
fact_order
.join(
dim_customer.filter(F.col("is_current") == True), # SCD Type 2: only current
on="customer_id",
how="left"
)
.join(
dim_product.filter(F.col("is_current") == True),
on="product_id",
how="left"
)
.join(
dim_date,
on=F.col("fact_order.order_date") == F.col("dim_date.date_key"),
how="left"
)
.select(
"fact_order.*",
F.col("dim_customer.customer_name"),
F.col("dim_customer.email"),
F.col("dim_product.product_name"),
F.col("dim_product.category"),
F.col("dim_product.price"),
F.col("dim_date.fiscal_year"),
F.col("dim_date.fiscal_quarter"),
)
)
enriched_orders.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("order_date") \
.save("/mnt/data/silver/fact_order_enriched")
Use broadcast hints to optimize joins when dimension tables are small enough to fit in memory.
from pyspark.sql import functions as F
# Broadcast small dimension tables
dim_country = spark.read.format("delta").load("/mnt/data/silver/dim_country") # ~300 rows
dim_category = spark.read.format("delta").load("/mnt/data/silver/dim_category") # ~100 rows
fact_order = spark.read.format("delta").load("/mnt/data/silver/fact_order")
# Explicit broadcast hint
enriched = fact_order.join(
F.broadcast(dim_country),
on="country_id",
how="left"
).join(
F.broadcast(dim_category),
on="product_category_id",
how="left"
)
# Or set Spark configuration for auto-broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50mb")
# Check what got broadcast
enriched.explain() # Shows broadcast in the physical plan
When one value dominates a join column (data skew), add salt keys to distribute processing.
from pyspark.sql import functions as F
from pyspark.sql import Window
import random
# Identify skew: count distribution
skew_analysis = fact_order.groupBy("product_id").count().orderBy(F.desc("count")).show()
# If product_id=123 has 40% of rows, salt it
salt_factor = 10 # distribute skewed value across 10 partitions
salted_fact = fact_order.withColumn(
"salt_key",
F.when(
F.col("product_id") == "123",
F.expr(f"CAST(RAND() * {salt_factor} AS INT)")
).otherwise(F.lit(0))
)
salted_dim = dim_product.withColumn(
"salt_key",
F.explode(F.array(*[F.lit(i) for i in range(salt_factor)]))
)
# Join on (product_id, salt_key)
enriched = salted_fact.join(
salted_dim,
on=["product_id", "salt_key"],
how="left"
).drop("salt_key")
enriched.write.format("delta").mode("overwrite").save("/mnt/data/silver/fact_order_enriched")
Join facts with SCD Type 2 dimensions to get the correct version at the fact's transaction date.
from pyspark.sql import functions as F
# Fact table with order_date
fact_order = spark.read.format("delta").load("/mnt/data/silver/fact_order")
# SCD Type 2 dimension
dim_customer_scd2 = spark.read.format("delta").load("/mnt/data/silver/dim_customer_scd2")
# Join to get the correct customer version at order time
correct_version = (
fact_order
.join(
dim_customer_scd2,
on=[
F.col("fact_order.customer_id") == F.col("dim_customer_scd2.customer_id"),
F.col("fact_order.order_date").between(
F.col("dim_customer_scd2.effective_date"),
F.coalesce(F.col("dim_customer_scd2.end_date"), F.current_date())
)
],
how="left"
)
.select(
"fact_order.*",
F.col("dim_customer_scd2.customer_name"),
F.col("dim_customer_scd2._scd_version"),
)
)
# Alternative: lookup current version (is_current = true)
current_version = (
fact_order
.join(
dim_customer_scd2.filter(F.col("is_current") == True),
on="customer_id",
how="left"
)
.select(
"fact_order.*",
F.col("dim_customer_scd2.customer_name"),
)
)
Use Change Data Feed to capture only modified records, enabling efficient incremental processing.
from pyspark.sql import functions as F
# Enable CDF on Bronze table (if not already enabled)
spark.sql("""
ALTER TABLE delta.`/mnt/data/bronze/customer`
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read changes since last run
cdf_df = spark.read \
.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", last_processed_version) \
.load("/mnt/data/bronze/customer")
# CDF returns: _change_type (insert, update_preimage, update_postimage, delete), _commit_version, _commit_timestamp
changes = cdf_df.select(
"_change_type",
"_commit_version",
"_commit_timestamp",
"customer_id",
"customer_name",
"email",
)
# Separate insert/update from delete
inserts_updates = changes.filter(F.col("_change_type").isin("insert", "update_postimage"))
deletes = changes.filter(F.col("_change_type") == "delete")
# Process inserts/updates
cleansed = inserts_updates.select(
F.col("customer_id").cast("string"),
F.initcap(F.trim(F.col("customer_name"))),
F.lower(F.trim(F.col("email"))),
)
# Merge into Silver
from delta.tables import DeltaTable
silver_table = DeltaTable.forPath(spark, "/mnt/data/silver/dim_customer")
silver_table.alias("target") \
.merge(
cleansed.alias("source"),
"target.customer_id = source.customer_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Handle deletes (mark as inactive or hard delete)
if deletes.count() > 0:
silver_table.alias("target") \
.merge(
deletes.select("customer_id").alias("source"),
"target.customer_id = source.customer_id"
) \
.whenMatchedUpdate(
set={"is_active": F.lit(False), "updated_at": F.current_timestamp()}
) \
.execute()
# Track last processed version
last_version = cdf_df.agg(F.max(F.col("_commit_version"))).collect()[0][0]
spark.sql(f"UPDATE _metadata SET last_cdf_version = {last_version} WHERE table_name = 'dim_customer'")
Implement complex MERGE logic with different handling per operation type.
from delta.tables import DeltaTable
from pyspark.sql import functions as F
silver_table = DeltaTable.forPath(spark, "/mnt/data/silver/fact_order")
new_orders = spark.read.format("delta").load("/mnt/data/bronze/fact_order")
# Multi-condition MERGE
(
silver_table.alias("target")
.merge(
new_orders.alias("source"),
"target.order_id = source.order_id"
)
# Update existing: only if newer and quality improved
.whenMatchedUpdate(
condition="source.order_quality_score > target.order_quality_score",
set={
"order_amount": "source.order_amount",
"order_status": "source.order_status",
"updated_at": F.current_timestamp(),
}
)
# Insert new: filter out test orders
.whenNotMatchedInsert(
condition="source.order_type != 'TEST' AND source.order_amount > 0",
values={
"order_id": "source.order_id",
"customer_id": "source.customer_id",
"order_amount": "source.order_amount",
"order_date": "source.order_date",
"created_at": F.current_timestamp(),
}
)
.execute()
)
# SQL equivalent
spark.sql("""
MERGE INTO silver.fact_order target
USING bronze.fact_order source
ON target.order_id = source.order_id
WHEN MATCHED AND source.order_quality_score > target.order_quality_score THEN
UPDATE SET
order_amount = source.order_amount,
order_status = source.order_status,
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED AND source.order_type != 'TEST' AND source.order_amount > 0 THEN
INSERT (order_id, customer_id, order_amount, order_date, created_at)
VALUES (source.order_id, source.customer_id, source.order_amount, source.order_date, CURRENT_TIMESTAMP())
""")
Process streaming Bronze data into Silver with exactly-once semantics via foreachBatch.
from pyspark.sql import functions as F
from delta.tables import DeltaTable
def upsert_to_silver(batch_df, batch_id):
"""Called for each micro-batch during streaming."""
# Cleanse and deduplicate within batch
window_spec = Window.partitionBy("customer_id").orderBy(F.desc("_bronze_loaded_at"))
cleansed_batch = (
batch_df
.select(
F.col("id").cast("string").alias("customer_id"),
F.initcap(F.trim(F.col("name"))).alias("customer_name"),
F.lower(F.trim(F.col("email"))).alias("email"),
F.col("created_at").cast("timestamp").alias("created_at"),
F.current_timestamp().alias("_silver_updated_at"),
)
.withColumn("rn", F.row_number().over(window_spec))
.filter(F.col("rn") == 1)
.drop("rn")
)
# Merge into Silver table
silver_table = DeltaTable.forPath(spark, "/mnt/data/silver/dim_customer")
silver_table.alias("target") \
.merge(
cleansed_batch.alias("source"),
"target.customer_id = source.customer_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
print(f"Batch {batch_id}: upserted {cleansed_batch.count()} records to Silver")
# Start streaming
streaming_bronze = spark.readStream \
.format("delta") \
.option("startingVersion", 0) \
.load("/mnt/data/bronze/customer_events")
streaming_bronze \
.writeStream \
.foreachBatch(upsert_to_silver) \
.option("checkpointLocation", "/mnt/data/.checkpoints/silver_customer") \
.start()
Achieve idempotency through MERGE and deduplication strategies that handle retries gracefully.
from pyspark.sql import functions as F
import uuid
# Add idempotency key to track duplicates across retries
def add_idempotency_key(df):
"""Add deterministic idempotency key (NOT random UUID)."""
# Use a hash of business key + timestamp + content
return df.withColumn(
"idempotency_key",
F.md5(
F.concat(
F.col("customer_id"),
F.col("_bronze_loaded_at"),
F.md5(F.to_json(F.struct("*"))) # Hash of entire row
)
)
)
bronze_with_key = add_idempotency_key(bronze_df)
# MERGE with idempotency: duplicate batches are safe
silver_table = DeltaTable.forPath(spark, "/mnt/data/silver/dim_customer")
silver_table.alias("target") \
.merge(
bronze_with_key.alias("source"),
"""
target.customer_id = source.customer_id
AND target.idempotency_key = source.idempotency_key
"""
) \
.whenNotMatchedInsertAll() \
.execute()
# Alternative: use natural idempotency via business key + timestamp
# If the exact same record (customer_id, timestamp) arrives twice, MERGE ignores it
# Verify exactly-once with deduplication count
dedup_check = bronze_with_key.groupBy("idempotency_key").count()
duplicates_in_batch = dedup_check.filter(F.col("count") > 1).count()
if duplicates_in_batch > 0:
print(f"WARNING: Found {duplicates_in_batch} potential duplicates in batch")
# MERGE will still be idempotent
Establish consistent naming to enable discoverability and data governance.
# Naming convention: silver_{domain}_{entity_type}_{version}
# domain: customer, order, inventory, etc.
# entity_type: dim (dimension), fact (fact table), agg (aggregate)
# version: optional, for major schema changes
table_names = {
# Dimensions
"silver_customer_dim": "Customer dimension with SCD Type 2",
"silver_product_dim": "Product dimension with catalog",
"silver_date_dim": "Date dimension with fiscal calendar",
"silver_store_dim": "Store dimension with location hierarchy",
# Facts
"silver_order_fact": "Order fact table (transactional)",
"silver_daily_sales_fact": "Daily sales aggregate",
"silver_inventory_fact": "Inventory levels by warehouse/product",
# Supporting tables
"silver_order_lineitem_fact": "Order line items (normalized)",
}
# Register tables with metadata
for table_name, description in table_names.items():
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {table_name}
USING DELTA
COMMENT '{description}'
TBLPROPERTIES (
'owner' = 'data_engineering@company.com',
'domain' = 'core',
'data_classification' = 'internal',
'retention_days' = '2555',
'refresh_frequency' = 'daily',
'sla_freshness_hours' = '24'
)
""")
# Add column-level comments
spark.sql("""
COMMENT ON COLUMN silver_customer_dim.customer_id
IS 'Business key: unique customer identifier from source system'
""")
spark.sql("""
COMMENT ON COLUMN silver_customer_dim._scd_version
IS 'SCD Type 2 version number, increments with each change'
""")
Implement role-based access control at the table and column level.
# Set table ownership
spark.sql("""
ALTER TABLE silver_customer_dim
SET OWNER 'data_engineering_team@company.com'
""")
# Grant read access to analytics team
spark.sql("""
GRANT SELECT ON TABLE silver_customer_dim
TO GROUP analytics_team@company.com
""")
# Grant write access (append only) to data pipeline service
spark.sql("""
GRANT INSERT, UPDATE ON TABLE silver_customer_dim
TO `databricks_job_service@company.com`
""")
# Deny delete access (prevent accidental deletes)
spark.sql("""
DENY DELETE ON TABLE silver_customer_dim
FROM GROUP `all_users@company.com`
""")
# Column-level access control (masking for sensitive data)
spark.sql("""
CREATE OR REPLACE FUNCTION silver_customer_dim.email_masked()
RETURNS STRING
AS 'CASE WHEN is_member(\"security/pii_viewers\") THEN email ELSE \"***@***.***\" END'
""")
# Dynamic column masking
spark.sql("""
ALTER TABLE silver_customer_dim
ALTER COLUMN email
SET MASK email_masked()
""")
Partition Silver tables for query performance and lifecycle management.
# Strategy 1: Partition by business date (most common)
spark.sql("""
CREATE TABLE silver_order_fact
USING DELTA
PARTITIONED BY (order_date)
AS
SELECT
order_id,
customer_id,
order_amount,
order_date,
created_at
FROM bronze.order_fact
""")
# Strategy 2: Partition by year/month for large tables
spark.sql("""
CREATE TABLE silver_transaction_fact
USING DELTA
PARTITIONED BY (transaction_year, transaction_month)
AS
SELECT
transaction_id,
amount,
YEAR(transaction_date) as transaction_year,
MONTH(transaction_date) as transaction_month,
transaction_date
FROM bronze.transaction_fact
""")
# Strategy 3: Partition by ingestion date for incremental loads
spark.sql("""
CREATE TABLE silver_event_fact
USING DELTA
PARTITIONED BY (_ingestion_date)
AS
SELECT
event_id,
event_type,
event_timestamp,
CAST(event_timestamp AS DATE) as _ingestion_date
FROM bronze.event_fact
""")
# Partition pruning verification
spark.sql("""
SELECT *
FROM silver_order_fact
WHERE order_date >= '2026-01-01'
""").explain() # Should show "PushedFilters: [IsNotNull(order_date), GreaterThanOrEqual(order_date, 2026-01-01)]"
Use OPTIMIZE and ZORDER to organize data for expected query patterns.
# Basic OPTIMIZE: compact small files, remove deletes
spark.sql("""
OPTIMIZE silver_customer_dim
""")
# OPTIMIZE with ZORDER: co-locate related data
# ZORDER on customer_id means queries filtering on customer_id will scan fewer files
spark.sql("""
OPTIMIZE silver_order_fact
ZORDER BY customer_id
""")
# ZORDER multiple columns: first column has most weight
spark.sql("""
OPTIMIZE silver_inventory_fact
ZORDER BY warehouse_id, product_id
""")
# Automated optimization with TBLPROPERTIES
spark.sql("""
ALTER TABLE silver_customer_dim
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# Monitor optimization performance
spark.sql("""
DESCRIBE DETAIL silver_customer_dim
""").select("numFiles", "sizeInBytes").show()
# Before OPTIMIZE: 10,000 small files, 500MB
# After OPTIMIZE + ZORDER: 50 larger files, 500MB (much faster queries)
Implement metrics tracking and alerting for data pipeline health.
from pyspark.sql import functions as F
from datetime import datetime, timedelta
class SilverMonitor:
"""Monitor Silver layer data freshness and quality."""
def __init__(self, spark_session):
self.spark = spark_session
def monitor_freshness(self, table_path, timestamp_column, sla_hours=24):
"""Check if data is within SLA freshness."""
df = self.spark.read.format("delta").load(table_path)
latest_timestamp = df.agg(F.max(F.col(timestamp_column))).collect()[0][0]
hours_behind = (datetime.utcnow() - latest_timestamp).total_seconds() / 3600
freshness_ok = hours_behind <= sla_hours
return {
"table": table_path,
"latest_data_timestamp": latest_timestamp,
"hours_behind": hours_behind,
"sla_hours": sla_hours,
"freshness_ok": freshness_ok,
"check_timestamp": datetime.utcnow().isoformat(),
}
def monitor_completeness(self, table_path, expected_row_count, tolerance_pct=10):
"""Check if table has expected row count (within tolerance)."""
df = self.spark.read.format("delta").load(table_path)
actual_count = df.count()
expected_low = expected_row_count * (1 - tolerance_pct / 100)
expected_high = expected_row_count * (1 + tolerance_pct / 100)
completeness_ok = expected_low <= actual_count <= expected_high
return {
"table": table_path,
"actual_row_count": actual_count,
"expected_row_count": expected_row_count,
"tolerance_pct": tolerance_pct,
"completeness_ok": completeness_ok,
"check_timestamp": datetime.utcnow().isoformat(),
}
def monitor_duplicates(self, table_path, key_columns):
"""Detect duplicate business keys."""
df = self.spark.read.format("delta").load(table_path)
dedup_count = df.select(*key_columns).distinct().count()
total_count = df.count()
duplicate_count = total_count - dedup_count
no_duplicates = duplicate_count == 0
return {
"table": table_path,
"key_columns": key_columns,
"total_rows": total_count,
"unique_keys": dedup_count,
"duplicate_rows": duplicate_count,
"no_duplicates": no_duplicates,
"check_timestamp": datetime.utcnow().isoformat(),
}
def report_all(self, tables_config):
"""Run all checks and generate report."""
results = []
for table_name, config in tables_config.items():
# Freshness check
freshness = self.monitor_freshness(
config["path"],
config["timestamp_col"],
config.get("sla_hours", 24)
)
results.append({"check_type": "freshness", **freshness})
# Completeness check
if "expected_row_count" in config:
completeness = self.monitor_completeness(
config["path"],
config["expected_row_count"],
config.get("tolerance_pct", 10)
)
results.append({"check_type": "completeness", **completeness})
# Duplicate check
if "key_columns" in config:
duplicates = self.monitor_duplicates(
config["path"],
config["key_columns"]
)
results.append({"check_type": "duplicates", **duplicates})
# Write to monitoring table
report_df = self.spark.createDataFrame(results)
report_df.write \
.format("delta") \
.mode("append") \
.save("/mnt/data/silver/_monitoring_metrics")
# Check for failures
failures = [r for r in results if not r.get(f"{r['check_type']}_ok", True)]
if failures:
self._send_alert(failures)
return report_df
def _send_alert(self, failures):
"""Send alert for monitoring failures."""
print(f"ALERT: {len(failures)} monitoring checks failed:")
for failure in failures:
print(f" {failure['check_type']}: {failure['table']}")
# In production: send to Slack, PagerDuty, email, etc.
# Usage
monitor = SilverMonitor(spark)
tables_config = {
"dim_customer": {
"path": "/mnt/data/silver/dim_customer",
"timestamp_col": "updated_at",
"sla_hours": 24,
"expected_row_count": 500000,
"tolerance_pct": 5,
"key_columns": ["customer_id"],
},
"fact_order": {
"path": "/mnt/data/silver/fact_order",
"timestamp_col": "created_at",
"sla_hours": 12,
"expected_row_count": 10000000,
"tolerance_pct": 10,
"key_columns": ["order_id"],
},
}
monitor.report_all(tables_config)
The Silver layer transforms raw Bronze data into a reliable, governed foundation for analytics. By implementing the design principles, cleansing strategies, deduplication patterns, and quality checks outlined here, you create a stable contract between data producers and consumers. SCD implementations, proper joins, and incremental processing patterns enable efficient downstream analytics at scale. Production patterns around naming, ownership, partitioning, and monitoring ensure the Silver layer remains maintainable and trustworthy as your data estate grows. The code patterns presented are battle-tested across enterprise data warehouses. Start with the foundational concepts—null handling, deduplication, and schema enforcement—then layer in advanced patterns like SCD Type 2 and streaming processing as your needs grow.
Silver is the conformed, cleansed, deduplicated representation of source entities — it is where raw events become well-typed business facts. The contract Silver promises is: enforced schema, no duplicates on the business key, nulls handled deliberately, and rejects routed to a quarantine table. Gold then assumes that contract and builds aggregates and dimensional models without re-doing data quality work on every consumer.
Use a window function: row_number() OVER (PARTITION BY business_key ORDER BY event_ts DESC, _ingestion_ts DESC), then filter to row_number = 1. The secondary sort on _ingestion_ts breaks ties when two events share the same event time. For incremental Silver, run this against a windowed slice of Bronze plus the current Silver state, then MERGE INTO on the business key — that keeps the operation O(delta) instead of full-table.
SCD Type 1 overwrites the old value — no history, used when only the current state matters (e.g., a typo correction). Type 2 inserts a new row with effective_from/effective_to/is_current columns, preserving every historical version — used when downstream needs point-in-time joins (e.g., what was the customer's tier when they placed this order?). Type 3 keeps a single "previous value" column alongside the current one — rarely used because it only remembers one prior state. Type 2 is the default for dimensions; Type 1 is fine for slowly drifting attributes that nobody will ask "as of when."
Define an explicit StructType (or DLT @dlt.expect_or_drop / @dlt.expect_all_or_fail) and apply it on read from Bronze; Delta enforces column types and nullability on write. Bad records should not silently disappear — route them to a quarantine table with the original payload, the failed validation rule, and a reject timestamp. That gives data engineering a feedback loop with upstream owners while keeping Silver clean for consumers.
Set mergeSchema=true on writes (or spark.databricks.delta.schema.autoMerge.enabled=true session-wide) — Delta will add the new column and backfill existing rows with NULL, transactionally. Type widening is also supported (e.g., int to long). For Silver, prefer explicit ALTER TABLE so changes are reviewed; auto-merge is fine for Bronze where you want resilience over control. Type narrowing or column drops always require an explicit ALTER TABLE ... DROP COLUMN with column mapping enabled.
The business key must be stable across source systems and meaningful to the business — e.g., the customer's verified email or a corporate customer_number, not the source database's auto-increment ID, which changes if the system is rebuilt or migrated. If multiple sources contribute to the same entity, Silver should compute a deterministic surrogate (sha2(concat(source_system, business_key), 256)) and carry both the natural key and the surrogate. Gold dimensions then carry their own integer surrogate for join performance, but the Silver-level key is what guarantees uniqueness across systems.