I/D/E · kafka

Retention and Log Compaction

Summary

Deep dive into time-based and size-based retention policies, log compaction mechanics, and production configuration strategies

Retention Policies and Mechanisms

Time-Based Retention Deep Dive

Kafka’s time-based retention operates at the segment level using file modification timestamps:

Retention Timeline
Segment Creation  Active Writing  Sealed Segment  Retention Check  Deletion
    
t=0 t=segment.ms t=sealed t=retention.ms t=cleanup
    
New segment Segment full Stop writing Eligible for Delete
created or time limit Start new cleanup segment
segment on file mtime files

Advanced Time-Based Configuration:

# Production retention configuration
log.retention.ms=604800000              # 7 days (overrides hours/minutes)
log.retention.check.interval.ms=300000  # Check every 5 minutes
log.segment.ms=86400000                 # New segment daily (24 hours)

# Retention vs segment interaction
# Retention = 7 days, Segment = 1 day
# Result: 7 segments retained, each representing 1 day of data

Size-Based Retention Implementation

public class SizeBasedRetentionCalculator {

    public long calculateRetentionBytes(
            long dailyIngressGB,
            int retentionDays,
            int replicationFactor,
            double compressionRatio) {

        // Base calculation
        long dailyBytes = dailyIngressGB * 1024L * 1024L * 1024L;
        long totalUncompressed = dailyBytes * retentionDays;
        long totalCompressed = (long)(totalUncompressed / compressionRatio);
        long totalWithReplication = totalCompressed * replicationFactor;

        // Add safety margin
        long retentionBytes = (long)(totalWithReplication * 1.2); // 20% buffer

        return retentionBytes;
    }

    void exampleCalculation() {
        // Example: Financial transaction logging
        long retentionBytes = calculateRetentionBytes(
            100,    // 100 GB/day ingress
            30,     // 30 days retention
            3,      // 3x replication
            2.5     // 2.5:1 compression ratio
        );

        // Result:
        // 100GB * 30 days = 3TB raw
        // 3TB / 2.5 compression = 1.2TB compressed
        // 1.2TB * 3 replication = 3.6TB total
        // 3.6TB * 1.2 safety = 4.32TB retention.bytes
    }
}

Policy Combination Strategies

Multi-Policy Retention Configuration:

# Topic with both time and size limits
log.retention.ms=2592000000    # 30 days
log.retention.bytes=1073741824000  # 1TB per partition

# Cleanup triggers when EITHER condition met:
# - Data older than 30 days, OR
# - Partition size exceeds 1TB

# Use cases:
# - Burst traffic protection (size limit)
# - Compliance requirements (time limit)
# - Storage cost control (size limit)

Production Retention Patterns:

public class RetentionPatternCatalog {

    // Pattern 1: High-value transactional data
    void configureFinancialRetention() {
        Properties config = new Properties();
        config.put("log.retention.ms", "31536000000");      // 365 days
        config.put("log.retention.bytes", "-1");            // No size limit
        config.put("log.segment.ms", "86400000");           // Daily segments
        config.put("min.insync.replicas", "3");             // High durability
    }

    // Pattern 2: High-volume logs with cost control
    void configureLogRetention() {
        Properties config = new Properties();
        config.put("log.retention.ms", "604800000");        // 7 days
        config.put("log.retention.bytes", "107374182400");  // 100GB per partition
        config.put("log.segment.ms", "3600000");            // Hourly segments
        config.put("compression.type", "lz4");              // Reduce storage
    }

    // Pattern 3: Real-time metrics (short retention)
    void configureMetricsRetention() {
        Properties config = new Properties();
        config.put("log.retention.ms", "86400000");         // 1 day
        config.put("log.retention.bytes", "10737418240");   // 10GB per partition
        config.put("log.segment.ms", "300000");             // 5-minute segments
    }
}

Log Compaction Deep Dive

Compaction Algorithm and Process Flow

Log compaction maintains the latest value for each key while removing outdated records:

