Design streaming test logger and queries
Company: Vanta
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Technical Screen
Quick Answer: This question evaluates a candidate's ability to design streaming data structures and time-series algorithms to track state transitions and compute interval queries, covering competencies in online/event-processing algorithms, interval overlap aggregation, and algorithmic complexity analysis within the Coding & Algorithms domain.
Part 1: Build normalized failure intervals from log()
Constraints
- 0 <= len(logs) <= 200000
- Each log entry is `(test_id, timestamp, status)`
- Timestamps are integers and strictly increasing across the entire stream
- status is always either 'failing' or 'passing'
Examples
Input: ([('T1', 1, 'failing'), ('T1', 4, 'passing')],)
Expected Output: {'T1': [(1, 4)]}
Explanation: A single failing streak starts at 1 and resolves at 4.
Input: ([('T1', 1, 'failing'), ('T1', 2, 'failing'), ('T1', 5, 'passing'), ('T1', 8, 'failing')],)
Expected Output: {'T1': [(1, 5), (8, None)]}
Explanation: The first two failing logs are one streak. The final failure is unresolved.
Input: ([('A', 1, 'passing'), ('B', 2, 'failing'), ('C', 3, 'passing'), ('B', 6, 'passing'), ('A', 7, 'failing')],)
Expected Output: {'A': [(7, None)], 'B': [(2, 6)], 'C': []}
Explanation: Passing logs with no open failure do nothing, but those tests still appear in the result.
Input: ([],)
Expected Output: {}
Explanation: Edge case: no logs means no tests and no intervals.
Solution
def solution(logs):
intervals = {}
current_start = {}
for test_id, timestamp, status in logs:
if test_id not in intervals:
intervals[test_id] = []
current_start[test_id] = None
if status == 'failing':
if current_start[test_id] is None:
current_start[test_id] = timestamp
else:
if current_start[test_id] is not None:
intervals[test_id].append((current_start[test_id], timestamp))
current_start[test_id] = None
for test_id, start in current_start.items():
if start is not None:
intervals[test_id].append((start, None))
return intervalsTime complexity: O(n), where n is the number of log events. Space complexity: O(m + r), where m is the number of distinct tests and r is the number of returned intervals.
Hints
- Track, for each test, whether a failure is currently open and when it started.
- A 'failing' log only matters if the test was not already failing; a 'passing' log only matters if it was.
Part 2: Minimum fix time for one test
Constraints
- 0 <= len(logs) <= 200000
- Each log entry is `(test_id, timestamp, status)`
- Timestamps are integers and strictly increasing across the entire stream
- status is always either 'failing' or 'passing'
Examples
Input: ([('A', 1, 'failing'), ('A', 3, 'passing'), ('A', 5, 'failing'), ('A', 9, 'passing')], 'A')
Expected Output: 2
Explanation: Resolved failure lengths are 2 and 4, so the minimum is 2.
Input: ([('A', 1, 'failing'), ('B', 2, 'failing'), ('A', 4, 'failing'), ('A', 6, 'passing'), ('B', 8, 'passing'), ('A', 10, 'failing'), ('A', 12, 'passing')], 'A')
Expected Output: 2
Explanation: For A, the streak from 1 to 6 has length 5 because the extra failing at 4 does not restart it. The second resolved streak has length 2.
Input: ([('A', 1, 'passing'), ('A', 2, 'passing')], 'A')
Expected Output: None
Explanation: Edge case: the test never enters a failing state.
Input: ([('A', 1, 'failing'), ('B', 2, 'failing'), ('B', 3, 'passing')], 'A')
Expected Output: None
Explanation: A fails but never resolves to passing.
Input: ([], 'A')
Expected Output: None
Explanation: Edge case: an empty stream has no resolved failures.
Solution
def solution(logs, test_id):
current_start = None
best = None
for event_test_id, timestamp, status in logs:
if event_test_id != test_id:
continue
if status == 'failing':
if current_start is None:
current_start = timestamp
else:
if current_start is not None:
duration = timestamp - current_start
if best is None or duration < best:
best = duration
current_start = None
return bestTime complexity: O(n), where n is the number of log events. Space complexity: O(1).
Hints
- You only need to track the current open failure for the queried test, not for every test.
- Ignore consecutive 'failing' logs until you see a 'passing' log that closes the active failure.
Part 3: Longest period with at least k concurrent failing tests
Constraints
- 0 <= len(logs) <= 200000
- Each log entry is `(test_id, timestamp, status)`
- Timestamps are integers and strictly increasing across the entire stream
- status is always either 'failing' or 'passing'
- 1 <= min_tests <= 200000
Examples
Input: ([('A', 1, 'failing'), ('B', 2, 'failing'), ('A', 5, 'passing'), ('B', 7, 'passing')], 2)
Expected Output: {'start_timestamp': 2, 'end_timestamp': 5}
Explanation: A fails on [1, 5), B fails on [2, 7), so both are failing together on [2, 5).
Input: ([('A', 1, 'failing'), ('A', 2, 'failing'), ('B', 3, 'failing'), ('C', 4, 'failing'), ('B', 6, 'passing'), ('A', 8, 'passing'), ('C', 10, 'passing')], 2)
Expected Output: {'start_timestamp': 3, 'end_timestamp': 8}
Explanation: At least two tests are failing continuously from 3 to 8. The extra failing log for A at 2 does not create a new interval.
Input: ([('A', 1, 'failing'), ('B', 2, 'failing'), ('A', 4, 'passing'), ('B', 5, 'passing'), ('A', 7, 'failing'), ('B', 8, 'failing'), ('A', 10, 'passing'), ('B', 11, 'passing')], 2)
Expected Output: {'start_timestamp': 2, 'end_timestamp': 4}
Explanation: There are two maximum-length periods: [2, 4) and [8, 10). The tie is broken by earliest start.
Input: ([('A', 1, 'failing'), ('B', 2, 'failing'), ('A', 3, 'passing'), ('B', 4, 'passing')], 3)
Expected Output: None
Explanation: Edge case: the threshold is never reached.
Input: ([('A', 1, 'failing'), ('B', 3, 'failing')], 2)
Expected Output: None
Explanation: Both tests are failing only after the final log, so there is no finite between-log segment with at least two failing tests.
Solution
def solution(logs, min_tests):
if not logs or min_tests <= 0:
return None
failing = set()
active_start = None
best_start = None
best_end = None
best_length = -1
n = len(logs)
for i, (test_id, timestamp, status) in enumerate(logs):
if status == 'failing':
failing.add(test_id)
else:
failing.discard(test_id)
if i + 1 < n:
if len(failing) >= min_tests:
if active_start is None:
active_start = timestamp
else:
if active_start is not None:
length = timestamp - active_start
if length > best_length or (length == best_length and (best_start is None or active_start < best_start)):
best_start = active_start
best_end = timestamp
best_length = length
active_start = None
if active_start is not None:
end_timestamp = logs[-1][1]
length = end_timestamp - active_start
if length > 0 and (length > best_length or (length == best_length and (best_start is None or active_start < best_start))):
best_start = active_start
best_end = end_timestamp
best_length = length
if best_start is None:
return None
return {'start_timestamp': best_start, 'end_timestamp': best_end}Time complexity: O(n), where n is the number of log events. Space complexity: O(m), where m is the number of distinct tests currently being tracked as failing.
Hints
- The number of failing tests can only change at log timestamps, so scan the stream segment by segment between consecutive timestamps.
- Maintain the current set of failing tests and the start of the current qualifying window where the failing count is at least `min_tests`.