┌──────────────────────────────────────────────────────────────────────────────────────────────────┐ │ Bronze Layer — Raw Ingestion Topology │ │ │ │ ┌──────────────────────────────────────────────────────────────────────────────────────┐ │ │ │ RAW SOURCES (everything that produces data, no transformation yet) │ │ │ │ │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ S3 / GCS │ │ Kafka │ │ CDC stream │ │ REST API │ │ SFTP │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │ │ │ │ │ │ └──────────────────────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ schema-on-read, exactly-once semantics │ │ ┌──────────────────────────────────────────────────────────────────────────────────────┐ │ │ │ INGESTION ENGINES (Databricks-native, all write to Delta) │ │ │ │ │ │ │ │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ │ │ │ Auto Loader │ │ DLT Streaming │ │Spark Structured│ │ COPY INTO │ │ │ │ │ └────────────────┘ └────────────────┘ └────────────────┘ └────────────────┘ │ │ │ │ │ │ │ └──────────────────────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ appended (rarely upserted), partitioned by ingest_date │ │ ┌──────────────────────────────────────────────────────────────────────────────────────┐ │ │ │ BRONZE TABLES (Delta — raw + ingest metadata, never modified) │ │ │ │ │ │ │ │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ │ │ │ raw_payload │ │ ingest_ts │ │ source_uri │ │_corrupt_record │ │ │ │ │ └────────────────┘ └────────────────┘ └────────────────┘ └────────────────┘ │ │ │ │ │ │ │ └──────────────────────────────────────────────────────────────────────────────────────┘ │ │ │ │ Bronze never deletes or updates rows. Replays use new ingest_ts. │ │ _corrupt_record column captures rows that fail schema validation — quarantined, not dropped. │ │ Downstream Silver layer reads Bronze incrementally via streaming or readChangeFeed. │ └──────────────────────────────────────────────────────────────────────────────────────────────────┘
Raw sources fan in through the ingestion engines and land as append-only Delta tables — never modified after write.
The Bronze layer is the foundational tier of the Medallion (Lakehouse) architecture, serving as the raw data landing zone for all ingested data. It operates on a "schema-on-read" philosophy, prioritizing raw data preservation over immediate transformation and cleansing. The Bronze layer captures data exactly as it arrives from source systems, maintaining full fidelity for audit trails, reprocessing capabilities, and forensic analysis.
The primary purpose of the Bronze layer is:
Bronze tables are typically read-heavy and append-only, designed for high-throughput streaming and batch ingestion. They trade query optimization for ingestion speed and data integrity.
Effective Bronze layer design follows these core principles:
Bronze tables use insert-only patterns to maintain immutability and enable efficient distributed writes. New data is appended in each run; deletes and updates are avoided to prevent partition rewrites and transaction conflicts.
Once written, Bronze records should not be modified. This enables:
Bronze ingestion applies only technical metadata enrichment, not business logic. Parsing, validation, and cleansing happen at Silver and Gold layers. This separation enables independent evolution of transformation logic and raw data retention.
Every Bronze record should include:
_ingestion_timestamp — UTC datetime when record was written to Bronze_source_file — Original file path or topic name for source traceability_batch_id — Unique identifier for the ingestion run (e.g., UUID or run timestamp)_raw_payload — Original payload for failed records or schema evolution cases_processing_timestamp — Server-side insertion time for latency measurementBronze tables are partitioned by ingestion date (_ingestion_date) to:
Auto Loader (Databricks' cloudFiles API) is the preferred mechanism for Bronze ingestion from cloud storage. It provides automatic schema detection, schema evolution, file deduplication, and incremental ingestion.
from pyspark.sql.functions import (
col, current_timestamp, input_file_name,
from_json, schema_of_json, lit
)
from pyspark.sql.types import StructType
import uuid
from datetime import datetime
# Configuration
source_path = "s3://my-data-bucket/events/"
checkpoint_path = "/mnt/delta/checkpoints/bronze_events"
target_table = "main.raw.bronze_events"
batch_id = str(uuid.uuid4())
# Initialize Spark session with Delta Lake
spark.sql("CREATE CATALOG IF NOT EXISTS main")
spark.sql("CREATE SCHEMA IF NOT EXISTS main.raw")
# Auto Loader with streaming (continuous ingestion)
def ingest_events_streaming():
"""
Read events from S3 using Auto Loader with schema evolution.
"""
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/mnt/delta/schema_hints/events")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.rescuedDataColumn", "_rescue_data")
.option("pathGlobfilter", "*.json")
.option("ignoreChanges", "true")
.option("maxFilesPerTrigger", 100)
.load(source_path)
)
# Add Bronze metadata columns
enriched_df = (
df
.withColumn("_ingestion_timestamp", current_timestamp())
.withColumn("_source_file", input_file_name())
.withColumn("_batch_id", lit(batch_id))
.withColumn("_ingestion_date",
col("_ingestion_timestamp").cast("date"))
)
# Write to Delta Bronze table with checkpointing
(
enriched_df
.writeStream
.format("delta")
.mode("append")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.partitionBy("_ingestion_date")
.table(target_table)
.start()
.awaitTermination()
)
if __name__ == "__main__":
ingest_events_streaming()
For scheduled batch jobs, use trigger-once semantics with Databricks Jobs:
from pyspark.sql.functions import (
col, current_timestamp, input_file_name, lit, to_date
)
import uuid
from datetime import datetime
# Configuration
source_path = "s3://my-data-bucket/events/"
checkpoint_path = "/mnt/delta/checkpoints/bronze_events_batch"
target_table = "main.raw.bronze_events"
batch_id = str(uuid.uuid4())
def ingest_events_batch(trigger_interval_minutes: int = 5):
"""
Trigger-once batch ingestion with Auto Loader and schema evolution.
Designed to run on a 5-minute schedule via Databricks Jobs.
"""
try:
spark.sql("CREATE CATALOG IF NOT EXISTS main")
spark.sql("CREATE SCHEMA IF NOT EXISTS main.raw")
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/mnt/delta/schema_hints/events")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.rescuedDataColumn", "_rescue_data")
.option("maxFilesPerTrigger", 500)
.option("maxBytes", "1gb")
.load(source_path)
)
enriched_df = (
df
.withColumn("_ingestion_timestamp", current_timestamp())
.withColumn("_source_file", input_file_name())
.withColumn("_batch_id", lit(batch_id))
.withColumn("_ingestion_date",
to_date(col("_ingestion_timestamp")))
)
# Trigger-once: runs once and stops
query = (
enriched_df
.writeStream
.format("delta")
.mode("append")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.partitionBy("_ingestion_date")
.table(target_table)
.trigger(once=True) # Critical: trigger once for batch semantics
.start()
)
query.awaitTermination()
print(f"Batch {batch_id} completed successfully.")
except Exception as e:
print(f"Ingestion failed: {str(e)}")
raise
if __name__ == "__main__":
ingest_events_batch()
Auto Loader can accept pre-defined schema hints to accelerate initial schema detection and ensure minimal schema expectations:
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
# Define schema hint as StructType
events_schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", LongType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("properties", StringType(), True), # JSON as string, parsed later
])
# Write schema hint to DBFS for Auto Loader discovery
schema_hint_path = "/mnt/delta/schema_hints/events/schema.json"
spark.createDataFrame([], events_schema).write.option("mergeSchema", "false").json(schema_hint_path)
# Auto Loader will now use this as a baseline and add new columns as they appear
For non-streaming sources (periodic exports, SFTP drops, FTP servers), batch ingestion leverages direct read operations with error handling.
from pyspark.sql.functions import (
col, current_timestamp, input_file_name, lit, to_date,
from_json, schema_of_json
)
from pyspark.sql.types import StringType
import uuid
batch_id = str(uuid.uuid4())
source_path = "s3://my-data-bucket/batch/customer_data/*.csv"
bad_records_path = f"/mnt/delta/quarantine/customer_data/{batch_id}"
target_table = "main.raw.bronze_customers"
def ingest_csv_batch():
"""
Ingest CSV with permissive mode (captures malformed records).
"""
try:
df = (
spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("mode", "PERMISSIVE")
.option("badRecordsPath", bad_records_path)
.option("multiLine", "true")
.option("escape", '"')
.load(source_path)
)
# Remove internal Spark columns from bad records
df = df.select([c for c in df.columns if not c.startswith("_corrupt")])
# Add Bronze metadata
enriched_df = (
df
.withColumn("_ingestion_timestamp", current_timestamp())
.withColumn("_source_file", input_file_name())
.withColumn("_batch_id", lit(batch_id))
.withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))
)
# Write to Bronze
enriched_df.write \
.format("delta") \
.mode("append") \
.partitionBy("_ingestion_date") \
.saveAsTable(target_table)
# Log bad records count
bad_df = spark.read.json(bad_records_path)
bad_count = bad_df.count()
print(f"Ingested {enriched_df.count()} good records, {bad_count} malformed.")
except Exception as e:
print(f"CSV ingestion failed: {str(e)}")
raise
if __name__ == "__main__":
ingest_csv_batch()
from pyspark.sql.functions import col, current_timestamp, input_file_name, lit, to_date
import uuid
batch_id = str(uuid.uuid4())
source_path = "s3://my-data-bucket/batch/logs/*.jsonl"
target_table = "main.raw.bronze_logs"
def ingest_jsonl_batch():
"""
Ingest newline-delimited JSON with schema evolution.
"""
try:
df = (
spark.read
.format("json")
.option("inferSchema", "true")
.option("multiLine", "false")
.load(source_path)
)
enriched_df = (
df
.withColumn("_ingestion_timestamp", current_timestamp())
.withColumn("_source_file", input_file_name())
.withColumn("_batch_id", lit(batch_id))
.withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))
)
enriched_df.write \
.format("delta") \
.mode("append") \
.partitionBy("_ingestion_date") \
.mergeSchema(True) \
.saveAsTable(target_table)
print(f"Ingested {enriched_df.count()} JSON records.")
except Exception as e:
print(f"JSON ingestion failed: {str(e)}")
raise
if __name__ == "__main__":
ingest_jsonl_batch()
For high-volume batch ingestion, the SQL COPY INTO command provides efficient file processing with incremental tracking:
-- Create Bronze table if not exists
CREATE TABLE IF NOT EXISTS main.raw.bronze_orders (
order_id STRING,
customer_id LONG,
amount DECIMAL(12, 2),
order_date STRING,
_ingestion_timestamp TIMESTAMP,
_source_file STRING,
_batch_id STRING,
_ingestion_date DATE
)
PARTITIONED BY (_ingestion_date);
-- COPY INTO with automatic file tracking (no duplicates on reruns)
COPY INTO main.raw.bronze_orders
FROM (
SELECT
_1 as order_id,
_2 as customer_id,
_3 as amount,
_4 as order_date,
current_timestamp() as _ingestion_timestamp,
input_file_name() as _source_file,
'2f5a8c3b-1234-5678-9012' as _batch_id,
to_date(current_timestamp()) as _ingestion_date
FROM 's3://my-data-bucket/batch/orders/*.csv'
)
FILEFORMAT = CSV
FORMAT_OPTIONS (
'header' = 'true',
'inferSchema' = 'true',
'nullValue' = '',
'mode' = 'PERMISSIVE'
)
COPY_OPTIONS (
'mergeSchema' = 'true'
);
Kafka integration enables real-time Bronze layer population from message brokers with exactly-once semantics.
from pyspark.sql.functions import (
col, from_json, current_timestamp, lit, to_date,
from_timestamp, cast, StringType
)
from pyspark.sql.types import StructType, StructField, StringType as ST, LongType
import json
import uuid
# Configuration
kafka_bootstrap = "kafka-broker-1.example.com:9092,kafka-broker-2.example.com:9092"
kafka_topic = "events"
checkpoint_path = "/mnt/delta/checkpoints/bronze_kafka_events"
target_table = "main.raw.bronze_kafka_events"
batch_id = str(uuid.uuid4())
# Kafka message schema
kafka_message_schema = StructType([
StructField("event_id", ST(), True),
StructField("user_id", LongType(), True),
StructField("event_type", ST(), True),
StructField("timestamp", ST(), True),
StructField("properties", ST(), True),
])
def ingest_kafka_streaming():
"""
Read from Kafka topic, parse JSON payloads, write to Delta Bronze.
Includes offset tracking for exactly-once semantics.
"""
try:
# Read from Kafka
df_kafka = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap)
.option("subscribe", kafka_topic)
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 10000)
.option("kafka.group.id", f"bronze-{batch_id}")
.load()
)
# Parse Kafka value (JSON) and add metadata
df_parsed = (
df_kafka
.select(
col("key").cast(ST()).alias("partition_key"),
col("offset").alias("_kafka_offset"),
col("partition").alias("_kafka_partition"),
col("timestamp").alias("_kafka_timestamp"),
from_json(col("value").cast(ST()), kafka_message_schema).alias("payload")
)
.select(
col("partition_key"),
col("_kafka_offset"),
col("_kafka_partition"),
col("_kafka_timestamp"),
col("payload.*")
)
.withColumn("_ingestion_timestamp", current_timestamp())
.withColumn("_source_file", lit(f"kafka://{kafka_topic}"))
.withColumn("_batch_id", lit(batch_id))
.withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))
)
# Write to Delta with checkpointing
query = (
df_parsed
.writeStream
.format("delta")
.mode("append")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.partitionBy("_ingestion_date")
.table(target_table)
.trigger(processingTime="1 minute")
.start()
)
query.awaitTermination()
except Exception as e:
print(f"Kafka ingestion failed: {str(e)}")
raise
if __name__ == "__main__":
ingest_kafka_streaming()
Bronze layers handle schema evolution gracefully to accommodate source system changes without breaking pipelines.
Add New Columns Mode: Auto Loader automatically adds new columns found in arriving data as nullable columns.
# Auto Loader with schema evolution
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.schemaLocation", "/mnt/delta/schema_hints/events")
.load("s3://my-bucket/events/")
)
# Writing with mergeSchema=true allows schema evolution at write time
df.writeStream \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.option("checkpointLocation", "/mnt/checkpoints/events") \
.table("main.raw.bronze_events") \
.start()
When enabled, Auto Loader captures fields that don't match the detected schema in a _rescue_data column:
# Enable rescue column to capture unexpected fields
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.rescuedDataColumn", "_rescue_data")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://my-bucket/events/")
)
# _rescue_data is a JSON string containing unmatched fields
# Later processing can parse it: from_json(col("_rescue_data"), ...)
from pyspark.sql.functions import col, from_json, size, json_tuple
from pyspark.sql.types import StructType, StructField, StringType
def detect_schema_drift():
"""
Monitor incoming data for unexpected fields (schema drift).
Log drift events for downstream awareness.
"""
# Read Bronze table with rescue data
df = spark.read.table("main.raw.bronze_events")
# Filter to records with rescue data (unexpected fields)
drift_df = df.filter(col("_rescue_data").isNotNull())
# Extract field names from rescue data
drift_summary = (
drift_df
.select(col("_rescue_data"), col("_ingestion_timestamp"))
.groupBy(col("_ingestion_timestamp").cast("date"))
.count()
.orderBy("_ingestion_timestamp", ascending=False)
)
print(f"Schema drift detected in {drift_df.count()} records.")
drift_summary.show()
# Persist drift summary to monitoring table
drift_summary.write \
.format("delta") \
.mode("append") \
.saveAsTable("main.monitoring.schema_drift_events")
if __name__ == "__main__":
detect_schema_drift()
Effective partitioning and file optimization ensure Bronze layer scalability and query performance.
Partition Bronze tables by _ingestion_date (not year/month/day hierarchy) to balance partition sizes and enable efficient date-range queries:
-- Create Bronze table with single-level date partitioning
CREATE TABLE IF NOT EXISTS main.raw.bronze_events (
event_id STRING,
user_id LONG,
event_type STRING,
payload STRING,
_ingestion_timestamp TIMESTAMP,
_source_file STRING,
_batch_id STRING,
_ingestion_date DATE
)
USING DELTA
PARTITIONED BY (_ingestion_date);
-- Query with partition pruning
SELECT * FROM main.raw.bronze_events
WHERE _ingestion_date >= '2026-04-01' AND _ingestion_date < '2026-05-01';
Maintain target file sizes of 128MB–1GB per file to optimize S3 list operations and Spark task distribution:
from pyspark.sql.functions import col, current_timestamp, lit, to_date
import uuid
batch_id = str(uuid.uuid4())
def optimize_bronze_files():
"""
Rewrite Bronze table files to optimal size using OPTIMIZE.
"""
# Read and repartition to consolidate small files
df = spark.read.table("main.raw.bronze_events")
# Target ~100 partitions for distributed write
optimized_df = df.repartition(100, col("_ingestion_date"))
# Write with coalesce to control file sizes
optimized_df.write \
.format("delta") \
.mode("overwrite") \
.option("dataChange", "false") \
.partitionBy("_ingestion_date") \
.saveAsTable("main.raw.bronze_events")
# Run OPTIMIZE to compact files
spark.sql("""
OPTIMIZE main.raw.bronze_events
WHERE _ingestion_date >= current_date() - 7
ZORDER BY user_id
""")
print("Bronze files optimized.")
if __name__ == "__main__":
optimize_bronze_files()
Periodically compact small files and remove old versions:
-- Compact files in recent partitions (improves query performance)
OPTIMIZE main.raw.bronze_events
WHERE _ingestion_date >= current_date() - 7
ZORDER BY _source_file;
-- Remove old versions to save storage (30 days by default)
VACUUM main.raw.bronze_events RETAIN 30 DAYS;
-- Check table statistics
DESCRIBE EXTENDED main.raw.bronze_events;
Implement quality checks to quarantine malformed records and track data quality metrics.
from pyspark.sql.functions import (
col, current_timestamp, input_file_name, lit, to_date,
when, isnan, isnull
)
import uuid
batch_id = str(uuid.uuid4())
source_path = "s3://my-data-bucket/events/*.json"
target_table = "main.raw.bronze_events"
quarantine_table = "main.raw.quarantine_events"
def ingest_with_quarantine():
"""
Ingest events with validation and quarantine pattern.
Valid records go to Bronze; invalid ones go to Quarantine.
"""
try:
# Read raw data
df = (
spark.read
.format("json")
.option("inferSchema", "true")
.load(source_path)
)
# Add metadata
df = (
df
.withColumn("_ingestion_timestamp", current_timestamp())
.withColumn("_source_file", input_file_name())
.withColumn("_batch_id", lit(batch_id))
.withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))
)
# Quality checks
is_valid = (
(col("event_id").isNotNull()) &
(col("user_id").isNotNull()) &
(col("event_type").isNotNull()) &
(~isnan(col("amount"))) # For numeric fields
)
# Split into valid and quarantine
valid_df = df.filter(is_valid)
quarantine_df = df.filter(~is_valid).withColumn(
"_quarantine_reason",
when(col("event_id").isNull(), "null_event_id")
.when(col("user_id").isNull(), "null_user_id")
.when(col("event_type").isNull(), "null_event_type")
.otherwise("invalid_amount")
)
# Write valid records to Bronze
valid_df.write \
.format("delta") \
.mode("append") \
.partitionBy("_ingestion_date") \
.saveAsTable(target_table)
# Write invalid records to Quarantine for investigation
quarantine_df.write \
.format("delta") \
.mode("append") \
.partitionBy("_ingestion_date") \
.saveAsTable(quarantine_table)
print(f"Processed: {valid_df.count()} valid, {quarantine_df.count()} quarantined.")
except Exception as e:
print(f"Ingestion with quarantine failed: {str(e)}")
raise
if __name__ == "__main__":
ingest_with_quarantine()
For continuous data quality monitoring, use DLT expectations:
import dlt
from pyspark.sql.functions import col
@dlt.table(
name="bronze_events",
comment="Raw events from Kafka with quality expectations",
partition_cols=["_ingestion_date"]
)
@dlt.expect("valid_event_id", "event_id IS NOT NULL")
@dlt.expect("valid_user_id", "user_id > 0")
@dlt.expect("valid_timestamp", "timestamp IS NOT NULL")
@dlt.expect("reasonable_amount", "amount >= 0 AND amount <= 1000000")
def bronze_events():
"""
Read from Kafka and apply quality expectations.
DLT automatically tracks pass/fail metrics.
"""
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
.select(
col("value").cast("string").alias("raw_payload"),
col("timestamp").alias("_kafka_timestamp")
)
)
@dlt.table(
name="quarantine_events",
comment="Records that failed quality expectations"
)
def quarantine_events():
"""
Capture records that violated expectations for investigation.
"""
return dlt.read("bronze_events_quarantine") # DLT auto-creates quarantine table
Handle incremental updates and late-arriving data with watermarks and merge patterns.
from pyspark.sql.functions import (
col, current_timestamp, from_json, window, lit
)
from pyspark.sql.types import StructType, StructField, StringType, LongType
# Define message schema
schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", LongType(), True),
StructField("timestamp", StringType(), True),
])
def process_with_watermark():
"""
Process Kafka stream with watermark to handle late-arriving data.
Watermark delays processing of data by 10 minutes to allow late arrivals.
"""
df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
)
# Parse and add watermark
df_parsed = (
df
.select(from_json(col("value").cast("string"), schema).alias("payload"))
.select("payload.*")
.select(
col("event_id"),
col("user_id"),
col("timestamp").cast("timestamp").alias("event_time")
)
# Watermark: allow 10 minutes of late data before dropping
.withWatermark("event_time", "10 minutes")
)
# Aggregate with windowing
windowed_df = (
df_parsed
.groupBy(
window(col("event_time"), "1 hour", "30 minutes"),
col("user_id")
)
.count()
.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("user_id"),
col("count").alias("event_count"),
current_timestamp().alias("_ingestion_timestamp")
)
)
# Write to Bronze with trigger
query = (
windowed_df
.writeStream
.format("delta")
.mode("append")
.option("checkpointLocation", "/mnt/delta/checkpoints/windowed_events")
.table("main.raw.bronze_windowed_events")
.trigger(processingTime="5 minutes")
.start()
)
query.awaitTermination()
if __name__ == "__main__":
process_with_watermark()
For sources requiring idempotent upserts (deduplication), use Delta merge:
from pyspark.sql.functions import col, current_timestamp, lit
import uuid
batch_id = str(uuid.uuid4())
def merge_with_deduplication():
"""
Merge new events into Bronze table with deduplication.
If event_id already exists, update only if new record is newer.
"""
# Load new data
new_events = spark.read.json("s3://my-bucket/new_events.json")
# Add metadata
new_events = (
new_events
.withColumn("_ingestion_timestamp", current_timestamp())
.withColumn("_batch_id", lit(batch_id))
)
# Merge into target table
spark.sql("""
MERGE INTO main.raw.bronze_events AS target
USING (
SELECT
event_id,
user_id,
event_type,
timestamp,
_ingestion_timestamp,
_batch_id
FROM new_events
) AS source
ON target.event_id = source.event_id
WHEN MATCHED AND source._ingestion_timestamp > target._ingestion_timestamp THEN
UPDATE SET
user_id = source.user_id,
event_type = source.event_type,
timestamp = source.timestamp,
_ingestion_timestamp = source._ingestion_timestamp,
_batch_id = source._batch_id
WHEN NOT MATCHED THEN
INSERT (event_id, user_id, event_type, timestamp, _ingestion_timestamp, _batch_id)
VALUES (source.event_id, source.user_id, source.event_type, source.timestamp,
source._ingestion_timestamp, source._batch_id)
""")
print("Merge completed with deduplication.")
if __name__ == "__main__":
merge_with_deduplication()
Production Bronze layers employ multi-source architectures with source-specific tables and comprehensive monitoring.
Create separate Bronze tables for each data source to maintain independence and enable source-specific monitoring:
from pyspark.sql.functions import (
col, current_timestamp, input_file_name, lit, to_date
)
from datetime import datetime
import uuid
def setup_multi_source_bronze():
"""
Create Bronze layer with source-specific tables following naming convention.
"""
spark.sql("CREATE SCHEMA IF NOT EXISTS main.raw")
sources = [
{
"name": "sales_transactions",
"path": "s3://data-bucket/sources/salesforce/transactions/",
"format": "json",
"system": "salesforce"
},
{
"name": "customer_events",
"path": "s3://data-bucket/sources/kafka/events/",
"format": "json",
"system": "kafka"
},
{
"name": "erp_orders",
"path": "s3://data-bucket/sources/sap/orders/",
"format": "csv",
"system": "sap"
},
]
for source in sources:
batch_id = str(uuid.uuid4())
table_name = f"main.raw.bronze_{source['name']}"
checkpoint_path = f"/mnt/delta/checkpoints/bronze_{source['name']}"
print(f"Setting up Bronze table: {table_name}")
try:
# Read source
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", source["format"])
.option("cloudFiles.schemaLocation",
f"/mnt/delta/schema_hints/{source['name']}")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.rescuedDataColumn", "_rescue_data")
.load(source["path"])
)
# Add source-specific metadata
enriched_df = (
df
.withColumn("_ingestion_timestamp", current_timestamp())
.withColumn("_source_file", input_file_name())
.withColumn("_source_system", lit(source["system"]))
.withColumn("_batch_id", lit(batch_id))
.withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))
)
# Write to Bronze
(
enriched_df
.writeStream
.format("delta")
.mode("append")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.partitionBy("_ingestion_date")
.table(table_name)
.trigger(once=True)
.start()
.awaitTermination()
)
except Exception as e:
print(f"Failed to setup {table_name}: {str(e)}")
raise
if __name__ == "__main__":
setup_multi_source_bronze()
from pyspark.sql.functions import col, md5, concat_ws, current_timestamp, lit
import hashlib
def idempotent_ingest():
"""
Idempotent ingestion: compute content hash to detect duplicates
and skip re-ingesting identical data on retry.
"""
# Load new data
new_data = spark.read.json("s3://bucket/source_data.json")
# Create content hash for idempotency detection
new_data = (
new_data
.withColumn(
"_content_hash",
md5(concat_ws("||", *[col(c) for c in new_data.columns]))
)
.withColumn("_ingestion_timestamp", current_timestamp())
)
# Check if records already exist (by content hash)
existing_hashes = (
spark.read.table("main.raw.bronze_events")
.select("_content_hash")
.distinct()
)
# Anti-join to get only new records
new_records = (
new_data
.join(existing_hashes, "_content_hash", "leftanti")
)
# Write only new records
new_records.write \
.format("delta") \
.mode("append") \
.saveAsTable("main.raw.bronze_events")
print(f"Ingested {new_records.count()} new records (idempotent).")
if __name__ == "__main__":
idempotent_ingest()
from pyspark.sql.functions import (
col, max as spark_max, min as spark_min, current_timestamp,
(col("_ingestion_timestamp") - col("event_timestamp")).alias("lag_seconds")
)
def monitor_ingestion_lag():
"""
Monitor ingestion lag (delay between event occurrence and Bronze arrival).
Write metrics to monitoring table for alerting.
"""
# Read recent Bronze records
df = spark.read.table("main.raw.bronze_events")
# Compute lag statistics
lag_metrics = (
df
.filter(col("_ingestion_date") >= current_date() - 1)
.select(
col("_source_file"),
((col("_ingestion_timestamp") - col("timestamp")) / 60).alias("lag_minutes")
)
.groupBy("_source_file")
.agg(
spark_max("lag_minutes").alias("max_lag_minutes"),
spark_min("lag_minutes").alias("min_lag_minutes"),
(spark_max("lag_minutes") - spark_min("lag_minutes")).alias("lag_range_minutes")
)
.withColumn("check_timestamp", current_timestamp())
)
# Persist metrics
lag_metrics.write \
.format("delta") \
.mode("append") \
.saveAsTable("main.monitoring.bronze_ingestion_lag")
# Alert if lag exceeds threshold
high_lag = lag_metrics.filter(col("max_lag_minutes") > 60)
if high_lag.count() > 0:
print("WARNING: High ingestion lag detected!")
high_lag.show()
if __name__ == "__main__":
monitor_ingestion_lag()
-- Catalog structure for multi-source Bronze layer
CREATE CATALOG IF NOT EXISTS main;
CREATE SCHEMA IF NOT EXISTS main.raw;
-- Source-specific tables follow pattern: bronze_{source}_{logical_entity}
CREATE TABLE IF NOT EXISTS main.raw.bronze_salesforce_accounts (...);
CREATE TABLE IF NOT EXISTS main.raw.bronze_salesforce_opportunities (...);
CREATE TABLE IF NOT EXISTS main.raw.bronze_kafka_events (...);
CREATE TABLE IF NOT EXISTS main.raw.bronze_sap_purchase_orders (...);
CREATE TABLE IF NOT EXISTS main.raw.bronze_stripe_transactions (...);
-- Quarantine and monitoring schemas
CREATE SCHEMA IF NOT EXISTS main.quarantine;
CREATE SCHEMA IF NOT EXISTS main.monitoring;
-- List all Bronze tables
SELECT table_name, table_type, location
FROM main.information_schema.tables
WHERE table_schema = 'raw' AND table_name LIKE 'bronze_%';
The Bronze layer serves as the foundation of the Medallion architecture, prioritizing raw data preservation, schema flexibility, and audit trails. By following the design principles of append-only ingestion, minimal transformation, and comprehensive metadata enrichment, Bronze layers enable trustworthy, replayable data pipelines that scale to petabyte volumes.
Key takeaways:
Bronze is the raw landing zone — it captures source data verbatim, append-only, with minimal transformation, so the system has a replayable history of every record that ever arrived. The contract is durability and faithfulness, not cleanliness: column types may stay as strings, schemas may drift, and bad records are kept rather than dropped. Silver and Gold can always be rebuilt from Bronze, which is why Bronze is the only layer that absolutely cannot be lost.
Delta gives Bronze the ACID guarantees that raw landing zones historically lacked — concurrent writers do not corrupt the table, failed jobs roll back instead of leaving half-written files, and time travel lets you replay ingestion as of any point. Schema evolution via mergeSchema handles upstream additions without breaking the pipeline, and the transaction log provides metadata for incremental readers (CDF, streaming) that plain Parquet cannot. Plain Parquet on cloud storage has none of those properties and breaks the moment two writers race.
The simplest pattern is Auto Loader with cloudFiles.allowOverwrites=false and checkpointing — each file is processed exactly once because the checkpoint records which paths have been ingested. For non-file sources, hash the payload (e.g., SHA-256 over the canonical bytes) into a _record_hash column and use MERGE INTO ... WHEN NOT MATCHED INSERT keyed on that hash. COPY INTO also tracks ingested files in the table metadata, which is the cleanest option for batch loads from object storage.
Schema-on-read means the source bytes are stored as-is and interpreted only when consumed. At Bronze that usually translates to keeping the original payload in a _raw string or binary column alongside a best-effort parsed view, plus metadata columns (_ingestion_ts, _source_file, _batch_id). When upstream changes a field type or adds a column, Bronze still ingests it; Silver picks up the corrected schema on the next run without losing history. This is what makes Bronze resilient to upstream churn.
Always partition Bronze by ingestion date, not event date — that keeps writes monotonic and avoids rewriting old partitions when data shows up late. Keep an event_ts column from the payload separately so Silver can reason about lateness. Downstream, Silver merges on the business key with a watermark on event_ts; structured streaming with withWatermark handles bounded lateness, while completely open-ended late data is reconciled by a periodic backfill job that re-reads recent Bronze partitions.
Auto Loader is the right choice for high-volume, continuous file arrival — it scales to billions of files using file notification mode (SNS/EventGrid) instead of directory listing, and it natively supports schema inference and evolution. COPY INTO is better for idempotent batch loads of a known file set, manual backfills, and SQL-only environments where you want a single statement that is safe to retry. As a rough rule: streaming or hourly micro-batch from object storage means Auto Loader; one-shot bulk loads or replays mean COPY INTO.