Implement topological sort from string input
Company: Netflix
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Onsite
Quick Answer: This question evaluates a candidate's skills in parsing unstructured dependency strings into directed graphs, producing a valid topological order, detecting and reporting minimal cycles, and reasoning about robustness against duplicate, unknown, or malformed inputs as well as time and space complexity.
Part 1: Parse Dependency Strings into a Directed Graph
Constraints
- 0 <= len(lines) <= 10^4
- Total input length across all lines <= 2 * 10^5 characters
- Task names contain only letters, digits, and underscores
Examples
Input: ['install A after B', 'C->A', 'B before D']
Expected Output: {'A': [], 'B': ['A', 'D'], 'C': ['A'], 'D': []}
Explanation: Three different input formats all contribute edges to the same graph.
Input: [' Task1 -> Task2 ']
Expected Output: {'Task1': ['Task2'], 'Task2': []}
Explanation: Spaces around the arrow should be ignored.
Input: []
Expected Output: {}
Explanation: Edge case: no lines means no tasks and no edges.
Input: ['A before A']
Expected Output: {'A': ['A']}
Explanation: A self-dependency is still a valid directed edge at parse time.
Solution
def solution(lines):
def valid(task):
return bool(task) and all(ch.isalnum() or ch == '_' for ch in task)
graph = {}
def ensure(node):
if node not in graph:
graph[node] = set()
for line in lines:
s = line.strip()
if not s:
continue
edge = None
if '->' in s:
parts = s.split('->')
if len(parts) == 2:
a, b = parts[0].strip(), parts[1].strip()
if valid(a) and valid(b):
edge = (a, b)
else:
tokens = s.split()
if len(tokens) == 3 and tokens[1] == 'before' and valid(tokens[0]) and valid(tokens[2]):
edge = (tokens[0], tokens[2])
elif len(tokens) == 4 and tokens[0] == 'install' and tokens[2] == 'after' and valid(tokens[1]) and valid(tokens[3]):
edge = (tokens[3], tokens[1])
if edge is None:
continue
u, v = edge
ensure(u)
ensure(v)
graph[u].add(v)
return {node: sorted(graph[node]) for node in graph}
Time complexity: O(L + V + E log E), where L is the total number of characters, V is the number of discovered tasks, and E is the number of unique edges. Space complexity: O(V + E).
Hints
- Normalize each line with strip() before parsing.
- The phrase 'install X after Y' reverses the edge direction: Y -> X.
Part 2: Return One Valid Topological Ordering
Constraints
- 0 <= number of tasks <= 2 * 10^5
- 0 <= number of edges <= 2 * 10^5
- Task names are strings
- Duplicate task names and duplicate edges may appear
Examples
Input: (['A', 'B', 'C', 'D'], [('B', 'A'), ('C', 'A'), ('B', 'D')])
Expected Output: ['B', 'C', 'A', 'D']
Explanation: Both B and C are initially available, so pick B first because it is lexicographically smaller.
Input: (['X'], [])
Expected Output: ['X']
Explanation: Edge case: a single isolated task is already a valid topological order.
Input: ([], [])
Expected Output: []
Explanation: Edge case: the empty graph has an empty topological ordering.
Input: (['A', 'B'], [('A', 'B'), ('B', 'A')])
Expected Output: None
Explanation: A cycle makes topological ordering impossible.
Input: (['A', 'B', 'C'], [('A', 'C'), ('A', 'C'), ('B', 'C')])
Expected Output: ['A', 'B', 'C']
Explanation: Duplicate edges should not change the result.
Solution
def solution(data):
import heapq
tasks, edges = data
nodes = set(tasks)
for u, v in edges:
nodes.add(u)
nodes.add(v)
adj = {node: set() for node in nodes}
indegree = {node: 0 for node in nodes}
for u, v in edges:
if v not in adj[u]:
adj[u].add(v)
indegree[v] += 1
adj = {node: sorted(neighbors) for node, neighbors in adj.items()}
heap = [node for node in nodes if indegree[node] == 0]
heapq.heapify(heap)
order = []
while heap:
u = heapq.heappop(heap)
order.append(u)
for v in adj[u]:
indegree[v] -= 1
if indegree[v] == 0:
heapq.heappush(heap, v)
return order if len(order) == len(nodes) else None
Time complexity: O((V + E) log V). Space complexity: O(V + E).
Hints
- Kahn's algorithm is a natural fit here.
- Use a min-heap instead of a queue if you want the lexicographically smallest valid order.
Part 3: Detect and Report a Minimal Directed Cycle
Constraints
- 1 <= number of tasks <= 200 for the intended shortest-cycle approach
- 0 <= number of edges <= 5000
- Duplicate edges may appear
Examples
Input: (['A', 'B', 'C', 'D'], [('A', 'B'), ('B', 'C'), ('C', 'A'), ('C', 'D')])
Expected Output: ['A', 'B', 'C', 'A']
Explanation: The only cycle is A -> B -> C -> A.
Input: (['A', 'B', 'C'], [('A', 'A'), ('A', 'B'), ('B', 'C'), ('C', 'A')])
Expected Output: ['A', 'A']
Explanation: A self-loop is a cycle of length 1, which is minimal.
Input: (['A', 'B', 'C'], [('A', 'B'), ('B', 'C')])
Expected Output: []
Explanation: This graph is acyclic.
Input: (['B', 'A', 'D', 'C'], [('A', 'B'), ('B', 'A'), ('C', 'D'), ('D', 'C')])
Expected Output: ['A', 'B', 'A']
Explanation: There are two shortest 2-cycles, but ['A', 'B', 'A'] is lexicographically smaller than ['C', 'D', 'C'].
Solution
def solution(data):
from collections import deque
tasks, edges = data
nodes = set(tasks)
for u, v in edges:
nodes.add(u)
nodes.add(v)
if not nodes:
return []
adj = {node: set() for node in nodes}
incoming = {node: set() for node in nodes}
for u, v in edges:
if v not in adj[u]:
adj[u].add(v)
incoming[v].add(u)
adj = {node: sorted(neighbors) for node, neighbors in adj.items()}
incoming = {node: sorted(parents) for node, parents in incoming.items()}
def canonical(cycle):
base = cycle[:-1]
n = len(base)
best = None
for i in range(n):
rotated = base[i:] + base[:i]
candidate = tuple(rotated + [rotated[0]])
if best is None or candidate < best:
best = candidate
return list(best)
best_cycle = None
best_len = float('inf')
for start in sorted(nodes):
dist = {start: 0}
parent = {start: None}
q = deque([start])
while q:
u = q.popleft()
for v in adj[u]:
if v not in dist:
dist[v] = dist[u] + 1
parent[v] = u
q.append(v)
for pred in incoming[start]:
if pred not in dist:
continue
path = []
cur = pred
while cur is not None:
path.append(cur)
cur = parent[cur]
path.reverse()
candidate = path + [start]
cand_len = len(candidate) - 1
canon = canonical(candidate)
if cand_len < best_len:
best_len = cand_len
best_cycle = canon
elif cand_len == best_len and best_cycle is not None and tuple(canon) < tuple(best_cycle):
best_cycle = canon
return best_cycle or []
Time complexity: O(V * (V + E)) after graph construction, plus sorting adjacency once. Space complexity: O(V + E).
Hints
- A shortest directed cycle through a start node can be found by looking for the shortest path from that start node to one of its predecessors.
- BFS is useful because all edges have equal weight.
Part 4: Robust Dependency Parsing with Duplicate, Unknown, and Malformed Input Handling
Constraints
- 0 <= len(known_tasks) <= 10^4
- 0 <= len(lines) <= 10^4
- Total input length <= 2 * 10^5 characters
Examples
Input: (['A', 'B', 'C', 'D'], ['install A after B', 'B before D', 'B before D', 'X->A', 'bad line', 'C->A'])
Expected Output: {'graph': {'A': [], 'B': ['A', 'D'], 'C': ['A'], 'D': []}, 'ignored_duplicates': 1, 'unknown_tasks': ['X'], 'malformed_lines': [4]}
Explanation: One duplicate edge is ignored, one unknown task is reported, and one malformed line is recorded.
Input: (['A', 'B'], [])
Expected Output: {'graph': {'A': [], 'B': []}, 'ignored_duplicates': 0, 'unknown_tasks': [], 'malformed_lines': []}
Explanation: Edge case: no lines means no errors and an empty graph over the known tasks.
Input: (['A', 'B'], ['A before', 'Y before Z', 'install A after B'])
Expected Output: {'graph': {'A': [], 'B': ['A']}, 'ignored_duplicates': 0, 'unknown_tasks': ['Y', 'Z'], 'malformed_lines': [0]}
Explanation: The first line is malformed, the second line references only unknown tasks, and the third line is valid.
Input: (['A'], ['', 'A->A', 'A->A'])
Expected Output: {'graph': {'A': ['A']}, 'ignored_duplicates': 1, 'unknown_tasks': [], 'malformed_lines': []}
Explanation: Blank lines are ignored and duplicate self-edges are counted once.
Solution
def solution(data):
known_tasks, lines = data
known = set(known_tasks)
graph = {task: set() for task in known}
ignored_duplicates = 0
unknown = set()
malformed = []
def valid(task):
return bool(task) and all(ch.isalnum() or ch == '_' for ch in task)
def parse_line(s):
if '->' in s:
parts = s.split('->')
if len(parts) == 2:
a, b = parts[0].strip(), parts[1].strip()
if valid(a) and valid(b):
return (a, b)
return None
tokens = s.split()
if len(tokens) == 3 and tokens[1] == 'before' and valid(tokens[0]) and valid(tokens[2]):
return (tokens[0], tokens[2])
if len(tokens) == 4 and tokens[0] == 'install' and tokens[2] == 'after' and valid(tokens[1]) and valid(tokens[3]):
return (tokens[3], tokens[1])
return None
for idx, line in enumerate(lines):
s = line.strip()
if not s:
continue
edge = parse_line(s)
if edge is None:
malformed.append(idx)
continue
u, v = edge
bad = False
if u not in known:
unknown.add(u)
bad = True
if v not in known:
unknown.add(v)
bad = True
if bad:
continue
if v in graph[u]:
ignored_duplicates += 1
else:
graph[u].add(v)
return {
'graph': {task: sorted(graph[task]) for task in sorted(graph)},
'ignored_duplicates': ignored_duplicates,
'unknown_tasks': sorted(unknown),
'malformed_lines': malformed
}
Time complexity: O(L + V + E log E + U log U), where U is the number of unique unknown tasks. Space complexity: O(V + E + U).
Hints
- Separate parsing from validation. First detect whether a line matches one of the supported formats, then check whether both tasks are known.
- A set is useful both for deduplicating edges and for collecting unique unknown task names.
Part 5: Compute Time and Space Estimates for Dependency Processing
Constraints
- 0 <= L, N, M <= 10^9
- lexicographic is a boolean
Examples
Input: (25, 4, 3, False)
Expected Output: (35, 11, 'O(L + N + M)', 'O(N + M)')
Explanation: Time = 25 parse + 3 build + 4 queue removals + 3 relaxations = 35.
Input: (25, 4, 3, True)
Expected Output: (55, 11, 'O(L + M + N log N)', 'O(N + M)')
Explanation: ceil(log2(5)) = 3, so heap work is 2 * 4 * 3 = 24.
Input: (0, 0, 0, False)
Expected Output: (0, 0, 'O(L + N + M)', 'O(N + M)')
Explanation: Edge case: an empty dataset has zero exact work and zero exact storage.
Input: (10, 1, 0, True)
Expected Output: (12, 2, 'O(L + M + N log N)', 'O(N + M)')
Explanation: ceil(log2(2)) = 1, so heap work is 2 operations beyond parsing.
Solution
def solution(data):
import math
L, N, M, lexicographic = data
space_cells = 2 * N + M
if lexicographic:
heap_cost = 0 if N == 0 else math.ceil(math.log2(N + 1))
time_ops = L + M + (2 * N * heap_cost) + M
asymptotic_time = 'O(L + M + N log N)'
else:
time_ops = L + M + N + M
asymptotic_time = 'O(L + N + M)'
return (time_ops, space_cells, asymptotic_time, 'O(N + M)')
Time complexity: O(1). Space complexity: O(1).
Hints
- Be careful to include both graph-building work and topological-processing work.
- For heap mode, use ceil(log2(N + 1)); when N = 0, the heap cost is 0.
Part 6: Batched Topological Sort for Large Graphs
Constraints
- 0 <= number of tasks <= 2 * 10^5
- 0 <= number of edges <= 2 * 10^5
- 1 <= batch_size
- Duplicate edges may appear
Examples
Input: (['A', 'B', 'C', 'D', 'E'], [('A', 'D'), ('B', 'D'), ('C', 'E')], 2)
Expected Output: [['A', 'B'], ['C', 'D'], ['E']]
Explanation: The first batch takes the two smallest ready tasks, A and B.
Input: (['A', 'B', 'C', 'D', 'E'], [('A', 'D'), ('B', 'D'), ('C', 'E')], 1)
Expected Output: [['A'], ['B'], ['C'], ['D'], ['E']]
Explanation: With batch size 1, this becomes a deterministic one-at-a-time topological sort.
Input: ([], [], 3)
Expected Output: []
Explanation: Edge case: an empty graph produces no batches.
Input: (['A', 'B'], [('A', 'B'), ('B', 'A')], 2)
Expected Output: None
Explanation: A cycle prevents all tasks from ever becoming ready.
Solution
def solution(data):
import heapq
tasks, edges, batch_size = data
nodes = set(tasks)
for u, v in edges:
nodes.add(u)
nodes.add(v)
if not nodes:
return []
adj = {node: set() for node in nodes}
indegree = {node: 0 for node in nodes}
for u, v in edges:
if v not in adj[u]:
adj[u].add(v)
indegree[v] += 1
adj = {node: sorted(neighbors) for node, neighbors in adj.items()}
heap = [node for node in nodes if indegree[node] == 0]
heapq.heapify(heap)
batches = []
processed = 0
while heap:
take = min(batch_size, len(heap))
batch = [heapq.heappop(heap) for _ in range(take)]
batches.append(batch)
for u in batch:
processed += 1
for v in adj[u]:
indegree[v] -= 1
if indegree[v] == 0:
heapq.heappush(heap, v)
return batches if processed == len(nodes) else None
Time complexity: O((V + E) log V). Space complexity: O(V + E).
Hints
- This is still Kahn's algorithm; the main change is that you pop several ready nodes at once.
- Use a min-heap so each batch is lexicographically deterministic.