We’re designing a lightweight task scheduler for a single-machine application. The scheduler should execute functions at specific times and optionally repeat them.
Tasks may run once at a scheduled time, repeatedly with a fixed interval or repeatedly with a delay after completion
To keep the scope focused It will be in-memory only on a single machine with no persistence, no distributed coordination. The tasks execute concurrently using worker threads
The API is minimal:
add(fn, name, start_time, interval=None, delay=None)start()
Here are some initial ideas that you might have:
Busy polling
Continuously scan the entire task list looking for runnable tasks. The issues are CPU waste, poor scalability as tasks grow and unnecessary wakeups
One thread per task
Create a thread that sleeps until execution time for a particular task. The issues are thread explosion, memory overhead and no centralised scheduling control
What must always be true
Before designing structures, define correctness rules.
- Tasks execute no earlier than their scheduled time
- The earliest scheduled task runs first
- Concurrent task execution must not corrupt scheduler state
- Recurring tasks must reschedule themselves correctly
- Task scheduling must remain efficient as task count grows
Choosing the right way to store
Scheduling always depends on which task should run next. The natural model is tasks are ordered by start_time. The earliest task must be retrieved efficiently
A min heap provides exactly this behavior. Inserting a task is O(log n), Retrieve earliest task is O(1) and remove earliest task → O(log n). The heap always keeps the next executable task at the root.
State Model
Task
- fn: Callable
- name: str
- start_time: datetime
- interval: Optional[int]
- delay: Optional[int]
TaskScheduler
- tasks: List[Task] (min heap)
- lock: Lock
- max_workers: int
Task
Represents a scheduled unit of work. The responsibilities are to store execution metadata, determine when execution is allowed and produce the next task if recurring
The next execution time for recurring tasks follows two scheduling models:
-
Interval tasks: The next run time is computed relative to the previous scheduled start time. Next run = Current start time + Interval
-
Delay tasks: The next run time is computed relative to the completion time of the current execution. Next run = Current time after completion + Interval
Also I’m not using monotonic time which will be more reliable than wall clocks here for the sake of simplicity
TaskScheduler
It is responsible for maintaining the task heap, retrieving runnable tasks, submitting execution to worker threads and rescheduling recurring tasks
Concurrency Design
You have to think of concurrency from two points. Scheduler thread and worker threads. The scheduler manages task ordering while workers execute jobs.
Heap lock
The task heap is shared state accessed by both the scheduler thread and worker threads. Without synchronization, concurrent push and pop operations could corrupt the heap structure and introduce race conditions.
A lock ensures that all heap operations are performed atomically, so modifications occur in a serialized manner regardless of whether they originate from the scheduler loop or worker threads.
Sleeping strategy
The scheduler avoids busy polling. Instead it sleeps exactly until the next task. This guarantees minimal CPU usage.
Algorithm:
- Look at the task at the top of the heap (the earliest scheduled task).
- Compute the time difference between the task’s
start_timeand the current time. - Sleep for that duration.
- When the scheduler wakes up, check the heap again and execute any task whose scheduled time has arrived.
Minimal Class Diagram
classDiagram
class Task {
fn: Callable
name: str
start_time: datetime
interval: int
delay: int
execute()
is_scheduled_time()
next_interval_task()
next_delay_task()
}
class TaskScheduler {
tasks: List~Task~
add()
start()
}
TaskScheduler o-- Task
Implementation (Python)
import time
import heapq
from threading import Lock
from typing import Callable, Optional
from datetime import UTC, datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
class Task:
def __init__(
self,
fn: Callable,
name: str,
start_time: datetime,
interval: Optional[int] = None,
delay: Optional[int] = None,
) -> None:
if interval and delay:
raise ValueError("Can't do both interval and delay on same task")
self.fn: Callable = fn
self.name = name
self.start_time = start_time
self.interval = interval
self.delay = delay
def next_interval_task(self) -> "Task | None":
if self.interval:
return Task(
fn=self.fn,
name=self.name,
start_time=self.start_time + timedelta(seconds=self.interval),
interval=self.interval,
delay=None,
)
def next_delay_task(self) -> "Task | None":
if self.delay:
return Task(
fn=self.fn,
name=self.name,
start_time=datetime.now(UTC) + timedelta(seconds=self.delay),
interval=None,
delay=self.delay,
)
def execute(self):
self.fn()
def is_scheduled_time(self) -> bool:
return self.start_time <= datetime.now(UTC)
def __lt__(self, other):
if not isinstance(other, Task):
return NotImplemented
return self.start_time < other.start_time
class TaskScheduler:
def __init__(self, max_workers: int = 5) -> None:
self.max_workers = max_workers
self.tasks: list[Task] = []
self.lock = Lock()
self.running = False
def _add_task(self, task: Task):
with self.lock:
heapq.heappush(self.tasks, task)
def _get_next_scheduled_task(self) -> Task | None:
with self.lock:
potential_task: Task | None = self.tasks[0] if self.tasks else None
next_task = potential_task if potential_task and potential_task.is_scheduled_time() else None
if next_task:
heapq.heappop(self.tasks)
return next_task
return None
def _run_job(self, task: Task):
next_task = task.next_interval_task()
if next_task:
self._add_task(task=next_task)
print(f"Executing task - {task.name}")
task.execute()
print(f"Finished executing task - {task.name}")
next_task = task.next_delay_task()
if next_task:
self._add_task(task=next_task)
def _get_sleep_time(self):
with self.lock:
if not self.tasks:
return 1
next_task = self.tasks[0]
now = datetime.now(UTC)
diff = (next_task.start_time - now).total_seconds()
return max(diff, 0)
def add(
self,
fn: Callable,
name: str,
start_time: datetime,
interval: Optional[int] = None,
delay: Optional[int] = None,
):
t = Task(
fn=fn,
name=name,
start_time=start_time,
interval=interval,
delay=delay
)
self._add_task(t)
def start(self):
with ThreadPoolExecutor(max_workers=self.max_workers) as t:
while True:
next_task = self._get_next_scheduled_task()
if next_task:
t.submit(self._run_job, next_task)
else:
time.sleep(self._get_sleep_time())
Limits and Extensions
Task persistence
Current design loses tasks on restart.Production systems typically use a database or persistent queues
Task prioritisation
Currently tasks are ordered only by time. Extensions could include (priority, start_time) allowing urgent tasks to run first