Implement cluster status tracker
Company: Anthropic
Role: Machine Learning Engineer
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Technical Screen
Quick Answer: This question evaluates a candidate's competence in data structures and algorithms for time-ordered state tracking, time-series indexing, conflict resolution (last-write-wins), memory-efficient storage, and concurrent system behavior under high update rates.
Part 1: Implement the cluster status tracker
Constraints
- 1 <= ttl <= 10^9
- 0 <= node_id, timestamp, t, cutoff <= 10^9
- 1 <= len(operations) <= 2 * 10^4
- Status strings are non-empty and never equal to 'OFFLINE'
- A node is counted in a summary at time t only if it has at least one update with timestamp <= t
Examples
Input: (5, [('update', 1, 'OK', 10), ('update', 2, 'WARN', 8), ('update', 1, 'ERROR', 7), ('getCurrent', 1), ('getAt', 1, 8), ('getClusterSummaryAt', 10)])
Expected Output: ['OK', 'ERROR', {'OK': 1, 'WARN': 1}]
Explanation: Node 1's newest update is at time 10 with status OK, but at time 8 its effective status is ERROR.
Input: (3, [('update', 1, 'OK', 5), ('update', 1, 'OK', 5), ('update', 2, 'WARN', 1), ('getAt', 2, 5), ('getClusterSummaryAt', 5)])
Expected Output: ['OFFLINE', {'OFFLINE': 1, 'OK': 1}]
Explanation: The duplicate update for node 1 changes nothing. Node 2 has not reported within TTL by time 5.
Input: (5, [('update', 1, 'OK', 1), ('update', 1, 'WARN', 4), ('compact', 4), ('getAt', 1, 3), ('getAt', 1, 4), ('getClusterSummaryAt', 4)])
Expected Output: [None, 'WARN', {'WARN': 1}]
Explanation: After compacting at cutoff 4, queries before time 4 must return None.
Input: (2, [('getCurrent', 99), ('getAt', 99, 10), ('getClusterSummaryAt', 10)])
Expected Output: [None, None, {}]
Explanation: Edge case: no node has ever reported.
Solution
def solution(ttl, operations):
from bisect import bisect_left, bisect_right
histories = {}
valid_from = -10**30
answers = []
for op in operations:
kind = op[0]
if kind == 'update':
node, status, ts = op[1], op[2], op[3]
times, statuses = histories.setdefault(node, [[], []])
i = bisect_left(times, ts)
if i < len(times) and times[i] == ts:
statuses[i] = status
else:
times.insert(i, ts)
statuses.insert(i, status)
elif kind == 'getCurrent':
node = op[1]
if node not in histories or not histories[node][0]:
answers.append(None)
else:
answers.append(histories[node][1][-1])
elif kind == 'getAt':
node, t = op[1], op[2]
if t < valid_from or node not in histories or not histories[node][0]:
answers.append(None)
continue
times, statuses = histories[node]
i = bisect_right(times, t) - 1
if i < 0:
answers.append(None)
elif t - times[i] > ttl:
answers.append('OFFLINE')
else:
answers.append(statuses[i])
elif kind == 'getClusterSummaryAt':
t = op[1]
if t < valid_from:
answers.append(None)
continue
summary = {}
for times, statuses in histories.values():
i = bisect_right(times, t) - 1
if i < 0:
continue
state = 'OFFLINE' if t - times[i] > ttl else statuses[i]
summary[state] = summary.get(state, 0) + 1
answers.append(dict(sorted(summary.items())))
elif kind == 'compact':
cutoff = op[1]
if cutoff <= valid_from:
continue
valid_from = cutoff
new_histories = {}
for node, value in histories.items():
times, statuses = value
if not times:
continue
idx = bisect_left(times, cutoff)
if idx == 0:
kept_times = times[:]
kept_statuses = statuses[:]
elif idx == len(times):
kept_times = [times[-1]]
kept_statuses = [statuses[-1]]
else:
kept_times = [times[idx - 1]] + times[idx:]
kept_statuses = [statuses[idx - 1]] + statuses[idx:]
new_histories[node] = [kept_times, kept_statuses]
histories = new_histories
return answersTime complexity: update: O(m) for one node's history because Python list insertion may shift elements; getCurrent: O(1); getAt: O(log m); getClusterSummaryAt: O(u log m) across u known nodes; compact: O(total retained updates). Space complexity: O(total retained updates).
Hints
- For each node, keep timestamps sorted so you can binary-search the latest update not after t.
- During compaction, keeping the last update before the cutoff plus all updates at or after the cutoff is enough to answer future queries for times >= cutoff.
Part 2: Add efficient range queries
Constraints
- 1 <= ttl <= 10^5
- 0 <= minute <= 2 * 10^5
- 1 <= len(events) + len(queries) <= 2 * 10^5
- The number of distinct non-OFFLINE statuses is at most 20
- Status strings are non-empty and never equal to 'OFFLINE'
- For every query, 0 <= end_minute - k + 1 <= end_minute <= 2 * 10^5
Examples
Input: (2, [(1, 'OK', 1), (2, 'WARN', 2), (1, 'ERROR', 4)], [(4, 4)])
Expected Output: [[{'OK': 1}, {'OK': 1, 'WARN': 1}, {'OK': 1, 'WARN': 1}, {'ERROR': 1, 'WARN': 1}]]
Explanation: The query asks for summaries at minutes 1, 2, 3, and 4.
Input: (1, [(1, 'OK', 3), (1, 'WARN', 3), (2, 'OK', 1)], [(3, 3)])
Expected Output: [[{'OK': 1}, {'OK': 1}, {'OFFLINE': 1, 'WARN': 1}]]
Explanation: At minute 3, node 1 is WARN because later input wins at the same minute, and node 2 is OFFLINE.
Input: (5, [], [(3, 2)])
Expected Output: [[{}, {}]]
Explanation: Edge case: with no events, every summary is empty.
Input: (2, [(1, 'OK', 0), (1, 'WARN', 5), (2, 'OK', 1)], [(2, 3), (6, 2)])
Expected Output: [[{'OK': 1}, {'OK': 2}, {'OK': 2}], [{'OFFLINE': 1, 'WARN': 1}, {'OFFLINE': 1, 'WARN': 1}]]
Explanation: The first query returns minutes 0, 1, 2. The second returns minutes 5 and 6.
Solution
def solution(ttl, events, queries):
if not queries:
return []
max_time = max(end_minute for end_minute, _ in queries)
if not events:
return [[{} for _ in range(k)] for end_minute, k in queries]
latest = {}
for node, status, minute in events:
latest[(node, minute)] = status
by_node = {}
statuses = set()
for (node, minute), status in latest.items():
by_node.setdefault(node, []).append((minute, status))
statuses.add(status)
statuses = sorted(statuses)
known_diff = [0] * (max_time + 2)
status_diff = {status: [0] * (max_time + 2) for status in statuses}
for updates in by_node.values():
updates.sort()
first_minute = updates[0][0]
if first_minute <= max_time:
known_diff[first_minute] += 1
known_diff[max_time + 1] -= 1
for i, (minute, status) in enumerate(updates):
if minute > max_time:
break
end = min(max_time, minute + ttl)
if i + 1 < len(updates):
end = min(end, updates[i + 1][0] - 1)
if minute <= end:
diff = status_diff[status]
diff[minute] += 1
diff[end + 1] -= 1
current_counts = {status: 0 for status in statuses}
summaries = [{} for _ in range(max_time + 1)]
known_nodes = 0
for minute in range(max_time + 1):
known_nodes += known_diff[minute]
summary = {}
online_nodes = 0
for status in statuses:
current_counts[status] += status_diff[status][minute]
if current_counts[status] > 0:
summary[status] = current_counts[status]
online_nodes += current_counts[status]
offline = known_nodes - online_nodes
if offline > 0:
summary['OFFLINE'] = offline
summaries[minute] = dict(sorted(summary.items()))
result = []
for end_minute, k in queries:
start = end_minute - k + 1
window = []
for minute in range(start, end_minute + 1):
window.append(dict(summaries[minute]))
result.append(window)
return resultTime complexity: Preprocessing: O(e log e + M * s), where e is the number of events after deduplication, M is the maximum queried minute, and s is the number of distinct statuses. Answering all queries costs O(total number of returned minute summaries).. Space complexity: O(e + M * s).
Hints
- For one node, each update creates a time interval during which that status is active: from its minute until the earlier of TTL expiry or the next update.
- Difference arrays plus prefix sums let you build every minute's cluster summary once, then each query becomes a slice of the precomputed timeline.
Part 3: Schedule thread-safe sharded requests
Constraints
- 1 <= num_shards <= 10^4
- 0 <= node_id <= 10^9
- 0 <= len(requests) <= 2 * 10^4
- Each request mode is either 'R' or 'W'
- Each request may list the same node more than once, but a shard should only be locked once per request
Examples
Input: (4, [('R', [1, 5]), ('R', [2]), ('W', [6]), ('W', [1, 2])])
Expected Output: {'waves': 3, 'plan': [[[1], 0], [[2], 0], [[2], 1], [[1, 2], 2]]}
Explanation: The first two reads fit in wave 0. The writes must be separated because they conflict on touched shards.
Input: (3, [('W', [1]), ('W', [2]), ('R', [4])])
Expected Output: {'waves': 2, 'plan': [[[1], 0], [[2], 0], [[1], 1]]}
Explanation: The two writes touch different shards, so they can share wave 0. The final read touches shard 1 and must wait.
Input: (2, [('R', [1, 3, 3]), ('W', [0, 2]), ('R', [5])])
Expected Output: {'waves': 1, 'plan': [[[1], 0], [[0], 0], [[1], 0]]}
Explanation: Duplicate nodes in one request do not change the lock set. All three requests fit in one wave because the write uses shard 0 while the reads use shard 1.
Input: (5, [])
Expected Output: {'waves': 0, 'plan': []}
Explanation: Edge case: no requests.
Solution
def solution(num_shards, requests):
waves = []
plan = []
for mode, nodes in requests:
shard_list = sorted({node % num_shards for node in nodes})
shard_set = set(shard_list)
placed_wave = None
for wave_index, wave in enumerate(waves):
if mode == 'R':
if wave['write'].isdisjoint(shard_set):
wave['read'].update(shard_set)
placed_wave = wave_index
break
else:
if wave['write'].isdisjoint(shard_set) and wave['read'].isdisjoint(shard_set):
wave['write'].update(shard_set)
placed_wave = wave_index
break
if placed_wave is None:
placed_wave = len(waves)
new_wave = {'read': set(), 'write': set()}
if mode == 'R':
new_wave['read'].update(shard_set)
else:
new_wave['write'].update(shard_set)
waves.append(new_wave)
plan.append([shard_list, placed_wave])
return {'waves': len(waves), 'plan': plan}Time complexity: O(r * w * d), where r is the number of requests, w is the number of waves created by the greedy policy, and d is the number of distinct shards touched by a request. Space complexity: O(r + w * d).
Hints
- Convert each request to the set of shards it touches first. The deadlock-free lock order is just the sorted list of those shards.
- For every wave, track which shards already have readers and which already have writers. Reads only need to avoid writers; writes need both sets to be disjoint.