Compute sliding window median
Company: Snapchat
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Technical Screen
##### Question
LeetCode 480. Sliding Window Median: Given an array nums and an integer k, find the median of each sliding window of size k as the window moves from left to right across the array.
https://leetcode.com/problems/sliding-window-median/description/
Quick Answer: This question evaluates the ability to maintain a running median over a sliding window, testing competencies in data structures, order statistics, dynamic updates, and algorithmic efficiency within the Coding & Algorithms domain.
Given an integer array nums and an integer k, compute the median of each contiguous subarray (sliding window) of length k as the window moves one position to the right at a time. For even k, the median is the average of the two middle elements. Return the sequence of medians as floats in the order of the window's starting index.
Constraints
- 1 <= len(nums) <= 200000
- 1 <= k <= len(nums)
- -10^9 <= nums[i] <= 10^9
- Output each median as a float (no rounding beyond standard floating-point behavior)
Solution
from typing import List
from heapq import heappush, heappop
from collections import defaultdict
def sliding_window_median(nums: List[int], k: int) -> List[float]:
n = len(nums)
if k <= 0 or k > n:
return []
# Max-heap (store negatives) for lower half, min-heap for upper half
lo = [] # max-heap via negatives
hi = [] # min-heap
delayed = defaultdict(int) # value -> count of delayed removals
lo_size = 0 # effective size (excluding delayed) of lo
hi_size = 0 # effective size (excluding delayed) of hi
def prune(heap):
# Remove elements from heap top that are marked for deletion
while heap:
val = -heap[0] if heap is lo else heap[0]
if delayed.get(val, 0) > 0:
heappop(heap)
delayed[val] -= 1
if delayed[val] == 0:
del delayed[val]
else:
break
def balance():
nonlocal lo_size, hi_size
# Ensure lo has either the same number of elements as hi or one more
if lo_size > hi_size + 1:
moved = -heappop(lo)
lo_size -= 1
heappush(hi, moved)
hi_size += 1
prune(lo)
elif lo_size < hi_size:
moved = heappop(hi)
hi_size -= 1
heappush(lo, -moved)
lo_size += 1
prune(hi)
def add(num: int):
nonlocal lo_size, hi_size
if not lo or num <= -lo[0]:
heappush(lo, -num)
lo_size += 1
else:
heappush(hi, num)
hi_size += 1
balance()
def remove(num: int):
nonlocal lo_size, hi_size
delayed[num] += 1
# Decide which heap the number would belong to
if lo and num <= -lo[0]:
lo_size -= 1
if lo and -lo[0] == num:
prune(lo)
else:
hi_size -= 1
if hi and hi[0] == num:
prune(hi)
balance()
def get_median() -> float:
prune(lo)
prune(hi)
if k % 2 == 1:
return float(-lo[0])
return (-lo[0] + hi[0]) / 2.0
result: List[float] = []
# Initialize first window
for i in range(k):
add(nums[i])
result.append(get_median())
# Slide the window
for i in range(k, n):
add(nums[i])
remove(nums[i - k])
result.append(get_median())
return result