Functional Scope (In-Scope)
- High-Throughput Reaction buffering: Uses an in-memory buffer to batch reaction events rather than writing them directly to the database.
- Sharded Counter Architectures: Distributes concurrent write requests across independent counter shards to prevent lock contention.
- Periodic 100ms Delta aggregations: Flushes, resets, and broadcasts aggregated delta counts at regular 100ms intervals.
- Stateless Fan-out broadcasting: Pushes small delta batches to active viewers instead of re-broadcasting total cumulative stats.
Explicit Boundaries (Out-of-Scope)
- Direct Client WebSocket connections: Mocks socket dispatch endpoints, connection servers, and browser clients.
- Physical Time-Series Database layers: Mocks persistence using in-memory queues instead of integrating concrete InfluxDB or TimescaleDB instances.
Production reference implementations demonstrating lock-free sharding, periodic flushing, time-series history tracking, and delta broadcasting in Java and Python:
// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
enum EmojiType {
LIKE, HEART, LAUGH, WOW, SAD, FIRE
}
class ReactionEvent {
private final String userId;
private final String streamId;
private final EmojiType emojiType;
private final long timestampMs;
public ReactionEvent(String userId, String streamId, EmojiType emojiType) {
this.userId = userId;
this.streamId = streamId;
this.emojiType = emojiType;
this.timestampMs = System.currentTimeMillis();
}
public String getUserId() { return userId; }
public String getStreamId() { return streamId; }
public EmojiType getEmojiType() { return emojiType; }
public long getTimestampMs() { return timestampMs; }
}
// Sharded counter buffer to avoid hot-key lock contention on massive live events
class CounterShard {
private final ConcurrentHashMap<EmojiType, AtomicLong> counts = new ConcurrentHashMap<>();
public CounterShard() {
for (EmojiType type : EmojiType.values()) {
counts.put(type, new AtomicLong(0));
}
}
public void increment(EmojiType type) {
counts.get(type).incrementAndGet();
}
public Map<EmojiType, Long> flushAndGet() {
Map<EmojiType, Long> snapshot = new HashMap<>();
for (EmojiType type : EmojiType.values()) {
long val = counts.get(type).getAndSet(0); // Atomic flush and reset
snapshot.put(type, val);
}
return snapshot;
}
}
class StreamReactionBuffer {
private final CounterShard[] shards;
private final int shardCount;
public StreamReactionBuffer(int shardCount) {
this.shardCount = shardCount;
this.shards = new CounterShard[shardCount];
for (int i = 0; i < shardCount; i++) {
shards[i] = new CounterShard();
}
}
public void recordReaction(EmojiType type) {
// Randomly route to a shard to balance write load
int shardIndex = ThreadLocalRandom.current().nextInt(shardCount);
shards[shardIndex].increment(type);
}
public Map<EmojiType, Long> collectAndReset() {
Map<EmojiType, Long> aggregated = new HashMap<>();
for (EmojiType type : EmojiType.values()) {
aggregated.put(type, 0L);
}
// Aggregate across all shards
for (CounterShard shard : shards) {
Map<EmojiType, Long> shardData = shard.flushAndGet();
for (Map.Entry<EmojiType, Long> entry : shardData.entrySet()) {
aggregated.put(entry.getKey(), aggregated.get(entry.getKey()) + entry.getValue());
}
}
return aggregated;
}
}
class LiveReactionsService {
private final ConcurrentHashMap<String, StreamReactionBuffer> streamBuffers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Map<EmojiType, Long>>> timeSeriesStore = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Map<EmojiType, Long>> cumulativeTotals = new ConcurrentHashMap<>();
private final ScheduledExecutorService aggregatorScheduler = Executors.newSingleThreadScheduledExecutor();
private final int defaultShardCount = 4;
public LiveReactionsService() {
// Flush buffer every 100 milliseconds
aggregatorScheduler.scheduleAtFixedRate(this::flushAllBuffers, 100, 100, TimeUnit.MILLISECONDS);
}
// High frequency O(1) lock-free write path
public void submitReaction(String userId, String streamId, EmojiType emoji) {
StreamReactionBuffer buffer = streamBuffers.computeIfAbsent(streamId, id -> new StreamReactionBuffer(defaultShardCount));
buffer.recordReaction(emoji);
}
// Cumulative stats interface for querying current total state
public Map<EmojiType, Long> getCumulativeStats(String streamId) {
return cumulativeTotals.getOrDefault(streamId, Collections.emptyMap());
}
public List<Map<EmojiType, Long>> getTimeSeriesHistory(String streamId) {
ConcurrentLinkedQueue<Map<EmojiType, Long>> queue = timeSeriesStore.get(streamId);
return queue != null ? new ArrayList<>(queue) : Collections.emptyList();
}
// Flush and broadcast job
private void flushAllBuffers() {
for (String streamId : streamBuffers.keySet()) {
StreamReactionBuffer buffer = streamBuffers.get(streamId);
if (buffer == null) continue;
Map<EmojiType, Long> deltaBatch = buffer.collectAndReset();
// Skip broadcast if there are no new reactions
boolean hasReactions = deltaBatch.values().stream().anyMatch(v -> v > 0);
if (!hasReactions) continue;
// Update cumulative total map
Map<EmojiType, Long> totals = cumulativeTotals.computeIfAbsent(streamId, id -> new ConcurrentHashMap<>());
for (EmojiType type : EmojiType.values()) {
long currentTotal = totals.getOrDefault(type, 0L);
totals.put(type, currentTotal + deltaBatch.get(type));
}
// Persist window snapshot
timeSeriesStore.computeIfAbsent(streamId, k -> new ConcurrentLinkedQueue<>()).add(new HashMap<>(deltaBatch));
// Mock network broadcast to subscribers
broadcastDeltaSnapshot(streamId, deltaBatch);
}
}
private void broadcastDeltaSnapshot(String streamId, Map<EmojiType, Long> delta) {
System.out.println("BROADCAST -> Stream: " + streamId + " Delta: " + delta);
}
public void shutdown() {
aggregatorScheduler.shutdown();
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== JAVA LIVE REACTIONS SIMULATION ===");
LiveReactionsService service = new LiveReactionsService();
service.submitReaction("user1", "stream123", EmojiType.FIRE);
service.submitReaction("user2", "stream123", EmojiType.HEART);
service.submitReaction("user3", "stream123", EmojiType.FIRE);
Thread.sleep(250);
System.out.println("Cumulative totals: " + service.getCumulativeStats("stream123"));
service.shutdown();
System.out.println("=== END OF JAVA SIMULATION ===");
}
}