Implement Data Structure for Top-K Elements in Streams
Company: Pinterest
Role: Data Scientist
Category: Coding & Algorithms
Difficulty: Medium
Interview Round: Onsite
##### Scenario
Analytics feature that must constantly report the K largest numbers seen so far.
##### Question
Implement a data structure that ingests a stream of integers and returns the top-K elements at any moment. How do you extend it to very large K or distributed streams?
##### Hints
Maintain a min-heap of size K; for distribution, merge local heaps or use count-min sketches.
Quick Answer: This question evaluates competency in streaming algorithms, dynamic data structures for maintaining top-K elements in real time, and scalability considerations for handling large or distributed input streams.
Design a function that maintains the K largest integers seen from a stream. You are given an integer k and a list of operations. Each operation is either ['add', x] to insert integer x into the stream, or ['get'] to query the current top-k numbers. For every 'get' operation, return a list of up to k largest numbers seen so far in descending (non-increasing) order. Duplicates are allowed and should appear as separate entries if present. If k = 0, every 'get' returns an empty list. Implement the function to process all operations and return the results of the 'get' operations in order.
Constraints
- 1 <= len(operations) <= 200000
- -10^9 <= x <= 10^9 for any added value
- 0 <= k <= 100000
- operations[i] is either ['add', x] or ['get']
- Return one list per 'get' operation, each sorted in descending order
- Use O(k) additional space
Solution
from typing import List
import heapq
def top_k_stream(operations: list, k: int) -> list:
"""
Process a stream of operations maintaining the top-k largest integers.
operations: list of ['add', x] or ['get']
Returns a list of lists: the answer for each 'get'.
"""
heap: List[int] = [] # min-heap storing current top-k
outputs: List[List[int]] = []
if k < 0:
k = 0 # normalize; though constraints say k >= 0
for op in operations:
if not op:
continue
cmd = op[0] if isinstance(op, (list, tuple)) else op
if cmd == 'add':
if k == 0:
continue # nothing to store
x = op[1]
if len(heap) < k:
heapq.heappush(heap, x)
elif x > heap[0]:
heapq.heapreplace(heap, x)
else:
# x is not large enough to enter top-k
pass
elif cmd == 'get':
outputs.append(sorted(heap, reverse=True))
else:
# Unknown command; ignore per spec assumptions
pass
return outputs