Implement multi-level task manager APIs
Company: Ramp
Role: Software Engineer
Category: Coding & Algorithms
Difficulty: medium
Interview Round: Take-home Project
Quick Answer: This question evaluates implementation skills for stateful in-memory data structures, API evolution handling, substring search and prioritized sorting, and time-windowed assignment logic with quota enforcement in the Coding & Algorithms domain, and is primarily a practical application problem rather than a purely conceptual one.
Part 1: Task CRUD Simulator
Constraints
- 0 <= len(operations) <= 10000
- Timestamps are non-decreasing, but they do not affect Level 1 behavior
- 0 <= priority <= 10^9
- Task IDs must be generated exactly as task_id_1, task_id_2, ...
Examples
Input: [('add', 1, 'write', 3), ('get', 2, 'task_id_1'), ('update', 3, 'task_id_1', 'write docs', 5), ('get', 4, 'task_id_1')]
Expected Output: ['task_id_1', '{"name":"write","priority":3}', True, '{"name":"write docs","priority":5}']
Explanation: Create one task, read it, update it, then read the updated version.
Input: [('add', 1, 'bug', 1), ('add', 2, 'bug', 1), ('get', 3, 'task_id_2')]
Expected Output: ['task_id_1', 'task_id_2', '{"name":"bug","priority":1}']
Explanation: Tasks with identical fields are still different because IDs are unique.
Input: [('update', 1, 'task_id_9', 'x', 1), ('get', 2, 'task_id_9')]
Expected Output: [False, None]
Explanation: Updating or reading a missing task should fail cleanly.
Input: []
Expected Output: []
Explanation: Edge case: no operations.
Solution
def solution(operations):
import json
tasks = {}
next_id = 1
result = []
for op in operations:
kind = op[0]
if kind == 'add':
_, timestamp, name, priority = op
task_id = f'task_id_{next_id}'
next_id += 1
tasks[task_id] = {'name': name, 'priority': priority}
result.append(task_id)
elif kind == 'update':
_, timestamp, task_id, name, priority = op
if task_id in tasks:
tasks[task_id]['name'] = name
tasks[task_id]['priority'] = priority
result.append(True)
else:
result.append(False)
elif kind == 'get':
_, timestamp, task_id = op
if task_id in tasks:
task = tasks[task_id]
result.append(json.dumps({'name': task['name'], 'priority': task['priority']}, separators=(',', ':')))
else:
result.append(None)
else:
raise ValueError('Unknown operation: ' + str(kind))
return resultTime complexity: O(n), where n is the number of operations. Space complexity: O(t), where t is the number of created tasks.
Hints
- Use a dictionary keyed by task_id so update and get are O(1).
- Keep a counter for the next numeric suffix, and use a JSON serializer to produce the exact string format.
Part 2: Search and Sort Tasks
Constraints
- 0 <= len(operations) <= 2000
- 0 <= priority <= 10^9
- Timestamps are non-decreasing, but they do not affect sorting logic here
- Substring matching is case-sensitive
- Sorting rule: higher priority first, then earlier creation first
Examples
Input: [('add', 1, 'alpha', 5), ('add', 2, 'beta', 2), ('add', 3, 'alphabet', 5), ('search', 4, 'alpha', 5), ('list', 5, 2)]
Expected Output: ['task_id_1', 'task_id_2', 'task_id_3', ['task_id_1', 'task_id_3'], ['task_id_1', 'task_id_3']]
Explanation: Both matching tasks have priority 5, so creation order breaks the tie.
Input: [('add', 1, 'Task', 4), ('add', 2, 'task', 4), ('search', 3, 'Task', 10), ('search', 4, 'task', 10)]
Expected Output: ['task_id_1', 'task_id_2', ['task_id_1'], ['task_id_2']]
Explanation: Search is case-sensitive.
Input: [('add', 1, 'x', 1), ('search', 2, 'x', 0), ('list', 3, 0), ('search', 4, '', -2)]
Expected Output: ['task_id_1', [], [], []]
Explanation: Edge cases with non-positive limits should return empty lists.
Input: [('add', 1, 'ab', 1), ('add', 2, 'cd', 5), ('search', 3, 'z', 3), ('list', 4, 5)]
Expected Output: ['task_id_1', 'task_id_2', [], ['task_id_2', 'task_id_1']]
Explanation: No search matches, and the global list is sorted by priority descending.
Solution
def solution(operations):
tasks = []
next_id = 1
result = []
for op in operations:
kind = op[0]
if kind == 'add':
_, timestamp, name, priority = op
task_id = f'task_id_{next_id}'
next_id += 1
tasks.append({
'id': task_id,
'name': name,
'priority': priority,
'order': len(tasks)
})
result.append(task_id)
elif kind == 'search':
_, timestamp, name_filter, max_results = op
if max_results <= 0:
result.append([])
continue
matches = [task for task in tasks if name_filter in task['name']]
matches.sort(key=lambda task: (-task['priority'], task['order']))
result.append([task['id'] for task in matches[:max_results]])
elif kind == 'list':
_, timestamp, limit = op
if limit <= 0:
result.append([])
continue
ordered = sorted(tasks, key=lambda task: (-task['priority'], task['order']))
result.append([task['id'] for task in ordered[:limit]])
else:
raise ValueError('Unknown operation: ' + str(kind))
return resultTime complexity: Worst-case O(n^2 log n) over all operations, because each search/list may sort the current tasks. Space complexity: O(t), where t is the number of created tasks.
Hints
- Store a creation index when each task is added.
- A sort key like (-priority, creation_index) directly matches the required order.
Part 3: Users and Time-Windowed Assignments
Constraints
- 0 <= len(operations) <= 2000
- Timestamps are non-decreasing
- 0 <= priority <= 10^9
- 0 <= quota <= 10^9
- Every assign operation satisfies finish_time > t
Examples
Input: [('add_task', 0, 'A', 1), ('add_task', 1, 'B', 2), ('add_user', 2, 'alice', 1), ('assign', 3, 'task_id_1', 'alice', 10), ('assign', 4, 'task_id_2', 'alice', 8), ('get_user_tasks', 5, 'alice'), ('get_user_tasks', 10, 'alice')]
Expected Output: ['task_id_1', 'task_id_2', True, True, False, ['task_id_1'], []]
Explanation: Alice's quota is 1, so the second assignment fails while the first one is still active. The interval is half-open, so the task is not active at time 10.
Input: [('add_task', 0, 'A', 1), ('add_task', 1, 'B', 1), ('add_user', 2, 'u', 1), ('assign', 2, 'task_id_1', 'u', 3), ('assign', 3, 'task_id_2', 'u', 6), ('get_user_tasks', 3, 'u')]
Expected Output: ['task_id_1', 'task_id_2', True, True, True, ['task_id_2']]
Explanation: At time 3, the first assignment has already expired, so quota is available for the second one.
Input: [('add_user', 0, 'u', 2), ('add_user', 1, 'u', 5), ('assign', 2, 'task_id_1', 'u', 4), ('get_user_tasks', 2, 'ghost')]
Expected Output: [True, False, False, []]
Explanation: Duplicate users are rejected, assigning a missing task fails, and querying an unknown user returns an empty list.
Input: []
Expected Output: []
Explanation: Edge case: no operations.
Solution
def solution(operations):
tasks = {}
users = {}
assignments = []
next_id = 1
result = []
for op in operations:
kind = op[0]
if kind == 'add_task':
_, t, name, priority = op
task_id = f'task_id_{next_id}'
next_id += 1
tasks[task_id] = {'name': name, 'priority': priority}
result.append(task_id)
elif kind == 'add_user':
_, t, user_id, quota = op
if user_id in users:
result.append(False)
else:
users[user_id] = quota
result.append(True)
elif kind == 'assign':
_, t, task_id, user_id, finish_time = op
if task_id not in tasks or user_id not in users:
result.append(False)
continue
active_count = 0
for assignment in assignments:
if assignment['user_id'] == user_id and assignment['start'] <= t < assignment['finish']:
active_count += 1
if active_count >= users[user_id]:
result.append(False)
else:
assignments.append({
'task_id': task_id,
'user_id': user_id,
'start': t,
'finish': finish_time
})
result.append(True)
elif kind == 'get_user_tasks':
_, t, user_id = op
if user_id not in users:
result.append([])
else:
active_tasks = []
for assignment in assignments:
if assignment['user_id'] == user_id and assignment['start'] <= t < assignment['finish']:
active_tasks.append(assignment['task_id'])
result.append(active_tasks)
else:
raise ValueError('Unknown operation: ' + str(kind))
return resultTime complexity: O(n^2) worst-case over all operations, because each assign/query may scan earlier assignments. Space complexity: O(t + u + a), for tasks, users, and assignments.
Hints
- To check whether an assignment fits, count how many assignments for that user satisfy start <= t < finish at the assignment timestamp.
- If you store assignment records in insertion order, you can scan them later to build get_user_tasks in the required order.
Part 4: Task Completion and Overdue Detection
Constraints
- 0 <= len(operations) <= 2000
- Timestamps are non-decreasing
- 0 <= priority <= 10^9
- 0 <= quota <= 10^9
- Every assign operation satisfies finish_time > t
Examples
Input: [('add_task', 0, 'A', 1), ('add_task', 1, 'B', 2), ('add_user', 1, 'alice', 1), ('assign', 2, 'task_id_1', 'alice', 5), ('complete', 3, 'task_id_1', 'alice'), ('assign', 3, 'task_id_2', 'alice', 6), ('overdue', 10, 'alice')]
Expected Output: ['task_id_1', 'task_id_2', True, True, True, True, ['task_id_2']]
Explanation: Completing task_id_1 at time 3 frees Alice's only slot immediately, so task_id_2 can be assigned at the same timestamp. The second task later becomes overdue.
Input: [('add_task', 0, 'A', 1), ('add_user', 0, 'u', 1), ('assign', 1, 'task_id_1', 'u', 4), ('complete', 4, 'task_id_1', 'u'), ('overdue', 4, 'u')]
Expected Output: ['task_id_1', True, True, False, ['task_id_1']]
Explanation: The assignment is active on [1, 4), so completion at time 4 fails and the task is already overdue at time 4.
Input: [('add_user', 0, 'u', 1), ('complete', 1, 'task_id_1', 'u'), ('overdue', 2, 'ghost')]
Expected Output: [True, False, []]
Explanation: Completing a missing task fails, and querying overdue tasks for an unknown user returns an empty list.
Input: []
Expected Output: []
Explanation: Edge case: no operations.
Solution
def solution(operations):
tasks = {}
users = {}
assignments = []
next_id = 1
result = []
def is_active(assignment, t):
return (
assignment['start'] <= t < assignment['finish']
and (assignment['completed_at'] is None or t < assignment['completed_at'])
)
for op in operations:
kind = op[0]
if kind == 'add_task':
_, t, name, priority = op
task_id = f'task_id_{next_id}'
next_id += 1
tasks[task_id] = {'name': name, 'priority': priority}
result.append(task_id)
elif kind == 'add_user':
_, t, user_id, quota = op
if user_id in users:
result.append(False)
else:
users[user_id] = quota
result.append(True)
elif kind == 'assign':
_, t, task_id, user_id, finish_time = op
if task_id not in tasks or user_id not in users:
result.append(False)
continue
active_count = 0
for assignment in assignments:
if assignment['user_id'] == user_id and is_active(assignment, t):
active_count += 1
if active_count >= users[user_id]:
result.append(False)
else:
assignments.append({
'task_id': task_id,
'user_id': user_id,
'start': t,
'finish': finish_time,
'completed_at': None
})
result.append(True)
elif kind == 'complete':
_, t, task_id, user_id = op
if task_id not in tasks or user_id not in users:
result.append(False)
continue
done = False
for assignment in assignments:
if assignment['task_id'] == task_id and assignment['user_id'] == user_id and is_active(assignment, t):
assignment['completed_at'] = t
done = True
break
result.append(done)
elif kind == 'overdue':
_, t, user_id = op
if user_id not in users:
result.append([])
else:
overdue_tasks = []
for assignment in assignments:
if (
assignment['user_id'] == user_id
and assignment['finish'] <= t
and (assignment['completed_at'] is None or assignment['completed_at'] >= assignment['finish'])
):
overdue_tasks.append(assignment['task_id'])
result.append(overdue_tasks)
else:
raise ValueError('Unknown operation: ' + str(kind))
return resultTime complexity: O(n^2) worst-case over all operations, because assignment, completion, and overdue queries may scan earlier assignments. Space complexity: O(t + u + a), for tasks, users, and assignments.
Hints
- Store completion time per assignment. An assignment is active at time t only if it has started, not finished, and t is strictly earlier than its completion time.
- Overdue is a property of each assignment record, not just of a task ID, so do not merge repeated assignments.