Compaction Process
Original Log:
[key1:v1][key2:v1][key1:v2][key3:v1][key2:v2][key1:v3]
     
t1 t2 t3 t4 t5 t6

After Compaction:
[key2:v2][key3:v1][key1:v3]
  
t5 t4 t6

Only latest value per key retained

Log Cleaner Architecture:

Log Cleaner Internal Flow
  
 Dirty Logs  Log Cleaner  Clean Logs 
 (uncomp)     (compacted) 
      
Head segment    Key Index   Tail segments 
Tail segments    (OffsetMap)   Latest values 
    
 
  
  Thread Pool  
 (configurable 
  parallelism) 
  

Advanced Compaction Configuration

Production Compaction Tuning:

# Log cleaner configuration
log.cleanup.policy=compact
log.cleaner.threads=8                           # Parallel cleaner threads
log.cleaner.io.max.bytes.per.second=1048576000  # 1GB/s I/O throttling
log.cleaner.dedupe.buffer.size=134217728        # 128MB dedup buffer

# Compaction triggers
log.cleaner.min.compaction.lag.ms=60000         # Wait 1 min before compact
log.cleaner.max.compaction.lag.ms=86400000     # Force compact within 24h
log.segment.ms=86400000                         # 24h segments
log.cleaner.min.cleanable.ratio=0.5             # Compact when 50% dirty

# Advanced settings
log.cleaner.io.buffer.size=524288               # 512KB I/O buffer
log.cleaner.io.buffer.load.factor=0.9           # Buffer utilization
log.cleaner.backoff.ms=15000                    # Backoff between runs

Compaction Performance Analysis:

public class CompactionPerformanceMonitor {

    public CompactionMetrics analyzeCompactionEffectiveness() {
        return CompactionMetrics.builder()
            .dirtyRatio(calculateDirtyRatio())
            .compactionRate(getCompactionRate())
            .keySpaceReduction(getKeySpaceReduction())
            .storageReclaimed(getStorageReclaimed())
            .ioImpact(getIOImpact())
            .build();
    }

    private double calculateDirtyRatio() {
        // dirty.ratio = dirty_bytes / (clean_bytes + dirty_bytes)
        // Optimal range: 0.3 - 0.7 for balanced performance

        long dirtyBytes = getDirtyLogSize();
        long cleanBytes = getCleanLogSize();
        return (double) dirtyBytes / (dirtyBytes + cleanBytes);
    }

    void optimizeCompactionScheduling() {
        double dirtyRatio = calculateDirtyRatio();

        if (dirtyRatio > 0.8) {
            // High dirty ratio - increase compaction frequency
            updateConfig("log.cleaner.min.compaction.lag.ms", "30000");  // 30s
            updateConfig("log.cleaner.threads", "12");                   // More threads

        } else if (dirtyRatio < 0.2) {
            // Low dirty ratio - reduce compaction frequency
            updateConfig("log.cleaner.min.compaction.lag.ms", "300000"); // 5min
            updateConfig("log.cleaner.threads", "4");                    // Fewer threads
        }
    }
}

Key-Based Compaction Strategies

Effective Key Design Patterns:

public class CompactionKeyStrategies {

    // Pattern 1: Entity state management
    void entityStateKeys() {
        // Key: user_id
        // Value: Complete user state (JSON/Avro)
        // Result: Latest state per user maintained

        String key = "user_123";
        UserState value = UserState.builder()
            .userId("user_123")
            .email("user@example.com")
            .lastLogin(Instant.now())
            .build();
    }

    // Pattern 2: Configuration management
    void configurationKeys() {
        // Key: config_category:setting_name
        // Value: Setting value
        // Result: Latest config per setting

        String key = "database:connection_timeout";
        String value = "30000";
    }

    // Pattern 3: State machine events (anti-pattern)
    void stateMachineAntiPattern() {
        // WRONG: State transition events with timestamp keys
        // Key: user_123:timestamp  ← Each event has unique key
        // Result: No compaction benefit, all events retained

        // CORRECT: Current state with entity key
        // Key: user_123  ← Same key for all state updates
        // Result: Only latest state retained
    }
}

