PracHub
QuestionsPremiumCoachesLearningGuidesInterview Prep

Quick Overview

This question evaluates streaming data structures, time-series event processing, interval/temporal query algorithms, and complexity analysis as applied to a software engineer role.

  • Medium
  • Vanta
  • Coding & Algorithms
  • Software Engineer

Design test logging and failure-window queries

Company: Vanta

Role: Software Engineer

Category: Coding & Algorithms

Difficulty: Medium

Interview Round: Technical Screen

Build an in-memory service to log test run results and answer two queries. Assumptions: events arrive with strictly increasing timestamps across all test IDs. Implement: 1) log(test_id, timestamp, status): Record a status update where status ∈ {'pass','fail'}. 2) getMinPassTransition(test_id): Return the minimum elapsed time for this test to change from failing to passing. Treat consecutive 'fail' reports as one failure segment that begins at the first 'fail' in that run. If a failure segment never transitions to 'pass', ignore it. Return the duration as a number in the timestamp units, or null if no complete fail→pass transition exists. Follow-up: Keep the same logging function and add: 3) getMaxConcurrentFailures(min_tests): Find the longest contiguous time interval during which at least min_tests distinct tests are failing simultaneously. Return an object { start_timestamp: <inclusive>, end_timestamp: <exclusive> }. If no such interval exists, return null. Clarify tie-breaking (e.g., return the earliest longest interval). Describe data structures, update/query algorithms, and analyze time and space complexity under streaming inserts.

Quick Answer: This question evaluates streaming data structures, time-series event processing, interval/temporal query algorithms, and complexity analysis as applied to a software engineer role.

Part 1: Minimum fail-to-pass transition time

You are given a stream of operations for an in-memory test logging service. Log operations arrive in strictly increasing timestamp order across all test IDs. Implement two operations: - ('log', test_id, timestamp, status): record a status update, where status is either 'pass' or 'fail'. - ('getMinPassTransition', test_id): return the minimum elapsed time for that test to change from failing to passing among all completed failure segments seen so far. A failure segment starts at the first 'fail' for a test while it is not already failing. Consecutive 'fail' logs for the same test belong to the same segment and do not reset the start time. A segment is only counted if it later receives a 'pass'. If no completed fail->pass segment exists for the queried test, return None. Process the operations in order and return the results of all getMinPassTransition queries in order.

Constraints

  • 1 <= len(operations) <= 200000
  • 1 <= timestamp <= 10^9
  • All 'log' timestamps are strictly increasing across the entire input
  • status is always 'pass' or 'fail'
  • test_id consists of printable non-empty characters

Examples

Input: [('log', 'A', 1, 'fail'), ('log', 'A', 4, 'fail'), ('log', 'A', 7, 'pass'), ('log', 'A', 10, 'fail'), ('log', 'A', 12, 'pass'), ('getMinPassTransition', 'A')]

Expected Output: [2]

Explanation: The first failure segment is from 1 to 7, so its duration is 6. The second is from 10 to 12, so its duration is 2. The minimum is 2.

Input: [('log', 'A', 2, 'pass'), ('log', 'A', 5, 'fail'), ('getMinPassTransition', 'A'), ('log', 'A', 11, 'pass'), ('getMinPassTransition', 'A'), ('getMinPassTransition', 'B')]

Expected Output: [None, 6, None]

Explanation: A 'pass' without an open failure segment does nothing. After the fail at 5 is closed by pass at 11, A has one completed duration of 6. B has no completed transition.

Input: [('getMinPassTransition', 'X'), ('log', 'B', 3, 'fail'), ('log', 'C', 4, 'fail'), ('log', 'B', 8, 'pass'), ('getMinPassTransition', 'B'), ('getMinPassTransition', 'C')]

Expected Output: [None, 5, None]

Explanation: Querying before any completed segment returns None. B completes one fail->pass segment of length 5. C is still failing, so it has no completed segment yet.

Input: [('log', 'T', 1, 'fail'), ('log', 'T', 6, 'pass'), ('log', 'T', 9, 'fail'), ('log', 'T', 15, 'pass'), ('log', 'T', 20, 'fail'), ('log', 'T', 23, 'pass'), ('getMinPassTransition', 'T')]

Expected Output: [3]

Explanation: The completed durations are 5, 6, and 3, so the minimum returned is 3.

Solution

