Data cleaning is the work that turns a Bronze layer (raw, append-only) into a Silver layer (validated, conformed, deduplicated) in a Medallion architecture. This page covers the core PySpark techniques: nulls, deduplication, schema enforcement, string normalization, outliers, dates and timezones, and validation. Each section has runnable code. See also Silver Layer.
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType,
DoubleType, TimestampType
)
spark = (SparkSession.builder
.appName("data-cleaning")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate())
bronze = spark.createDataFrame([
(1, " Alice ", "alice@EXAMPLE.com", "29", "2026-04-01 09:15:00", 50000.0),
(1, "Alice", "alice@example.com", "29", "2026-04-02 10:00:00", 51000.0), # dup, newer
(2, "bob", None, "31", "2026-04-03 11:30:00", 70000.0),
(3, "carol", "carol@example.com", None, "2026/04/04 12:00:00", 9999999.0), # outlier
(4, " ", "dave@example.com", "200", "bad-date", None),
], ["user_id", "name", "email", "age_str", "ts_str", "salary"])
Three patterns cover most cases: drop, fill with a default, or coalesce from a fallback column.
# Drop rows missing critical columns
df = bronze.dropna(subset=["user_id", "email"])
# Fill defaults per column
df = df.fillna({"age_str": "0", "salary": 0.0, "name": "unknown"})
# Coalesce: take the first non-null
df = df.withColumn(
"contact",
F.coalesce(F.col("email"), F.lit("noreply@example.com"))
)
Be deliberate about fillna. Filling salary with 0 will silently corrupt averages downstream. Better: fill with a sentinel (-1) and exclude in aggregations, or keep the null and use F.avg which ignores nulls by default.
# Count nulls per column - useful in profiling
null_counts = bronze.select([
F.sum(F.col(c).isNull().cast("int")).alias(c)
for c in bronze.columns
])
null_counts.show()
When any row from a duplicate group will do, dropDuplicates with a subset is enough.
# Dedupe on the natural key
df = bronze.dropDuplicates(subset=["user_id"])
Note: order is non-deterministic. If which row wins matters, do not use this.
The standard "keep the latest record per key" pattern uses a window with row_number. This is what most CDC and SCD Type 1 Silver jobs do.
w = Window.partitionBy("user_id").orderBy(F.col("ts_str").desc())
latest = (bronze
.withColumn("rn", F.row_number().over(w))
.filter(F.col("rn") == 1)
.drop("rn"))
Two variations worth knowing:
# Keep latest non-null email per user
w = Window.partitionBy("user_id").orderBy(F.col("ts_str").desc())
df = (bronze
.withColumn("email", F.first("email", ignorenulls=True).over(w)))
# Tie-break with a secondary sort key
w = Window.partitionBy("user_id").orderBy(
F.col("ts_str").desc(),
F.col("salary").desc_nulls_last()
)
Bronze is permissive; Silver is strict. Cast explicitly and reject rows that fail.
silver_schema = StructType([
StructField("user_id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("email", StringType(), True),
StructField("age", IntegerType(), True),
StructField("event_ts", TimestampType(), False),
StructField("salary", DoubleType(), True),
])
# Safe cast: invalid values become null instead of failing the job
typed = (bronze
.withColumn("age", F.col("age_str").cast("int"))
.withColumn("event_ts", F.to_timestamp("ts_str", "yyyy-MM-dd HH:mm:ss"))
.select("user_id", "name", "email", "age", "event_ts", "salary"))
# Quarantine rows that failed casting
quarantine = typed.filter(F.col("event_ts").isNull() | F.col("user_id").isNull())
clean = typed.filter(F.col("event_ts").isNotNull() & F.col("user_id").isNotNull())
Always quarantine; never drop silently. The Silver job should write rejects to a side table so downstream owners can investigate.
Whitespace, casing, and stray characters are the most common cause of false-positive uniqueness violations.
cleaned = (bronze
.withColumn("name", F.trim(F.col("name")))
.withColumn("name", F.lower(F.col("name")))
.withColumn("email", F.lower(F.trim(F.col("email"))))
# Collapse internal whitespace
.withColumn("name", F.regexp_replace("name", r"\s+", " "))
# Strip non-printable characters
.withColumn("name", F.regexp_replace("name", r"[^\x20-\x7E]", ""))
# Drop empty-after-trim
.filter(F.length(F.col("name")) > 0)
)
# Validate email shape with regex
EMAIL_RE = r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"
cleaned = cleaned.withColumn(
"email_valid", F.col("email").rlike(EMAIL_RE)
)
Two approaches dominate at scale: IQR (robust to skew) and z-score (assumes roughly normal). For both, compute statistics with approxQuantile or aggregations to avoid full sorts.
q1, q3 = bronze.approxQuantile("salary", [0.25, 0.75], 0.01)
iqr = q3 - q1
low, high = q1 - 1.5 * iqr, q3 + 1.5 * iqr
flagged = bronze.withColumn(
"salary_outlier",
(F.col("salary") < low) | (F.col("salary") > high)
)
stats = bronze.select(
F.mean("salary").alias("mu"),
F.stddev("salary").alias("sigma"),
).first()
flagged = bronze.withColumn(
"z_salary",
(F.col("salary") - F.lit(stats["mu"])) / F.lit(stats["sigma"])
).withColumn(
"salary_outlier", F.abs(F.col("z_salary")) > 3
)
Decision rule: flag outliers, do not delete them. Outliers often carry the most signal (fraud, anomalies, top customers). Drop only when domain experts confirm they are data errors.
Two rules: store everything in UTC, and parse with explicit format strings. Spark's permissive parser changed behavior between versions and the strict parser is the default since 3.0.
# Try multiple known formats
def parse_ts(col):
return F.coalesce(
F.to_timestamp(col, "yyyy-MM-dd HH:mm:ss"),
F.to_timestamp(col, "yyyy/MM/dd HH:mm:ss"),
F.to_timestamp(col, "MM/dd/yyyy HH:mm:ss"),
F.to_timestamp(col, "yyyy-MM-dd'T'HH:mm:ssXXX"),
)
df = bronze.withColumn("event_ts", parse_ts("ts_str"))
# Convert from a source timezone to UTC, then store as UTC
df = df.withColumn(
"event_ts_utc",
F.to_utc_timestamp(F.col("event_ts"), "America/Los_Angeles")
)
# Derived date columns for partitioning
df = (df
.withColumn("event_date", F.to_date("event_ts_utc"))
.withColumn("event_year", F.year("event_ts_utc"))
.withColumn("event_month", F.month("event_ts_utc")))
Configure the cluster timezone explicitly:
spark.conf.set("spark.sql.session.timeZone", "UTC")
For lightweight checks, use Spark assertions inside the job. For heavyweight contract validation, use Great Expectations.
def assert_no_nulls(df, col):
n = df.filter(F.col(col).isNull()).count()
assert n == 0, f"Found {n} nulls in {col}"
def assert_unique(df, cols):
total = df.count()
distinct = df.dropDuplicates(cols).count()
assert total == distinct, (
f"Uniqueness violated on {cols}: {total - distinct} dupes"
)
assert_no_nulls(silver, "user_id")
assert_unique(silver, ["user_id"])
import great_expectations as gx
context = gx.get_context()
batch = context.sources.add_or_update_spark("spark") \
.add_dataframe_asset("silver_users", dataframe=silver) \
.build_batch_request()
suite = context.add_or_update_expectation_suite("silver_users.warning")
validator = context.get_validator(batch_request=batch,
expectation_suite=suite)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_unique("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_value_lengths_to_be_between("email", 5, 254)
validator.save_expectation_suite()
results = validator.validate()
assert results.success, "Silver validation failed"
If you are on Databricks specifically, prefer DLT expectations — they integrate with the pipeline UI and route bad rows automatically.
import dlt
@dlt.table
@dlt.expect_or_drop("valid_age", "age BETWEEN 0 AND 120")
@dlt.expect_or_fail("user_id_set", "user_id IS NOT NULL")
def silver_users():
return dlt.read("bronze_users")
A complete Silver job stitches the techniques above into a single pipeline.
def bronze_to_silver(bronze):
# 1. Normalize strings
df = (bronze
.withColumn("name", F.lower(F.trim(F.col("name"))))
.withColumn("email", F.lower(F.trim(F.col("email"))))
.withColumn("name", F.regexp_replace("name", r"\s+", " "))
)
# 2. Type cast safely
df = (df
.withColumn("age", F.col("age_str").cast("int"))
.withColumn("event_ts", F.to_timestamp("ts_str", "yyyy-MM-dd HH:mm:ss"))
)
# 3. Quarantine bad rows
bad = df.filter(F.col("event_ts").isNull() | F.col("user_id").isNull())
good = df.subtract(bad)
# 4. Dedupe to latest per user
w = Window.partitionBy("user_id").orderBy(F.col("event_ts").desc())
good = (good
.withColumn("rn", F.row_number().over(w))
.filter(F.col("rn") == 1)
.drop("rn"))
# 5. Flag outliers, do not drop
q1, q3 = good.approxQuantile("salary", [0.25, 0.75], 0.01)
iqr = q3 - q1
good = good.withColumn(
"salary_outlier",
(F.col("salary") < q1 - 1.5 * iqr) | (F.col("salary") > q3 + 1.5 * iqr)
)
# 6. Validate
assert good.filter(F.col("user_id").isNull()).count() == 0
return good, bad
silver, quarantine = bronze_to_silver(bronze)
(silver.write
.mode("overwrite")
.partitionBy("event_date")
.format("delta")
.saveAsTable("silver.users"))
(quarantine.write
.mode("append")
.format("delta")
.saveAsTable("quarantine.users"))
That is the contract Silver promises to Gold: typed, deduplicated, normalized, validated, with rejects routed to a side table for follow-up. Anything fancier — SCD Type 2 history, late-arriving fact handling, surrogate keys — builds on top of this foundation.
dropna removes the row entirely — appropriate when the column is required and a null indicates a corrupt record. fillna substitutes a sentinel ("UNKNOWN", 0, the column median) — appropriate when downstream needs a non-null value and the field is genuinely optional. Quarantining moves the row to a side table with the original payload and a reject reason — the right choice for required fields when you want a feedback loop with the upstream system instead of silently losing data. Production pipelines almost always use a mix: required keys quarantine, optional descriptive fields fill, fully corrupt rows drop after logging.
Window function: Window.partitionBy("business_key").orderBy(col("event_ts").desc(), col("_ingestion_ts").desc()), add row_number(), then filter == 1. The secondary sort breaks ties when two events share the same event timestamp. dropDuplicates is faster but non-deterministic for which row it keeps, so it is only safe when the rows are truly identical. For incremental processing, run the window over the new batch unioned with the current Silver state, then MERGE back.
Pass an explicit StructType to spark.read.schema(...) — Spark will then either parse to that type or fail fast (or write nulls in PERMISSIVE mode with bad rows captured to _corrupt_record). Inference is convenient but slow (it requires a full file scan) and unstable (a missing field in the sample causes the schema to drift). For Delta writes, schema enforcement is on by default; new columns require mergeSchema=true to be added intentionally. Always pair schema enforcement with a quarantine path so rejects are inspectable.
For symmetric distributions, compute mean and stddev with approx_count_distinct-style approximate stats and flag values beyond ±3σ. For skewed distributions (revenue, latency), use the IQR rule: flag values outside [Q1 - 1.5·IQR, Q3 + 1.5·IQR], computed via approxQuantile which uses a single pass and bounded memory. For domain-specific outliers (negative ages, future-dated transactions) just write explicit predicates — they are faster and clearer than statistical methods. Always log outlier rates over time; a sudden spike usually indicates an upstream pipeline change, not a data quality problem you can fix in Silver.
Normalize aggressively in Silver and never again — trim whitespace, lowercase keys used for joins, NFC-normalize unicode (F.translate or a UDF wrapping unicodedata.normalize), and strip non-printable characters from free-text fields. Keep the original raw value in Bronze so you can always recover it. The mistake is normalizing inside every Gold query — it kills predicate pushdown and burns CPU repeatedly for the same answer.
Run a fixed set of expectations as a final stage: row count delta within tolerance, primary key uniqueness (count == countDistinct), null rate per required column under threshold, referential integrity against parent dimensions, and value-range checks on numerics. In DLT this is @dlt.expect_or_drop / @dlt.expect_or_fail; in plain jobs use a small assertion helper that writes results to an audit table and raises on critical failures. Publish to Silver only on success — failed batches stay in a staging table for inspection.