Design out-of-order windowed stream processor
Company: Box
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Onsite
##### Question
Design an event processor for an unbounded, infinite stream of events. Each event has the fields `id`, `timestamp`, `payload` (a string), and `checksum`. Events may arrive out of timestamp order, and they should drive a rolling aggregate over a 1-minute (60-second) sliding window.
Your processor must:
1. **Validate each event by its checksum** and discard any event whose checksum does not match its payload.
2. **Drop late and out-of-window events** — any valid event whose timestamp falls more than 60 seconds behind the current window (the latest observed timestamp) is discarded.
3. **Handle duplicates** — if the same event `id` is seen more than once, it must only be counted once.
4. **Maintain and report the average payload length over the latest 60 seconds**, updating continuously as new events arrive and as old events fall out of the window.
5. Keep the **overall time complexity no worse than O(n log n)** across `n` events.
Describe the data structures you would use, how you keep the window current as the clock advances, and how you handle late arrivals and duplicates.
Quick Answer: This Box onsite coding question asks you to design a stream processor for an unbounded, out-of-order event stream — validating checksums, dropping late and duplicate events, and maintaining the average payload length over a 60-second sliding window in O(n log n). The solution covers the data structures (timestamp-ordered window, dedupe set, running sum/count), continuous window updates, and late/duplicate handling.
Solution
### Approach
The processor is a streaming pipeline: **validate → dedupe → admit-to-window → evict-expired → report average**. The key insight is that we only ever need a running *sum of payload lengths* and a *count of events* currently inside the window; the average is just `sum / count` in O(1) at any moment. The work is in keeping the window's membership correct as events arrive out of order.
### Core state
- `window` — a structure of in-window events ordered by timestamp so we can evict the oldest efficiently. A **min-heap keyed on timestamp**, or a balanced BST / `SortedList` keyed on timestamp, works because events arrive out of order (a plain queue/deque is insufficient since insertions are not monotonic).
- `seen_ids` — a hash set of accepted event `id`s for O(1) duplicate detection. (If memory is a concern over an infinite stream, this can be bounded — see *Scaling*.)
- `sum_len` — running sum of payload lengths of all events currently in the window.
- `count` — number of events currently in the window.
- `max_ts` — the largest timestamp observed so far; this defines the right edge of the window. The window covers `(max_ts - 60s, max_ts]`.
### Per-event processing
For each incoming event `e = (id, timestamp, payload, checksum)`:
1. **Checksum validation.** Recompute the checksum over `payload` (and whatever fields the protocol covers) and compare to `e.checksum`. If it does not match, discard `e`.
2. **Duplicate check.** If `e.id` is already in `seen_ids`, discard `e`. Otherwise mark it seen.
3. **Advance the window edge.** If `e.timestamp > max_ts`, set `max_ts = e.timestamp`.
4. **Late / out-of-window check.** If `e.timestamp <= max_ts - 60s`, the event is already older than the window — drop it (do not insert).
5. **Admit.** Insert `e` into `window`, add `len(e.payload)` to `sum_len`, increment `count`.
6. **Evict expired.** Repeatedly remove the oldest event from `window` while its timestamp `<= max_ts - 60s`, subtracting its payload length from `sum_len` and decrementing `count`. (Eviction is triggered whenever `max_ts` advances.)
7. **Report.** The current average payload length is `sum_len / count` if `count > 0`, else 0 (or undefined).
### Why O(n log n)
Each of the `n` events is inserted into the ordered window once and removed at most once. Heap / balanced-BST insert and delete-min are O(log n), and the duplicate/seen-set operations are O(1) amortized. Summed over the stream that is **O(n log n)** total — within the required bound. The running `sum_len`/`count` makes each average report O(1).
### Late & duplicate handling, summarized
- **Late but still in-window** events (timestamp within 60s of `max_ts`) are inserted normally, even though they arrive after newer events — the timestamp-ordered window absorbs them correctly.
- **Late beyond the window** (older than `max_ts - 60s`) are dropped at step 4 and never affect the aggregate.
- **Duplicates** are rejected at step 2 via `seen_ids`, so a re-sent event never double-counts.
- **Corrupt** events fail the checksum at step 1.
### Reference implementation (Python, heap-based)
```python
import heapq
WINDOW = 60_000 # 60 seconds, in ms
class StreamProcessor:
def __init__(self):
self.heap = [] # (timestamp, id) min-heap by timestamp
self.len_by_id = {} # id -> payload length (in-window members)
self.seen_ids = set() # all accepted ids (dedupe)
self.sum_len = 0
self.count = 0
self.max_ts = None
def _valid_checksum(self, e):
return compute_checksum(e['payload']) == e['checksum']
def process(self, e):
# 1. checksum
if not self._valid_checksum(e):
return
# 2. dedupe
if e['id'] in self.seen_ids:
return
# 3. advance edge
ts = e['timestamp']
if self.max_ts is None or ts > self.max_ts:
self.max_ts = ts
cutoff = self.max_ts - WINDOW
# 4. drop if already out of window
if ts <= cutoff:
self.seen_ids.add(e['id']) # still record so a resend can't sneak in later
return
# 5. admit
self.seen_ids.add(e['id'])
plen = len(e['payload'])
self.len_by_id[e['id']] = plen
heapq.heappush(self.heap, (ts, e['id']))
self.sum_len += plen
self.count += 1
# 6. evict expired
while self.heap and self.heap[0][0] <= cutoff:
_, old_id = heapq.heappop(self.heap)
self.sum_len -= self.len_by_id.pop(old_id)
self.count -= 1
def average_payload_length(self):
return self.sum_len / self.count if self.count else 0.0
```
### Scaling notes
- Over a truly infinite stream `seen_ids` grows without bound. In practice bound it: keep ids only for a multiple of the window (e.g. evict id records older than `2 * WINDOW`), or use a probabilistic structure (Bloom/Cuckoo filter) accepting a tiny false-duplicate rate, depending on the duplicate-correctness requirement.
- If the window edge should advance with wall-clock time rather than only on event arrival, run the eviction loop on a timer/tick as well, not just inside `process`.
- The same design parallelizes by sharding on `id` (for dedupe) or by partitioning timestamps across windows for higher throughput.
Explanation
Streaming validate -> dedupe -> admit -> evict pipeline. Keep a timestamp-ordered window (min-heap or balanced BST) plus a running sum of payload lengths and a count, so the average is O(1) per report. Out-of-order arrivals are handled by the ordered structure; events older than (max_ts - 60s) are dropped; duplicate ids are rejected via a hash set; corrupt events fail checksum. Each event is inserted/removed at most once at O(log n), giving O(n log n) overall.