Implement Data Structure for Top-K Elements in Streams
Company: Pinterest
Role: Data Scientist
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Onsite
##### Scenario
Analytics feature that must constantly report the K largest numbers seen so far.
##### Question
Implement a data structure that ingests a stream of integers and returns the top-K elements at any moment. How do you extend it to very large K or distributed streams?
##### Hints
Maintain a min-heap of size K; for distribution, merge local heaps or use count-min sketches.
Quick Answer: This question evaluates competency in streaming algorithms, dynamic data structures for maintaining top-K elements in real time, and scalability considerations for handling large or distributed input streams.
Design a function that maintains the K largest integers seen from a stream. You are given an integer k and a list of operations. Each operation is either ['add', x] to insert integer x into the stream, or ['get'] to query the current top-k numbers. For every 'get' operation, return a list of up to k largest numbers seen so far in descending (non-increasing) order. Duplicates are allowed and should appear as separate entries if present. If k = 0, every 'get' returns an empty list. Implement the function to process all operations and return the results of the 'get' operations in order.
Constraints
- 1 <= len(operations) <= 200000
- -10^9 <= x <= 10^9 for any added value
- 0 <= k <= 100000
- operations[i] is either ['add', x] or ['get']
- Return one list per 'get' operation, each sorted in descending order
- Use O(k) additional space
Solution
from typing import List
import heapq
def top_k_stream(operations: list, k: int) -> list:
"""
Process a stream of operations maintaining the top-k largest integers.
operations: list of ['add', x] or ['get']
Returns a list of lists: the answer for each 'get'.
"""
heap: List[int] = [] # min-heap storing current top-k
outputs: List[List[int]] = []
if k < 0:
k = 0 # normalize; though constraints say k >= 0
for op in operations:
if not op:
continue
cmd = op[0] if isinstance(op, (list, tuple)) else op
if cmd == 'add':
if k == 0:
continue # nothing to store
x = op[1]
if len(heap) < k:
heapq.heappush(heap, x)
elif x > heap[0]:
heapq.heapreplace(heap, x)
else:
# x is not large enough to enter top-k
pass
elif cmd == 'get':
outputs.append(sorted(heap, reverse=True))
else:
# Unknown command; ignore per spec assumptions
pass
return outputs
Explanation
Keep a fixed-size min-heap of size at most K. Each new value is pushed if the heap is not full. Once full, only values larger than the heap's minimum replace it, ensuring the heap always contains the current K largest elements. For a 'get' query, copy and sort the heap in descending order to return the top-K values observed so far. This preserves duplicates and returns fewer than K values if fewer have been seen.
Time complexity: O(log K) per 'add'; O(K log K) per 'get'. Space complexity: O(K).
Hints
- Maintain a min-heap of size at most K: push until size K; thereafter, replace the heap minimum only if the new value is larger.
- For 'get', copy and sort the heap in descending order to return the current top-K.
- For very large K or distributed streams, keep local top-K min-heaps on shards and merge them with a min-heap or k-way merge; for approximate heavy hitters, consider Count-Min Sketch.