Compute sliding window median
Company: Snapchat
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Technical Screen
##### Question
LeetCode 480. Sliding Window Median: Given an array nums and an integer k, find the median of each sliding window of size k as the window moves from left to right across the array.
https://leetcode.com/problems/sliding-window-median/description/
Quick Answer: This question evaluates the ability to maintain a running median over a sliding window, testing competencies in data structures, order statistics, dynamic updates, and algorithmic efficiency within the Coding & Algorithms domain.
Given an integer array nums and an integer k, compute the median of each contiguous subarray (sliding window) of length k as the window moves one position to the right at a time. For even k, the median is the average of the two middle elements. Return the sequence of medians as floats in the order of the window's starting index.
Constraints
- 1 <= len(nums) <= 200000
- 1 <= k <= len(nums)
- -10^9 <= nums[i] <= 10^9
- Output each median as a float (no rounding beyond standard floating-point behavior)
Solution
from typing import List
from heapq import heappush, heappop
from collections import defaultdict
def sliding_window_median(nums: List[int], k: int) -> List[float]:
n = len(nums)
if k <= 0 or k > n:
return []
# Max-heap (store negatives) for lower half, min-heap for upper half
lo = [] # max-heap via negatives
hi = [] # min-heap
delayed = defaultdict(int) # value -> count of delayed removals
lo_size = 0 # effective size (excluding delayed) of lo
hi_size = 0 # effective size (excluding delayed) of hi
def prune(heap):
# Remove elements from heap top that are marked for deletion
while heap:
val = -heap[0] if heap is lo else heap[0]
if delayed.get(val, 0) > 0:
heappop(heap)
delayed[val] -= 1
if delayed[val] == 0:
del delayed[val]
else:
break
def balance():
nonlocal lo_size, hi_size
# Ensure lo has either the same number of elements as hi or one more
if lo_size > hi_size + 1:
moved = -heappop(lo)
lo_size -= 1
heappush(hi, moved)
hi_size += 1
prune(lo)
elif lo_size < hi_size:
moved = heappop(hi)
hi_size -= 1
heappush(lo, -moved)
lo_size += 1
prune(hi)
def add(num: int):
nonlocal lo_size, hi_size
if not lo or num <= -lo[0]:
heappush(lo, -num)
lo_size += 1
else:
heappush(hi, num)
hi_size += 1
balance()
def remove(num: int):
nonlocal lo_size, hi_size
delayed[num] += 1
# Decide which heap the number would belong to
if lo and num <= -lo[0]:
lo_size -= 1
if lo and -lo[0] == num:
prune(lo)
else:
hi_size -= 1
if hi and hi[0] == num:
prune(hi)
balance()
def get_median() -> float:
prune(lo)
prune(hi)
if k % 2 == 1:
return float(-lo[0])
return (-lo[0] + hi[0]) / 2.0
result: List[float] = []
# Initialize first window
for i in range(k):
add(nums[i])
result.append(get_median())
# Slide the window
for i in range(k, n):
add(nums[i])
remove(nums[i - k])
result.append(get_median())
return result
Explanation
Maintain two heaps to represent the lower and upper halves of the current window. The lower half is a max-heap (implemented via negatives) and the upper half is a min-heap. Insertions go to one of the heaps based on value, followed by rebalancing so the size difference is at most one (with the lower half allowed one extra when k is odd). Elements leaving the window are lazily deleted: record them in a counter and physically remove them only when they reach the top of their heap. The median is either the top of the lower half (odd k) or the average of both tops (even k). This yields O(n log k) time and O(k) space.
Time complexity: O(n log k). Space complexity: O(k).
Hints
- Maintain two heaps: a max-heap for the lower half and a min-heap for the upper half.
- Balance the heaps so that their sizes differ by at most one, with the lower half allowed to have one extra when k is odd.
- Use lazy deletion: mark outgoing elements in a hash map and remove them from the top of a heap only when they reach the top.
- Median is the top of the max-heap for odd k, and the average of both tops for even k.