Functional Scope (In-Scope)
- Advanced Job Execution: Schedule both one-time and interval-based recurring tasks.
- Exponential Backoff Retries: Delay failed executions exponentially (delay = base × 2^attempt).
- Dynamic Cancellations: Cancel scheduled tasks by their unique string identifiers thread-safely.
- Observer Monitoring Telemetry: Emit scheduling and execution state updates through registered observers.
Explicit Boundaries (Out-of-Scope)
- No Cluster Auto-scaling: Excludes automatic VM deployment to handle high CPU throughput.
- No Leader-Election Heartbeats: Bypasses distributed consensus nodes (e.g. Paxos / Raft consensus).
Clean reference designs demonstrating prioritized dispatch loops in Java and Python:
// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
interface JobObserver {
void onJobSubmitted(String jobId);
void onJobCompleted(String jobId);
void onJobFailed(String jobId, int attempt, String errorMsg);
void onJobRetried(String jobId, int attempt, long nextFireMs);
}
class Job {
private final String id;
private final Runnable task;
private final int priority;
private final int maxRetries;
private final long intervalMs; // 0 for one-time
public Job(String id, Runnable task, int priority, int maxRetries, long intervalMs) {
this.id = id;
this.task = task;
this.priority = priority;
this.maxRetries = maxRetries;
this.intervalMs = intervalMs;
}
public String getId() { return id; }
public Runnable getTask() { return task; }
public int getPriority() { return priority; }
public int getMaxRetries() { return maxRetries; }
public long getIntervalMs() { return intervalMs; }
}
class ScheduledJob implements Comparable<ScheduledJob> {
private final Job job;
private long nextFireTime;
private int retryCount = 0;
public ScheduledJob(Job job, long fireTime) {
this.job = job;
this.nextFireTime = fireTime;
}
public Job getJob() { return job; }
public long getNextFireTime() { return nextFireTime; }
public int getRetryCount() { return retryCount; }
public void delayForRetry(long baseMs) {
retryCount++;
long backoff = baseMs * (long) Math.pow(2, retryCount);
nextFireTime = System.currentTimeMillis() + backoff;
}
public void scheduleNextExecution() {
nextFireTime = System.currentTimeMillis() + job.getIntervalMs();
retryCount = 0; // Reset retry count for the next cycle
}
@Override
public int compareTo(ScheduledJob other) {
if (this.nextFireTime != other.nextFireTime) {
return Long.compare(this.nextFireTime, other.nextFireTime);
}
return Integer.compare(other.job.getPriority(), this.job.getPriority()); // Higher priority first
}
}
class JobScheduler {
private final PriorityQueue<ScheduledJob> queue = new PriorityQueue<>();
private final ReentrantLock lock = new ReentrantLock();
private final Condition entryAdded = lock.newCondition();
private final ThreadPoolExecutor workerPool = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()
);
private final List<JobObserver> observers = new CopyOnWriteArrayList<>();
private final Set<String> cancelledJobs = ConcurrentHashMap.newKeySet();
private Thread dispatcherThread;
private volatile boolean running = true;
public JobScheduler() {
dispatcherThread = new Thread(this::dispatcherLoop);
dispatcherThread.start();
}
public void addObserver(JobObserver obs) { observers.add(obs); }
public void schedule(Job job, long delayMs) {
lock.lock();
try {
long fireTime = System.currentTimeMillis() + delayMs;
queue.add(new ScheduledJob(job, fireTime));
for (JobObserver obs : observers) obs.onJobSubmitted(job.getId());
entryAdded.signal();
} finally {
lock.unlock();
}
}
public void cancel(String jobId) {
cancelledJobs.add(jobId);
}
private void dispatcherLoop() {
while (running) {
lock.lock();
try {
while (queue.isEmpty() && running) {
entryAdded.await();
}
if (!running) break;
long now = System.currentTimeMillis();
ScheduledJob head = queue.peek();
if (head.getNextFireTime() <= now) {
ScheduledJob sj = queue.poll();
if (!cancelledJobs.contains(sj.getJob().getId())) {
workerPool.submit(() -> executeJob(sj));
} else {
cancelledJobs.remove(sj.getJob().getId());
}
} else {
long sleepMs = head.getNextFireTime() - now;
entryAdded.await(sleepMs, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} finally {
lock.unlock();
}
}
}
private void executeJob(ScheduledJob sj) {
Job job = sj.getJob();
try {
job.getTask().run();
for (JobObserver obs : observers) obs.onJobCompleted(job.getId());
// Handle recurring jobs
if (job.getIntervalMs() > 0) {
sj.scheduleNextExecution();
lock.lock();
try {
queue.add(sj);
entryAdded.signal();
} finally {
lock.unlock();
}
}
} catch (Exception e) {
for (JobObserver obs : observers) {
obs.onJobFailed(job.getId(), sj.getRetryCount() + 1, e.getMessage());
}
if (sj.getRetryCount() < job.getMaxRetries()) {
sj.delayForRetry(1000);
for (JobObserver obs : observers) {
obs.onJobRetried(job.getId(), sj.getRetryCount(), sj.getNextFireTime());
}
lock.lock();
try {
queue.add(sj);
entryAdded.signal();
} finally {
lock.unlock();
}
}
}
}
public void shutdown() {
running = false;
lock.lock();
try {
entryAdded.signalAll();
} finally {
lock.unlock();
}
workerPool.shutdown();
}
}