PracHub
QuestionsPremiumCoachesLearningGuidesInterview Prep

Quick Overview

This question evaluates a candidate's ability to design streaming data structures and time-series algorithms to track state transitions and compute interval queries, covering competencies in online/event-processing algorithms, interval overlap aggregation, and algorithmic complexity analysis within the Coding & Algorithms domain.

  • Medium
  • Vanta
  • Coding & Algorithms
  • Software Engineer

Design streaming test logger and queries

Company: Vanta

Role: Software Engineer

Category: Coding & Algorithms

Difficulty: Medium

Interview Round: Technical Screen

Implement a streaming logger and two query operations for test runs with globally strictly increasing timestamps. 1) Implement log(test_id, timestamp, status) where status ∈ {'failing','passing'}. 2) Implement get_min_fix_time(test_id) -> number|null: For the specified test, return the minimal duration from the start of a failing state to the next subsequent passing state. If multiple failing statuses occur consecutively, treat them as a single failure starting at the first failure; if the failure never resolves to passing, or the test never fails, return null. 3) Implement get_max_concurrent_failure_period(min_tests) -> {start_timestamp, end_timestamp}|null: Return the longest contiguous period [start_timestamp, end_timestamp) during which at least min_tests distinct tests are failing simultaneously; if no such period exists, return null. Assume timestamps are integers and strictly increasing across all log calls. Describe your data structures, algorithms, and time/space complexities.

Quick Answer: This question evaluates a candidate's ability to design streaming data structures and time-series algorithms to track state transitions and compute interval queries, covering competencies in online/event-processing algorithms, interval overlap aggregation, and algorithmic complexity analysis within the Coding & Algorithms domain.

Part 1: Build normalized failure intervals from log()

You are given a chronologically ordered stream of log events `(test_id, timestamp, status)`, where `status` is either `'failing'` or `'passing'` and timestamps are globally strictly increasing. Process the stream as if `log(test_id, timestamp, status)` were called for each event. For every test ID that appears in the stream, return its normalized failure intervals. A failure interval starts at the first `'failing'` event in a consecutive failing streak and ends at the next later `'passing'` event for the same test. Consecutive `'failing'` events while the test is already failing do not start new intervals. Consecutive `'passing'` events while the test is not failing do nothing. If a failure is still open at the end of the stream, its end should be `None`.

Constraints

  • 0 <= len(logs) <= 200000
  • Each log entry is `(test_id, timestamp, status)`
  • Timestamps are integers and strictly increasing across the entire stream
  • status is always either 'failing' or 'passing'

Examples

Input: ([('T1', 1, 'failing'), ('T1', 4, 'passing')],)

Expected Output: {'T1': [(1, 4)]}

Explanation: A single failing streak starts at 1 and resolves at 4.

Input: ([('T1', 1, 'failing'), ('T1', 2, 'failing'), ('T1', 5, 'passing'), ('T1', 8, 'failing')],)

Expected Output: {'T1': [(1, 5), (8, None)]}

Explanation: The first two failing logs are one streak. The final failure is unresolved.

Input: ([('A', 1, 'passing'), ('B', 2, 'failing'), ('C', 3, 'passing'), ('B', 6, 'passing'), ('A', 7, 'failing')],)

Expected Output: {'A': [(7, None)], 'B': [(2, 6)], 'C': []}

Explanation: Passing logs with no open failure do nothing, but those tests still appear in the result.

Input: ([],)

Expected Output: {}

Explanation: Edge case: no logs means no tests and no intervals.

Solution

def solution(logs):
    intervals = {}
    current_start = {}

    for test_id, timestamp, status in logs:
        if test_id not in intervals:
            intervals[test_id] = []
            current_start[test_id] = None

        if status == 'failing':
            if current_start[test_id] is None:
                current_start[test_id] = timestamp
        else:
            if current_start[test_id] is not None:
                intervals[test_id].append((current_start[test_id], timestamp))
                current_start[test_id] = None

    for test_id, start in current_start.items():
        if start is not None:
            intervals[test_id].append((start, None))

    return intervals