Segment Management and Cleanup

Segment Lifecycle Management

Segment Creation and Rollover:

Segment Lifecycle
Active Segment Sealed Segment Eligible for Cleanup
  
  
 Rollover  Time/Size 
 .log  Triggers:  .log (sealed) Triggers:  .log (old) 
 .index  • Size  .index retention .index 
 .timeindex  • Time  .timeindex  • compact  .timeindex 
 • Force  

Production Segment Configuration:

# Segment size management
log.segment.bytes=1073741824        # 1GB segments (balance between size & granularity)
log.index.size.max.bytes=10485760   # 10MB index files
log.index.interval.bytes=4096       # Index entry every 4KB

# Segment time management
log.segment.ms=86400000             # 24h segments (daily rollover)
log.roll.jitter.ms=3600000          # 1h jitter (spread rollover load)

# Segment cleanup optimization
log.retention.check.interval.ms=300000  # Check every 5 minutes
log.segment.delete.delay.ms=60000        # 1 minute delay before deletion

File System Layout and Optimization

Kafka Directory Structure:

# Optimal file system layout
/var/kafka-logs/
├── topic-partition-0/
   ├── 00000000000000000000.log        # Segment 0 (oldest)
   ├── 00000000000000000000.index      # Offset index
   ├── 00000000000000000000.timeindex  # Time index
   ├── 00000000000000100000.log        # Segment 1
   ├── 00000000000000100000.index
   ├── 00000000000000100000.timeindex
   ├── 00000000000000200000.log        # Segment 2 (active)
   ├── 00000000000000200000.index
   ├── 00000000000000200000.timeindex
   └── leader-epoch-checkpoint         # Leadership changes

File System Optimization:

# Production file system tuning
# Mount options for Kafka log directories
mount -o noatime,nodiratime /dev/sdb1 /var/kafka-logs

# File system selection
# XFS: Recommended for large files, better performance
# EXT4: Acceptable alternative, wider compatibility

# I/O scheduler optimization
echo deadline > /sys/block/sdb/queue/scheduler  # Better for sequential I/O

# File descriptor limits
echo "kafka soft nofile 100000" >> /etc/security/limits.conf
echo "kafka hard nofile 100000" >> /etc/security/limits.conf

Cleanup Process Optimization

public class SegmentCleanupOptimizer {

    public void optimizeCleanupScheduling() {
        // Cleanup process phases:
        // 1. Identify segments eligible for deletion
        // 2. Mark segments for deletion (rename to .deleted)
        // 3. Asynchronous deletion after delay

        Properties config = new Properties();

        // Balance cleanup frequency vs I/O impact
        config.put("log.retention.check.interval.ms", "300000");    // 5 minutes
        config.put("log.segment.delete.delay.ms", "60000");         // 1 minute delay

        // Spread cleanup load across time
        config.put("log.roll.jitter.ms", "3600000");                // 1 hour jitter
    }

    public void monitorCleanupHealth() {
        // Key metrics to track:
        long pendingDeletes = countPendingDeletes();
        double deletionRate = calculateDeletionRate();
        long diskSpaceReclaimed = getDiskSpaceReclaimed();

        // Alert conditions:
        if (pendingDeletes > 1000) {
            alert("High number of pending segment deletes");
        }

        if (deletionRate < expectedDeletionRate * 0.5) {
            alert("Segment deletion falling behind retention schedule");
        }
    }
}

Capacity Planning and Cost Optimization

Storage Capacity Modeling

public class StorageCapacityPlanner {

