A reference design for an enterprise HR data platform and API integration framework supporting People Systems and employee lifecycle analytics. The integration layer ingests candidates, employees, and recruitment records from Greenhouse (ATS) and Workday (HCM), lands them in a Medallion lakehouse, and serves real-time dashboards for hiring, attrition, and diversity analytics.
requests + tenacity for retries; async variants with httpx for high-fan-out polling.candidate, application, employee, job_requisition) with SCD Type 2 history.Greenhouse uses HTTP Basic Auth with a base64-encoded API key as the username and an empty password. Keys are scoped per-integration with read-only permissions set in the Greenhouse admin console.
import base64
import os
import requests
GH_BASE = "https://harvest.greenhouse.io/v1"
def greenhouse_headers() -> dict:
api_key = os.environ["GREENHOUSE_API_KEY"]
token = base64.b64encode(f"{api_key}:".encode()).decode()
return {
"Authorization": f"Basic {token}",
"On-Behalf-Of": os.environ["GREENHOUSE_USER_ID"], # audit trail
"Accept": "application/json",
}
Workday issues short-lived access tokens via an Integration System User (ISU) registered as an OAuth client. Tokens are cached in a secrets manager and refreshed proactively before expiry.
import time
import requests
class WorkdayAuth:
def __init__(self, tenant: str, client_id: str, client_secret: str, token_url: str):
self.tenant = tenant
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._token = None
self._expires_at = 0
def token(self) -> str:
if self._token and time.time() < self._expires_at - 60:
return self._token
resp = requests.post(
self.token_url,
data={"grant_type": "client_credentials"},
auth=(self.client_id, self.client_secret),
timeout=15,
)
resp.raise_for_status()
payload = resp.json()
self._token = payload["access_token"]
self._expires_at = time.time() + payload["expires_in"]
return self._token
GET /v1/candidates — Candidate profiles; supports updated_after for incremental sync.GET /v1/applications — Application records linking candidates to jobs.GET /v1/jobs — Job requisitions with departments and offices.GET /v1/job_posts — Public job postings (external/internal visibility).GET /v1/offers — Offer details, compensation, status.GET /v1/scheduled_interviews — Interview scheduling events.GET /v1/scorecards — Structured interviewer feedback.GET /v1/users — Recruiters and hiring managers.GET /v1/departments, GET /v1/offices — Organizational dimensions.Pagination uses Link headers (rel="next"). Rate limit is
50 req/10s per API key; the server returns 429 with a Retry-After
header when exceeded.
GET /ccx/api/v1/{tenant}/workers — Worker records (employees + contingent workers).GET /ccx/api/v1/{tenant}/workers/{id}/personalData — PII details (diversity, address).GET /ccx/api/v1/{tenant}/workers/{id}/jobData — Position, manager, location.GET /ccx/api/v1/{tenant}/workers/{id}/compensation — Pay, grade, bonus targets.GET /ccx/service/customreport2/{tenant}/{owner}/{report}?format=json — RaaS custom reports for headcount snapshots, attrition cohorts, diversity slices.POST /ccx/api/privacy/v1/{tenant}/subject/export — GDPR/CCPA data-subject exports.All API clients wrap requests in a tenacity retry decorator with
exponential backoff + jitter, honor Retry-After, and cap concurrency with
a token-bucket limiter to stay under published quotas.
import random
import time
from typing import Iterator
import requests
from tenacity import (
retry, retry_if_exception_type, stop_after_attempt,
wait_exponential_jitter, before_sleep_log
)
import logging
log = logging.getLogger("hr_api")
class RateLimiter:
"""Simple token bucket (thread-safe enough for single-driver Spark jobs)."""
def __init__(self, rate_per_sec: float, burst: int):
self.rate = rate_per_sec
self.capacity = burst
self.tokens = burst
self.last = time.monotonic()
def acquire(self) -> None:
while True:
now = time.monotonic()
self.tokens = min(self.capacity, self.tokens + (now - self.last) * self.rate)
self.last = now
if self.tokens >= 1:
self.tokens -= 1
return
time.sleep((1 - self.tokens) / self.rate)
GH_LIMITER = RateLimiter(rate_per_sec=4.5, burst=10) # under 50/10s
class RetryableHTTPError(Exception):
pass
@retry(
reraise=True,
retry=retry_if_exception_type((RetryableHTTPError, requests.ConnectionError, requests.Timeout)),
wait=wait_exponential_jitter(initial=1, max=30),
stop=stop_after_attempt(6),
before_sleep=before_sleep_log(log, logging.WARNING),
)
def gh_get(path: str, params: dict | None = None) -> requests.Response:
GH_LIMITER.acquire()
resp = requests.get(f"{GH_BASE}{path}", headers=greenhouse_headers(),
params=params, timeout=30)
if resp.status_code == 429:
time.sleep(int(resp.headers.get("Retry-After", "2")) + random.random())
raise RetryableHTTPError("429 rate limited")
if 500 <= resp.status_code < 600:
raise RetryableHTTPError(f"{resp.status_code} server error")
resp.raise_for_status()
return resp
def gh_paginate(path: str, params: dict | None = None) -> Iterator[dict]:
url = f"{GH_BASE}{path}"
params = dict(params or {}, per_page=500)
while url:
resp = gh_get(url.replace(GH_BASE, ""), params=params)
yield from resp.json()
url = _parse_next_link(resp.headers.get("Link", ""))
params = None # subsequent pages have params baked into the Link URL
def _parse_next_link(header: str) -> str | None:
for part in header.split(","):
if 'rel="next"' in part:
return part.split(";")[0].strip().strip("<>")
return None
from datetime import datetime, timezone
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp, input_file_name
spark = SparkSession.builder.getOrCreate()
def land_greenhouse_candidates(run_id: str, updated_after: str) -> int:
"""Pull candidates and write JSON records to Bronze Delta table."""
rows = []
pulled_at = datetime.now(timezone.utc).isoformat()
for record in gh_paginate("/candidates", params={"updated_after": updated_after}):
rows.append({
"source_system": "greenhouse",
"entity": "candidate",
"source_id": str(record["id"]),
"pulled_at": pulled_at,
"run_id": run_id,
"payload": json.dumps(record),
})
if not rows:
return 0
df = spark.createDataFrame(rows)
(df.write
.mode("append")
.format("delta")
.saveAsTable("hr_prod.bronze.greenhouse_candidates"))
return len(rows)
Silver applies regex normalization (phone/email), fuzzy-match dedup on candidate identity, and business-rule engines that map Greenhouse stages and Workday job codes onto a conformed enterprise taxonomy.
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
def normalize_email(col):
return F.lower(F.trim(col))
def normalize_phone(col):
# strip non-digits, keep last 10 for US
return F.regexp_replace(col, r"\D", "").substr(-10, 10)
bronze = spark.read.table("hr_prod.bronze.greenhouse_candidates")
parsed = bronze.selectExpr(
"source_id",
"pulled_at",
"from_json(payload, 'id BIGINT, first_name STRING, last_name STRING, "
"email_addresses ARRAY<STRUCT<value:STRING,type:STRING>>, "
"phone_numbers ARRAY<STRUCT<value:STRING,type:STRING>>, "
"updated_at TIMESTAMP') AS c"
).select(
F.col("source_id"),
F.col("c.first_name").alias("first_name"),
F.col("c.last_name").alias("last_name"),
normalize_email(F.expr("c.email_addresses[0].value")).alias("email"),
normalize_phone(F.expr("c.phone_numbers[0].value")).alias("phone_e164"),
F.col("c.updated_at").alias("source_updated_at"),
F.col("pulled_at"),
)
# MERGE into Silver with SCD Type 2
from delta.tables import DeltaTable
target = DeltaTable.forName(spark, "hr_prod.silver.candidate_scd2")
(target.alias("t")
.merge(
parsed.alias("s"),
"t.source_id = s.source_id AND t.is_current = true"
)
.whenMatchedUpdate(
condition="t.email <> s.email OR t.phone_e164 <> s.phone_e164 "
"OR t.first_name <> s.first_name OR t.last_name <> s.last_name",
set={"is_current": "false", "valid_to": "s.source_updated_at"}
)
.whenNotMatchedInsertAll()
.execute())
Custom detectors flag data-quality issues: duplicate candidate identities across sources, outlier time-to-hire, missing EEOC fields on diverse-hire cohorts, and interview-feedback scores that deviate >3σ from interviewer baselines.
from pyspark.sql import Window
from pyspark.sql import functions as F
scorecards = spark.table("hr_prod.silver.scorecards")
w = Window.partitionBy("interviewer_id")
stats = scorecards.withColumn("mu", F.avg("overall_score").over(w)) \
.withColumn("sigma", F.stddev_pop("overall_score").over(w))
anomalies = (stats
.withColumn("z", (F.col("overall_score") - F.col("mu")) / F.col("sigma"))
.filter(F.abs("z") > 3)
.select("scorecard_id", "interviewer_id", "candidate_id",
"overall_score", "mu", "sigma", "z"))
(anomalies.write.mode("overwrite")
.saveAsTable("hr_prod.gold.anomaly_scorecard_outliers"))
Workday webhooks and Greenhouse event exports are fanned into a Kafka topic; a Structured Streaming job micro-batches events into Bronze, then declarative streaming pipelines cascade updates into Silver and Gold aggregates (hiring velocity, open requisitions, diversity dashboards) with end-to-end latency under 2 minutes.
from pyspark.sql import functions as F
events = (spark.readStream
.format("kafka")
.option("subscribe", "hr.events.v1")
.option("kafka.bootstrap.servers", "{{ bootstrap }}")
.option("startingOffsets", "latest")
.load())
parsed = (events
.select(F.col("value").cast("string").alias("json"),
F.col("timestamp").alias("event_ts"))
.select(F.from_json("json",
"event_id STRING, source STRING, entity STRING, op STRING, "
"payload STRING, emitted_at TIMESTAMP").alias("e"),
"event_ts")
.select("e.*", "event_ts"))
(parsed.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/chk/hr_events_bronze")
.outputMode("append")
.trigger(processingTime="30 seconds")
.toTable("hr_prod.bronze.hr_events"))
hr_prod.{bronze|silver|gold}.{table}.current_user() membership (HRBPs see only their business units).ssn, dob, home_address for non-privileged readers.source, endpoint, http_status, latency_ms, retry_count, rate_limit_remaining.