Functional Scope (In-Scope)
- Time-Series Sample Ingestion: Captures high-frequency raw float samples decorated with metadata labels.
- Tiered Multi-Resolution Aggregators: Computes tiered windowed summaries (1-minute, 5-minute, and 1-hour rolls) using pyramidal consolidation.
- Approximate Percentiles (Histogram Buckets): Employs boundary arrays and linear interpolation algorithms to resolve p50, p95, and p99 metrics under low memory overhead.
- Range Query API: Filters, maps, and returns consolidated stats across custom intervals and resolution levels.
Explicit Boundaries (Out-of-Scope)
- Full Alerting Engine: Excludes threshold evaluation loops and integration webhooks.
- Cold Data Compression: Skips filesystem serialization and Gorillas/Chimp XOR float delta-compression techniques to prioritize memory structure and interpolation mechanics.
Production reference implementations demonstrating pyramidal aggregation windows, histogram bucketing, and linear percentile interpolation in Java and Python:
// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MetricSample {
private final String metricName;
private final double value;
private final long timestampMs;
private final Map<String, String> tags;
public MetricSample(String metricName, double value, long timestampMs, Map<String, String> tags) {
this.metricName = metricName;
this.value = value;
this.timestampMs = timestampMs;
this.tags = new HashMap<>(tags);
}
public String getMetricName() { return metricName; }
public double getValue() { return value; }
public long getTimestampMs() { return timestampMs; }
public Map<String, String> getTags() { return tags; }
}
class RollupBucket {
private final long startTimeMs;
private final long endTimeMs;
private double min;
private double max;
private double sum;
private long count;
private final double[] bucketBoundaries;
private final long[] histogramCounts;
public RollupBucket(long startTimeMs, long endTimeMs, double[] bucketBoundaries) {
this.startTimeMs = startTimeMs;
this.endTimeMs = endTimeMs;
this.min = Double.MAX_VALUE;
this.max = Double.MIN_VALUE;
this.sum = 0.0;
this.count = 0;
this.bucketBoundaries = bucketBoundaries.clone();
this.histogramCounts = new long[bucketBoundaries.length + 1];
}
public synchronized void recordValue(double val) {
this.count++;
this.sum += val;
if (val < this.min) this.min = val;
if (val > this.max) this.max = val;
int bucketIndex = bucketBoundaries.length;
for (int i = 0; i < bucketBoundaries.length; i++) {
if (val <= bucketBoundaries[i]) {
bucketIndex = i;
break;
}
}
this.histogramCounts[bucketIndex]++;
}
public synchronized void merge(RollupBucket other) {
if (other == null || other.count == 0) return;
this.count += other.count;
this.sum += other.sum;
this.min = Math.min(this.min, other.min);
this.max = Math.max(this.max, other.max);
for (int i = 0; i < this.histogramCounts.length; i++) {
this.histogramCounts[i] += other.histogramCounts[i];
}
}
public long getStartTimeMs() { return startTimeMs; }
public long getEndTimeMs() { return endTimeMs; }
public synchronized double getMin() { return count == 0 ? 0.0 : min; }
public synchronized double getMax() { return count == 0 ? 0.0 : max; }
public synchronized double getSum() { return sum; }
public synchronized long getCount() { return count; }
public double[] getBucketBoundaries() { return bucketBoundaries; }
public synchronized long[] getHistogramCounts() { return histogramCounts.clone(); }
}
class PercentileCalculator {
public static double computePercentile(RollupBucket bucket, double percentile) {
if (bucket == null || bucket.getCount() == 0) {
return 0.0;
}
if (percentile < 0.0 || percentile > 100.0) {
throw new IllegalArgumentException("Percentile must be between 0 and 100");
}
double target = bucket.getCount() * (percentile / 100.0);
double[] boundaries = bucket.getBucketBoundaries();
long[] counts = bucket.getHistogramCounts();
double cumulativeCount = 0.0;
double prevBoundary = 0.0;
for (int i = 0; i < counts.length; i++) {
double currentBoundary;
if (i < boundaries.length) {
currentBoundary = boundaries[i];
} else {
currentBoundary = bucket.getMax() * 1.5;
if (currentBoundary <= prevBoundary) {
currentBoundary = prevBoundary + 100.0;
}
}
long bucketCount = counts[i];
if (cumulativeCount + bucketCount >= target) {
if (bucketCount == 0) {
return prevBoundary;
}
double needed = target - cumulativeCount;
double ratio = needed / bucketCount;
return prevBoundary + (currentBoundary - prevBoundary) * ratio;
}
cumulativeCount += bucketCount;
prevBoundary = currentBoundary;
}
return bucket.getMax();
}
}
class MetricsAggregator {
private final ConcurrentHashMap<String, Queue<MetricSample>> rawStores = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<RollupBucket>> rollups1m = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<RollupBucket>> rollups5m = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<RollupBucket>> rollups1h = new ConcurrentHashMap<>();
private final double[] bucketBoundaries = {10.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0};
public void recordSample(String metricName, double value, Map<String, String> tags) {
rawStores.computeIfAbsent(metricName, k -> new ConcurrentLinkedQueue<>())
.offer(new MetricSample(metricName, value, System.currentTimeMillis(), tags));
}
public void runRollup1m(String metricName, long windowStartMs) {
long windowEndMs = windowStartMs + 60000;
Queue<MetricSample> queue = rawStores.get(metricName);
if (queue == null) return;
RollupBucket bucket = new RollupBucket(windowStartMs, windowEndMs, bucketBoundaries);
while (true) {
MetricSample sample = queue.peek();
if (sample == null) break;
if (sample.getTimestampMs() >= windowEndMs) {
break; // belongs to a future window
}
queue.poll();
if (sample.getTimestampMs() >= windowStartMs) {
bucket.recordValue(sample.getValue());
}
}
rollups1m.computeIfAbsent(metricName, k -> new CopyOnWriteArrayList<>()).add(bucket);
}
public void runRollup5m(String metricName, long windowStartMs) {
long windowEndMs = windowStartMs + 300000;
List<RollupBucket> buckets1m = rollups1m.get(metricName);
if (buckets1m == null) return;
RollupBucket bucket5m = new RollupBucket(windowStartMs, windowEndMs, bucketBoundaries);
for (RollupBucket b1m : buckets1m) {
if (b1m.getStartTimeMs() >= windowStartMs && b1m.getEndTimeMs() <= windowEndMs) {
bucket5m.merge(b1m);
}
}
rollups5m.computeIfAbsent(metricName, k -> new CopyOnWriteArrayList<>()).add(bucket5m);
}
public void runRollup1h(String metricName, long windowStartMs) {
long windowEndMs = windowStartMs + 3600000;
List<RollupBucket> buckets5m = rollups5m.get(metricName);
if (buckets5m == null) return;
RollupBucket bucket1h = new RollupBucket(windowStartMs, windowEndMs, bucketBoundaries);
for (RollupBucket b5m : buckets5m) {
if (b5m.getStartTimeMs() >= windowStartMs && b5m.getEndTimeMs() <= windowEndMs) {
bucket1h.merge(b5m);
}
}
rollups1h.computeIfAbsent(metricName, k -> new CopyOnWriteArrayList<>()).add(bucket1h);
}
public List<RollupBucket> query(String metricName, long startTimeMs, long endTimeMs, String resolution) {
List<RollupBucket> source;
if ("1m".equals(resolution)) {
source = rollups1m.get(metricName);
} else if ("5m".equals(resolution)) {
source = rollups5m.get(metricName);
} else {
source = rollups1h.get(metricName);
}
if (source == null) return Collections.emptyList();
List<RollupBucket> result = new ArrayList<>();
for (RollupBucket b : source) {
if (b.getStartTimeMs() >= startTimeMs && b.getEndTimeMs() <= endTimeMs) {
result.add(b);
}
}
return result;
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== JAVA METRICS AGGREGATOR SIMULATION ===");
MetricsAggregator aggregator = new MetricsAggregator();
Map<String, String> tags = Map.of("env", "prod");
long startTime = System.currentTimeMillis();
aggregator.recordSample("api.latency", 120.5, tags);
aggregator.recordSample("api.latency", 85.0, tags);
aggregator.recordSample("api.latency", 450.0, tags);
aggregator.recordSample("api.latency", 12.0, tags);
aggregator.runRollup1m("api.latency", startTime - 10000);
List<RollupBucket> buckets = aggregator.query("api.latency", startTime - 20000, startTime + 100000, "1m");
System.out.println("Query returned buckets: " + buckets.size());
if (!buckets.isEmpty()) {
RollupBucket b = buckets.get(0);
System.out.println("Min: " + b.getMin());
System.out.println("Max: " + b.getMax());
System.out.println("Count: " + b.getCount());
System.out.println("p50: " + PercentileCalculator.computePercentile(b, 50.0));
System.out.println("p95: " + PercentileCalculator.computePercentile(b, 95.0));
System.out.println("p99: " + PercentileCalculator.computePercentile(b, 99.0));
}
System.out.println("=== END OF JAVA SIMULATION ===");
}
}