Solve sliding-window and heap problems
Company: Amazon
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Onsite
Quick Answer: This question evaluates algorithm design and data structure proficiency, focusing on sliding-window techniques for fixed-length subarray aggregates and heap- or hashmap-based approaches for maintaining medians in a real-time integer stream, along with considerations for concurrent execution.
Constraints
- 1 <= len(nums) <= 200000
- 1 <= k <= len(nums)
- -10^9 <= nums[i] <= 10^9
- Return medians as floats; for even k, use (a + b) / 2.0 with no rounding beyond floating-point defaults
- Target time complexity: O(n log k), space: O(k)
Solution
from typing import List
import heapq
from collections import defaultdict
def sliding_window_median(nums: List[int], k: int) -> List[float]:
small = [] # max-heap (store negatives)
large = [] # min-heap
delayed = defaultdict(int) # value -> count to lazily delete
s_size = 0 # number of valid elements in small
l_size = 0 # number of valid elements in large
def prune(heap):
# Remove elements at the top that are marked for deletion
while heap:
if heap is small:
num = -heap[0]
else:
num = heap[0]
if delayed.get(num, 0):
heapq.heappop(heap)
delayed[num] -= 1
if delayed[num] == 0:
del delayed[num]
else:
break
def make_balance():
nonlocal s_size, l_size
prune(small)
prune(large)
# Ensure size property: s_size >= l_size and at most 1 larger
if s_size > l_size + 1:
val = -heapq.heappop(small)
heapq.heappush(large, val)
s_size -= 1
l_size += 1
elif s_size < l_size:
val = heapq.heappop(large)
heapq.heappush(small, -val)
s_size += 1
l_size -= 1
prune(small)
prune(large)
def add_num(num: int):
nonlocal s_size, l_size
if not small or num <= -small[0]:
heapq.heappush(small, -num)
s_size += 1
else:
heapq.heappush(large, num)
l_size += 1
make_balance()
def remove_num(num: int):
nonlocal s_size, l_size
delayed[num] += 1
# Decide which heap the number belongs to
if small and num <= -small[0]:
s_size -= 1
if small and -small[0] == num:
prune(small)
else:
l_size -= 1
if large and large[0] == num:
prune(large)
make_balance()
def get_median() -> float:
prune(small)
prune(large)
if k % 2 == 1:
return float(-small[0])
else:
return (-small[0] + large[0]) / 2.0
res: List[float] = []
for i, num in enumerate(nums):
add_num(num)
if i >= k:
remove_num(nums[i - k])
if i >= k - 1:
res.append(get_median())
return res
Explanation
Time complexity: O(n log k). Space complexity: O(k).
Hints
- Maintain two heaps: a max-heap for the lower half and a min-heap for the upper half of the window.
- Balance heaps so that the max-heap has either the same number of valid elements as the min-heap or one more.
- Use a hashmap for lazy deletion to remove elements that slide out without direct heap deletion.
- When computing the median, prune invalid (delayed) elements from heap tops.
- For even k, average the max of the lower half and the min of the upper half.