Machine Coding Problem

Real-time Dashboard

macoAlldevopswebsocket-pushmaterialized-view
Commonly Asked By:DatadogGrafanaAWS

Functional Scope (In-Scope)

  • WebSocket Streaming Pushes: Connects live dashboard sessions, pushing incremental updates to clients instead of relying on polling.
  • Materialized Snapshot View: Caches frequently queried metrics in a materialized view store, serving instant dashboard state immediately on connect.
  • Adaptive Throttle & Snapshots: Caps push rates per-metric (e.g. max N/sec) to avoid web UI freezes, batching transient peaks into the latest updates.
  • Lifecycle Connection Management: Monitors active clients, cleanly freeing subscription bindings and resources upon connection teardowns.

Explicit Boundaries (Out-of-Scope)

  • Custom Alert Policies: Relies on external notification engines to evaluate boundary checks and routing.
  • True Socket Frameworks: Excludes deep physical network configuration and socket-io protocol keepalives, modeling channels as mock connections.

Production reference implementations demonstrating real-time WebSocket pushes, materialized cache views, and adaptive throttle loops in Java and Python:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;

class MetricValue {
    private final String metricName;
    private final double value;
    private final long timestampMs;

    public MetricValue(String metricName, double value, long timestampMs) {
        this.metricName = metricName;
        this.value = value;
        this.timestampMs = timestampMs;
    }

    public String getMetricName() { return metricName; }
    public double getValue() { return value; }
    public long getTimestampMs() { return timestampMs; }
}

interface ClientConnection {
    void sendUpdate(MetricValue val);
}

class MockClientConnection implements ClientConnection {
    private final String clientId;

    public MockClientConnection(String clientId) {
        this.clientId = clientId;
    }

    @Override
    public void sendUpdate(MetricValue val) {
        System.out.println("PUSH -> Client: " + clientId + " | Metric: " + val.getMetricName() + " = " + val.getValue());
    }
}

class DashboardSession {
    private final String sessionId;
    private final ClientConnection connection;
    private final Set<String> subscribedMetrics = ConcurrentHashMap.newKeySet();
    private final ConcurrentHashMap<String, Long> lastPushTimeMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, MetricValue> pendingUpdates = new ConcurrentHashMap<>();

    public DashboardSession(String sessionId, ClientConnection connection) {
        this.sessionId = sessionId;
        this.connection = connection;
    }

    public String getSessionId() { return sessionId; }
    public ClientConnection getConnection() { return connection; }
    public Set<String> getSubscribedMetrics() { return subscribedMetrics; }
    public ConcurrentHashMap<String, Long> getLastPushTimeMap() { return lastPushTimeMap; }
    public ConcurrentHashMap<String, MetricValue> getPendingUpdates() { return pendingUpdates; }

    public void subscribe(String metricName) {
        subscribedMetrics.add(metricName);
    }

    public void unsubscribe(String metricName) {
        subscribedMetrics.remove(metricName);
        lastPushTimeMap.remove(metricName);
        pendingUpdates.remove(metricName);
    }
}

class MaterializedViewCache {
    private final ConcurrentHashMap<String, MetricValue> cache = new ConcurrentHashMap<>();

    public void update(String metricName, double value) {
        cache.put(metricName, new MetricValue(metricName, value, System.currentTimeMillis()));
    }

    public MetricValue get(String metricName) {
        return cache.get(metricName);
    }
}

class RealtimeDashboardService {
    private final ConcurrentHashMap<String, DashboardSession> activeSessions = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Set<DashboardSession>> metricSubscriptions = new ConcurrentHashMap<>();
    private final MaterializedViewCache viewCache = new MaterializedViewCache();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final long throttleIntervalMs;

    public RealtimeDashboardService(long throttleIntervalMs) {
        this.throttleIntervalMs = throttleIntervalMs;
        this.scheduler.scheduleAtFixedRate(this::flushThrottledUpdates, 100, 100, TimeUnit.MILLISECONDS);
    }

    public void registerSession(DashboardSession session) {
        activeSessions.put(session.getSessionId(), session);
        System.out.println("Session connected: " + session.getSessionId());
    }

    public void disconnectSession(String sessionId) {
        DashboardSession session = activeSessions.remove(sessionId);
        if (session != null) {
            for (String metric : session.getSubscribedMetrics()) {
                Set<DashboardSession> subs = metricSubscriptions.get(metric);
                if (subs != null) {
                    subs.remove(session);
                }
            }
            System.out.println("Session disconnected: " + sessionId);
        }
    }

    public void subscribeClient(String sessionId, String metricName) {
        DashboardSession session = activeSessions.get(sessionId);
        if (session == null) return;

        session.subscribe(metricName);
        metricSubscriptions.computeIfAbsent(metricName, k -> ConcurrentHashMap.newKeySet()).add(session);

        MetricValue cachedVal = viewCache.get(metricName);
        if (cachedVal != null) {
            session.getConnection().sendUpdate(cachedVal);
        }
    }

    public void ingestMetric(String metricName, double value) {
        viewCache.update(metricName, value);
        MetricValue metricValue = viewCache.get(metricName);

        Set<DashboardSession> subscribers = metricSubscriptions.get(metricName);
        if (subscribers == null || subscribers.isEmpty()) return;

        long now = System.currentTimeMillis();

        for (DashboardSession session : subscribers) {
            synchronized (session) {
                long lastPush = session.getLastPushTimeMap().getOrDefault(metricName, 0L);
                if (now - lastPush >= throttleIntervalMs) {
                    session.getConnection().sendUpdate(metricValue);
                    session.getLastPushTimeMap().put(metricName, now);
                    session.getPendingUpdates().remove(metricName);
                } else {
                    session.getPendingUpdates().put(metricName, metricValue);
                }
            }
        }
    }

    private void flushThrottledUpdates() {
        long now = System.currentTimeMillis();
        for (DashboardSession session : activeSessions.values()) {
            synchronized (session) {
                List<String> toRemove = new ArrayList<>();
                session.getPendingUpdates().forEach((metricName, value) -> {
                    long lastPush = session.getLastPushTimeMap().getOrDefault(metricName, 0L);
                    if (now - lastPush >= throttleIntervalMs) {
                        session.getConnection().sendUpdate(value);
                        session.getLastPushTimeMap().put(metricName, now);
                        toRemove.add(metricName);
                    }
                });
                for (String key : toRemove) {
                    session.getPendingUpdates().remove(key);
                }
            }
        }
    }

    public void shutdown() {
        scheduler.shutdown();
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== JAVA REALTIME DASHBOARD SIMULATION ===");
        RealtimeDashboardService service = new RealtimeDashboardService(500);

        MockClientConnection conn = new MockClientConnection("client-1");
        DashboardSession session = new DashboardSession("session-1", conn);

        service.registerSession(session);
        service.subscribeClient("session-1", "cpu.usage");

        System.out.println("Ingesting first metric update...");
        service.ingestMetric("cpu.usage", 45.2);

        System.out.println("Ingesting second update immediately (should be throttled)...");
        service.ingestMetric("cpu.usage", 48.7);

        System.out.println("Ingesting third update immediately (should overwrite second in queue)...");
        service.ingestMetric("cpu.usage", 52.1);

        Thread.sleep(600);

        service.disconnectSession("session-1");
        service.shutdown();
        System.out.println("=== END OF JAVA SIMULATION ===");
    }
}