Design a Streaming Job Scheduler
Company: Scale AI
Role: Software Engineer
Category: System Design
Difficulty: easy
Interview Round: Technical Screen
Design and incrementally build a **streaming job scheduler**: a service (and supporting class) that continuously ingests tasks and dispatches them to workers, respecting dependencies and deadlines.
This was a coding round that progressively grew into a mini system design. You will first implement a single-process class, then add dependency handling, then discuss how it becomes a distributed, fault-tolerant service.
Each task is defined as:
- `id`: unique task identifier
- `deadline`: an execution deadline (orderable, e.g. an epoch timestamp)
- `prerequisites`: a list of task `id`s that must complete before this task can run (empty for the first two parts)
### Constraints & Assumptions
- **Ordering rule:** among *runnable* tasks (all prerequisites completed), the scheduler dispatches the one with the **earliest deadline** first; ties broken deterministically (e.g. by `id`).
- **Scale (Part 3):** assume large task volumes and many concurrent workers, with ingest arriving in bursts.
- **At-least-once execution** is acceptable for individual tasks (workers must be idempotent), but the **same task must never be leased to two workers simultaneously**.
- For Parts 1–2 a **single-process, single-threaded** implementation is fine. Concurrency, persistence, and distribution enter only in Part 3.
- The interview allowed Python or Go; pseudocode is acceptable here as long as the data structures and complexity are explicit.
---
### Part 1 — In-memory scheduler with deadline ordering
Implement a `Scheduler` class with two operations (no prerequisites yet):
- `add_tasks(tasks)`: accept one task or a batch of tasks.
- `consume_task()`: remove and return the task with the **earliest deadline**. If no task is available, return a sentinel such as `"no task"` (or `None`).
State the time complexity of each operation.
```hint What does "earliest deadline first" demand?
Both operations need to repeatedly find the *minimum* `deadline` among everything currently held. Which classic data structure makes "insert" and "extract-min" both cheap, and what does that imply for the per-operation complexity you're asked to state?
```
```hint Determinism and later lookups
Two tasks can share a `deadline`; think about what to add to the ordering key so the result is total and reproducible. Separately, if a later part needs to find or remove a specific task by `id`, ask yourself whether your chosen structure can address an element by key — and what auxiliary structure would cover that gap.
```
---
### Part 2 — Add prerequisites (dependency-aware dispatch)
Each task now carries `prerequisites`. A task becomes **runnable** only after *all* of its prerequisites have completed. `consume_task()` must return the earliest-deadline task **among runnable tasks only** — never a task whose prerequisites are still pending.
You also need a `complete_task(id)` operation so the scheduler learns a task finished and can unblock its dependents.
Explain how you keep this efficient as tasks complete, and how the structure relates to a topological order.
```hint Avoid recomputing readiness
Recomputing "are all prerequisites done?" by scanning the dependency graph on every `consume` is wasteful. What single per-task counter could you maintain so that "runnable" becomes an $O(1)$ check, and when would that counter change?
```
```hint Reacting to a completion cheaply
When a task finishes, only a small set of other tasks can possibly become newly runnable. Which set, and what would you need to store at insert time so that on completion you can reach exactly those tasks without a graph walk? Think about which direction your edges should point. This unblocking pattern has a well-known name in topological sorting — relating your design to it is part of a strong answer.
```
```hint Contracts and edge cases
Consider a task whose prerequisites are already satisfied the moment it arrives — where should it land? And decide your contract for a prerequisite `id` you have never seen: is it a forward reference you'll wait for, or an error you reject? State the rule; it changes both the code and the validation.
```
---
### Part 3 — Validation, DAG verification, concurrency, and streaming (mini system design)
Now turn the class into a service. Beyond Parts 1–2, the system must:
1. Accept tasks in **batches or as a continuous stream**.
2. **Validate** task definitions: malformed input, duplicate `id`s, self-dependencies, and prerequisite references that don't (and never will) resolve.
3. **Verify the dependency graph is a DAG** — reject submissions that would introduce a cycle.
4. Dispatch only runnable tasks (Part 2 semantics) in earliest-deadline order.
5. Support **many workers consuming concurrently** without ever assigning the same task twice.
6. Handle **worker crashes, retries, and recovery** (a dispatched task whose worker dies must become runnable again).
7. **Scale** to the volumes above and provide **useful monitoring**.
Discuss API design, storage model, scheduling logic, DAG-validation strategy (batch vs. streaming), concurrency control, fault tolerance, and the scaling approach.
```hint DAG check: batch vs. streaming
For a full batch/workflow submitted together, cycle detection is **Kahn's algorithm** or DFS coloring in $O(V+E)$. For *streaming* edge-by-edge inserts, checking "does adding $u \to v$ create a cycle?" reduces to "is $u$ already reachable from $v$?" — a bounded DFS/BFS in the affected workflow. Note the cost difference and consider scoping cycle checks per `workflow_id`.
```
```hint Lease, don't pop
In a distributed setting "pop a task" becomes "**lease** a task": atomically move the earliest-deadline runnable task to a `LEASED` state with a `lease_owner` and `lease_expiry`. Think about what primitive makes this atomic — e.g. SQL `SELECT ... FOR UPDATE SKIP LOCKED`, or a Redis `ZPOPMIN` inside a Lua script.
```
```hint Crash recovery & idempotency
A worker crash = an expired lease. A background sweeper returns `LEASED` tasks past `lease_expiry` to `RUNNABLE`. Pair this with **idempotency keys** on submit and a **lease/fence token** on complete so duplicate or late messages can't double-create tasks or unblock dependents twice.
```
```hint Scaling the runnable set
Partition by `workflow_id` (or hash of `id`) so DAG edges and dependency updates stay local to a shard; each shard keeps its own deadline-ordered runnable index (Redis sorted set, or a DB index on `(state, deadline)`).
```
### Clarifying Questions to Ask
- Can a task reference a prerequisite that **hasn't been submitted yet** (forward references), or must every prerequisite already exist at submission time?
- Are tasks grouped into independent **workflows/DAGs**, or is there one global dependency graph? (This decides the blast radius of cycle checks and partitioning.)
- Is **at-least-once** execution acceptable (workers idempotent), or do we need exactly-once semantics?
- What should happen to a task's **dependents when a prerequisite permanently fails** — cancel the subtree, or skip and continue?
- Is the deadline a **hard SLA** (drop/alert if missed) or merely a priority hint?
- What are the expected ingest rate, total live-task count, and max fan-in/fan-out per task?
### What a Strong Answer Covers
- **Correct Part-1/2 core:** a priority-queue-based runnable frontier with deterministic ordering, plus an efficient incremental scheme for unblocking dependents as tasks complete — with stated complexities for each operation.
- **Clean state machine** for a task (`WAITING → RUNNABLE → LEASED → COMPLETED / FAILED`) and where each transition happens.
- **Atomic leasing** that provably prevents double-assignment (the chosen DB/cache primitive named, not hand-waved).
- **Validation + DAG strategy** with an explicit batch-vs-streaming tradeoff, not just "run Kahn's".
- **Fault tolerance:** lease expiry recovery, retries with backoff/attempt cap, idempotent submit and complete, and how the scheduler rebuilds state after restart from the durable store.
- **Scaling story:** partitioning rationale, per-shard runnable queues, separation of ingest / schedule / completion paths.
- **Observability:** the specific metrics that reveal backlog, starvation, and correctness bugs (duplicate dispatch, lease timeouts).
- **Tradeoff awareness:** acknowledging that fully general streaming DAG validation is expensive and proposing pragmatic restrictions.
### Follow-up Questions
- A worker takes a lease, then hangs for hours without crashing (a "zombie"). How do you distinguish it from a legitimately slow task, and how do leases + heartbeats handle it without prematurely re-dispatching?
- Deadlines keep getting pushed; a low-priority task with a far-future deadline never runs. How would you prevent **starvation** while still honoring earliest-deadline-first?
- How do you support **task cancellation** or **deadline updates** for a task already sitting in the runnable heap (which can't address entries by key)?
- If a single workflow contains millions of tasks, how does partitioning by `workflow_id` break down, and what would you do instead?
Quick Answer: This question evaluates task scheduling, deadline-based prioritization, dependency management, and distributed coordination competencies, encompassing algorithmic/data-structure reasoning as well as correctness under concurrency and failure conditions.