Implement streaming RLE and bit-packed codec
Company: Databricks
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: hard
Interview Round: Onsite
This question evaluates understanding of data compression algorithms, bit-level manipulation, two's‑complement integer representation, streaming algorithm design, and encoder/decoder state management for 32-bit signed integers.
Company: Databricks
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: hard
Interview Round: Onsite
Loading coding console...
Quick Answer: This question evaluates understanding of data compression algorithms, bit-level manipulation, two's‑complement integer representation, streaming algorithm design, and encoder/decoder state management for 32-bit signed integers.
Input: ('R', (5, 4))
Expected Output: {'type': 'R', 'value': 5, 'count': 4}
Explanation: A run-length block is stored directly.
Input: ('B', [0, 1, -1])
Expected Output: {'type': 'B', 'bitWidth': 2, 'count': 3, 'words': [52]}
Explanation: The minimum common width is 2 bits: 0 -> 00, 1 -> 01, -1 -> 11. Packed word is 0b110100 = 52.
Input: ('B', [-2147483648, 2147483647])
Expected Output: {'type': 'B', 'bitWidth': 32, 'count': 2, 'words': [2147483648, 2147483647]}
Explanation: The full 32-bit signed range requires width 32, so each value occupies one full word.
Input: ('B', [])
Expected Output: {'type': 'B', 'bitWidth': 0, 'count': 0, 'words': []}
Explanation: Edge case: empty bit-packed input.
Input: ('B', [7, 7, 7, 7, 7, 7, 7, 7, 7])
Expected Output: {'type': 'B', 'bitWidth': 4, 'count': 9, 'words': [2004318071, 7]}
Explanation: Nine 4-bit values need 36 bits, so the payload spills into a second word.
def solution(block_type, data):
def min_bits_signed(x):
if x >= 0:
return 1 if x == 0 else x.bit_length() + 1
return (~x).bit_length() + 1
def pack(values, w):
words = []
current = 0
used = 0
mask = (1 << w) - 1
for v in values:
u = v & mask
remaining = w
while remaining > 0:
space = 32 - used
take = remaining if remaining < space else space
part = u & ((1 << take) - 1)
current |= part << used
used += take
u >>= take
remaining -= take
if used == 32:
words.append(current & 0xFFFFFFFF)
current = 0
used = 0
if used > 0:
words.append(current & 0xFFFFFFFF)
return words
if block_type == 'R':
value, count = data
return {'type': 'R', 'value': value, 'count': count}
values = list(data)
if not values:
return {'type': 'B', 'bitWidth': 0, 'count': 0, 'words': []}
bit_width = max(min_bits_signed(v) for v in values)
return {'type': 'B', 'bitWidth': bit_width, 'count': len(values), 'words': pack(values, bit_width)}Time complexity: O(n + number_of_words), which is O(n) because each value uses at most 32 bits. Space complexity: O(number_of_words).
Input: ([], 3, 128)
Expected Output: []
Explanation: Edge case: empty input produces no blocks.
Input: ([1, 2, 3], 3, 128)
Expected Output: [{'type': 'B', 'bitWidth': 3, 'count': 3, 'words': [209]}]
Explanation: No run is long enough for RLE, so the whole sequence becomes one BP block.
Input: ([7, 7, 7, 7], 3, 128)
Expected Output: [{'type': 'R', 'value': 7, 'count': 4}]
Explanation: A maximal run of length 4 qualifies for RLE.
Input: ([5, 5, 5, 1, 2, 2, 3, 3, 3, 4], 3, 128)
Expected Output: [{'type': 'R', 'value': 5, 'count': 3}, {'type': 'B', 'bitWidth': 3, 'count': 3, 'words': [145]}, {'type': 'R', 'value': 3, 'count': 3}, {'type': 'B', 'bitWidth': 4, 'count': 1, 'words': [4]}]
Explanation: The 5s and 3s become RLE blocks; the short middle segment and final 4 become BP blocks.
Input: ([0, 0, 1, 1, 2, 2], 3, 4)
Expected Output: [{'type': 'B', 'bitWidth': 2, 'count': 4, 'words': [80]}, {'type': 'B', 'bitWidth': 3, 'count': 2, 'words': [18]}]
Explanation: No run reaches length 3, so everything is BP. max_bp_block = 4 forces the data to split into two BP blocks.
def solution(values, rle_min_run=3, max_bp_block=128):
from collections import deque
def min_bits_signed(x):
if x >= 0:
return 1 if x == 0 else x.bit_length() + 1
return (~x).bit_length() + 1
def pack(vals, w):
words = []
current = 0
used = 0
mask = (1 << w) - 1
for v in vals:
u = v & mask
remaining = w
while remaining > 0:
space = 32 - used
take = remaining if remaining < space else space
part = u & ((1 << take) - 1)
current |= part << used
used += take
u >>= take
remaining -= take
if used == 32:
words.append(current & 0xFFFFFFFF)
current = 0
used = 0
if used > 0:
words.append(current & 0xFFFFFFFF)
return words
def make_bp(chunk):
bit_width = max(min_bits_signed(v) for v in chunk)
return {'type': 'B', 'bitWidth': bit_width, 'count': len(chunk), 'words': pack(chunk, bit_width)}
blocks = []
buffer = deque()
def flush_full_chunks():
while len(buffer) >= max_bp_block:
chunk = [buffer.popleft() for _ in range(max_bp_block)]
blocks.append(make_bp(chunk))
def flush_all():
while buffer:
size = min(len(buffer), max_bp_block)
chunk = [buffer.popleft() for _ in range(size)]
blocks.append(make_bp(chunk))
i = 0
n = len(values)
while i < n:
j = i + 1
while j < n and values[j] == values[i]:
j += 1
run_len = j - i
if run_len >= rle_min_run:
flush_all()
blocks.append({'type': 'R', 'value': values[i], 'count': run_len})
else:
for k in range(i, j):
buffer.append(values[k])
flush_full_chunks()
i = j
flush_all()
return blocksTime complexity: O(n). Space complexity: O(max_bp_block + output_size).
Input: []
Expected Output: []
Explanation: Edge case: no blocks means no output values.
Input: [{'type': 'R', 'value': 5, 'count': 4}]
Expected Output: [5, 5, 5, 5]
Explanation: An RLE block expands into repeated values.
Input: [{'type': 'B', 'bitWidth': 3, 'count': 3, 'words': [209]}]
Expected Output: [1, 2, 3]
Explanation: The 3-bit values are packed as 001, 010, 011.
Input: [{'type': 'B', 'bitWidth': 32, 'count': 2, 'words': [2147483647, 2147483648]}, {'type': 'R', 'value': -1, 'count': 2}]
Expected Output: [2147483647, -2147483648, -1, -1]
Explanation: This checks both 32-bit signed decoding and iteration across different block types.
Input: [{'type': 'B', 'bitWidth': 5, 'count': 7, 'words': [4294967295, 7]}]
Expected Output: [-1, -1, -1, -1, -1, -1, -1]
Explanation: Seven 5-bit values cross the 32-bit boundary; every 5-bit pattern is 11111, which is -1.
def solution(blocks):
result = []
for block in blocks:
if block['type'] == 'R':
result.extend([block['value']] * block['count'])
continue
w = block['bitWidth']
count = block['count']
words = block['words']
if count == 0:
continue
mask = (1 << w) - 1
bit_pos = 0
for _ in range(count):
word_index = bit_pos // 32
offset = bit_pos % 32
if offset + w <= 32:
u = (words[word_index] >> offset) & mask
else:
low_bits = 32 - offset
high_bits = w - low_bits
low = (words[word_index] >> offset) & ((1 << low_bits) - 1)
high = words[word_index + 1] & ((1 << high_bits) - 1)
u = low | (high << low_bits)
sign_bit = 1 << (w - 1)
if u & sign_bit:
value = u - (1 << w)
else:
value = u
result.append(value)
bit_pos += w
return resultTime complexity: O(total_decoded_values). Space complexity: O(total_decoded_values).
Input: ([[], [1, 2, 3], [7, 7, 7, 7], [5, 5, 5, 1, 2, 2, 3, 3, 3], [2147483647, -2147483648, -1]], 3)
Expected Output: {'simple_bp': True, 'long_run_rle': True, 'alternating_switch': True, 'has_int_min': True, 'has_int_max': True, 'has_negative': True, 'empty_input': True, 'complete': True}
Explanation: This suite covers every required category.
Input: ([[1, 2], [3, 3, 3]], 3)
Expected Output: {'simple_bp': True, 'long_run_rle': True, 'alternating_switch': False, 'has_int_min': False, 'has_int_max': False, 'has_negative': False, 'empty_input': False, 'complete': False}
Explanation: The suite has BP-only and RLE-only cases, but no empty case, no boundary values, and no single sequence that switches strategies.
Input: ([[]], 3)
Expected Output: {'simple_bp': False, 'long_run_rle': False, 'alternating_switch': False, 'has_int_min': False, 'has_int_max': False, 'has_negative': False, 'empty_input': True, 'complete': False}
Explanation: Edge case: only the empty-input requirement is satisfied.
Input: ([[-1, -2], [4, 4, 4, 5, 6, 6, 6], [2147483647]], 3)
Expected Output: {'simple_bp': True, 'long_run_rle': True, 'alternating_switch': True, 'has_int_min': False, 'has_int_max': True, 'has_negative': True, 'empty_input': False, 'complete': False}
Explanation: This suite is close, but it still misses INT_MIN and an empty test.
Input: ([[], [1, 2], [3, 3, 3], [2147483647, -2147483648, -1]], 3)
Expected Output: {'simple_bp': True, 'long_run_rle': True, 'alternating_switch': False, 'has_int_min': True, 'has_int_max': True, 'has_negative': True, 'empty_input': True, 'complete': False}
Explanation: Even with all other categories present, alternating_switch is still missing because no single sequence uses both block types.
def solution(test_sequences, rle_min_run=3):
INT_MIN = -2147483648
INT_MAX = 2147483647
def analyze_sequence(seq):
has_r = False
has_b = False
i = 0
n = len(seq)
while i < n:
j = i + 1
while j < n and seq[j] == seq[i]:
j += 1
if j - i >= rle_min_run:
has_r = True
else:
has_b = True
i = j
return has_r, has_b
summary = {
'simple_bp': False,
'long_run_rle': False,
'alternating_switch': False,
'has_int_min': False,
'has_int_max': False,
'has_negative': False,
'empty_input': False,
'complete': False,
}
for seq in test_sequences:
if not seq:
summary['empty_input'] = True
if any(v == INT_MIN for v in seq):
summary['has_int_min'] = True
if any(v == INT_MAX for v in seq):
summary['has_int_max'] = True
if any(v < 0 for v in seq):
summary['has_negative'] = True
has_r, has_b = analyze_sequence(seq)
if seq and not has_r:
summary['simple_bp'] = True
if has_r:
summary['long_run_rle'] = True
if has_r and has_b:
summary['alternating_switch'] = True
summary['complete'] = all([
summary['simple_bp'],
summary['long_run_rle'],
summary['alternating_switch'],
summary['has_int_min'],
summary['has_int_max'],
summary['has_negative'],
summary['empty_input'],
])
return summaryTime complexity: O(total number of integers across the whole suite). Space complexity: O(1) extra space, excluding the returned dictionary.