Machine Coding Problem

Distributed Lock Manager

macoAllinfrastructurelease-managementfencing
Commonly Asked By:UberGoogleAmazonHashiCorp

Functional Scope (In-Scope)

  • Monotonic Fencing Tokens Engine: Generates incrementing numeric keys per lease to block outdated client calls on resources.
  • Lease-Based Auto-Expiration: Releases locks automatically on expiration to prevent system lockouts if a client crashes.
  • Compare-And-Swap (CAS) Mechanics: Performs atomic in-memory lock changes securely to prevent collision risks.
  • Deadlock Wait-For Dependency Graph: Constructs client lock dependencies and analyzes loops to abort cyclic blocks immediately.

Explicit Boundaries (Out-of-Scope)

  • Network Packet Transport Sockets: Ignores physical RPC protocol formats, REST endpoints, and UDP packets.
  • Continuous Hard Disk Logging: Skips file sync write barriers and sequential WAL records.

Production reference implementations demonstrating fencing validation, CAS acquisitions, and cycle-based deadlock prevention in Java and Python:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

class DistributedLock {
    private final String lockId;
    private final ReentrantLock internalLock = new ReentrantLock();
    
    private String holderClientId;
    private long leaseExpiryTime;
    private int reentrancyCount = 0;
    private final AtomicLong fencingTokenCounter = new AtomicLong(0);
    private long activeFencingToken = 0;

    public DistributedLock(String lockId) {
        this.lockId = lockId;
    }

    public String getLockId() { return lockId; }
    
    public String getHolderClientId() { 
        internalLock.lock();
        try { return holderClientId; } finally { internalLock.unlock(); }
    }

    public long getLeaseExpiryTime() { 
        internalLock.lock();
        try { return leaseExpiryTime; } finally { internalLock.unlock(); }
    }

    public long getActiveFencingToken() { 
        internalLock.lock();
        try { return activeFencingToken; } finally { internalLock.unlock(); }
    }

    public long tryAcquire(String clientId, long ttlMillis, long now) {
        internalLock.lock();
        try {
            if (holderClientId == null || leaseExpiryTime < now) {
                // Free or expired lock acquisition
                this.holderClientId = clientId;
                this.leaseExpiryTime = now + ttlMillis;
                this.reentrancyCount = 1;
                this.activeFencingToken = fencingTokenCounter.incrementAndGet();
                return activeFencingToken;
            } else if (holderClientId.equals(clientId)) {
                // Reentrant acquire
                this.reentrancyCount++;
                this.leaseExpiryTime = now + ttlMillis; // Renew lease
                return activeFencingToken;
            }
            return -1; // Failed to acquire
        } finally {
            internalLock.unlock();
        }
    }

    public boolean tryRelease(String clientId) {
        internalLock.lock();
        try {
            if (holderClientId != null && holderClientId.equals(clientId)) {
                reentrancyCount--;
                if (reentrancyCount == 0) {
                    this.holderClientId = null;
                    this.leaseExpiryTime = 0;
                }
                return true;
            }
            return false;
        } finally {
            internalLock.unlock();
        }
    }

    public boolean tryRenew(String clientId, long ttlMillis, long now) {
        internalLock.lock();
        try {
            if (holderClientId != null && holderClientId.equals(clientId) && leaseExpiryTime >= now) {
                this.leaseExpiryTime = now + ttlMillis;
                return true;
            }
            return false;
        } finally {
            internalLock.unlock();
        }
    }
}

class FencingTokenValidator {
    // Resource ID -> Last Accepted Monotonic Fencing Token
    private final ConcurrentHashMap<String, Long> resourceTokens = new ConcurrentHashMap<>();

    public boolean validateAndWrite(String resourceId, long incomingFencingToken, String payload) {
        // Atomic compare-and-swap update via compute
        final boolean[] accepted = { false };
        resourceTokens.compute(resourceId, (id, currentToken) -> {
            if (currentToken == null || incomingFencingToken > currentToken) {
                accepted[0] = true;
                return incomingFencingToken;
            }
            return currentToken;
        });

        if (accepted[0]) {
            System.out.println("  [WRITE ACCEPTED] Token " + incomingFencingToken + " wrote: " + payload);
            return true;
        } else {
            System.out.println("  [WRITE REJECTED] Stale Token " + incomingFencingToken + " blocked for write: " + payload);
            return false;
        }
    }
}

class DeadlockDetector {
    // Wait-For Graph: Client A waiting for Client B
    private final Map<String, Set<String>> waitForGraph = new ConcurrentHashMap<>();

    public synchronized void addWaitRelation(String clientA, String clientB) {
        waitForGraph.computeIfAbsent(clientA, k -> new HashSet<>()).add(clientB);
    }

    public synchronized void removeWaitRelation(String clientA, String clientB) {
        Set<String> relations = waitForGraph.get(clientA);
        if (relations != null) {
            relations.remove(clientB);
            if (relations.isEmpty()) waitForGraph.remove(clientA);
        }
    }

    public synchronized boolean detectDeadlock() {
        Set<String> visited = new HashSet<>();
        Set<String> stack = new HashSet<>();

        for (String node : waitForGraph.keySet()) {
            if (hasCycleDFS(node, visited, stack)) {
                return true;
            }
        }
        return false;
    }

