Functional Specifications
- Low-Latency Online serving: Quick features query interface backed by in-memory stores with under 10ms serving limits.
- Point-in-Time Correctness (AS-OF Joins): Assemble historical training arrays matched exactly at specific timestamps. Eliminates data leakage traps.
- Offline Ledger Storage: Maintain deep columnar logs tracking all feature adjustments sequentially.
- Dynamic Pipeline Registration: Connect processing algorithms with schemas, intervals, and fresh SLA configurations.
Production reference implementations demonstrating online caches, offline time-series records, binary floor searches, and training join checks:
// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
class FeatureDefinition {
private final String name;
private final String entityType;
private final String description;
public FeatureDefinition(String name, String entityType, String description) {
this.name = name;
this.entityType = entityType;
this.description = description;
}
public String getName() { return name; }
public String getEntityType() { return entityType; }
public String getDescription() { return description; }
}
class FeatureValue {
private final Object value;
private final long timestamp;
public FeatureValue(Object value, long timestamp) {
this.value = value;
this.timestamp = timestamp;
}
public Object getValue() { return value; }
public long getTimestamp() { return timestamp; }
}
class FeatureStore {
private final ConcurrentHashMap<String, FeatureDefinition> registry = new ConcurrentHashMap<>();
// Online Store: entityId -> (featureName -> FeatureValue)
private final ConcurrentHashMap<String, ConcurrentHashMap<String, FeatureValue>> onlineStore = new ConcurrentHashMap<>();
// Offline Store: entityId -> (featureName -> TreeMap<Long, Object>) (Timestamps sorted naturally)
private final ConcurrentHashMap<String, ConcurrentHashMap<String, TreeMap<Long, Object>>> offlineStore = new ConcurrentHashMap<>();
public void registerFeature(FeatureDefinition definition) {
registry.put(definition.getName(), definition);
}
public void ingestFeature(String entityId, String featureName, Object value, long timestamp) {
if (!registry.containsKey(featureName)) {
throw new IllegalArgumentException("Feature not registered: " + featureName);
}
// 1. Ingest to Online Store (Always keep the latest timestamp)
onlineStore.computeIfAbsent(entityId, k -> new ConcurrentHashMap<>())
.compute(featureName, (k, existing) -> {
if (existing == null || timestamp >= existing.getTimestamp()) {
return new FeatureValue(value, timestamp);
}
return existing;
});
// 2. Ingest to Offline Store (Append to timeseries for bulk analysis & training joins)
offlineStore.computeIfAbsent(entityId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(featureName, k -> new TreeMap<>())
.put(timestamp, value);
}
public Map<String, Object> getOnlineFeatures(String entityId, List<String> featureNames) {
Map<String, Object> result = new HashMap<>();
Map<String, FeatureValue> entityFeatures = onlineStore.get(entityId);
if (entityFeatures == null) return result;
for (String name : featureNames) {
FeatureValue val = entityFeatures.get(name);
if (val != null) {
result.put(name, val.getValue());
}
}
return result;
}
public Map<String, Object> getHistoricalFeatures(String entityId, List<String> featureNames, long asOfTimestamp) {
Map<String, Object> result = new HashMap<>();
ConcurrentHashMap<String, TreeMap<Long, Object>> entityFeatures = offlineStore.get(entityId);
if (entityFeatures == null) return result;
for (String name : featureNames) {
TreeMap<Long, Object> history = entityFeatures.get(name);
if (history != null) {
// floorEntry returns the greatest timestamp <= asOfTimestamp (Point-in-time Correct)
Map.Entry<Long, Object> entry = history.floorEntry(asOfTimestamp);
if (entry != null) {
result.put(name, entry.getValue());
}
}
}
return result;
}
}
public class Main {
public static void main(String[] args) {
System.out.println("=== ML FEATURE STORE SIMULATION RUNNER ===");
FeatureStore store = new FeatureStore();
// Register features
store.registerFeature(new FeatureDefinition("session_count", "user", "Total sessions of the user"));
store.registerFeature(new FeatureDefinition("fraud_risk", "user", "ML model calculated fraud probability"));
System.out.println("Registered features: session_count, fraud_risk");
// Ingest features over time
System.out.println("\nIngesting historical feature data...");
// User 1 updates
store.ingestFeature("user_123", "session_count", 5, 1000L);
store.ingestFeature("user_123", "fraud_risk", 0.02, 1000L);
store.ingestFeature("user_123", "session_count", 6, 2000L);
store.ingestFeature("user_123", "fraud_risk", 0.15, 2000L);
store.ingestFeature("user_123", "session_count", 8, 3000L);
store.ingestFeature("user_123", "fraud_risk", 0.82, 3000L);
System.out.println("Ingestion complete.");
// Query Online Features (should be latest values)
System.out.println("\n--- ONLINE FEATURE SERVING (Low latency) ---");
List<String> queryFeatures = Arrays.asList("session_count", "fraud_risk");
Map<String, Object> onlineVals = store.getOnlineFeatures("user_123", queryFeatures);
System.out.println("Online Features for user_123: " + onlineVals);
// Query Historical Features (Point-in-Time Correctness / AS-OF Joins)
System.out.println("\n--- HISTORICAL POINT-IN-TIME RETRIEVAL (AS-OF JOINS) ---");
long[] asOfTimes = { 500L, 1000L, 1500L, 2000L, 2500L, 3500L };
for (long time : asOfTimes) {
Map<String, Object> histVals = store.getHistoricalFeatures("user_123", queryFeatures, time);
System.out.println("AS-OF Timestamp " + time + " -> " + histVals);
}
System.out.println("\nSimulation completed successfully.");
}
}