Machine Coding Problem

Live Reactions System

macoAllreal-timeaggregatorhigh-write-load
Commonly Asked By:MetaByteDanceTwitterLinkedIn

Functional Scope (In-Scope)

  • High-Throughput Reaction buffering: Uses an in-memory buffer to batch reaction events rather than writing them directly to the database.
  • Sharded Counter Architectures: Distributes concurrent write requests across independent counter shards to prevent lock contention.
  • Periodic 100ms Delta aggregations: Flushes, resets, and broadcasts aggregated delta counts at regular 100ms intervals.
  • Stateless Fan-out broadcasting: Pushes small delta batches to active viewers instead of re-broadcasting total cumulative stats.

Explicit Boundaries (Out-of-Scope)

  • Direct Client WebSocket connections: Mocks socket dispatch endpoints, connection servers, and browser clients.
  • Physical Time-Series Database layers: Mocks persistence using in-memory queues instead of integrating concrete InfluxDB or TimescaleDB instances.

Production reference implementations demonstrating lock-free sharding, periodic flushing, time-series history tracking, and delta broadcasting in Java and Python:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

enum EmojiType {
    LIKE, HEART, LAUGH, WOW, SAD, FIRE
}

class ReactionEvent {
    private final String userId;
    private final String streamId;
    private final EmojiType emojiType;
    private final long timestampMs;

    public ReactionEvent(String userId, String streamId, EmojiType emojiType) {
        this.userId = userId;
        this.streamId = streamId;
        this.emojiType = emojiType;
        this.timestampMs = System.currentTimeMillis();
    }

    public String getUserId() { return userId; }
    public String getStreamId() { return streamId; }
    public EmojiType getEmojiType() { return emojiType; }
    public long getTimestampMs() { return timestampMs; }
}

// Sharded counter buffer to avoid hot-key lock contention on massive live events
class CounterShard {
    private final ConcurrentHashMap<EmojiType, AtomicLong> counts = new ConcurrentHashMap<>();

    public CounterShard() {
        for (EmojiType type : EmojiType.values()) {
            counts.put(type, new AtomicLong(0));
        }
    }

    public void increment(EmojiType type) {
        counts.get(type).incrementAndGet();
    }

    public Map<EmojiType, Long> flushAndGet() {
        Map<EmojiType, Long> snapshot = new HashMap<>();
        for (EmojiType type : EmojiType.values()) {
            long val = counts.get(type).getAndSet(0); // Atomic flush and reset
            snapshot.put(type, val);
        }
        return snapshot;
    }
}

class StreamReactionBuffer {
    private final CounterShard[] shards;
    private final int shardCount;

    public StreamReactionBuffer(int shardCount) {
        this.shardCount = shardCount;
        this.shards = new CounterShard[shardCount];
        for (int i = 0; i < shardCount; i++) {
            shards[i] = new CounterShard();
        }
    }

    public void recordReaction(EmojiType type) {
        // Randomly route to a shard to balance write load
        int shardIndex = ThreadLocalRandom.current().nextInt(shardCount);
        shards[shardIndex].increment(type);
    }

    public Map<EmojiType, Long> collectAndReset() {
        Map<EmojiType, Long> aggregated = new HashMap<>();
        for (EmojiType type : EmojiType.values()) {
            aggregated.put(type, 0L);
        }

        // Aggregate across all shards
        for (CounterShard shard : shards) {
            Map<EmojiType, Long> shardData = shard.flushAndGet();
            for (Map.Entry<EmojiType, Long> entry : shardData.entrySet()) {
                aggregated.put(entry.getKey(), aggregated.get(entry.getKey()) + entry.getValue());
            }
        }
        return aggregated;
    }
}

class LiveReactionsService {
    private final ConcurrentHashMap<String, StreamReactionBuffer> streamBuffers = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Map<EmojiType, Long>>> timeSeriesStore = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Map<EmojiType, Long>> cumulativeTotals = new ConcurrentHashMap<>();
    private final ScheduledExecutorService aggregatorScheduler = Executors.newSingleThreadScheduledExecutor();
    private final int defaultShardCount = 4;

    public LiveReactionsService() {
        // Flush buffer every 100 milliseconds
        aggregatorScheduler.scheduleAtFixedRate(this::flushAllBuffers, 100, 100, TimeUnit.MILLISECONDS);
    }

    // High frequency O(1) lock-free write path
    public void submitReaction(String userId, String streamId, EmojiType emoji) {
        StreamReactionBuffer buffer = streamBuffers.computeIfAbsent(streamId, id -> new StreamReactionBuffer(defaultShardCount));
        buffer.recordReaction(emoji);
    }

    // Cumulative stats interface for querying current total state
    public Map<EmojiType, Long> getCumulativeStats(String streamId) {
        return cumulativeTotals.getOrDefault(streamId, Collections.emptyMap());
    }

    public List<Map<EmojiType, Long>> getTimeSeriesHistory(String streamId) {
        ConcurrentLinkedQueue<Map<EmojiType, Long>> queue = timeSeriesStore.get(streamId);
        return queue != null ? new ArrayList<>(queue) : Collections.emptyList();
    }

    // Flush and broadcast job
    private void flushAllBuffers() {
        for (String streamId : streamBuffers.keySet()) {
            StreamReactionBuffer buffer = streamBuffers.get(streamId);
            if (buffer == null) continue;

            Map<EmojiType, Long> deltaBatch = buffer.collectAndReset();
            
            // Skip broadcast if there are no new reactions
            boolean hasReactions = deltaBatch.values().stream().anyMatch(v -> v > 0);
            if (!hasReactions) continue;

            // Update cumulative total map
            Map<EmojiType, Long> totals = cumulativeTotals.computeIfAbsent(streamId, id -> new ConcurrentHashMap<>());
            for (EmojiType type : EmojiType.values()) {
                long currentTotal = totals.getOrDefault(type, 0L);
                totals.put(type, currentTotal + deltaBatch.get(type));
            }

            // Persist window snapshot
            timeSeriesStore.computeIfAbsent(streamId, k -> new ConcurrentLinkedQueue<>()).add(new HashMap<>(deltaBatch));

            // Mock network broadcast to subscribers
            broadcastDeltaSnapshot(streamId, deltaBatch);
        }
    }

    private void broadcastDeltaSnapshot(String streamId, Map<EmojiType, Long> delta) {
        System.out.println("BROADCAST -> Stream: " + streamId + " Delta: " + delta);
    }

    public void shutdown() {
        aggregatorScheduler.shutdown();
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== JAVA LIVE REACTIONS SIMULATION ===");
        LiveReactionsService service = new LiveReactionsService();
        
        service.submitReaction("user1", "stream123", EmojiType.FIRE);
        service.submitReaction("user2", "stream123", EmojiType.HEART);
        service.submitReaction("user3", "stream123", EmojiType.FIRE);
        
        Thread.sleep(250);
        
        System.out.println("Cumulative totals: " + service.getCumulativeStats("stream123"));
        
        service.shutdown();
        System.out.println("=== END OF JAVA SIMULATION ===");
    }
}