Problem
You are given a multithreaded task runner. Each task has:
-
task_id: int
-
task_type: "gen" | "score"
-
duration: int
(seconds)
Each task runs in its own thread and calls a shared lock before and after doing work.
from dataclasses import dataclass
from typing import Literal
@dataclass
class SampleTask:
task_id: int
task_type: Literal["gen", "score"]
duration: int
class TaskLock:
def acquire(self, task: SampleTask) -> None:
raise NotImplementedError
def release(self, task: SampleTask) -> None:
raise NotImplementedError
The task runner does:
-
lock.acquire(task)
-
sleeps for
task.duration
-
lock.release(task)
Task
Implement a MutexTaskLock(TaskLock) such that:
-
Multiple tasks of the
same
task_type
may run
concurrently
.
-
Tasks of
different
task_type
must
not
run at the same time (i.e., if any
gen
task is running, no
score
task may run, and vice versa).
-
The implementation must be thread-safe and avoid deadlocks.
You may use Python threading primitives such as threading.Lock, threading.Condition, or semaphores.
Clarifications / Expected behavior
-
If no tasks are running, either type may acquire the lock.
-
If tasks of one type are running, tasks of the other type must block until all running tasks of the current type finish and release.
-
It is acceptable for waiting tasks to wake up and re-check conditions (i.e., use a loop around
wait
).