def solution(operations):
    active_fail_start = {}
    min_duration = {}
    answers = []

    for op in operations:
        if op[0] == 'log':
            _, test_id, timestamp, status = op

            if status == 'fail':
                if test_id not in active_fail_start:
                    active_fail_start[test_id] = timestamp
            else:  # status == 'pass'
                if test_id in active_fail_start:
                    duration = timestamp - active_fail_start[test_id]
                    if test_id not in min_duration or duration < min_duration[test_id]:
                        min_duration[test_id] = duration
                    del active_fail_start[test_id]
        else:  # getMinPassTransition
            _, test_id = op
            answers.append(min_duration.get(test_id))

    return answers

Time complexity: O(n), where n is the number of operations. Space complexity: O(t), where t is the number of distinct test IDs that have appeared.

Hints

  1. For each test, you only need to know whether it is currently in a failure segment and, if so, when that segment started.
  2. Only update the best answer when a 'pass' closes an existing failure segment.

Part 2: Longest interval with at least k concurrent failures

You are given a stream of operations for an in-memory test logging service. Log operations arrive in strictly increasing timestamp order across all test IDs. Support these operations: - ('log', test_id, timestamp, status) - ('getMinPassTransition', test_id) - ('getMaxConcurrentFailures', min_tests) Failure-segment rules are the same as in Part 1: - A failure segment starts at the first 'fail' for a test while it is not already failing. - Consecutive 'fail' logs for the same test stay in the same segment. - A segment ends when that test receives a 'pass'. Query behavior: 1. getMinPassTransition(test_id): return the minimum completed fail->pass duration for that test so far, or None if none exists. 2. getMaxConcurrentFailures(min_tests): return the longest contiguous time interval during which at least `min_tests` distinct tests are failing simultaneously. Use half-open intervals [start, end): - A completed failure segment contributes [fail_start, pass_timestamp). - If a test is still failing when the query is asked, treat its current open segment as [fail_start, latest_logged_timestamp), where `latest_logged_timestamp` is the timestamp of the most recent log seen so far. If multiple intervals have the same maximum length, return the earliest one. If no positive-length interval exists, return None. Process the operations in order and return the results of all query operations in order.

Constraints

  • 1 <= len(operations) <= 20000
  • 1 <= timestamp <= 10^9
  • All 'log' timestamps are strictly increasing across the entire input
  • 1 <= min_tests <= 10^5
  • status is always 'pass' or 'fail'

Examples

Input: [('log', 'A', 1, 'fail'), ('log', 'B', 2, 'fail'), ('log', 'A', 5, 'pass'), ('log', 'B', 6, 'pass'), ('getMaxConcurrentFailures', 2), ('getMinPassTransition', 'A')]

Expected Output: [{'start_timestamp': 2, 'end_timestamp': 5}, 4]

Explanation: A fails on [1, 5) and B fails on [2, 6). At least 2 tests are failing together on [2, 5). A's only completed fail->pass duration is 4.

Input: [('log', 'A', 1, 'fail'), ('log', 'B', 3, 'fail'), ('log', 'C', 8, 'pass'), ('getMaxConcurrentFailures', 2), ('getMinPassTransition', 'A')]

Expected Output: [{'start_timestamp': 3, 'end_timestamp': 8}, None]

Explanation: A and B are both still failing when the query is asked. The latest logged timestamp is 8, so their open segments are treated as [1, 8) and [3, 8). Their overlap is [3, 8). A has no completed fail->pass transition yet.

Input: [('log', 'A', 1, 'fail'), ('log', 'B', 2, 'fail'), ('log', 'A', 4, 'pass'), ('log', 'B', 5, 'pass'), ('log', 'A', 6, 'fail'), ('log', 'B', 7, 'fail'), ('log', 'A', 9, 'pass'), ('log', 'B', 10, 'pass'), ('getMaxConcurrentFailures', 2)]

Expected Output: [{'start_timestamp': 2, 'end_timestamp': 4}]

Explanation: There are two longest intervals with 2 concurrent failures: [2, 4) and [7, 9), both of length 2. The earliest one must be returned.

Input: [('log', 'A', 1, 'fail'), ('log', 'A', 4, 'fail'), ('log', 'B', 2, 'fail'), ('log', 'B', 6, 'pass'), ('log', 'A', 8, 'pass'), ('getMaxConcurrentFailures', 2), ('getMinPassTransition', 'A'), ('getMaxConcurrentFailures', 3)]

