Machine Coding Problem

Live Comments System

macoAllreal-timebuffer-managementordering
Commonly Asked By:MetaYouTubeTwitchByteDance

Functional Scope (In-Scope)

  • Strict Server-Side Comment Sequencing: Assigns monotonic sequence IDs to prevent out-of-order comments.
  • Synchronous Word Filtering: Moderates content using quick, lock-free keyword matching algorithms.
  • In-Memory Catch-Up buffers: Keeps circular caches of the last 50 approved comments to bootstrap joining viewers.
  • Historical VOD replay aggregates: Archives approved content to support indexed replays at specific stream times.

Explicit Boundaries (Out-of-Scope)

  • Web Socket Framing Protocol: Mocks socket connections and protocol upgrades (Socket.io or raw ws).
  • Machine Learning Moderation Models: Simplifies image classification and complex deep-learning NLP pipelines.

Production reference implementations demonstrating sequence generators, circular buffers, moderation filters, and VOD query engines in Java and Python:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

enum ModerationStatus {
    PENDING, APPROVED, REJECTED
}

class Comment {
    private final String id;
    private final String streamId;
    private final String authorId;
    private final String text;
    private final long timestampMs;
    private final long sequenceNumber;
    private final ModerationStatus status;

    public Comment(String id, String streamId, String authorId, String text, long sequenceNumber, ModerationStatus status) {
        this.id = id;
        this.streamId = streamId;
        this.authorId = authorId;
        this.text = text;
        this.timestampMs = System.currentTimeMillis();
        this.sequenceNumber = sequenceNumber;
        this.status = status;
    }

    public String getId() { return id; }
    public String getStreamId() { return streamId; }
    public String getAuthorId() { return authorId; }
    public String getText() { return text; }
    public long getTimestampMs() { return timestampMs; }
    public long getSequenceNumber() { return sequenceNumber; }
    public ModerationStatus getStatus() { return status; }
}

class LiveCommentsService {
    private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Comment>> commentsStore = new ConcurrentHashMap<>(); // VOD Replay store
    private final ConcurrentHashMap<String, ConcurrentLinkedDeque<Comment>> catchUpBuffers = new ConcurrentHashMap<>(); // Circular memory buffers for catch-up
    private final ConcurrentHashMap<String, AtomicLong> sequenceGenerators = new ConcurrentHashMap<>();
    private final Set<String> moderationBlacklist = ConcurrentHashMap.newKeySet();
    private final int catchUpBufferSize = 50;

    public void addBlacklistWord(String word) {
        moderationBlacklist.add(word.toLowerCase());
    }

    // Ingestion pipeline: fast-path moderation -> sequencing -> buffer -> broadcast
    public Comment ingestComment(String authorId, String streamId, String text) {
        // 1. Moderation filter block
        ModerationStatus status = runModerationFilter(text);

        if (status == ModerationStatus.REJECTED) {
            return new Comment(UUID.randomUUID().toString(), streamId, authorId, text, -1, ModerationStatus.REJECTED);
        }

        // 2. Server-side atomic sequencing
        long sequence = sequenceGenerators.computeIfAbsent(streamId, k -> new AtomicLong(0)).incrementAndGet();
        String commentId = UUID.randomUUID().toString();
        Comment comment = new Comment(commentId, streamId, authorId, text, sequence, ModerationStatus.APPROVED);

        // 3. Persist to VOD archive list
        commentsStore.computeIfAbsent(streamId, k -> new ConcurrentLinkedQueue<>()).add(comment);

        // 4. Update circular catch-up buffer
        ConcurrentLinkedDeque<Comment> buffer = catchUpBuffers.computeIfAbsent(streamId, k -> new ConcurrentLinkedDeque<>());
        buffer.addLast(comment);
        while (buffer.size() > catchUpBufferSize) {
            buffer.pollFirst(); // evict oldest
        }

        // 5. Broadcast to online viewers
        broadcastComment(comment);

        return comment;
    }

    // Catch-up path for users joining stream
    public List<Comment> getJoinerCatchUpSnapshot(String streamId) {
        ConcurrentLinkedDeque<Comment> buffer = catchUpBuffers.get(streamId);
        if (buffer == null) {
            return Collections.emptyList();
        }
        return new ArrayList<>(buffer);
    }

    // VOD replay: retrieve all comments in a specific stream and time window
    public List<Comment> getVODReplayRange(String streamId, long startTimestampMs, long endTimestampMs) {
        ConcurrentLinkedQueue<Comment> store = commentsStore.get(streamId);
        if (store == null) {
            return Collections.emptyList();
        }
        return store.stream()
            .filter(c -> c.getTimestampMs() >= startTimestampMs && c.getTimestampMs() <= endTimestampMs)
            .sorted(Comparator.comparingLong(Comment::getSequenceNumber))
            .collect(Collectors.toList());
    }

    private ModerationStatus runModerationFilter(String text) {
        String lowerContent = text.toLowerCase();
        for (String badWord : moderationBlacklist) {
            if (lowerContent.contains(badWord)) {
                return ModerationStatus.REJECTED;
            }
        }
        return ModerationStatus.APPROVED;
    }

    private void broadcastComment(Comment comment) {
        System.out.println("BROADCAST COMMENT -> Stream: " + comment.getStreamId() + " [" + comment.getSequenceNumber() + "] " + comment.getAuthorId() + ": " + comment.getText());
    }
}

public class Main {
    public static void main(String[] args) {
        System.out.println("=== JAVA LIVE COMMENTS SIMULATION ===");
        LiveCommentsService service = new LiveCommentsService();
        service.addBlacklistWord("spam");
        
        service.ingestComment("user1", "streamA", "Hello world!");
        service.ingestComment("user2", "streamA", "This is spam content");
        service.ingestComment("user3", "streamA", "Great stream!");
        
        List<Comment> recent = service.getJoinerCatchUpSnapshot("streamA");
        System.out.println("Catch-up size: " + recent.size());
        for (Comment c : recent) {
            System.out.println("Catch-up Comment: " + c.getText());
        }
        System.out.println("=== END OF JAVA SIMULATION ===");
    }
}