Design test logging and failure-window queries
Company: Vanta
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Technical Screen
Quick Answer: This question evaluates streaming data structures, time-series event processing, interval/temporal query algorithms, and complexity analysis as applied to a software engineer role.
Part 1: Minimum fail-to-pass transition time
Constraints
- 1 <= len(operations) <= 200000
- 1 <= timestamp <= 10^9
- All 'log' timestamps are strictly increasing across the entire input
- status is always 'pass' or 'fail'
- test_id consists of printable non-empty characters
Examples
Input: [('log', 'A', 1, 'fail'), ('log', 'A', 4, 'fail'), ('log', 'A', 7, 'pass'), ('log', 'A', 10, 'fail'), ('log', 'A', 12, 'pass'), ('getMinPassTransition', 'A')]
Expected Output: [2]
Explanation: The first failure segment is from 1 to 7, so its duration is 6. The second is from 10 to 12, so its duration is 2. The minimum is 2.
Input: [('log', 'A', 2, 'pass'), ('log', 'A', 5, 'fail'), ('getMinPassTransition', 'A'), ('log', 'A', 11, 'pass'), ('getMinPassTransition', 'A'), ('getMinPassTransition', 'B')]
Expected Output: [None, 6, None]
Explanation: A 'pass' without an open failure segment does nothing. After the fail at 5 is closed by pass at 11, A has one completed duration of 6. B has no completed transition.
Input: [('getMinPassTransition', 'X'), ('log', 'B', 3, 'fail'), ('log', 'C', 4, 'fail'), ('log', 'B', 8, 'pass'), ('getMinPassTransition', 'B'), ('getMinPassTransition', 'C')]
Expected Output: [None, 5, None]
Explanation: Querying before any completed segment returns None. B completes one fail->pass segment of length 5. C is still failing, so it has no completed segment yet.
Input: [('log', 'T', 1, 'fail'), ('log', 'T', 6, 'pass'), ('log', 'T', 9, 'fail'), ('log', 'T', 15, 'pass'), ('log', 'T', 20, 'fail'), ('log', 'T', 23, 'pass'), ('getMinPassTransition', 'T')]
Expected Output: [3]
Explanation: The completed durations are 5, 6, and 3, so the minimum returned is 3.
Solution
def solution(operations):
active_fail_start = {}
min_duration = {}
answers = []
for op in operations:
if op[0] == 'log':
_, test_id, timestamp, status = op
if status == 'fail':
if test_id not in active_fail_start:
active_fail_start[test_id] = timestamp
else: # status == 'pass'
if test_id in active_fail_start:
duration = timestamp - active_fail_start[test_id]
if test_id not in min_duration or duration < min_duration[test_id]:
min_duration[test_id] = duration
del active_fail_start[test_id]
else: # getMinPassTransition
_, test_id = op
answers.append(min_duration.get(test_id))
return answers
Time complexity: O(n), where n is the number of operations. Space complexity: O(t), where t is the number of distinct test IDs that have appeared.
Hints
- For each test, you only need to know whether it is currently in a failure segment and, if so, when that segment started.
- Only update the best answer when a 'pass' closes an existing failure segment.
Part 2: Longest interval with at least k concurrent failures
Constraints
- 1 <= len(operations) <= 20000
- 1 <= timestamp <= 10^9
- All 'log' timestamps are strictly increasing across the entire input
- 1 <= min_tests <= 10^5
- status is always 'pass' or 'fail'
Examples
Input: [('log', 'A', 1, 'fail'), ('log', 'B', 2, 'fail'), ('log', 'A', 5, 'pass'), ('log', 'B', 6, 'pass'), ('getMaxConcurrentFailures', 2), ('getMinPassTransition', 'A')]
Expected Output: [{'start_timestamp': 2, 'end_timestamp': 5}, 4]
Explanation: A fails on [1, 5) and B fails on [2, 6). At least 2 tests are failing together on [2, 5). A's only completed fail->pass duration is 4.
Input: [('log', 'A', 1, 'fail'), ('log', 'B', 3, 'fail'), ('log', 'C', 8, 'pass'), ('getMaxConcurrentFailures', 2), ('getMinPassTransition', 'A')]
Expected Output: [{'start_timestamp': 3, 'end_timestamp': 8}, None]
Explanation: A and B are both still failing when the query is asked. The latest logged timestamp is 8, so their open segments are treated as [1, 8) and [3, 8). Their overlap is [3, 8). A has no completed fail->pass transition yet.
Input: [('log', 'A', 1, 'fail'), ('log', 'B', 2, 'fail'), ('log', 'A', 4, 'pass'), ('log', 'B', 5, 'pass'), ('log', 'A', 6, 'fail'), ('log', 'B', 7, 'fail'), ('log', 'A', 9, 'pass'), ('log', 'B', 10, 'pass'), ('getMaxConcurrentFailures', 2)]
Expected Output: [{'start_timestamp': 2, 'end_timestamp': 4}]
Explanation: There are two longest intervals with 2 concurrent failures: [2, 4) and [7, 9), both of length 2. The earliest one must be returned.
Input: [('log', 'A', 1, 'fail'), ('log', 'A', 4, 'fail'), ('log', 'B', 2, 'fail'), ('log', 'B', 6, 'pass'), ('log', 'A', 8, 'pass'), ('getMaxConcurrentFailures', 2), ('getMinPassTransition', 'A'), ('getMaxConcurrentFailures', 3)]
Expected Output: [{'start_timestamp': 2, 'end_timestamp': 6}, 7, None]
Explanation: A's consecutive fails at 1 and 4 are one segment, so A fails on [1, 8). B fails on [2, 6). Their overlap is [2, 6). A's completed duration is 7. No interval has 3 concurrent failures.
Input: [('getMaxConcurrentFailures', 1), ('getMinPassTransition', 'Z')]
Expected Output: [None, None]
Explanation: With no logs at all, there is no failure interval and no completed fail->pass transition.
Solution
def solution(operations):
from collections import defaultdict
active_fail_start = {}
min_duration = {}
closed_intervals = []
latest_timestamp = None
answers = []
def get_max_interval(min_tests):
if latest_timestamp is None:
return None
delta = defaultdict(int)
for start, end in closed_intervals:
if start < end:
delta[start] += 1
delta[end] -= 1
for start in active_fail_start.values():
if start < latest_timestamp:
delta[start] += 1
delta[latest_timestamp] -= 1
if not delta:
return None
times = sorted(delta)
count = 0
current_start = None
best_start = None
best_end = None
best_length = 0
for i, t in enumerate(times[:-1]):
count += delta[t]
if count >= min_tests:
if current_start is None:
current_start = t
else:
if current_start is not None:
length = t - current_start
if length > best_length or (
length == best_length and length > 0 and (best_start is None or current_start < best_start)
):
best_start = current_start
best_end = t
best_length = length
current_start = None
if current_start is not None:
end_time = times[-1]
length = end_time - current_start
if length > best_length or (
length == best_length and length > 0 and (best_start is None or current_start < best_start)
):
best_start = current_start
best_end = end_time
best_length = length
if best_length == 0:
return None
return {'start_timestamp': best_start, 'end_timestamp': best_end}
for op in operations:
cmd = op[0]
if cmd == 'log':
_, test_id, timestamp, status = op
latest_timestamp = timestamp
if status == 'fail':
if test_id not in active_fail_start:
active_fail_start[test_id] = timestamp
else: # status == 'pass'
if test_id in active_fail_start:
start = active_fail_start.pop(test_id)
closed_intervals.append((start, timestamp))
duration = timestamp - start
if test_id not in min_duration or duration < min_duration[test_id]:
min_duration[test_id] = duration
elif cmd == 'getMinPassTransition':
_, test_id = op
answers.append(min_duration.get(test_id))
else: # getMaxConcurrentFailures
_, min_tests = op
answers.append(get_max_interval(min_tests))
return answers
Time complexity: log: O(1) per operation; getMinPassTransition: O(1) per query; getMaxConcurrentFailures: O(m log m) per query, where m is the number of known failure intervals plus currently open failing tests. Space complexity: O(m + t), where m is the number of completed failure segments stored and t is the number of distinct test IDs.
Hints
- Think of each failure segment as a half-open interval [start, end). A max-concurrency query becomes a sweep-line problem over interval boundaries.
- For an open failure segment at query time, use the latest seen log timestamp as its temporary end. While sweeping, merge adjacent spans as long as the failing-test count stays at least `min_tests`.