Design a streaming algorithm to emit, at the end of each calendar month in a specified time zone, the counts and percentage shares of requests from NEW vs RETURNING users. A request is NEW if its month equals the user's first‑ever request month; otherwise RETURNING. Constraints: up to 1B distinct users; 50K requests/sec; 8 GB RAM; events arrive mostly time‑ordered but can be up to 7 days late; duplicates may occur; you may keep limited state per user. Describe: (1) data structures (e.g., compact first‑seen month store, Bloom filters, HLL/Count‑Min) and their memory footprints; (2) exact vs approximate designs and error bounds ensuring ≤0.5 percentage‑point error in monthly shares; (3) handling late and out‑of‑order events and monthly watermarking; (4) deduplication strategy; (5) time/space complexity; (6) recovery/checkpointing for fault tolerance.
Quick Answer: This question evaluates a candidate's competency in designing scalable, stateful streaming analytics for monthly NEW vs RETURNING request metrics, focusing on event-time processing with late/out-of-order arrivals, deduplication, compact state and probabilistic data-structure trade-offs with quantifiable error bounds.
Solution
# Solution overview
We need event-time, calendar-month aggregates in a specified time zone, with correctness under out-of-order arrivals and limited RAM. The two core problems are:
- Classifying each event as NEW vs RETURNING: requires knowing each user's first-ever month.
- Emitting final monthly counts only when late data have been processed (watermarking).
I propose two designs:
- Exact (recommended): disk-backed per-user first-month state (e.g., RocksDB) with per-user-month counters for in-flight months; exact classification and counts.
- Approximate (RAM-first): replace the per-user map with a compact “ever-seen” membership filter (Bloom/Cuckoo), with tunable false-positive rate to bound share error ≤ 0.5 pp.
Both designs partition by user_id across tasks to meet throughput and memory constraints.
Assumptions (explicit):
- Each event has: user_id (string or 64-bit), event_time (UTC), request_id (UUID or a stable hash; if missing, we derive a surrogate from stable fields).
- Processing is distributed across P partitions by consistent hashing on user_id; memory and rates below are per partition unless noted.
- We emit final month M at watermark >= end_of_month(M) + 7 days. Late events beyond 7 days are dropped or logged.
# Architecture (common to both designs)
- Partitioning: keyBy(user_id) → each task handles a disjoint subset of users; state is local and small.
- Event-time conversion: event_time → target time zone → month_index = months since epoch (e.g., 1970-01 → 0). Use calendar-aligned tumbling windows by month in that time zone.
- Watermark: W = max_seen_event_time - 7 days. Finalize month m when W ≥ end_of_month(m).
- Aggregates per month (task-local):
- req_new[m]: 64-bit counter of NEW requests.
- req_ret[m]: 64-bit counter of RETURNING requests.
- Optionally, per-user-month counts for unfinalized months (to support retractions when earlier first-month discoveries happen).
# Design A — Exact (recommended)
## Data structures and memory footprints
- User → first_month store (disk-backed KV, e.g., RocksDB):
- Key: 64-bit hash of user_id.
- Value: 2-byte month_index (0..65535 covers > 5 centuries). Optionally 1 byte version/check.
- On-disk bytes/entry: ~12–24 bytes (key, value, index/LSM overhead, compression). For 1B users globally and P partitions, each task stores ~1B/P entries; with P=1024, ~1M entries → ~12–24 MB per task on disk. Block cache 0.5–2 GB RAM per task is plenty.
- Per-user-month counts (for open months only, disk-backed): key=(user_hash, month_index), value=int count. Keeps reclassification exact under late data. With 126M events/month/task if P=1024 and average 10 events/user, this is ~12.6M keys → ~300–600 MB on disk.
- Dedup ring (see Dedup section): time-bucketed Cuckoo filters (e.g., 15-min buckets retained for 8 days); per task ~50–100 MB RAM for ~0.1% FP.
- Monthly counters: a few 64-bit integers per open month (negligible RAM).
## Event processing (pseudocode)
1) Normalize event_time to month m in target time zone.
2) Deduplicate by request_id (see Dedup below). If duplicate → drop.
3) Lookup user_hash in user→first_month store.
- Not found: set first_month = m (write), classify NEW.
- Found = fm:
- if m == fm → NEW.
- if m > fm → RETURNING.
- if m < fm → Out-of-order earlier first event discovered:
- Reclass corrections: prior months where we had counted this user as NEW (specifically month=fm) must shift to RETURNING. Use per-user-month counts to compute and apply a retraction for month=fm: req_new[fm] -= count(user, fm); req_ret[fm] += count(user, fm). Update fm := m.
4) Increment per-user-month count for (user, m) and increment req_new[m] or req_ret[m].
5) Watermark advancement: when W ≥ end_of_month(m), emit final counts for m and drop per-user-month state for m.
Notes
- We only need retractions for a user’s originally recorded first_month; later months don’t require reclassification because they were RETURNING already.
- With 7-day lateness, such fm back-shifts occur rarely and only before fm’s watermark closure. We support exact retractions via the per-user-month counts.
## Deduplication strategy
- Use event_id (or a stable surrogate hash on immutable fields).
- Ring of Cuckoo filters by event_time bucket, retained for 8 days (lateness + slack). E.g., 15-min buckets:
- Per task items/bucket ≈ (50K/sec ÷ P) × 900 sec.
- With P=1024 → ~49 × 900 ≈ 44K items/bucket.
- Buckets retained: 8 days × 96 buckets/day = 768 buckets.
- Total items ~ 44K × 768 ≈ 33.8M per task. With 12–14 bit fingerprints and 0.95 load, RAM ≈ 60–90 MB per task, FP ~0.1–0.3%.
- Optional: Put a tiny LRU hash (e.g., last few minutes) in front to further cut lookups.
## Watermarking and finalization
- Watermark W = max_event_time_seen - 7 days.
- Finalize month m when W ≥ end_of_month(m) in the specified time zone.
- Emit counts and shares: share_new[m] = req_new[m] / (req_new[m] + req_ret[m]).
## Error
- Classification/counting are exact, assuming:
- First_month store is durable.
- Per-user-month counts enable exact retractions before watermark closure.
- Dedup is exact. If dedup uses Cuckoo with FP p_dup, worst-case share error ≤ p_dup (all false drops happen in one class). Choose p_dup ≤ 0.2%.
- Hence we can meet ≤ 0.5 percentage-point error easily (effectively 0 in practice).
## Time and space complexity
- Per event: dedup O(1), KV get/put O(log LSM) amortized ~O(1), counter update O(1).
- Memory (per task):
- RocksDB block cache: 0.5–2 GB.
- Dedup ring: ~60–90 MB.
- Aggregation buffers: ~10s–100s MB depending on concurrency.
- Total ≤ 8 GB.
- Disk (per task): tens to hundreds of MBs for first_month + per-user-month; scale with P.
## Fault tolerance and checkpointing
- Use a streaming engine with exactly-once state and source commits (e.g., Flink with RocksDB state backend):
- Checkpoint state: user→first_month store, per-user-month counts, monthly counters, dedup ring metadata, and source offsets, atomically.
- Incremental checkpoints for RocksDB reduce I/O.
- On recovery, restore watermark and resume; retractions replay deterministically.
# Design B — Approximate (RAM-first) with bounded error
Replace the per-user map with an “ever-seen-before-current-month” membership filter. Classification becomes approximate but we can bound the impact on share.
## Idea
- Maintain an “ever seen before month m” Cuckoo/Bloom filter SeenBefore_m over user_ids. A hit means the user existed in some month < m.
- For an event in month m:
- If user ∈ SeenBefore_m → RETURNING.
- Else → NEW.
- We only insert users into SeenBefore_m after finalizing month m (i.e., build SeenBefore_{m+1} = SeenBefore_m ∪ Users_in_month_m). This avoids flipping a new user to returning within the same month.
- Keep per-user-month counts (disk-backed) for open months to enable corrections due to late data within the 7-day window.
## Data structures and memory
- SeenBefore (Bloom or Cuckoo filter): n ≤ 1B items, target FP p_BF chosen so share error ≤ 0.5 pp.
- Bloom filter bits/item m/n = -ln(p_BF)/(ln 2)^2.
- For p_BF = 0.003 (0.3 pp): m/n ≈ 12.1 bits. Memory ≈ 1.51 GB.
- For p_BF = 0.005 (0.5 pp): m/n ≈ 11.0 bits. Memory ≈ 1.38 GB.
- Hash count k ≈ (m/n) ln 2 ≈ 7–8.
- Per-user-month counts: same as Design A, for in-flight months.
- Dedup ring: same as Design A; set p_dup small.
## Classification errors and bound
- Bloom/Cuckoo filter can only cause false positives (classify NEW as RETURNING). False negatives don’t occur because we don’t remove and we lag insertion until month close.
- Let true monthly share be s_true = N_new / (N_new + N_ret). Observed share:
- N_new_obs = N_new × (1 - p_BF).
- N_total_obs ≈ N_total if dedup is exact; with dedup FP p_dup worst-case all drops hit one class.
- Absolute error bound (worst-case, union bound):
- |s_obs - s_true| ≤ p_BF + p_dup.
- Choose p_BF ≤ 0.003 and p_dup ≤ 0.002 → total ≤ 0.5 percentage points.
- In typical traffic, drops impact both classes and error is much smaller.
## Handling late/out-of-order
- Watermarking identical to Design A.
- Because SeenBefore_m for month m does not include users first seen in m-1 until m-1 finalizes, events for month m arriving before m-1 closes can be temporarily misclassified NEW. We correct this by keeping per-user-month counts for open months and applying retractions at m-1 finalization:
- When month m-1 finalizes, we know Users_in_month_{m-1}. For any user first seen in m-1, reclassify their month m counts (if any) from NEW to RETURNING using the per-user-month store.
- This keeps the approximation strictly due to the filter FP only, not due to lateness.
## Dedup, complexity, and FT
- Same as Design A.
- Memory fits within 8 GB per task (SeenBefore ≈ 1.4–1.6 GB + dedup ring ≈ 60–90 MB + caches ≤ 2 GB + overhead).
# Time zone, windowing, and correctness details
- Month boundaries must be aligned to the specified time zone, including DST transitions. Compute month_index by converting event_time to local time, then taking (year, month).
- Define watermark on event-time (not processing time). Drop or quarantine events beyond allowed lateness.
- Emit final metrics: count_new, count_returning, share_new = count_new / (count_new + count_returning), share_returning = 1 - share_new.
# Where HLL / Count-Min Sketch fit
- We don’t need HLL for the main task (requests, not distinct users), but it’s useful for auxiliary monitoring (e.g., distinct users per month) with tiny memory.
- Count-Min Sketch is not ideal for “first month” because it’s a minimum-over-time problem, not a frequency sum; you could build a multi-hash “min-sketch” for earliest month, but it’s biased toward earlier months and complicates error bounds. Prefer the exact RocksDB map (Design A) or the Bloom/Cuckoo membership (Design B) for clear guarantees.
# Guardrails and validation
- Backfill/bootstrapping: initialize SeenBefore or first_month store from historical batch data, then switch to streaming updates. Otherwise, first month after cutover will overcount NEW.
- Monitor: late-data rate, dedup FP rate (via sample replays), Bloom FPR (via canary partition with exact map), and reconcile monthly aggregates vs. offline batch recomputations.
- SLA knobs: allowed lateness (7d), dedup window (8d retention), BF target p_BF, dedup p_dup. Tune to maintain ≤ 0.5 pp error.
# Small numeric example
- User U has events (PST):
- 2025-03-31 23:50 (month=Mar): first-ever → NEW in Mar.
- 2025-04-01 00:10 (month=Apr): RETURNING in Apr.
- A late event for 2025-03-25 arrives on 2025-04-03: Design A updates first_month to Mar (no change to Mar); if any Apr events were temporarily classed as NEW, retractions convert them to RETURNING before Apr finalizes. Design B behaves the same except any NEW→RETURNING flip in Apr is driven by the per-user-month counts when Mar finalizes.
# Summary
- Design A (exact) uses a disk-backed user→first_month map plus per-user-month counts and yields exact monthly shares (practically 0 error), meeting scale via partitioning and 8 GB/task RAM.
- Design B (approximate) replaces the map with a Bloom/Cuckoo membership filter with FP p_BF and guarantees |share error| ≤ p_BF + p_dup ≤ 0.5 pp with 1.4–1.6 GB RAM for 1B users, while keeping late-event corrections exact via per-user-month counts.
- Both handle 50K req/s globally with P≥∼1024 partitions, deduplicate via a time-bucketed Cuckoo ring, finalize with a 7-day watermark, and checkpoint state for exactly-once recovery.