Expected Output: [{'start_timestamp': 2, 'end_timestamp': 6}, 7, None]

Explanation: A's consecutive fails at 1 and 4 are one segment, so A fails on [1, 8). B fails on [2, 6). Their overlap is [2, 6). A's completed duration is 7. No interval has 3 concurrent failures.

Input: [('getMaxConcurrentFailures', 1), ('getMinPassTransition', 'Z')]

Expected Output: [None, None]

Explanation: With no logs at all, there is no failure interval and no completed fail->pass transition.

Solution

def solution(operations):
    from collections import defaultdict

    active_fail_start = {}
    min_duration = {}
    closed_intervals = []
    latest_timestamp = None
    answers = []

    def get_max_interval(min_tests):
        if latest_timestamp is None:
            return None

        delta = defaultdict(int)

        for start, end in closed_intervals:
            if start < end:
                delta[start] += 1
                delta[end] -= 1

        for start in active_fail_start.values():
            if start < latest_timestamp:
                delta[start] += 1
                delta[latest_timestamp] -= 1

        if not delta:
            return None

        times = sorted(delta)
        count = 0
        current_start = None
        best_start = None
        best_end = None
        best_length = 0

        for i, t in enumerate(times[:-1]):
            count += delta[t]

            if count >= min_tests:
                if current_start is None:
                    current_start = t
            else:
                if current_start is not None:
                    length = t - current_start
                    if length > best_length or (
                        length == best_length and length > 0 and (best_start is None or current_start < best_start)
                    ):
                        best_start = current_start
                        best_end = t
                        best_length = length
                    current_start = None

        if current_start is not None:
            end_time = times[-1]
            length = end_time - current_start
            if length > best_length or (
                length == best_length and length > 0 and (best_start is None or current_start < best_start)
            ):
                best_start = current_start
                best_end = end_time
                best_length = length

        if best_length == 0:
            return None

        return {'start_timestamp': best_start, 'end_timestamp': best_end}

    for op in operations:
        cmd = op[0]

        if cmd == 'log':
            _, test_id, timestamp, status = op
            latest_timestamp = timestamp

            if status == 'fail':
                if test_id not in active_fail_start:
                    active_fail_start[test_id] = timestamp
            else:  # status == 'pass'
                if test_id in active_fail_start:
                    start = active_fail_start.pop(test_id)
                    closed_intervals.append((start, timestamp))
                    duration = timestamp - start
                    if test_id not in min_duration or duration < min_duration[test_id]:
                        min_duration[test_id] = duration

        elif cmd == 'getMinPassTransition':
            _, test_id = op
            answers.append(min_duration.get(test_id))

        else:  # getMaxConcurrentFailures
            _, min_tests = op
            answers.append(get_max_interval(min_tests))

    return answers

Time complexity: log: O(1) per operation; getMinPassTransition: O(1) per query; getMaxConcurrentFailures: O(m log m) per query, where m is the number of known failure intervals plus currently open failing tests. Space complexity: O(m + t), where m is the number of completed failure segments stored and t is the number of distinct test IDs.

Hints

  1. Think of each failure segment as a half-open interval [start, end). A max-concurrency query becomes a sweep-line problem over interval boundaries.
  2. For an open failure segment at query time, use the latest seen log timestamp as its temporary end. While sweeping, merge adjacent spans as long as the failing-test count stays at least `min_tests`.
Last updated: May 29, 2026

Loading coding console...

PracHub

Master your tech interviews with 8,500+ real questions from top companies.

Product

  • Questions
  • Learning Tracks
  • Interview Guides
  • Resources
  • Premium
  • For Universities
  • Student Access

Browse

  • By Company
  • By Role
  • By Category
  • Topic Hubs
  • SQL Questions
  • Compare Platforms
  • Discord Community

Support

  • support@prachub.com
  • (916) 541-4762

Legal

  • Privacy Policy
  • Terms of Service
  • About Us

© 2026 PracHub. All rights reserved.

Related Coding Questions

  • Implement a uniq-like function - Vanta (medium)
  • Design test-run logging and query functions - Vanta (Medium)
  • Implement test failure analytics APIs - Vanta (Medium)
  • Design streaming test logger and queries - Vanta (Medium)