    private boolean hasCycleDFS(String node, Set<String> visited, Set<String> stack) {
        if (stack.contains(node)) return true;
        if (visited.contains(node)) return false;

        visited.add(node);
        stack.add(node);

        Set<String> neighbors = waitForGraph.get(node);
        if (neighbors != null) {
            for (String neighbor : neighbors) {
                if (hasCycleDFS(neighbor, visited, stack)) {
                    return true;
                }
            }
        }

        stack.remove(node);
        return false;
    }
}

class DistributedLockManager {
    private final ConcurrentHashMap<String, DistributedLock> lockStore = new ConcurrentHashMap<>();
    private final DeadlockDetector deadlockDetector = new DeadlockDetector();

    public long acquireLock(String lockId, String clientId, long ttlMillis) {
        DistributedLock lock = lockStore.computeIfAbsent(lockId, k -> new DistributedLock(lockId));
        long now = System.currentTimeMillis();

        long token = lock.tryAcquire(clientId, ttlMillis, now);
        if (token != -1) {
            return token;
        }

        // Wait-for graph checking
        String currentHolder = lock.getHolderClientId();
        if (currentHolder != null && !currentHolder.equals(clientId)) {
            deadlockDetector.addWaitRelation(clientId, currentHolder);
            if (deadlockDetector.detectDeadlock()) {
                deadlockDetector.removeWaitRelation(clientId, currentHolder);
                throw new IllegalStateException("Deadlock cycle detected! Locking aborted.");
            }
        }
        return -1;
    }

    public boolean releaseLock(String lockId, String clientId) {
        DistributedLock lock = lockStore.get(lockId);
        if (lock != null) {
            String currentHolder = lock.getHolderClientId();
            boolean released = lock.tryRelease(clientId);
            if (released && currentHolder != null) {
                deadlockDetector.removeWaitRelation(clientId, currentHolder);
            }
            return released;
        }
        return false;
    }

    public boolean renewLock(String lockId, String clientId, long ttlMillis) {
        DistributedLock lock = lockStore.get(lockId);
        if (lock != null) {
            return lock.tryRenew(clientId, ttlMillis, System.currentTimeMillis());
        }
        return false;
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== STARTING DISTRIBUTED LOCK MANAGER SIMULATION ===");
        DistributedLockManager manager = new DistributedLockManager();
        FencingTokenValidator storageValidator = new FencingTokenValidator();

        // Scenario 1: Reentrancy Test
        System.out.println("--- Testing Reentrant Locking ---");
        long tokenA1 = manager.acquireLock("lock-1", "client-A", 1000);
        System.out.println("  Acquired lock-1 (Token: " + tokenA1 + ")");
        long tokenA2 = manager.acquireLock("lock-1", "client-A", 1000);
        System.out.println("  Reentered lock-1 successfully (Token: " + tokenA2 + ")");
        manager.releaseLock("lock-1", "client-A");
        manager.releaseLock("lock-1", "client-A");
        System.out.println("  Released reentrant lock.");

        // Scenario 2: Fencing Token Race Condition Resolution (Martin Kleppmann's classic scenario)
        System.out.println("\n--- Testing Fencing Token Race Condition Resolution ---");
        
        // Client A acquires lock-2 with 100ms TTL
        long clientAToken = manager.acquireLock("lock-2", "client-A", 100);
        System.out.println("  Client A acquires lock-2 (Token: " + clientAToken + ")");
        
        // Client A goes to sleep simulating a long Stop-the-World GC Pause
        System.out.println("  [STW GC] Client A enters deep GC pause...");
        Thread.sleep(200); // 200ms sleep (> 100ms TTL)
        
        // Lock expires. Client B acquires lock-2 and gets a newer token
        long clientBToken = manager.acquireLock("lock-2", "client-B", 1000);
        System.out.println("  Client B acquires lock-2 (Token: " + clientBToken + ")");
        
        // Client B successfully writes to storage
        storageValidator.validateAndWrite("db-row-777", clientBToken, "Client B Update");
        
        // Client A wakes up from GC and tries to write with stale token
        System.out.println("  [GC OVER] Client A wakes up and attempts stale write...");
        storageValidator.validateAndWrite("db-row-777", clientAToken, "Client A Stale Update");

        // Scenario 3: Deadlock Detection
        System.out.println("\n--- Testing Deadlock Detection ---");
        manager.acquireLock("resource-X", "client-1", 5000);
        manager.acquireLock("resource-Y", "client-2", 5000);

        // Simulate wait-for graph: Client 1 wants Y, Client 2 wants X
        System.out.println("  Constructing wait-for relations...");
        try {
            manager.acquireLock("resource-Y", "client-1", 5000);
        } catch (Exception e) {
            System.out.println("  Immediate request tracking failed or blocked.");
        }
        
        try {
            System.out.println("  Client 2 attempting lock on resource-X...");
            manager.acquireLock("resource-X", "client-2", 5000);
        } catch (IllegalStateException e) {
            System.out.println("  [DEADLOCK BLOCKED] " + e.getMessage());
        }

        System.out.println("=== DISTRIBUTED LOCK MANAGER SIMULATION COMPLETE ===");
    }
}