Machine Coding Problem

Workflow Engine

maco60macoAllinfrastructuredag-executioncallback
Commonly Asked By:TemporalUberNetflixStripe

Functional Scope (In-Scope)

  • DAG Execution Engine: Run a directed acyclic graph (DAG) of steps utilizing topological sorting algorithms.
  • Parallel Execution Branches: Automatically execute steps in parallel once all parent dependencies have registered success.
  • Saga Compensation Rollbacks: On execution failure, trigger compensating actions for all completed steps in reverse execution order.
  • ExecutionContext State: Pass thread-safe contextual outputs across parallel nodes, allowing downstream steps to consume previous outputs.

Explicit Boundaries (Out-of-Scope)

  • No Native Kubernetes Pod Launchers: Bypasses containerization, Kubernetes pod spawning, or low-level virtual machine allocation.
  • No Distributed Network Consensus: Excludes distributed leader election or multi-node workflow shard management.

Clean reference designs demonstrating topological scheduling in Java and Python:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

enum StepStatus { PENDING, RUNNING, COMPLETED, FAILED, COMPENSATED }

// Abstract class defining execution, retry counts, and Saga rollback action
abstract class Step {
    private final String id;
    private final int maxRetries;

    public Step(String id, int maxRetries) {
        this.id = id;
        this.maxRetries = maxRetries;
    }

    public String getId() { return id; }
    public int getMaxRetries() { return maxRetries; }

    public abstract boolean execute(Map<String, Object> context) throws Exception;
    public abstract void compensate(Map<String, Object> context);
}

// A concrete implementation of a BaseStep with a mock execution function
class PaymentStep extends Step {
    private final boolean shouldFail;

    public PaymentStep(String id, int maxRetries, boolean shouldFail) {
        super(id, maxRetries);
        this.shouldFail = shouldFail;
    }

    @Override
    public boolean execute(Map<String, Object> context) throws Exception {
        System.out.println("[STEP] Executing PaymentStep: " + getId());
        if (shouldFail) {
            throw new RuntimeException("Payment processor unavailable.");
        }
        context.put("payment.txId", "TX-" + UUID.randomUUID().toString().substring(0, 8));
        return true;
    }

    @Override
    public void compensate(Map<String, Object> context) {
        System.out.println("[COMPENSATION] Refunding transaction via PaymentStep: " + getId() + 
            " | TxId: " + context.get("payment.txId"));
    }
}

class InventoryStep extends Step {
    private final boolean shouldFail;

    public InventoryStep(String id, int maxRetries, boolean shouldFail) {
        super(id, maxRetries);
        this.shouldFail = shouldFail;
    }

    @Override
    public boolean execute(Map<String, Object> context) throws Exception {
        System.out.println("[STEP] Executing InventoryStep: " + getId());
        if (shouldFail) {
            throw new RuntimeException("Inventory item out of stock.");
        }
        context.put("inventory.reserved", true);
        return true;
    }

    @Override
    public void compensate(Map<String, Object> context) {
        System.out.println("[COMPENSATION] Releasing inventory reservations in InventoryStep: " + getId());
    }
}

class OrderStep extends Step {
    public OrderStep(String id, int maxRetries) {
        super(id, maxRetries);
    }

    @Override
    public boolean execute(Map<String, Object> context) {
        System.out.println("[STEP] Executing OrderStep: " + getId());
        context.put("order.registered", true);
        return true;
    }

    @Override
    public void compensate(Map<String, Object> context) {
        System.out.println("[COMPENSATION] Deleting order registration from OrderStep: " + getId());
    }
}

class ShipmentStep extends Step {
    public ShipmentStep(String id, int maxRetries) {
        super(id, maxRetries);
    }

    @Override
    public boolean execute(Map<String, Object> context) {
        System.out.println("[STEP] Executing ShipmentStep: " + getId());
        context.put("shipment.carrier", "FedEx");
        return true;
    }

    @Override
    public void compensate(Map<String, Object> context) {
        System.out.println("[COMPENSATION] Canceling shipment request via ShipmentStep: " + getId());
    }
}

// Global parallel execution manager
class DAGScheduler {
    private final Map<String, Step> steps = new ConcurrentHashMap<>();
    private final Map<String, List<String>> adjList = new ConcurrentHashMap<>();
    private final Map<String, List<String>> reverseAdjList = new ConcurrentHashMap<>();
    private final Map<String, AtomicInteger> inDegree = new ConcurrentHashMap<>();
    private final List<String> completedSteps = new CopyOnWriteArrayList<>();
    private final Map<String, StepStatus> stepStatus = new ConcurrentHashMap<>();
    
    private final ExecutorService executor = Executors.newFixedThreadPool(4);
    private final ReentrantLock schedulerLock = new ReentrantLock();
    private boolean workflowFailed = false;

    public void addStep(Step step) {
        steps.put(step.getId(), step);
        inDegree.putIfAbsent(step.getId(), new AtomicInteger(0));
        stepStatus.put(step.getId(), StepStatus.PENDING);
    }

