Functional Scope (In-Scope)
- DAG Execution Engine: Run a directed acyclic graph (DAG) of steps utilizing topological sorting algorithms.
- Parallel Execution Branches: Automatically execute steps in parallel once all parent dependencies have registered success.
- Saga Compensation Rollbacks: On execution failure, trigger compensating actions for all completed steps in reverse execution order.
- ExecutionContext State: Pass thread-safe contextual outputs across parallel nodes, allowing downstream steps to consume previous outputs.
Explicit Boundaries (Out-of-Scope)
- No Native Kubernetes Pod Launchers: Bypasses containerization, Kubernetes pod spawning, or low-level virtual machine allocation.
- No Distributed Network Consensus: Excludes distributed leader election or multi-node workflow shard management.
Clean reference designs demonstrating topological scheduling in Java and Python:
// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
enum StepStatus { PENDING, RUNNING, COMPLETED, FAILED, COMPENSATED }
// Abstract class defining execution, retry counts, and Saga rollback action
abstract class Step {
private final String id;
private final int maxRetries;
public Step(String id, int maxRetries) {
this.id = id;
this.maxRetries = maxRetries;
}
public String getId() { return id; }
public int getMaxRetries() { return maxRetries; }
public abstract boolean execute(Map<String, Object> context) throws Exception;
public abstract void compensate(Map<String, Object> context);
}
// A concrete implementation of a BaseStep with a mock execution function
class PaymentStep extends Step {
private final boolean shouldFail;
public PaymentStep(String id, int maxRetries, boolean shouldFail) {
super(id, maxRetries);
this.shouldFail = shouldFail;
}
@Override
public boolean execute(Map<String, Object> context) throws Exception {
System.out.println("[STEP] Executing PaymentStep: " + getId());
if (shouldFail) {
throw new RuntimeException("Payment processor unavailable.");
}
context.put("payment.txId", "TX-" + UUID.randomUUID().toString().substring(0, 8));
return true;
}
@Override
public void compensate(Map<String, Object> context) {
System.out.println("[COMPENSATION] Refunding transaction via PaymentStep: " + getId() +
" | TxId: " + context.get("payment.txId"));
}
}
class InventoryStep extends Step {
private final boolean shouldFail;
public InventoryStep(String id, int maxRetries, boolean shouldFail) {
super(id, maxRetries);
this.shouldFail = shouldFail;
}
@Override
public boolean execute(Map<String, Object> context) throws Exception {
System.out.println("[STEP] Executing InventoryStep: " + getId());
if (shouldFail) {
throw new RuntimeException("Inventory item out of stock.");
}
context.put("inventory.reserved", true);
return true;
}
@Override
public void compensate(Map<String, Object> context) {
System.out.println("[COMPENSATION] Releasing inventory reservations in InventoryStep: " + getId());
}
}
class OrderStep extends Step {
public OrderStep(String id, int maxRetries) {
super(id, maxRetries);
}
@Override
public boolean execute(Map<String, Object> context) {
System.out.println("[STEP] Executing OrderStep: " + getId());
context.put("order.registered", true);
return true;
}
@Override
public void compensate(Map<String, Object> context) {
System.out.println("[COMPENSATION] Deleting order registration from OrderStep: " + getId());
}
}
class ShipmentStep extends Step {
public ShipmentStep(String id, int maxRetries) {
super(id, maxRetries);
}
@Override
public boolean execute(Map<String, Object> context) {
System.out.println("[STEP] Executing ShipmentStep: " + getId());
context.put("shipment.carrier", "FedEx");
return true;
}
@Override
public void compensate(Map<String, Object> context) {
System.out.println("[COMPENSATION] Canceling shipment request via ShipmentStep: " + getId());
}
}
// Global parallel execution manager
class DAGScheduler {
private final Map<String, Step> steps = new ConcurrentHashMap<>();
private final Map<String, List<String>> adjList = new ConcurrentHashMap<>();
private final Map<String, List<String>> reverseAdjList = new ConcurrentHashMap<>();
private final Map<String, AtomicInteger> inDegree = new ConcurrentHashMap<>();
private final List<String> completedSteps = new CopyOnWriteArrayList<>();
private final Map<String, StepStatus> stepStatus = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newFixedThreadPool(4);
private final ReentrantLock schedulerLock = new ReentrantLock();
private boolean workflowFailed = false;
public void addStep(Step step) {
steps.put(step.getId(), step);
inDegree.putIfAbsent(step.getId(), new AtomicInteger(0));
stepStatus.put(step.getId(), StepStatus.PENDING);
}
public void addDependency(String parentId, String childId) {
adjList.computeIfAbsent(parentId, k -> new CopyOnWriteArrayList<>()).add(childId);
reverseAdjList.computeIfAbsent(childId, k -> new CopyOnWriteArrayList<>()).add(parentId);
inDegree.computeIfAbsent(childId, k -> new AtomicInteger(0)).incrementAndGet();
}
public boolean executeWorkflow() {
System.out.println("=== Starting Workflow DAG Execution ===");
Map<String, Object> context = new ConcurrentHashMap<>();
// Find roots (in-degree == 0)
List<String> roots = new ArrayList<>();
for (String id : steps.keySet()) {
if (inDegree.get(id).get() == 0) {
roots.add(id);
}
}
CountDownLatch latch = new CountDownLatch(steps.size());
for (String root : roots) {
submitStep(root, context, latch);
}
try {
// Wait for completion (or timeout)
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println("Workflow execution timed out or was interrupted.");
workflowFailed = true;
}
if (workflowFailed) {
System.out.println("\n[SAGA TRIGGERED] Workflow failed. Executing Compensation rollback...");
// Execute compensation in reverse order of completion
for (int i = completedSteps.size() - 1; i >= 0; i--) {
String stepId = completedSteps.get(i);
Step step = steps.get(stepId);
try {
step.compensate(context);
stepStatus.put(stepId, StepStatus.COMPENSATED);
} catch (Exception e) {
System.out.println("Compensation failed for step: " + stepId);
}
}
executor.shutdown();
return false;
} else {
System.out.println("\n[SUCCESS] Workflow execution succeeded perfectly!");
executor.shutdown();
return true;
}
}
private void submitStep(String stepId, Map<String, Object> context, CountDownLatch latch) {
executor.submit(() -> {
Step step = steps.get(stepId);
stepStatus.put(stepId, StepStatus.RUNNING);
boolean success = false;
int attempts = 0;
while (attempts <= step.getMaxRetries() && !workflowFailed) {
attempts++;
try {
success = step.execute(context);
if (success) break;
} catch (Exception e) {
System.out.println(" [RETRY] Step " + stepId + " failed attempt " + attempts + ": " + e.getMessage());
}
}
if (success && !workflowFailed) {
stepStatus.put(stepId, StepStatus.COMPLETED);
completedSteps.add(stepId);
latch.countDown();
// Process downstream steps
List<String> children = adjList.get(stepId);
if (children != null) {
for (String child : children) {
int remainingDependencies = inDegree.get(child).decrementAndGet();
if (remainingDependencies == 0) {
submitStep(child, context, latch);
}
}
}
} else {
schedulerLock.lock();
try {
if (!workflowFailed) {
workflowFailed = true;
stepStatus.put(stepId, StepStatus.FAILED);
System.out.println("[FAILED] Step " + stepId + " permanently failed. Canceling running branches.");
}
} finally {
schedulerLock.unlock();
}
// Force latch reduction to unblock main thread
long remainingCount = latch.getCount();
for (int i = 0; i < remainingCount; i++) {
latch.countDown();
}
}
});
}
}
public class Main {
public static void main(String[] args) {
System.out.println("--- Scenario 1: Successful DAG execution ---");
DAGScheduler successScheduler = new DAGScheduler();
Step order = new OrderStep("order", 1);
Step payment = new PaymentStep("payment", 2, false);
Step inventory = new InventoryStep("inventory", 1, false);
Step shipment = new ShipmentStep("shipment", 1);
successScheduler.addStep(order);
successScheduler.addStep(payment);
successScheduler.addStep(inventory);
successScheduler.addStep(shipment);
// Define Dependency Graph: order -> payment & inventory (parallel) -> shipment
successScheduler.addDependency("order", "payment");
successScheduler.addDependency("order", "inventory");
successScheduler.addDependency("payment", "shipment");
successScheduler.addDependency("inventory", "shipment");
successScheduler.executeWorkflow();
System.out.println("\n--- Scenario 2: Failed DAG execution with Saga compensation rollback ---");
DAGScheduler failureScheduler = new DAGScheduler();
Step orderFail = new OrderStep("order", 1);
Step paymentFail = new PaymentStep("payment", 2, false);
// Inventory step configured to FAIL
Step inventoryFail = new InventoryStep("inventory", 2, true);
Step shipmentFail = new ShipmentStep("shipment", 1);
failureScheduler.addStep(orderFail);
failureScheduler.addStep(paymentFail);
failureScheduler.addStep(inventoryFail);
failureScheduler.addStep(shipmentFail);
failureScheduler.addDependency("order", "payment");
failureScheduler.addDependency("order", "inventory");
failureScheduler.addDependency("payment", "shipment");
failureScheduler.addDependency("inventory", "shipment");
failureScheduler.executeWorkflow();
}
}