Challenge: Design the OldHost Sync Layer β compress 300+ ops/sec down to 2.5 ops/sec
Simple, clean diagram for whiteboard presentation:
Full system with all components:
Quick reference version:
| Metric | Value |
|---|---|
| Units managed | 50,000 |
| Peak input rate | 300+ ops/sec (10K events in 30 min) |
| OldHost throughput | 2.5 ops/sec (400ms latency, single-threaded) |
| OldHost constraints | Single TCP socket, sequential nonce (NβN+1βN+2) |
| Nonce violation penalties | Gap = 400 error, Replay = 15-minute ban |
The Core Problem: We need to compress 300+ ops/sec down to 2.5 ops/sec while maintaining correctness.
-- Shadow State (instant reads)
units (
id, name, property_id,
status, -- Clean/Dirty/Cleaning
sync_status, -- PENDING_SYNC / SYNCED / SYNC_FAILED
synced_at
)
-- Transactional Outbox (durable writes)
outbox_events (
id, idempotency_key UNIQUE,
entity_type, entity_id,
event_type, payload JSON,
priority_class, -- EMERGENCY / TXN / LWW
created_at, processed_at
)
-- Nonce Ledger (crash-safe nonce state)
nonce_ledger (
id DEFAULT 1, -- Singleton row
next_nonce BIGINT,
last_applied_nonce BIGINT
)
nonce_requests (
nonce BIGINT PRIMARY KEY,
idempotency_key UNIQUE,
status, -- RESERVED / SENT / APPLIED / FAILED
created_at, sent_at, completed_at
)
-- Leader Election
writer_lease (
id DEFAULT 1, -- Singleton row
owner_id, epoch INT, -- Fencing token
acquired_at, expires_at
)
POST /api/units/{id}/status
Body: { status: "Clean" }
Response: { sync_status: "PENDING_SYNC" } // < 200ms
POST /oldhost/sync
Headers: X-Nonce: 501
Body: { unit_id, status, ... }
Response: 200 OK | 400 (gap/replay)
GET /oldhost/expected-nonce
Response: { expected_nonce: 502 }
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β HIGH LEVEL DESIGN β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ββββββββββββ βββββββββββββββββββββββββββββββββββ
β Agents ββββββΆβ Shadow State βββββ UI reads (< 200ms)
β (300/s) β β (MariaDB: status, sync_status) β
ββββββββββββ βββββββββββββββββ¬ββββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β Outbox Table β
β (same DB, txn) β
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β Classifier β
βββββ¬ββββββ¬ββββββ¬ββ
β β β
ββββββββββββββββ β ββββββββββββββββ
βΌ βΌ βΌ
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Emergency Q β βTransactional β βLWW Coalescer β
β (~2%) β β (~18%) β β (~80%) β
β SLO: 30s β β No compress β β Debounce 2m β
ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ
β β β
ββββββββββββββββββββββΌβββββββββββββββββββββ
βΌ
βββββββββββββββββββββββββ
β Weighted Scheduler β
β Emergency = ABSOLUTE β
β Txn : LWW = 3 : 1 β
βββββββββββββ¬ββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββ
β SINGLETON WRITER β
β β’ Lease + Epoch (fencing) β
β β’ Nonce Ledger (crash-safe) β
β β’ Circuit Breaker (ban) β
β β’ Single TCP Socket β
βββββββββββββββββββ¬ββββββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β OldHost β
β (2.5 ops/sec) β
βββββββββββββββββββ
| Decision | Rationale |
|---|---|
| Shadow State | Decouples UI speed from OldHost latency |
| Outbox Pattern | Atomic: business write + event publish in one transaction |
| LWW Coalescing | Compresses 8,000 status updates β 2,500 ops |
| 3 Priority Queues | Prevents housekeeping from starving emergencies |
| Singleton Writer | OldHost requires single socket + sequential nonces |
| Lease + Epoch | Prevents split-brain after failover |
Peak: 10,000 events in 30 minutes
Event Classification:
βββ LWW (80%): 8,000 events β ~2,500 unique entities β 2,500 ops β
βββ Transactional: 1,800 events β 1,800 ops (no compression)
βββ Emergency: 200 events β 200 ops (no compression)
Total after coalescing: 2,500 + 1,800 + 200 = 4,500 ops
Capacity: 2.5 ops/sec Γ 1,800 sec = 4,500 ops β EXACTLY AT LIMIT
class LWWCoalescer:
def __init__(self):
self.buffer = {} # entity_key β (event, timestamp, timer)
self.debounce_ms = 120_000 # 2 minutes
def add(self, event):
key = f"{event.entity_type}:{event.entity_id}"
if key in self.buffer:
old_event, _, timer = self.buffer[key]
timer.cancel() # Cancel pending flush
if event.timestamp <= old_event.timestamp:
return # Discard older event (Last Write Wins)
# Store latest, start new debounce timer
timer = Timer(self.debounce_ms, lambda: self.flush(key))
self.buffer[key] = (event, event.timestamp, timer)
timer.start()
-- Acquire lease (atomic compare-and-swap)
UPDATE writer_lease
SET owner_id = :new_owner,
epoch = epoch + 1, -- Fencing token increments!
acquired_at = NOW(),
expires_at = NOW() + INTERVAL 30 SECOND
WHERE expires_at < NOW() -- Only if expired
OR owner_id = :new_owner; -- Or we already own it
class SingletonWriter:
def send_to_oldhost(self, request):
# Before EVERY request, verify our epoch is still valid
current_epoch = db.query(
"SELECT epoch FROM writer_lease WHERE owner_id = ?",
self.owner_id
)
if current_epoch != self.my_epoch:
raise FencedOutError("Another writer took over!")
return self.socket.send(request)
1. Writer A crashes after sending nonce 500
DB state: nonce_requests[500] = SENT
OldHost: May or may not have processed it
2. Writer B acquires lease (epoch = old_epoch + 1)
3. Writer B runs recovery:
a. Query: SELECT * FROM nonce_requests WHERE status IN ('RESERVED', 'SENT')
b. Found: nonce 500, status SENT
c. Call: GET /expected-nonce β returns 501
d. 501 > 500, so nonce 500 was applied!
e. Update: nonce_requests[500].status = APPLIED
f. Continue from nonce 501
def recover_from_crash():
pending = db.query(
"SELECT * FROM nonce_requests WHERE status IN ('RESERVED', 'SENT')"
)
if not pending:
return # Clean state
# Call OldHost once to reconcile
expected = oldhost.get_expected_nonce()
for req in pending:
if req.nonce < expected:
# OldHost processed it
db.update("nonce_requests", {status: "APPLIED"}, where={nonce: req.nonce})
elif req.nonce == expected:
# Lost in flight, resend
resend(req)
else:
# req.nonce > expected = GAP (should never happen!)
alert_ops("P0", f"Nonce gap: expected {expected}, have {req.nonce}")
raise CriticalError()
class BanCircuitBreaker:
def __init__(self):
self.state = "CLOSED"
self.ban_duration = 15 * 60 # 15 minutes
self.opened_at = None
def on_replay_detected(self):
self.state = "OPEN"
self.opened_at = time.now()
alert_ops("P0", "Nonce replay! Circuit breaker OPEN for 15 min")
def is_open(self):
if self.state == "CLOSED":
return False
elapsed = time.now() - self.opened_at
if elapsed >= self.ban_duration:
self.state = "CLOSED"
return False
return True
Ban: 15 min = 900 seconds
Incoming: 5.5 ops/sec Γ 900 = 4,950 raw events
After coalescing: ~2,000 events
Drain after ban: 2,000 / 2.5 = 800 sec = ~13 min
Total recovery: 15 + 13 = 28 minutes (within 4h SLA β)
| Component | Purpose |
|---|---|
| Shadow State | Instant UI response (< 200ms) |
| Outbox Pattern | Atomic, durable event capture |
| 3 Priority Queues | Emergency (30s SLO), Transactional, LWW |
| LWW Coalescer | Compresses 80% of traffic |
| Weighted Scheduler | Fair scheduling with emergency priority |
| Singleton Writer | Single socket + sequential nonces |
| Lease + Epoch | Fencing for crash recovery |
| Nonce Ledger | Crash-safe nonce state |
| Circuit Breaker | Ban recovery |