The brief: build a pipeline that re-embeds 1 million documents per day with sub-minute freshness from source-of-truth update to vector index. Source is a Postgres OLTP database (orders, support tickets, knowledge base articles) with a typical write rate of 20 updates/sec, bursting to 200 during business hours. Downstream consumers are RAG and search systems that must see fresh embeddings within 60 seconds of an upstream write.
Embedding pipelines fail in two predictable ways: (1) they fall behind during bursts and never catch up, or (2) they cost more in GPU spend than the product generates in revenue. Both are solved by treating the pipeline as a streaming system with backpressure and a cost-quality knob, not a batch job that runs every 5 minutes.
| Metric | Target |
|---|---|
| End-to-end p50 lag (DB commit to searchable) | 15 s |
| End-to-end p95 lag | 60 s |
| End-to-end p99 lag | 5 min (degraded mode acceptable) |
| Sustained throughput | 1M docs/day = ~12 docs/sec avg |
| Burst tolerance | 10× sustained for 15 min without lag breach |
| Data loss tolerance | Zero (every commit must reach the vector index or DLQ) |
| Embedding cost ceiling | < $400/day at sustained rate |
Chunking and embedding count. 1M docs/day × ~5 chunks/doc average (most docs are short, some are long) ≈ 5M embeddings/day ≈ 58 embeddings/sec sustained, ~580/sec at 10× burst.
GPU sizing for embedding workers. bge-large-en-v1.5 served by vLLM on a single L4 GPU at batch 256 yields ~3,000 chunks/sec. We need ~58/sec sustained, ~580/sec burst. One L4 handles steady state with 50× headroom. Two L4s for burst plus one warm spare = 3 L4s in the pool.
Kafka throughput. 12 doc-events/sec sustained at ~1 KB per event = 12 KB/sec. Trivial; a 3-broker Kafka cluster sized for the rest of the platform absorbs this without a dedicated topic shard. Burst is 120 events/sec ≈ 120 KB/sec, still trivial.
Vector store write throughput. 580 upserts/sec at burst into pgvector with HNSW: each insert triggers an HNSW link insertion. Measured at ~2k inserts/sec on db.r6i.4xlarge for 1024-dim vectors; well within budget. The pgvector HNSW build cost is the bottleneck if you go above ~5k inserts/sec sustained — at that point switch to Qdrant or batch builds.
End-to-end lag budget (p95 target 60 s):
postgres_commit: 0 s # baseline
debezium_capture_to_kafka: 2 s # WAL polling + serialization
kafka_to_consumer: 1 s
chunking + dedupe: 2 s
batch_wait_for_embedder: 10 s # max batch fill timeout
embedding_inference: 3 s
vector_store_upsert: 5 s
hnsw_index_visibility: 12 s # pgvector immediate, but readers may pin
buffer_for_jitter: 25 s
---
total p95: 60 s
+-----------------+
| Postgres OLTP | (source of truth)
+--------+--------+
|
| logical replication slot
v
+-----------------+
| Debezium | (CDC connector, reads WAL)
+--------+--------+
|
| one Kafka event per row change
v
+-----------------+ +------------------+
| Kafka topic |---->| Schema Registry |
| doc-changes | +------------------+
+--------+--------+
|
v
+-----------------+
| Chunker | (consumer group, 4 partitions)
| (Python/Go) | - normalize, split, dedupe by content_hash
+--------+--------+
|
| chunk-tasks topic (partition by doc_id)
v
+-----------------+ +-------------------+
| Embed Worker |<---->| vLLM / Bedrock |
| Pool (3 L4s) | | embedding endpt |
+--------+--------+ +-------------------+
|
| upsert batches
v
+-----------------+
| Vector Store | (pgvector / Qdrant / Pinecone)
+--------+--------+
|
v
+-----------------+
| RAG / Search | (downstream consumers)
+-----------------+
Out-of-band:
+-----------------+ +-------------------+
| DLQ Topic |---->| Operator UI | manual replay
+-----------------+ +-------------------+
+-----------------+
| Lag Exporter | publishes end-to-end lag to Prometheus
+-----------------+
Component responsibilities:
doc-changes, fetches
full row if needed, splits text into chunks, computes content hash for
dedupe, emits chunk-tasks.chunk-tasks,
batches up to 256 chunks or 50 ms, calls vLLM, upserts vectors.The pipeline must be idempotent on every step. Re-delivery from Kafka, re-processing after a worker crash, replay from DLQ — all must produce the same final state in the vector store.
-- Source table (existing OLTP)
CREATE TABLE articles (
article_id BIGINT PRIMARY KEY,
title TEXT,
body TEXT,
updated_at TIMESTAMPTZ DEFAULT now(),
deleted_at TIMESTAMPTZ
);
-- Vector store (pgvector, same DB or a replica)
CREATE TABLE article_chunks (
chunk_id UUID PRIMARY KEY,
article_id BIGINT NOT NULL,
chunk_index INT NOT NULL,
content_hash BYTEA NOT NULL, -- sha256(chunk_text); skip embed if unchanged
text TEXT NOT NULL,
embedding VECTOR(1024),
embed_model TEXT NOT NULL,
embed_version SMALLINT NOT NULL,
source_lsn PG_LSN NOT NULL, -- watermark from CDC for idempotency + lag tracking
updated_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (article_id, chunk_index)
);
CREATE INDEX article_chunks_hnsw ON article_chunks
USING hnsw (embedding vector_cosine_ops) WITH (m = 32, ef_construction = 200);
CREATE INDEX article_chunks_article ON article_chunks (article_id);
CREATE INDEX article_chunks_lsn ON article_chunks (source_lsn);
Two-layer idempotency. First, the Kafka consumer commits
offsets after a successful upsert, so crashes replay from the last
durable offset. Second, the upsert uses
ON CONFLICT (article_id, chunk_index) DO UPDATE WHERE EXCLUDED.source_lsn > article_chunks.source_lsn;
older replays cannot overwrite newer state, even out of order.
Content-hash short-circuit. Before embedding, compare
sha256(chunk_text) against the stored hash. If unchanged (e.g.
the row was touched but the text didn't change), skip the embedding call
entirely. In practice this filters 30–60% of CDC events, cutting GPU spend
by the same fraction.
async def chunker_worker(consumer: AIOKafkaConsumer):
async for batch in consumer.batches():
chunks_to_embed = []
for msg in batch:
event = DocChangeEvent.parse(msg.value)
if event.op == "delete":
await tombstone(event.article_id, event.lsn)
continue
chunks = chunk_text(event.after.body, target_tokens=512, overlap=64)
for i, text in enumerate(chunks):
h = sha256(text.encode()).digest()
# Short-circuit: skip unchanged chunks
if await current_hash_equals(event.article_id, i, h):
continue
chunks_to_embed.append(ChunkTask(
chunk_id=deterministic_uuid(event.article_id, i),
article_id=event.article_id,
chunk_index=i,
text=text,
content_hash=h,
source_lsn=event.lsn,
))
if chunks_to_embed:
await chunk_tasks_producer.send_batch(chunks_to_embed)
await consumer.commit() # only after enqueue succeeds
async def embed_worker(consumer: AIOKafkaConsumer):
batcher = MicroBatcher(max_size=256, max_wait_ms=50)
async for msg in consumer:
task = ChunkTask.parse(msg.value)
await batcher.add(task)
if batcher.ready():
batch = await batcher.flush()
vectors = await vllm_client.embed([t.text for t in batch])
await pg.upsert_chunks([
ChunkRow(
chunk_id=t.chunk_id,
article_id=t.article_id,
chunk_index=t.chunk_index,
content_hash=t.content_hash,
text=t.text,
embedding=v,
embed_model="bge-large-en-v1.5",
embed_version=2,
source_lsn=t.source_lsn,
) for t, v in zip(batch, vectors)
])
await consumer.commit()
Deterministic chunk IDs. deterministic_uuid(article_id, i)
produces the same UUID for the same (article, chunk_index) pair, so re-delivery
becomes a natural upsert without orphan rows. Chunk count changes (a
shorter article producing fewer chunks) are handled by a tail-deletion step
that removes chunk_index >= new_chunk_count.
Backpressure. Kafka's consumer-group lag is the source of
truth. The lag exporter compares the latest consumed offset to the latest
produced offset; when lag exceeds 1 min, the autoscaler adds embed workers.
When the embed worker pool itself is saturated (queue depth on the vLLM
endpoint > 100), the chunker pauses consumption (drops Kafka poll()
rate) rather than building a backlog inside the pipeline.
Cost-quality knob. Three operating modes, switched by a single config flag based on lag:
| Mode | Trigger | Embed model | Batch | Throughput |
|---|---|---|---|---|
| QUALITY | lag < 30 s | bge-large (1024d) | 256 | 3,000 chunks/sec |
| BALANCED | 30 s ≤ lag < 5 min | bge-base (768d) | 512 | 8,000 chunks/sec |
| SHED | lag ≥ 5 min | bge-base (768d) | 1024 | 15,000 chunks/sec; downsample updates < 1h old by 50% |
SHED mode embeds a sampled subset of recent updates and queues the rest for backfill once lag recovers. The choice of which updates to drop is policy: "low-priority tenants first," "update events only (not creates)," etc. The key is that the system never silently drops — deferred events are queued in a backfill topic, not lost.
Bottlenecks at scale, in order:
pg_replication_slots.confirmed_flush_lsn; alert
if WAL retention exceeds 10 GB.max_wait_ms=50
means a single isolated event waits 50 ms before embedding. Below ~10 docs/sec,
the wait dominates inference latency. Acceptable: this is sub-second by
construction.chunks-dlq
with the full task payload, error, and timestamp. An operator UI lets a human
inspect and replay; common cause is a malformed chunk that triggers a vLLM
crash, fixed by a sanitizer regex.confirmed_flush_lsn advances, so no data is lost as long as
disk holds.pg_create_logical_replication_slot;
Debezium needs to seek to the right LSN. Practical mitigation: use a hot
standby with replication slots already configured, or AWS DMS / RDS Blue/Green
deployment that handles slot promotion.DLQ replay protocol. The operator UI shows DLQ messages
grouped by error fingerprint; once a fix is deployed, "replay" emits the
messages back to chunk-tasks with a header indicating retry. Replays
go through the same idempotency path; the worst case is wasted GPU on a
re-embed that produces the same vector.
Per 1M source events, with 50% short-circuited by content-hash dedupe (so 500k actually embedded):
debezium_msk: $1.00 # amortized small Kafka cluster
chunker_workers: $0.50 # 2 small instances
embed_workers_l4_gpu: $4.80 # 3 L4s @ $0.80/hr / 1M events
# alternative: bedrock_titan: ~$50.00 (10x more for managed)
vector_store_writes: $0.30 # pgvector amortized
storage_growth: $0.10 # ~5GB/day at 1M new chunks
monitoring + s3 dlq: $0.20
---
total_per_1M_events: $6.90
sustained_daily_cost: $6.90 # at 1M docs/day
sustained_monthly_cost: $207
budget_ceiling: $400/day > well under
The dominant variable is self-hosted vLLM vs managed embedding API. At our scale (1M docs/day) self-hosted L4 is ~10× cheaper than Bedrock Titan or OpenAI text-embedding-3. The crossover where managed becomes cheaper is around < 50k docs/day, where the L4 sits idle most of the time.
Debezium vs application dual-write vs polling. Debezium reads the WAL, so it captures every change exactly once with no application changes required. Dual-write (app writes to DB and Kafka) is faster to implement but breaks every time someone forgets to add the Kafka publish. Polling is simple but trades off lag against DB load. We pick Debezium because it makes the "every commit reaches Kafka" guarantee a property of the database, not the application code.
Kafka vs Kinesis vs Pulsar. Kafka has the richest connector ecosystem (Debezium first-class) and the best operational tooling. Kinesis is fine if you're already AWS-heavy and don't need Kafka's exact features; the 24-hour retention default is dangerous for replay scenarios. Pulsar's per-topic geo-replication is nice but the tooling lags Kafka. We pick Kafka.
vLLM vs Bedrock for embedding. Self-hosted vLLM wins on cost and latency at our scale; loses on operational simplicity. Bedrock Titan embedding wins when traffic is bursty and idle GPUs are wasted spend. The hybrid approach — self-hosted for steady state, Bedrock burst on overflow — sounds clever but introduces an embedding-version inconsistency that breaks vector search recall. Pick one model and stick to it.
Snowflake / Databricks streams instead of Postgres CDC. If the source of truth is a warehouse rather than an OLTP DB, Snowflake Streams or Delta Live Tables give the same change-capture semantics with less operational burden — no logical replication slots to manage. The lag floor is higher (warehouse commit cadence is typically minutes, not seconds), so the freshness SLO has to relax to ~5 min p95.
Re-embed everything on model upgrade. When bumping
embed_version, naive re-embed of 5M chunks costs ~$30 of vLLM
compute and fits in 30 min. Do it as a side pipeline that writes to a new
table, swaps the index pointer atomically; never re-embed in place because
search recall drops if v1 and v2 vectors coexist in the same index.
CDC moves the "every commit gets captured" guarantee from application code into the database. Application dual-write is a distributed transaction across Postgres and Kafka without a coordinator; it will lose events the first time someone forgets to add the publish call to a new endpoint, or when a transaction rolls back after the Kafka publish succeeded. Debezium reading the WAL is, by construction, exactly the events that committed. The cost is operational (one more system to run) but the correctness gain is large.
Honestly, you don't — you ensure effectively-once via
idempotency. Kafka offers at-least-once delivery; the embed worker may
re-process a message after a crash. The deterministic chunk_id and
ON CONFLICT ... WHERE source_lsn > ... upsert ensure that
re-processing produces the same final state. Sales calls it "exactly once";
engineers call it "at-least-once with idempotent sinks," which is what every
modern streaming system actually does.
The chunker detects sustained backlog and switches to BALANCED then SHED mode. The embed worker pool autoscales up to its hard cap (say 10 L4s). Beyond that, the chunker downsamples low-priority updates and routes them to a backfill topic with relaxed SLO. After the burst subsides, a backfill consumer drains the deferred topic. The pipeline never stops accepting events; it gracefully degrades freshness for the tail of the backlog while keeping sub-minute freshness for the leading edge.
It periodically (every 5 s) reads SELECT pg_current_wal_lsn()
from the source DB and SELECT MAX(source_lsn) FROM article_chunks
from the vector store. The difference, expressed in WAL bytes, divided by the
recent write rate, gives a time-equivalent lag. Exposed as a Prometheus
gauge with a single alert at > 60 s for 5 min. Beats trying to measure each
stage's contribution separately.
Debezium emits a delete event with the primary key. The chunker enqueues a
tombstone task; the embed worker (or a dedicated tombstone worker) issues
DELETE FROM article_chunks WHERE article_id = $1. pgvector marks
the HNSW points as deleted immediately; readers stop returning them on the
next query. The 60-second SLO is met as long as the CDC pipeline is healthy.
For GDPR Art. 17 timing (72 h), we additionally run a weekly index-rebuild job
to physically reclaim the tombstoned vectors.
(1) Spin up a parallel pipeline writing to article_chunks_v2
with the new model; the existing v1 pipeline keeps running. (2) Backfill
v2 by replaying the historical CDC log (or by full table scan); takes ~30 min
for 5M chunks at 3k/sec. (3) Run the eval suite on both indexes; only proceed
if v2 wins on the relevant retrieval metrics. (4) Atomically swap the search
index pointer in app config (a single config-server update). (5) Keep v1 for
a week as rollback insurance, then drop. The whole upgrade is online —
no readers see a moment of mixed-version vectors.