Time complexity: O(n), where n is the number of log events. Space complexity: O(m + r), where m is the number of distinct tests and r is the number of returned intervals.

Hints

  1. Track, for each test, whether a failure is currently open and when it started.
  2. A 'failing' log only matters if the test was not already failing; a 'passing' log only matters if it was.

Part 2: Minimum fix time for one test

You are given a chronologically ordered stream of log events `(test_id, timestamp, status)` and a target `test_id`. For that one test, compute the minimum time needed to recover from failure. A failure starts at the first `'failing'` event in a consecutive failing streak and ends at the next later `'passing'` event for the same test. Consecutive `'failing'` logs while already failing belong to the same failure. If the test never has a resolved failure, return `None`.

Constraints

  • 0 <= len(logs) <= 200000
  • Each log entry is `(test_id, timestamp, status)`
  • Timestamps are integers and strictly increasing across the entire stream
  • status is always either 'failing' or 'passing'

Examples

Input: ([('A', 1, 'failing'), ('A', 3, 'passing'), ('A', 5, 'failing'), ('A', 9, 'passing')], 'A')

Expected Output: 2

Explanation: Resolved failure lengths are 2 and 4, so the minimum is 2.

Input: ([('A', 1, 'failing'), ('B', 2, 'failing'), ('A', 4, 'failing'), ('A', 6, 'passing'), ('B', 8, 'passing'), ('A', 10, 'failing'), ('A', 12, 'passing')], 'A')

Expected Output: 2

Explanation: For A, the streak from 1 to 6 has length 5 because the extra failing at 4 does not restart it. The second resolved streak has length 2.

Input: ([('A', 1, 'passing'), ('A', 2, 'passing')], 'A')

Expected Output: None

Explanation: Edge case: the test never enters a failing state.

Input: ([('A', 1, 'failing'), ('B', 2, 'failing'), ('B', 3, 'passing')], 'A')

Expected Output: None

Explanation: A fails but never resolves to passing.

Input: ([], 'A')

Expected Output: None

Explanation: Edge case: an empty stream has no resolved failures.

Solution

def solution(logs, test_id):
    current_start = None
    best = None

    for event_test_id, timestamp, status in logs:
        if event_test_id != test_id:
            continue

        if status == 'failing':
            if current_start is None:
                current_start = timestamp
        else:
            if current_start is not None:
                duration = timestamp - current_start
                if best is None or duration < best:
                    best = duration
                current_start = None

    return best

Time complexity: O(n), where n is the number of log events. Space complexity: O(1).

Hints

  1. You only need to track the current open failure for the queried test, not for every test.
  2. Ignore consecutive 'failing' logs until you see a 'passing' log that closes the active failure.

Part 3: Longest period with at least k concurrent failing tests

You are given a chronologically ordered stream of log events `(test_id, timestamp, status)` and an integer `min_tests`. A test is considered failing from the first `'failing'` log in a failing streak until the next `'passing'` log for that same test. Consecutive `'failing'` logs while already failing do not change the state, and consecutive `'passing'` logs while not failing do nothing. After processing a log at timestamp `t_i`, the resulting failure set is stable on the half-open segment `[t_i, t_{i+1})`, where `t_{i+1}` is the next log timestamp. Return the longest contiguous period `[start_timestamp, end_timestamp)` during which at least `min_tests` distinct tests are failing simultaneously on every point of the period. If multiple periods have the same maximum length, return the earliest one. If no such finite period exists, return `None`.

Constraints

  • 0 <= len(logs) <= 200000
  • Each log entry is `(test_id, timestamp, status)`
  • Timestamps are integers and strictly increasing across the entire stream
  • status is always either 'failing' or 'passing'
  • 1 <= min_tests <= 200000

Examples