    public StorageRequirements calculateStorageNeeds(
            TopicProfile profile,
            RetentionPolicy retention,
            ClusterConfig cluster) {

        // Base storage calculation
        long dailyBytes = profile.messagesPerDay * profile.avgMessageSizeBytes;
        long retentionBytes = dailyBytes * retention.retentionDays;

        // Apply compression
        long compressedBytes = (long)(retentionBytes / profile.compressionRatio);

        // Apply replication
        long replicatedBytes = compressedBytes * cluster.replicationFactor;

        // Add overhead (indexes, metadata)
        double overheadRatio = 0.15; // 15% overhead typical
        long totalBytes = (long)(replicatedBytes * (1 + overheadRatio));

        // Add growth and safety margins
        long safetyMargin = (long)(totalBytes * 0.25); // 25% safety
        long growthProjection = calculateGrowthProjection(totalBytes, 12); // 12 months

        return StorageRequirements.builder()
            .baseRequirement(totalBytes)
            .withSafetyMargin(totalBytes + safetyMargin)
            .withGrowthProjection(totalBytes + safetyMargin + growthProjection)
            .build();
    }

    public void optimizeStorageCosts() {
        // Cost optimization strategies:

        // 1. Right-size retention policies
        optimizeRetentionPolicies();

        // 2. Implement tiered storage
        configureTieredStorage();

        // 3. Optimize compression
        tuneCompressionSettings();

        // 4. Monitor and alert on storage growth
        setupStorageAlerting();
    }

    private void optimizeRetentionPolicies() {
        // Analyze actual data access patterns
        Map<String, AccessPattern> accessPatterns = analyzeAccessPatterns();

        for (String topic : accessPatterns.keySet()) {
            AccessPattern pattern = accessPatterns.get(topic);

            if (pattern.getLastAccessDays() < 7 && pattern.getCurrentRetentionDays() > 30) {
                // Reduce retention for rarely accessed topics
                recommendRetentionChange(topic, 7);
            }
        }
    }
}

Cost Model and Analysis

Storage Cost Breakdown:

public class StorageCostAnalyzer {

    public CostBreakdown analyzeMonthlyCosts(
            long totalStorageGB,
            StorageTier tier,
            Region region) {

        // Base storage costs (example AWS pricing)
        double gpSSDCostPerGB = 0.10;    // gp3 SSD
        double ioPSSDCostPerGB = 0.125;  // io2 SSD
        double s3CostPerGB = 0.023;      // S3 Standard

        double monthlyCost = switch(tier) {
            case HOT_SSD -> totalStorageGB * gpSSDCostPerGB;
            case PERFORMANCE_SSD -> totalStorageGB * ioPSSDCostPerGB;
            case COLD_S3 -> totalStorageGB * s3CostPerGB;
        };

        // Add I/O costs
        double ioCosts = calculateIOCosts(totalStorageGB, tier);

        // Add network costs for replication
        double networkCosts = calculateNetworkCosts(totalStorageGB);

        return CostBreakdown.builder()
            .storageCosts(monthlyCost)
            .ioCosts(ioCosts)
            .networkCosts(networkCosts)
            .totalCosts(monthlyCost + ioCosts + networkCosts)
            .build();
    }

    public void generateCostOptimizationReport() {
        // Identify cost optimization opportunities:

        // 1. Topics with high storage cost but low access
        List<String> overRetainedTopics = findOverRetainedTopics();

        // 2. Topics suitable for compression optimization
        List<String> compressionCandidates = findCompressionCandidates();

        // 3. Topics suitable for tiered storage
        List<String> tieringCandidates = findTieringCandidates();

        // 4. Unused or low-value topics
        List<String> unusedTopics = findUnusedTopics();

        generateReport(overRetainedTopics, compressionCandidates,
                      tieringCandidates, unusedTopics);
    }
}

Monitoring and Troubleshooting

Essential Retention Metrics

public class RetentionMonitoring {