    public void addDependency(String parentId, String childId) {
        adjList.computeIfAbsent(parentId, k -> new CopyOnWriteArrayList<>()).add(childId);
        reverseAdjList.computeIfAbsent(childId, k -> new CopyOnWriteArrayList<>()).add(parentId);
        inDegree.computeIfAbsent(childId, k -> new AtomicInteger(0)).incrementAndGet();
    }

    public boolean executeWorkflow() {
        System.out.println("=== Starting Workflow DAG Execution ===");
        Map<String, Object> context = new ConcurrentHashMap<>();
        
        // Find roots (in-degree == 0)
        List<String> roots = new ArrayList<>();
        for (String id : steps.keySet()) {
            if (inDegree.get(id).get() == 0) {
                roots.add(id);
            }
        }

        CountDownLatch latch = new CountDownLatch(steps.size());

        for (String root : roots) {
            submitStep(root, context, latch);
        }

        try {
            // Wait for completion (or timeout)
            latch.await(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            System.out.println("Workflow execution timed out or was interrupted.");
            workflowFailed = true;
        }

        if (workflowFailed) {
            System.out.println("\n[SAGA TRIGGERED] Workflow failed. Executing Compensation rollback...");
            // Execute compensation in reverse order of completion
            for (int i = completedSteps.size() - 1; i >= 0; i--) {
                String stepId = completedSteps.get(i);
                Step step = steps.get(stepId);
                try {
                    step.compensate(context);
                    stepStatus.put(stepId, StepStatus.COMPENSATED);
                } catch (Exception e) {
                    System.out.println("Compensation failed for step: " + stepId);
                }
            }
            executor.shutdown();
            return false;
        } else {
            System.out.println("\n[SUCCESS] Workflow execution succeeded perfectly!");
            executor.shutdown();
            return true;
        }
    }

    private void submitStep(String stepId, Map<String, Object> context, CountDownLatch latch) {
        executor.submit(() -> {
            Step step = steps.get(stepId);
            stepStatus.put(stepId, StepStatus.RUNNING);

            boolean success = false;
            int attempts = 0;
            
            while (attempts <= step.getMaxRetries() && !workflowFailed) {
                attempts++;
                try {
                    success = step.execute(context);
                    if (success) break;
                } catch (Exception e) {
                    System.out.println("  [RETRY] Step " + stepId + " failed attempt " + attempts + ": " + e.getMessage());
                }
            }

            if (success && !workflowFailed) {
                stepStatus.put(stepId, StepStatus.COMPLETED);
                completedSteps.add(stepId);
                latch.countDown();

                // Process downstream steps
                List<String> children = adjList.get(stepId);
                if (children != null) {
                    for (String child : children) {
                        int remainingDependencies = inDegree.get(child).decrementAndGet();
                        if (remainingDependencies == 0) {
                            submitStep(child, context, latch);
                        }
                    }
                }
            } else {
                schedulerLock.lock();
                try {
                    if (!workflowFailed) {
                        workflowFailed = true;
                        stepStatus.put(stepId, StepStatus.FAILED);
                        System.out.println("[FAILED] Step " + stepId + " permanently failed. Canceling running branches.");
                    }
                } finally {
                    schedulerLock.unlock();
                }
                
                // Force latch reduction to unblock main thread
                long remainingCount = latch.getCount();
                for (int i = 0; i < remainingCount; i++) {
                    latch.countDown();
                }
            }
        });
    }
}

public class Main {
    public static void main(String[] args) {
        System.out.println("--- Scenario 1: Successful DAG execution ---");
        DAGScheduler successScheduler = new DAGScheduler();
        
        Step order = new OrderStep("order", 1);
        Step payment = new PaymentStep("payment", 2, false);
        Step inventory = new InventoryStep("inventory", 1, false);
        Step shipment = new ShipmentStep("shipment", 1);

        successScheduler.addStep(order);
        successScheduler.addStep(payment);
        successScheduler.addStep(inventory);
        successScheduler.addStep(shipment);

        // Define Dependency Graph: order -> payment & inventory (parallel) -> shipment
        successScheduler.addDependency("order", "payment");
        successScheduler.addDependency("order", "inventory");
        successScheduler.addDependency("payment", "shipment");
        successScheduler.addDependency("inventory", "shipment");

        successScheduler.executeWorkflow();

        System.out.println("\n--- Scenario 2: Failed DAG execution with Saga compensation rollback ---");
        DAGScheduler failureScheduler = new DAGScheduler();
        
        Step orderFail = new OrderStep("order", 1);
        Step paymentFail = new PaymentStep("payment", 2, false);
        // Inventory step configured to FAIL
        Step inventoryFail = new InventoryStep("inventory", 2, true); 
        Step shipmentFail = new ShipmentStep("shipment", 1);

        failureScheduler.addStep(orderFail);
        failureScheduler.addStep(paymentFail);
        failureScheduler.addStep(inventoryFail);
        failureScheduler.addStep(shipmentFail);

        failureScheduler.addDependency("order", "payment");
        failureScheduler.addDependency("order", "inventory");
        failureScheduler.addDependency("payment", "shipment");
        failureScheduler.addDependency("inventory", "shipment");

        failureScheduler.executeWorkflow();
    }
}