Input: ([('A', 1, 'failing'), ('B', 2, 'failing'), ('A', 5, 'passing'), ('B', 7, 'passing')], 2)

Expected Output: {'start_timestamp': 2, 'end_timestamp': 5}

Explanation: A fails on [1, 5), B fails on [2, 7), so both are failing together on [2, 5).

Input: ([('A', 1, 'failing'), ('A', 2, 'failing'), ('B', 3, 'failing'), ('C', 4, 'failing'), ('B', 6, 'passing'), ('A', 8, 'passing'), ('C', 10, 'passing')], 2)

Expected Output: {'start_timestamp': 3, 'end_timestamp': 8}

Explanation: At least two tests are failing continuously from 3 to 8. The extra failing log for A at 2 does not create a new interval.

Input: ([('A', 1, 'failing'), ('B', 2, 'failing'), ('A', 4, 'passing'), ('B', 5, 'passing'), ('A', 7, 'failing'), ('B', 8, 'failing'), ('A', 10, 'passing'), ('B', 11, 'passing')], 2)

Expected Output: {'start_timestamp': 2, 'end_timestamp': 4}

Explanation: There are two maximum-length periods: [2, 4) and [8, 10). The tie is broken by earliest start.

Input: ([('A', 1, 'failing'), ('B', 2, 'failing'), ('A', 3, 'passing'), ('B', 4, 'passing')], 3)

Expected Output: None

Explanation: Edge case: the threshold is never reached.

Input: ([('A', 1, 'failing'), ('B', 3, 'failing')], 2)

Expected Output: None

Explanation: Both tests are failing only after the final log, so there is no finite between-log segment with at least two failing tests.

Solution

def solution(logs, min_tests):
    if not logs or min_tests <= 0:
        return None

    failing = set()
    active_start = None
    best_start = None
    best_end = None
    best_length = -1
    n = len(logs)

    for i, (test_id, timestamp, status) in enumerate(logs):
        if status == 'failing':
            failing.add(test_id)
        else:
            failing.discard(test_id)

        if i + 1 < n:
            if len(failing) >= min_tests:
                if active_start is None:
                    active_start = timestamp
            else:
                if active_start is not None:
                    length = timestamp - active_start
                    if length > best_length or (length == best_length and (best_start is None or active_start < best_start)):
                        best_start = active_start
                        best_end = timestamp
                        best_length = length
                    active_start = None

    if active_start is not None:
        end_timestamp = logs[-1][1]
        length = end_timestamp - active_start
        if length > 0 and (length > best_length or (length == best_length and (best_start is None or active_start < best_start))):
            best_start = active_start
            best_end = end_timestamp
            best_length = length

    if best_start is None:
        return None

    return {'start_timestamp': best_start, 'end_timestamp': best_end}

Time complexity: O(n), where n is the number of log events. Space complexity: O(m), where m is the number of distinct tests currently being tracked as failing.

Hints

  1. The number of failing tests can only change at log timestamps, so scan the stream segment by segment between consecutive timestamps.
  2. Maintain the current set of failing tests and the start of the current qualifying window where the failing count is at least `min_tests`.
Last updated: May 29, 2026

Loading coding console...

PracHub

Master your tech interviews with 8,500+ real questions from top companies.

Product

  • Questions
  • Learning Tracks
  • Interview Guides
  • Resources
  • Premium
  • For Universities
  • Student Access

Browse

  • By Company
  • By Role
  • By Category
  • Topic Hubs
  • SQL Questions
  • Compare Platforms
  • Discord Community

Support

  • support@prachub.com
  • (916) 541-4762

Legal

  • Privacy Policy
  • Terms of Service
  • About Us

© 2026 PracHub. All rights reserved.

Related Coding Questions

  • Implement a uniq-like function - Vanta (medium)
  • Design test-run logging and query functions - Vanta (Medium)
  • Implement test failure analytics APIs - Vanta (Medium)
  • Design test logging and failure-window queries - Vanta (Medium)