Implement test failure analytics APIs
Company: Vanta
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Technical Screen
Quick Answer: This question evaluates a candidate's ability to design efficient data structures and algorithms for time-series event logging, interval computation, and tracking concurrent failure states across distinct identifiers.
Part 1: Minimum Fix Time per Test
Constraints
- 0 <= n <= 2 * 10^5
- 0 <= len(queries) <= 2 * 10^5
- len(test_ids) == len(timestamps) == len(statuses)
- timestamps are strictly increasing
- Each status is either 'pass' or 'fail'
Examples
Input: (['A', 'A', 'A', 'A', 'A'], [1, 2, 5, 8, 10], ['fail', 'fail', 'pass', 'fail', 'pass'], ['A'])
Expected Output: [2]
Explanation: Test A has two completed failure blocks: [1 -> 5] with duration 4, and [8 -> 10] with duration 2. The minimum is 2.
Input: (['A', 'B', 'A', 'B', 'A', 'A'], [1, 2, 4, 7, 8, 9], ['fail', 'fail', 'pass', 'pass', 'fail', 'pass'], ['A', 'B', 'C'])
Expected Output: [1, 5, -1]
Explanation: A has fix times 3 and 1, so answer is 1. B has one fix time 5. C never appears, so answer is -1.
Input: (['A', 'A', 'B'], [1, 3, 4], ['pass', 'fail', 'fail'], ['A', 'B'])
Expected Output: [-1, -1]
Explanation: A starts failing at time 3 but never passes afterward. B also never passes after failing.
Input: ([], [], [], ['A', 'B'])
Expected Output: [-1, -1]
Explanation: With no logs, no queried test has a completed fail->pass transition.
Solution
def solution(test_ids, timestamps, statuses, queries):
best = {}
failing_since = {}
is_failing = set()
for test_id, ts, status in zip(test_ids, timestamps, statuses):
if status == 'fail':
if test_id not in is_failing:
is_failing.add(test_id)
failing_since[test_id] = ts
else:
if test_id in is_failing:
duration = ts - failing_since[test_id]
if test_id not in best or duration < best[test_id]:
best[test_id] = duration
is_failing.remove(test_id)
del failing_since[test_id]
return [best.get(q, -1) for q in queries]Time complexity: O(n + q). Space complexity: O(u), where u is the number of distinct tests seen in the logs.
Hints
- Track, for each test, whether it is currently in a failure block and when that block started.
- When a 'pass' arrives for a test that is currently failing, compute one candidate duration and update that test's minimum.
Part 2: Longest Interval with At Least K Concurrent Failures
Constraints
- 0 <= n <= 2 * 10^5
- 0 <= len(min_tests_queries) <= 2 * 10^5
- len(test_ids) == len(timestamps) == len(statuses)
- timestamps are strictly increasing
- Each status is either 'pass' or 'fail'
- 1 <= min_tests_queries[i] <= 2 * 10^5
Examples
Input: (['A', 'B', 'C', 'A', 'B', 'C'], [1, 2, 4, 5, 7, 8], ['fail', 'fail', 'fail', 'pass', 'pass', 'pass'], [1, 2, 3, 4])
Expected Output: [[1, 8], [2, 7], [4, 5], [-1, -1]]
Explanation: The failure counts on segments are: [1,2):1, [2,4):2, [4,5):3, [5,7):2, [7,8):1. So the best intervals are [1,8) for k=1, [2,7) for k=2, [4,5) for k=3, and none for k=4.
Input: (['A', 'A', 'B', 'B'], [1, 3, 5, 7], ['fail', 'pass', 'fail', 'pass'], [1, 2])
Expected Output: [[1, 3], [-1, -1]]
Explanation: There are two length-2 intervals with at least 1 failing test: [1,3) and [5,7). The tie is broken by earliest start, so [1,3) is returned.
Input: (['A', 'A', 'B', 'A', 'B'], [1, 2, 3, 6, 9], ['fail', 'fail', 'fail', 'pass', 'pass'], [1, 2, 3])
Expected Output: [[1, 9], [3, 6], [-1, -1]]
Explanation: Repeated 'fail' for A at time 2 does not change the state. At least one test is failing through the whole observed window [1,9), and at least two tests are failing only on [3,6).
Input: (['A'], [10], ['fail'], [1])
Expected Output: [[-1, -1]]
Explanation: A single log creates no positive-length observed interval because there is no later timestamp.
Input: (['A', 'B'], [1, 4], ['pass', 'pass'], [1])
Expected Output: [[-1, -1]]
Explanation: No test is ever failing, so no interval satisfies the query.
Solution
def solution(test_ids, timestamps, statuses, min_tests_queries):
n = len(timestamps)
if n < 2:
return [[-1, -1] for _ in min_tests_queries]
failing = set()
seg_starts = []
seg_ends = []
seg_counts = []
for i, (test_id, ts, status) in enumerate(zip(test_ids, timestamps, statuses)):
if status == 'fail':
failing.add(test_id)
else:
failing.discard(test_id)
if i + 1 < n:
seg_starts.append(ts)
seg_ends.append(timestamps[i + 1])
seg_counts.append(len(failing))
m = len(seg_counts)
max_count = max(seg_counts, default=0)
if max_count == 0:
return [[-1, -1] for _ in min_tests_queries]
buckets = [[] for _ in range(max_count + 1)]
for i, count in enumerate(seg_counts):
if count > 0:
buckets[count].append(i)
parent = list(range(m))
dsu_size = [1] * m
active = [False] * m
comp_len = [seg_ends[i] - seg_starts[i] for i in range(m)]
comp_start = seg_starts[:]
comp_end = seg_ends[:]
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra == rb:
return ra
if dsu_size[ra] < dsu_size[rb]:
ra, rb = rb, ra
parent[rb] = ra
dsu_size[ra] += dsu_size[rb]
comp_len[ra] += comp_len[rb]
comp_start[ra] = min(comp_start[ra], comp_start[rb])
comp_end[ra] = max(comp_end[ra], comp_end[rb])
return ra
def better(root, best):
root = find(root)
candidate = (comp_len[root], comp_start[root], comp_end[root])
if best is None:
return candidate
if candidate[0] > best[0]:
return candidate
if candidate[0] == best[0] and candidate[1] < best[1]:
return candidate
return best
best = None
answers = [None] * (max_count + 1)
for k in range(max_count, 0, -1):
for idx in buckets[k]:
active[idx] = True
best = better(idx, best)
if idx > 0 and active[idx - 1]:
root = union(idx, idx - 1)
best = better(root, best)
if idx + 1 < m and active[idx + 1]:
root = union(idx, idx + 1)
best = better(root, best)
if best is not None and best[0] > 0:
answers[k] = [best[1], best[2]]
result = []
for k in min_tests_queries:
if 1 <= k <= max_count and answers[k] is not None:
result.append(answers[k])
else:
result.append([-1, -1])
return resultTime complexity: O(n * alpha(n) + q). Space complexity: O(n).
Hints
- First convert the logs into consecutive time segments where the number of currently failing tests is constant.
- Then think of each query k as asking for the longest contiguous run of adjacent segments whose failure count is at least k. You can answer all k values efficiently by processing thresholds from high to low.