This page documents the design and implementation of a high-performance search and classification platform built on top of the YouTube Data API. The system was originally built at a brand-safety platform vendor to power AI-driven brand-safety and contextual-targeting products that allow advertisers to run campaigns adjacent to suitable, on-brand video content within a walled-garden ecosystem (YouTube). It indexes a multi-billion-video corpus and exposes ranked, classified results through a REST API consumed by ad-targeting systems, sales tooling, and machine-learning training pipelines.
The platform combines four capabilities:
Programming & infrastructure stack: Scala (services, Spark jobs), C++ (Xapian extensions, hot-path tokenizers and BM25 modifications), Python (offline NLP training, evaluation), Xapian probabilistic search libraries, Cassandra (engagement time-series and feature store), Kinesis (event ingestion), Kafka (internal fan-out), AWS (EC2, S3, EMR, MSK, ECS).
Advertisers spending on YouTube face a structural problem: they cannot freely crawl or instrument the ad-serving environment the way they can on the open web. They depend on the platform's own classification, which is coarse, opaque, and (historically) inconsistent. Major brand-safety incidents — extremist content monetized with mainstream ads, brand logos appearing next to graphic videos — repeatedly forced advertisers to pause YouTube spend.
The product sold by the platform addresses this with an independent, advertiser-aligned classification layer:
The ranking and classification engine described on this page is what generates and continuously refreshes those lists across YouTube's full public corpus.
The system is organized into five layers. Each layer is independently scalable and exposes well-defined APIs to the next.
channel_id; a Lambda
consumer hydrates them into S3 (Parquet, hourly partitions) and
Cassandra (current-state row per video).GET /youtube/v3/search — Discovery seed: query by topic, region, date
windows. Returns lightweight video / channel resource IDs.GET /youtube/v3/videos — Hydration: part=snippet,contentDetails,statistics,topicDetails,status,liveStreamingDetails.
Batched 50 IDs per call.GET /youtube/v3/channels — Channel metadata, uploads playlist ID, total
view counts, country, custom URL.GET /youtube/v3/playlistItems — Walk a channel's uploads
playlist to enumerate every public video.GET /youtube/v3/captions + caption download — Fetch closed-caption
tracks (when permitted) for transcript-level NLP.GET /youtube/v3/commentThreads — Top-level comments and reply threads,
used for engagement-tone signals.GET /youtube/v3/videoCategories — YouTube's own coarse categories
(used as a weak prior, not as ground truth).The YouTube Data API enforces a daily quota measured in "units" — a
search.list call costs 100 units, while videos.list with parts
costs ~7 units per call (for 50 videos batched). With a default 10,000-unit daily quota
per project, naive crawling is impossible at scale. We addressed this with:
channel_id hash. Coordinator service routes
outbound requests through whichever project still has quota.search.list calls used only for cold-start discovery and trending topics;
the steady-state crawl walks uploads playlists (1 unit each) and batches
videos.list calls.package com.brandsafe.crawl
import akka.actor.typed.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import io.circe.parser._
final case class VideoId(value: String) extends AnyVal
final case class ChannelId(value: String) extends AnyVal
class YouTubeClient(
apiKeyPool: ApiKeyPool,
rateLimiter: TokenBucket,
http: HttpExt
)(implicit ec: ExecutionContext, sys: ActorSystem[_]) {
private val Base = "https://www.googleapis.com/youtube/v3"
/** Hydrate up to 50 video IDs in a single videos.list call. */
def videosBatch(ids: Seq[VideoId]): Future[Seq[VideoResource]] = {
require(ids.size <= 50, "videos.list max 50 ids per call")
rateLimiter.acquire().flatMap { _ =>
val key = apiKeyPool.checkout(quotaCost = 7)
val parts = "snippet,contentDetails,statistics,topicDetails,status"
val uri = Uri(s"$Base/videos")
.withQuery(Uri.Query(
"part" -> parts,
"id" -> ids.map(_.value).mkString(","),
"maxResults" -> "50",
"key" -> key
))
http.singleRequest(HttpRequest(uri = uri))
.flatMap(handleResponse[VideosListResponse])
.map(_.items)
.recoverWith(quotaAware(key))
}
}
/** Walk a channel's uploads playlist via playlistItems pagination. */
def channelUploads(uploadsPlaylistId: String): Source[VideoId, _] =
Source.unfoldAsync[Option[String], Seq[VideoId]](Some("")) {
case None => Future.successful(None)
case Some(pageToken) =>
playlistPage(uploadsPlaylistId, pageToken).map { page =>
Some((page.nextPageToken, page.videoIds))
}
}.mapConcat(identity)
}
// Each crawler worker writes raw API responses to Kinesis with channel_id
// as the partition key, guaranteeing per-channel ordering downstream.
val record = PutRecordRequest.builder()
.streamName("youtube-raw-v1")
.partitionKey(channelId.value)
.data(SdkBytes.fromUtf8String(rawJson))
.build()
kinesisAsync.putRecord(record).asScala
.recover {
case e: ProvisionedThroughputExceededException =>
// Kinesis backpressure: park the record on a local SQS DLQ
dlq.enqueue(channelId, rawJson)
}
Cassandra was chosen for the serving-side feature store and engagement time-series
because the access pattern is overwhelmingly point reads keyed by
video_id, with high write fan-out from streaming engagement
updates. It scales linearly, tolerates AZ failures with QUORUM writes across three
replicas, and avoids the read-amplification problems we hit with HBase on the same
workload.
-- Current-state video record. One row per video.
CREATE TABLE yt.video_current (
video_id text PRIMARY KEY,
channel_id text,
title text,
description text,
published_at timestamp,
duration_seconds int,
language text,
category_id int,
view_count bigint,
like_count bigint,
comment_count bigint,
topic_ids set<text>,
caption_available boolean,
last_seen_at timestamp
) WITH compaction = {'class': 'LeveledCompactionStrategy'};
-- Engagement time-series for trend detection.
CREATE TABLE yt.video_engagement_ts (
video_id text,
bucket_hr timestamp,
views bigint,
likes bigint,
comments bigint,
PRIMARY KEY ((video_id), bucket_hr)
) WITH CLUSTERING ORDER BY (bucket_hr DESC)
AND compaction = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'HOURS',
'compaction_window_size': 24};
-- Feature store consumed by the classifier.
CREATE TABLE yt.video_features (
video_id text PRIMARY KEY,
tokens frozen<list<text>>,
pos_tags frozen<list<text>>,
entities frozen<map<text, text>>, -- entity -> type (PERSON, ORG, LOC, ...)
embedding frozen<list<float>>, -- 384-dim sentence embedding
toxicity_score float,
language_conf float,
computed_at timestamp
);
-- Brand-safety verdicts written back by the classifier.
CREATE TABLE yt.video_safety (
video_id text PRIMARY KEY,
overall_verdict text, -- safe | risky | unsafe
iab_categories frozen<map<text, float>>,
policy_flags frozen<set<text>>, -- violence, hate, adult, profanity, ...
copyright_flag boolean,
computed_at timestamp,
classifier_version text
);
S3 is the durable system of record. Cassandra is rebuildable from S3 within the SLA window.
s3://brandsafe-yt-prod/
bronze/yt_raw/dt=YYYY-MM-DD/hh=HH/ # raw JSON, snappy parquet
silver/yt_videos/dt=YYYY-MM-DD/ # parsed + deduped
silver/yt_features/dt=YYYY-MM-DD/ # NLP outputs
gold/yt_safety_verdicts/dt=YYYY-MM-DD/ # daily classifier output
index/xapian/version=YYYYMMDDHHMM/shard=NN/ # rsyncable shard snapshots
models/safety/version=vN/ # trained classifier weights
tokens in the feature store.title + description + first 1k caption tokens for nearest-neighbour search
and semantic clustering.import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import com.brandsafe.nlp._
val spark = SparkSession.builder().appName("yt-nlp").getOrCreate()
import spark.implicits._
val raw = spark.read.parquet("s3://brandsafe-yt-prod/bronze/yt_raw/dt=2026-04-21/")
val text = raw.selectExpr(
"video_id",
"concat_ws(' ', title, description, coalesce(captions, '')) as raw_text",
"language as declared_lang"
)
val tokenized = text
.withColumn("language", detectLanguageUDF($"raw_text"))
.filter($"language" === "en")
.withColumn("tokens", tokenizeUDF($"raw_text"))
.withColumn("pos_tags", posTagUDF($"tokens"))
.withColumn("entities", nerUDF($"tokens", $"pos_tags"))
.withColumn("embedding", sentenceEmbedUDF($"raw_text"))
.withColumn("toxicity_score", toxicityProbeUDF($"tokens"))
tokenized.write
.mode("overwrite")
.partitionBy("dt")
.parquet("s3://brandsafe-yt-prod/silver/yt_features/dt=2026-04-21/")
// tokenizer.cpp — used inside the C++ classifier service for sub-ms tokenization.
#include <string>
#include <vector>
#include <unicode/unistr.h>
#include <unicode/normalizer2.h>
namespace brandsafe::nlp {
class FastTokenizer {
public:
explicit FastTokenizer(const VocabTrie& vocab) : vocab_(vocab) {}
std::vector<TokenId> tokenize(std::string_view text) const {
icu::UnicodeString u = icu::UnicodeString::fromUTF8(
icu::StringPiece(text.data(), text.size()));
UErrorCode err = U_ZERO_ERROR;
const auto* nfkc = icu::Normalizer2::getNFKCInstance(err);
icu::UnicodeString normalized;
nfkc->normalize(u, normalized, err);
std::vector<TokenId> out;
out.reserve(text.size() / 5); // heuristic: ~5 chars/token
int32_t i = 0;
while (i < normalized.length()) {
auto [token_id, consumed] = vocab_.longest_match(normalized, i);
if (consumed == 0) {
out.push_back(VocabTrie::kUnk);
i += U16_LENGTH(normalized.char32At(i));
} else {
out.push_back(token_id);
i += consumed;
}
}
return out;
}
private:
const VocabTrie& vocab_;
};
} // namespace brandsafe::nlp
Xapian is a mature C++ probabilistic IR library implementing BM25 (and several other
weighting schemes) with very fast posting-list traversal and an extensible
Weight subclassing API. The reasons we picked it over Elasticsearch /
Lucene at the time:
Xapian::Weight
subclasses let us inject brand-safety priors directly into the score, rather than
rescoring after retrieval.Each Xapian document represents one video, with prefixed terms per field so a query
can target a single field (e.g., T:tesla matches only titles).
| Prefix | Field | Boost |
|---|---|---|
T | Title | 3.0 |
D | Description | 1.0 |
C | Captions | 1.5 |
K | Keywords / tags | 2.0 |
E | Named entities | 2.5 |
XCH | Channel ID (filter) | — |
XLN | Language (filter) | — |
XCAT | Safety verdict (filter) | — |
// indexer.cpp — bulk-loads a Xapian shard from a Parquet partition.
#include <xapian.h>
#include "feature_reader.h"
void index_shard(const std::string& shard_path,
FeatureReader& reader) {
Xapian::WritableDatabase db(shard_path,
Xapian::DB_CREATE_OR_OPEN | Xapian::DB_BACKEND_GLASS);
Xapian::TermGenerator tg;
tg.set_stemmer(Xapian::Stem("en"));
tg.set_stemming_strategy(Xapian::TermGenerator::STEM_SOME);
while (auto rec = reader.next()) {
Xapian::Document doc;
doc.set_data(rec->video_id);
doc.add_value(SLOT_PUBLISHED_TS,
Xapian::sortable_serialise(rec->published_unix));
doc.add_value(SLOT_VIEWS,
Xapian::sortable_serialise(rec->view_count));
tg.set_document(doc);
tg.index_text(rec->title, 3, "T");
tg.index_text(rec->description, 1, "D");
tg.index_text(rec->captions, 2, "C");
for (const auto& kw : rec->tags) doc.add_term("K" + kw, 2);
for (const auto& en : rec->entities) doc.add_term("E" + en, 3);
doc.add_boolean_term("XCH" + rec->channel_id);
doc.add_boolean_term("XLN" + rec->language);
doc.add_boolean_term("XCAT" + rec->safety_verdict);
db.add_document(doc);
}
db.commit();
}
The corpus is sharded by hash(channel_id) % N. Each indexer node owns
~10 shards of ~50M videos each; a query fans out to all shards in parallel through
Xapian::Database's multi-shard support. We chose channel-based sharding
(rather than video-id) so that channel-scoped queries hit a single shard, and so that a
hot channel's writes serialize on one indexer rather than thundering across the fleet.
BM25 scores a document D against a query Q as:
score(D, Q) = Σ_{q ∈ Q} IDF(q) · (f(q,D) · (k1+1))
/ (f(q,D) + k1 · (1 - b + b · |D|/avgdl))
where:
f(q, D) = term frequency of q in D
|D| = length of D in tokens
avgdl = average document length across the corpus
k1, b = tunables (defaults 1.2 and 0.75)
IDF(q) = log((N - n(q) + 0.5) / (n(q) + 0.5) + 1)
For YouTube content, two properties of standard BM25 hurt us:
b=0.75 overpenalizes legitimate long
descriptions on educational and tutorial content.We addressed this with a BM25F-style per-field formulation: each
field gets its own k1, b, and weight, with field-level length
normalization. Title used b=0.0 (no length penalty) and a 3× weight;
description used b=0.75 and 1×; captions used b=0.5 and 1.5×.
The exact values were swept against a labeled relevance set of 50k judged
<query, video, label> triples.
// brand_safety_weight.cpp
// Wraps standard BM25 and folds in a per-document safety prior so that risky
// content is downweighted at retrieval time, not just filtered after scoring.
#include <xapian.h>
#include "safety_score_table.h"
class BrandSafetyWeight : public Xapian::BM25Weight {
public:
BrandSafetyWeight(double k1, double k2, double k3, double b,
double min_normlen,
const SafetyScoreTable& safety,
double safety_lambda)
: Xapian::BM25Weight(k1, k2, k3, b, min_normlen),
safety_(safety),
lambda_(safety_lambda) {}
std::string name() const override { return "BrandSafetyWeight"; }
BrandSafetyWeight* clone() const override {
return new BrandSafetyWeight(*this);
}
double get_sumpart(Xapian::termcount wdf,
Xapian::termcount doclen,
Xapian::termcount uniqterms) const override {
double bm25 = Xapian::BM25Weight::get_sumpart(wdf, doclen, uniqterms);
// Safety prior is in [0, 1]; lambda controls how aggressively unsafe
// documents are demoted in the ranking.
Xapian::docid did = get_docid_for_current_post();
double safety = safety_.lookup(did); // 1.0 = safe, 0.0 = unsafe
return bm25 * (1.0 - lambda_ * (1.0 - safety));
}
private:
const SafetyScoreTable& safety_;
double lambda_;
};
We swept k1 ∈ {0.9, 1.2, 1.5, 1.8}, b ∈ {0.25, 0.5, 0.75},
and lambda ∈ {0.0, 0.25, 0.5, 0.75} against a held-out judged set,
measuring nDCG@10 for relevance and a custom "advertiser-safe-precision@10" metric:
the fraction of top-10 results that human reviewers labeled as safe for a default
brand-suitability profile. Final production values: title k1=1.2 b=0.0,
description k1=1.5 b=0.75, captions k1=1.4 b=0.5,
lambda=0.5.
Two parallel taxonomies are scored on every video:
The classifier is a hybrid: hand-curated keyword/entity rules (domain-expert-authored) combined with a trained gradient-boosted model over the following features:
The hot inference path lives in a C++ service that loads the Xapian shard and the
gradient-boosted model (compiled to a flat binary via treelite). A Scala
front-end exposes the REST API and handles request validation, auth, and result
hydration from Cassandra. The boundary is a thin gRPC interface.
// classifier_service.cpp — gRPC handler.
grpc::Status ClassifierServiceImpl::Classify(
grpc::ServerContext* ctx,
const ClassifyRequest* req,
ClassifyResponse* resp) {
const auto& vid = req->video_id();
auto features = feature_cache_.get_or_load(vid);
if (!features) {
return grpc::Status(grpc::StatusCode::NOT_FOUND, "unknown video_id");
}
// 1. Score against each safety policy bundle via Xapian queries.
for (const auto& [policy, bundle] : safety_bundles_) {
Xapian::Query q = bundle.build_query();
Xapian::Enquire enq(*shard_db_);
enq.set_query(q);
enq.set_weighting_scheme(Xapian::BM25Weight());
auto mset = enq.get_mset(0, 1, nullptr,
single_doc_filter(features->xapian_docid));
double score = mset.empty() ? 0.0 : mset[0].get_weight();
features->safety_features[policy] = normalize(score, bundle);
}
// 2. Concatenate features and run the GBDT.
auto vec = features->to_dense_vector();
auto preds = gbdt_.predict(vec);
// 3. Apply policy thresholds to produce verdicts.
populate_response(*features, preds, resp);
return grpc::Status::OK;
}
Engagement metrics flow in through video_engagement_ts. We compute
rolling 24h / 7d / 30d view-velocity, like/dislike ratios where available, comment
volume per minute of video, and a comment-toxicity ratio (fraction of comments with
toxicity probe > 0.7). High-velocity videos are re-classified preferentially because
they are the ones most likely to be served against ads imminently.
Channel-declared country, video-level geo-tags (rare), and per-region view-share distributions feed a "regional appropriateness" feature: a video may be safe in one region and restricted in another. This matters operationally because campaigns are geo-targeted and an advertiser running in Germany may have stricter requirements around political content than the same advertiser running in Brazil.
A separate fingerprinting subsystem (Chromaprint-style audio fingerprints + perceptual
video hashes for keyframes) flags videos containing matched copyrighted material. The
flag is surfaced as copyright_flag = true in video_safety and
is hard-blocked from inclusion lists for advertisers in regulated verticals (music
labels, film studios, broadcasters).
GET /v1/videos/{video_id} — Hydrated video record + safety verdict.GET /v1/videos/{video_id}/safety — Just the safety verdict and
per-policy scores.POST /v1/search — Ranked search with brand-suitability filter.
Request body specifies query terms, advertiser policy ID, and contextual segments.POST /v1/lists/inclusion — Generate or refresh an inclusion list for
a campaign given a policy + contextual seed.POST /v1/lists/exclusion — Symmetric; generate exclusions.POST /v1/classify — On-demand classification for a batch of video IDs
(used by ad-server pre-bid systems).# POST /v1/search
query: "electric vehicle road trip"
policy_id: "advertiser-acme-strict-v3"
contextual_segments:
- automotive.electric_vehicles
- travel.road_trips
filters:
language: ["en"]
region: ["US", "CA"]
min_view_count: 10000
published_after: "2025-01-01"
limit: 50
ranking:
scheme: "bm25f_safety"
safety_lambda: 0.6
# 200 OK
results:
- video_id: "dQw4w9WgXcQ"
channel_id: "UCabcdef"
title: "Cross-country in a Model 3 — 4,200 miles"
score: 18.42
safety_verdict: "safe"
safety_policy_flags: []
contextual_scores:
automotive.electric_vehicles: 0.93
travel.road_trips: 0.88
- ...
total_matched: 8421
took_ms: 47
shard_stats:
shards_queried: 32
shards_with_hits: 28
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import scala.concurrent.duration._
class SearchRoute(svc: SearchService)(implicit ec: ExecutionContext) {
val route =
pathPrefix("v1") {
path("search") {
post {
entity(as[SearchRequest]) { req =>
withRequestTimeout(2.seconds) {
onSuccess(svc.search(req)) { resp =>
complete(StatusCodes.OK -> resp)
}
}
}
}
} ~
path("classify") {
post {
entity(as[ClassifyBatchRequest]) { req =>
require(req.videoIds.size <= 1000, "max 1000 ids per call")
onSuccess(svc.classifyBatch(req)) { resp =>
complete(StatusCodes.OK -> resp)
}
}
}
} ~
path("videos" / Segment / "safety") { vid =>
get {
onSuccess(svc.safetyVerdict(VideoId(vid))) {
case Some(v) => complete(v)
case None => complete(StatusCodes.NotFound)
}
}
}
}
}
Demand-side platforms call POST /v1/classify in the pre-bid window
(typically <100ms budget) to decide whether to bid on an impression on a given
YouTube video. Our SLO for this path is p99 < 80ms server-side,
which the C++ classifier comfortably meets when the feature row is hot in Cassandra.
Cold-feature path (video never seen before) falls back to a fast classifier path
operating only on the snippet returned by the YouTube ad-server and a channel-level
prior.
Backpressure: the front-end Akka HTTP service uses a bounded request queue with
503 Retry-After: 1 shedding when downstream Cassandra latency degrades.
The DSP integration is configured to treat a 503 as "do not bid" — failing closed is
the brand-safe default.
RF=3, QUORUM reads/writes, ~120 nodes at peak.part selection cut quota burn
~70% versus a naive crawler.Weight subclass eliminated the need for an
expensive rescoring pass and cut p99 latency by ~40%.