    public void setupRetentionAlerting() {
        // Critical storage alerts
        alertOnMetric("kafka.log:type=LogSize,name=Size",
                     threshold -> threshold > getMaxDiskCapacity() * 0.85,
                     "Disk usage critical - approaching capacity limit");

        alertOnMetric("kafka.log:type=LogManager,name=OfflineLogDirectoryCount",
                     count -> count > 0,
                     "Log directory offline - data loss risk");

        // Retention process health
        alertOnMetric("kafka.log:type=LogCleanerManager,name=uncleanable-partitions-count",
                     count -> count > 0,
                     "Partitions unable to be cleaned");

        alertOnMetric("kafka.log:type=LogCleaner,name=max-dirty-percent",
                     percent -> percent > 0.8,
                     "Log cleaner falling behind - high dirty ratio");

        // Segment management
        alertOnMetric("kafka.log:type=LogManager,name=LogFlushRateAndTimeMs",
                     rate -> rate < expectedFlushRate * 0.5,
                     "Log flush rate degraded");
    }

    public RetentionHealthReport generateHealthReport() {
        return RetentionHealthReport.builder()
            .storageUtilization(calculateStorageUtilization())
            .retentionCompliance(checkRetentionCompliance())
            .compactionEffectiveness(measureCompactionEffectiveness())
            .segmentHealth(assessSegmentHealth())
            .cleanupPerformance(measureCleanupPerformance())
            .build();
    }
}

Troubleshooting Playbook

Common Issues and Solutions:

public class RetentionTroubleshooting {

    public void troubleshootHighDiskUsage() {
        System.out.println("DISK USAGE TROUBLESHOOTING:");
        System.out.println("1. Check retention policies:");
        System.out.println("   kafka-topics --describe --topic <topic>");
        System.out.println("2. Verify cleanup is running:");
        System.out.println("   Check JMX: kafka.log:type=LogCleaner,name=cleaner-recopy-percent");
        System.out.println("3. Look for stuck segments:");
        System.out.println("   ls -la /kafka-logs/*/");
        System.out.println("4. Check for errors:");
        System.out.println("   grep ERROR /kafka-logs/log-cleaner.log");
    }

    public void troubleshootCompactionIssues() {
        System.out.println("COMPACTION TROUBLESHOOTING:");
        System.out.println("1. Check compaction lag:");
        System.out.println("   JMX: kafka.log:type=LogCleaner,name=max-compaction-delay-secs");
        System.out.println("2. Verify key distribution:");
        System.out.println("   kafka-dump-log --files /kafka-logs/topic-0/00000.log");
        System.out.println("3. Check cleaner thread status:");
        System.out.println("   JMX: kafka.log:type=LogCleaner,name=cleaner-recopy-percent");
        System.out.println("4. Review memory allocation:");
        System.out.println("   Check log.cleaner.dedupe.buffer.size setting");
    }

    public void troubleshootSlowCleanup() {
        // Step 1: Check I/O throttling
        double ioThrottle = getIOThrottleLimit();
        if (ioThrottle < 100_000_000) { // 100MB/s
            System.out.println("Consider increasing log.cleaner.io.max.bytes.per.second");
        }

        // Step 2: Check thread allocation
        int cleanerThreads = getCleanerThreadCount();
        int availableCores = Runtime.getRuntime().availableProcessors();
        if (cleanerThreads < availableCores / 4) {
            System.out.println("Consider increasing log.cleaner.threads");
        }

        // Step 3: Check segment sizing
        analyzeSegmentSizing();
    }
}

This production-focused deep dive into Kafka retention and log compaction provides the technical expertise needed for implementing cost-effective, scalable storage strategies in enterprise environments. The content emphasizes operational excellence, capacity planning, and troubleshooting skills essential for managing large-scale Kafka deployments.

See Also: [[Producer_Mechanics]], [[Consumer_Groups]], [[Event_Sourcing_Patterns]], [[Tiered_Storage_Strategies]]

Kafka Ch 4/6
  1. 1 Kafka Architecture - Core Concepts 45m
  2. 2 Producer Mechanics - Under the Hood 35m
  3. 3 Consumer Groups and Rebalancing 30m
  4. 4 Retention and Log Compaction 28m
  5. 5 Transactions and Exactly-Once Semantics 32m
  6. 6 Event Sourcing and CQRS with Kafka 38m