I/D/E · Patterns

Checkpointing

Summary

Periodically saving processing state to enable recovery from failures without reprocessing all data from the beginning

TL;DR

Checkpointing is the process of periodically saving the processing state of a stream processing application, enabling it to resume from the last checkpoint rather than restart from scratch after failures. Essential for exactly-once semantics in systems like Apache Flink, Spark Streaming, and Kafka Streams.

Visual Overview

Checkpointing Overview
WITHOUT CHECKPOINTING (Restart from beginning)

  Stream Processing Job                         
                                                
  T0: Start processing from offset 0            
  T1: Processed 1,000,000 messages              
  T2: Processed 5,000,000 messages              
  T3: CRASH! ⚡                                  
  T4: Restart from offset 0
       Reprocess all 5,000,000 messages        
       Hours of lost work                      
       Possible duplicate outputs              


WITH CHECKPOINTING (Resume from last checkpoint)

  Stream Processing Job with Checkpoints        
                                                
  T0: Start, checkpoint offset=0                
  T1: Process 1M messages, checkpoint offset=1M 
  T2: Process 4M more, checkpoint offset=5M     
  T3: CRASH! ⚡                                  
  T4: Restore from checkpoint                  
       Resume from offset=5M                   
       Only reprocess since last checkpoint    
       Fast recovery (seconds vs hours)        
       No duplicates (exactly-once)            


CHECKPOINT LIFECYCLE

  Checkpoint Coordinator                        
                                               
  1. Trigger Checkpoint                         
     Insert barrier into stream                 
                                               
  Stream: [msg1, msg2, BARRIER, msg3, msg4]     
                                               
  2. Operator receives barrier                  
     - Save current state to storage            
     - Save current offset                      
     - Acknowledge checkpoint                   
                                               
  3. All operators acknowledged?                
     YES  Checkpoint COMPLETE                 
     NO   Wait or timeout                      
                                               
  4. Commit checkpoint metadata                 
     Store: {                                   
       checkpoint_id: 123                       
       timestamp: T                             
       state_location: s3://...                 
       offsets: {partition0: 5000, ...}         
     }                                          


RECOVERY FLOW

  Job Failure Detected                          
                                               
  1. Find Latest Successful Checkpoint          
     checkpoint_id: 123                         
     timestamp: 2 minutes ago                   
                                               
  2. Restore Operator State                     
     Load state from: s3://checkpoint-123/      
     - Counters, windows, caches                
     - Application state                        
                                               
  3. Reset Stream Positions                     
     Kafka offsets: {partition0: 5000, ...}     
                                               
  4. Resume Processing                          
     Start from checkpoint offsets              
     Reprocess messages since checkpoint        
                                                
  Recovery Time Objective (RTO):                
  - Checkpoint interval: 1 minute               
  - State restore: 10 seconds                   
  - Reprocess: 1 minute of data                 
  Total: ~90 seconds downtime                   

Core Explanation

What is Checkpointing?

Checkpointing is a fault-tolerance mechanism that periodically saves the state of a distributed computation to durable storage. For stream processing, this includes:

  1. Application State: Counters, aggregations, windows, caches
  2. Input Positions: Kafka offsets, file positions, database cursors
  3. Metadata: Checkpoint ID, timestamp, version

Key Property: Consistency

A checkpoint must capture a consistent snapshot across all parallel operators, ensuring:

  • No messages are lost (completeness)
  • No messages are duplicated (exactly-once)
  • State is consistent across operators

Checkpoint Barriers (Chandy-Lamport Algorithm)

Stream processing systems use barriers to coordinate consistent snapshots without stopping the stream:

Checkpoint Barriers
How Barriers Work:

Source Operator injects barriers into stream:

  Kafka Topic                           
  [msg1, msg2, msg3, BARRIER_10, msg4]  
                                     
  Operator A processes messages         
  When BARRIER_10 arrives:              
  1. Save state (e.g., count=150)       
  2. Forward barrier downstream         
  3. Continue processing                


Multiple Input Streams (Barrier Alignment):

  Stream 1: [msg1, BARRIER_10, msg2]    
  Stream 2: [msg3, msg4, BARRIER_10]    
                                      
  Join Operator                         
                                       
  Wait for BARRIER_10 from BOTH streams 
  (buffer messages from faster stream)  
                                       
  All barriers received  Save state    


Why Barriers?
 No need to pause the stream (live checkpointing)
 Consistent snapshot across distributed operators
 Minimal impact on throughput (less than 5% overhead)

Checkpoint Interval Trade-offs

Checkpoint Interval Trade-offs
Frequent Checkpoints (e.g., every 10 seconds):
Pros:
 Fast recovery (less data to reprocess)
 Small recovery window
Cons:
 Higher overhead (I/O, CPU for serialization)
 More storage costs
 Can slow down processing

Infrequent Checkpoints (e.g., every 10 minutes):
Pros:
 Lower overhead
 Better throughput
Cons:
 Slower recovery (more data to reprocess)
 Larger recovery window
 Higher risk of data loss

Recommended: 1-5 minutes for most applications

Checkpoint Storage

Where to Store Checkpoints:

Checkpoint Storage Options
1. Distributed File Systems:
 - HDFS, S3, GCS, Azure Blob
 - Pros: Scalable, durable, cost-effective
 - Cons: Higher latency (~100ms writes)
 - Use case: Production systems

2. Distributed Databases:
 - RocksDB (local + replicated)
 - Cassandra, DynamoDB
 - Pros: Fast writes (~10ms)
 - Cons: Higher cost, operational complexity
 - Use case: Low-latency requirements

3. In-Memory (with replication):
 - Redis, Memcached with persistence
 - Pros: Very fast (~1ms)
 - Cons: Limited capacity, expensive
 - Use case: Small state, ultra-low latency

Storage Structure:
checkpoint_dir/
 checkpoint-000010/
  operator-state-0
  operator-state-1
  metadata
 checkpoint-000011/
  ...
 _metadata (latest checkpoint info)

Incremental Checkpointing

Incremental Checkpointing
Full Checkpoint (naive approach):
Every checkpoint saves entire state
State size: 10 GB
Checkpoint interval: 1 minute
I/O: 10 GB/minute = 166 MB/s ✕ (expensive!)

Incremental Checkpoint (RocksDB-based):
Only save changed state since last checkpoint

 Checkpoint 10: Save full state (10GB) 
 Checkpoint 11: Save delta (100MB) 
 Checkpoint 12: Save delta (150MB) 
 Checkpoint 13: Save delta (120MB) 
  
 Recovery: Restore checkpoint 10 
 + Apply deltas 11,12,13 


Benefits:
 Reduce I/O by 10-100x
 Faster checkpoint completion
 Lower storage costs

Implementation: Apache Flink RocksDB state backend

Exactly-Once Semantics with Checkpointing

Exactly-Once Semantics
How Checkpointing Enables Exactly-Once:

1. Atomic Commits:
 
   Process messages 1-100              
   Update state (count += 100)         
                                      
   Checkpoint:                         
   - Save state: count=100             
   - Save offset: 100                  
   - Commit offset to Kafka (atomic)   
                                      
   If crash before checkpoint:         
    Restore offset=0, count=0         
    Reprocess messages 1-100         
                                       
   If crash after checkpoint:          
    Restore offset=100, count=100     
    Skip messages 1-100              
 

2. Two-Phase Commit (for sinks):
 
   Phase 1: Pre-commit                 
   - Write to staging table            
   - Don't make visible yet            
                                      
   Checkpoint completes               
                                      
   Phase 2: Commit                     
   - Move from staging to production   
   - Make writes visible               
                                       
   If crash before Phase 2:            
    Staging writes discarded         
    Reprocess and retry               
 

Real Systems Using Checkpointing

SystemCheckpoint MechanismDefault IntervalState BackendUse Case
Apache FlinkChandy-Lamport barriersDisabled (manual)RocksDB, HeapReal-time analytics, ETL
Spark Structured StreamingMicro-batch checkpointing5 secondsHDFS, S3Batch + streaming
Kafka StreamsState stores + offset commits30 secondsRocksDBStream processing
Apache StormRecord-level ackingN/A (acking-based)N/ALow-latency streaming
Apache SamzaChangelog-based1 minuteRocksDB + KafkaStateful streaming
Apache Flink Checkpointing
Flink Checkpoint Configuration:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing every 60 seconds
env.enableCheckpointing(60000);

// Checkpoint configuration
CheckpointConfig config = env.getCheckpointConfig();

// Exactly-once mode (vs at-least-once)
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Minimum pause between checkpoints (prevent overload)
config.setMinPauseBetweenCheckpoints(30000);

// Checkpoint timeout (fail if takes too long)
config.setCheckpointTimeout(600000);

// Max concurrent checkpoints
config.setMaxConcurrentCheckpoints(1);

// Retain checkpoints on cancellation (for savepoints)
config.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// State backend (where to store state)
env.setStateBackend(new RocksDBStateBackend("s3://checkpoints/"));

Checkpoint Flow in Flink:

 1. JobManager triggers checkpoint 
 - Increment checkpoint ID 
 - Send trigger to sources 
  
 2. Sources inject barriers 
 - Insert barrier into stream 
 - Save current offset 
  
 3. Barriers flow through operators 
 - Each operator saves state 
 - Acknowledges checkpoint 
  
 4. Sinks receive barriers 
 - Pre-commit external writes 
 - Acknowledge checkpoint 
  
 5. JobManager collects acks 
 - All tasks acknowledged?  
 - Commit checkpoint metadata 
 - Finalize external writes 


Recovery in Flink:

1. Job fails (e.g., task exception, node crash)
2. JobManager restarts job from latest checkpoint
3. All tasks restore state from checkpoint
4. Sources reset to checkpoint offsets
5. Processing resumes (exactly-once guaranteed)

Case Study: Kafka Streams Checkpointing

Kafka Streams Checkpointing
Kafka Streams uses:
- Local RocksDB for state storage
- Kafka topics for changelog (state replication)
- Kafka offsets for position tracking

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
config.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

// Commit interval (checkpoint frequency)
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);

// State store configuration
config.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 600000);

How it works:

 State Changes: 
 1. Write to local RocksDB 
 2. Write to changelog topic (Kafka) 
 3. Commit offset every 30s 
  
 Recovery: 
 1. Restore RocksDB from changelog 
 2. Resume from committed offset 
 3. Replay uncommitted messages 


Exactly-once in Kafka Streams (EOS):

- Transactional writes to output topics
- Atomic offset commits
- Changelog updates in same transaction
- Result: End-to-end exactly-once

When to Use Checkpointing

✓ Perfect Use Cases

Long-Running Stream Processing Jobs

Long-Running Stream Processing
Scenario: Real-time analytics running 24/7
Requirement: No data loss, exactly-once semantics
Solution: Checkpoint every 1 minute to S3
Benefit: Recover from failures in under 2 minutes

Stateful Aggregations

Stateful Aggregations
Scenario: Counting events per user over 24-hour windows
State size: 100 GB
Solution: Incremental checkpointing with RocksDB
Benefit: Preserve state across restarts, avoid recomputation

Complex Event Processing

Complex Event Processing
Scenario: Multi-stage pipeline with joins, enrichment
Requirement: Consistent state across operators
Solution: Distributed checkpointing with barriers
Benefit: Consistent snapshots without pausing stream

✕ When NOT to Use (or Use Carefully)

Stateless Processing

Stateless Processing
Problem: No state to checkpoint, pure transformation
Example: Filter, map, simple parsing
Alternative: Just reprocess from source (no checkpointing overhead)

Ultra-Low Latency Requirements (< 1ms)

Ultra-Low Latency
Problem: Checkpointing adds latency (barrier alignment)
Alternative: At-least-once processing with deduplication
Example: High-frequency trading, real-time bidding

Small Batch Jobs (< 1 minute)

Small Batch Jobs
Problem: Checkpoint overhead > job duration
Alternative: Just rerun the job on failure
Example: Scheduled micro-batch jobs

Interview Application

Common Interview Question

Q: “Design a real-time fraud detection system processing millions of transactions/second. How would you ensure exactly-once processing and fast recovery from failures?”

Strong Answer:

“I’d design a checkpointed stream processing system:

Architecture:

  • Framework: Apache Flink (for exactly-once guarantees)
  • State Backend: RocksDB with incremental checkpoints
  • Checkpoint Storage: S3 (durable, scalable)
  • Checkpoint Interval: 1 minute (balance overhead vs recovery time)

State Management:

  • User Profile Cache: Latest transactions per user (for pattern detection)
  • Fraud Rules State: Configurable thresholds, ML model parameters
  • Windows: 5-minute sliding windows for transaction aggregations
  • State Size Estimate: 50 GB (10M users × 5KB profile)

Checkpointing Strategy:

  1. Incremental Checkpoints:
    • Full checkpoint hourly
    • Incremental deltas every minute
    • Reduces I/O from 50 GB/min to ~500 MB/min
  2. Asynchronous Snapshots:
    • Checkpoint in background threads
    • Minimal impact on throughput (less than 5%)
  3. Barrier Alignment:
    • Use Chandy-Lamport barriers for consistency
    • Handle out-of-order messages correctly

Exactly-Once Guarantees:

  1. Source: Read from Kafka with transactional semantics
  2. Processing: Flink exactly-once mode
  3. Sink: Two-phase commit to output database
    • Pre-commit during checkpoint
    • Finalize after checkpoint complete

Recovery Flow:

  1. Failure Detection: Flink JobManager detects task failure
  2. Checkpoint Restore:
    • Restore state from latest checkpoint (S3)
    • Reset Kafka offsets to checkpoint position
    • Time: ~30 seconds
  3. Reprocessing:
    • Replay 1 minute of messages since checkpoint
    • Time: ~30 seconds (depends on throughput)
  4. Total RTO: ~90 seconds

Optimization:

  • Local State: RocksDB on SSD for fast access
  • State Sharding: Partition state by user_id (key-by user_id)
  • Compression: Enable Snappy compression for checkpoints
  • Monitoring: Alert on checkpoint duration > 60s

Trade-offs:

  • Checkpoint interval: 1 min = 5% overhead, 90s recovery
  • Faster checkpoints (30s): 8% overhead, 60s recovery
  • Slower checkpoints (5 min): 2% overhead, 5-6 min recovery
  • Chose 1 min as balance for fraud detection SLA

Disaster Recovery:

  • Savepoints: Manual checkpoints for version upgrades
  • Cross-region replication: Replicate checkpoints to DR region
  • Retention: Keep last 10 checkpoints (10 hours history)“

Code Example

Simple Checkpointing Implementation

import time
import json
import hashlib
from typing import Dict, Any
from pathlib import Path

class CheckpointManager:
    """
    Simple checkpointing system for stream processing
    """
    def __init__(self, checkpoint_dir: str, interval_seconds: int = 60):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
        self.interval_seconds = interval_seconds
        self.last_checkpoint_time = 0
        self.checkpoint_id = 0

    def should_checkpoint(self) -> bool:
        """Check if it's time to create a checkpoint"""
        return time.time() - self.last_checkpoint_time >= self.interval_seconds

    def create_checkpoint(self, state: Dict[str, Any], offset: int) -> int:
        """
        Create a checkpoint by saving state and offset
        Returns checkpoint ID
        """
        self.checkpoint_id += 1
        checkpoint_path = self.checkpoint_dir / f"checkpoint-{self.checkpoint_id}"
        checkpoint_path.mkdir(parents=True, exist_ok=True)

        # Save application state
        state_file = checkpoint_path / "state.json"
        with open(state_file, 'w') as f:
            json.dump(state, f, indent=2)

        # Save stream offset
        offset_file = checkpoint_path / "offset.txt"
        with open(offset_file, 'w') as f:
            f.write(str(offset))

        # Save checkpoint metadata
        metadata = {
            'checkpoint_id': self.checkpoint_id,
            'timestamp': time.time(),
            'offset': offset,
            'state_size': state_file.stat().st_size,
            'checksum': self._compute_checksum(state_file)
        }

        metadata_file = checkpoint_path / "metadata.json"
        with open(metadata_file, 'w') as f:
            json.dump(metadata, f, indent=2)

        # Update last checkpoint pointer
        latest_file = self.checkpoint_dir / "_latest"
        with open(latest_file, 'w') as f:
            f.write(str(self.checkpoint_id))

        self.last_checkpoint_time = time.time()

        print(f"✓ Checkpoint {self.checkpoint_id} created: "
              f"offset={offset}, state_size={metadata['state_size']} bytes")

        return self.checkpoint_id

    def restore_latest_checkpoint(self) -> tuple[Dict[str, Any], int]:
        """
        Restore state and offset from latest checkpoint
        Returns (state, offset) tuple
        """
        latest_file = self.checkpoint_dir / "_latest"

        if not latest_file.exists():
            print("No checkpoint found, starting from beginning")
            return {}, 0

        # Read latest checkpoint ID
        with open(latest_file, 'r') as f:
            checkpoint_id = int(f.read().strip())

        checkpoint_path = self.checkpoint_dir / f"checkpoint-{checkpoint_id}"

        # Verify checkpoint integrity
        metadata_file = checkpoint_path / "metadata.json"
        with open(metadata_file, 'r') as f:
            metadata = json.load(f)

        state_file = checkpoint_path / "state.json"
        checksum = self._compute_checksum(state_file)

        if checksum != metadata['checksum']:
            raise Exception(f"Checkpoint {checkpoint_id} corrupted! "
                          f"Checksum mismatch")

        # Restore state
        with open(state_file, 'r') as f:
            state = json.load(f)

        # Restore offset
        offset_file = checkpoint_path / "offset.txt"
        with open(offset_file, 'r') as f:
            offset = int(f.read().strip())

        self.checkpoint_id = checkpoint_id

        print(f"✓ Restored checkpoint {checkpoint_id}: "
              f"offset={offset}, state_size={len(state)} items")

        return state, offset

    def _compute_checksum(self, file_path: Path) -> str:
        """Compute SHA256 checksum of file"""
        sha256 = hashlib.sha256()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b''):
                sha256.update(chunk)
        return sha256.hexdigest()

    def cleanup_old_checkpoints(self, keep_last: int = 3):
        """Remove old checkpoints, keeping only the last N"""
        checkpoints = sorted(
            [d for d in self.checkpoint_dir.iterdir() if d.is_dir()],
            key=lambda d: int(d.name.split('-')[1])
        )

        # Remove old checkpoints
        for checkpoint in checkpoints[:-keep_last]:
            print(f"Removing old checkpoint: {checkpoint.name}")
            for file in checkpoint.iterdir():
                file.unlink()
            checkpoint.rmdir()

# Stream Processor with Checkpointing
class StreamProcessor:
    """
    Stream processor with checkpoint support
    """
    def __init__(self, checkpoint_dir: str):
        self.checkpoint_manager = CheckpointManager(checkpoint_dir)
        self.state: Dict[str, int] = {}  # word -> count
        self.current_offset = 0

        # Restore from last checkpoint
        self.state, self.current_offset = \
            self.checkpoint_manager.restore_latest_checkpoint()

    def process_message(self, message: str):
        """Process a single message (word count)"""
        words = message.lower().split()

        for word in words:
            self.state[word] = self.state.get(word, 0) + 1

        self.current_offset += 1

        # Checkpoint if interval elapsed
        if self.checkpoint_manager.should_checkpoint():
            self.checkpoint()

    def checkpoint(self):
        """Create a checkpoint"""
        self.checkpoint_manager.create_checkpoint(
            self.state,
            self.current_offset
        )

    def get_word_count(self, word: str) -> int:
        """Query current state"""
        return self.state.get(word, 0)

# Usage Example
if __name__ == '__main__':
    import sys

    processor = StreamProcessor('/tmp/checkpoints')

    # Simulate stream processing
    messages = [
        "hello world",
        "hello kafka",
        "stream processing",
        "checkpoint test",
        "hello checkpoint"
    ]

    print("\n=== Processing Stream ===")
    for i, msg in enumerate(messages):
        print(f"Message {i}: {msg}")
        processor.process_message(msg)

        # Simulate checkpoint every 2 messages
        if (i + 1) % 2 == 0:
            processor.checkpoint()

    # Query state
    print("\n=== Query State ===")
    print(f"Count of 'hello': {processor.get_word_count('hello')}")
    print(f"Count of 'checkpoint': {processor.get_word_count('checkpoint')}")

    # Simulate crash and recovery
    print("\n=== Simulating Crash and Recovery ===")
    del processor  # "crash"

    processor_recovered = StreamProcessor('/tmp/checkpoints')
    print(f"Count of 'hello' after recovery: "
          f"{processor_recovered.get_word_count('hello')}")
    print(f"Current offset after recovery: "
          f"{processor_recovered.current_offset}")

    # Cleanup
    processor_recovered.checkpoint_manager.cleanup_old_checkpoints(keep_last=2)
from dataclasses import dataclass
from typing import Union, List
import queue

@dataclass
class Message:
    """Regular stream message"""
    data: str
    offset: int

@dataclass
class Barrier:
    """Checkpoint barrier"""
    checkpoint_id: int

class BarrierBuffer:
    """
    Buffer messages until all input barriers received
    (for operators with multiple inputs)
    """
    def __init__(self, num_inputs: int):
        self.num_inputs = num_inputs
        self.pending_checkpoint = None
        self.received_barriers = set()
        self.buffered_messages = {i: [] for i in range(num_inputs)}

    def process(self, message: Union[Message, Barrier], input_id: int):
        """
        Process message or barrier from input stream
        Returns: (messages_to_process, should_checkpoint)
        """
        if isinstance(message, Barrier):
            if self.pending_checkpoint is None:
                # First barrier for this checkpoint
                self.pending_checkpoint = message.checkpoint_id
                self.received_barriers = {input_id}

                # Return buffered messages from this input
                buffered = self.buffered_messages[input_id]
                self.buffered_messages[input_id] = []
                return buffered, False

            elif message.checkpoint_id == self.pending_checkpoint:
                # Another barrier for same checkpoint
                self.received_barriers.add(input_id)

                # Return buffered messages from this input
                buffered = self.buffered_messages[input_id]
                self.buffered_messages[input_id] = []

                # All barriers received?
                if len(self.received_barriers) == self.num_inputs:
                    # Checkpoint!
                    self.pending_checkpoint = None
                    self.received_barriers = set()
                    return buffered, True
                else:
                    return buffered, False

        else:  # Regular message
            if self.pending_checkpoint is not None and \
               input_id not in self.received_barriers:
                # Buffer message (barrier not yet received from this input)
                self.buffered_messages[input_id].append(message)
                return [], False
            else:
                # Process message immediately
                return [message], False

class CheckpointedOperator:
    """
    Stream operator with checkpoint support
    """
    def __init__(self, num_inputs: int = 1):
        self.state = {}
        self.barrier_buffer = BarrierBuffer(num_inputs)
        self.checkpoint_manager = CheckpointManager('/tmp/operator-checkpoints')

    def on_message(self, message: Union[Message, Barrier], input_id: int = 0):
        """Handle incoming message or barrier"""
        messages, should_checkpoint = \
            self.barrier_buffer.process(message, input_id)

        # Process buffered/immediate messages
        for msg in messages:
            self._process_data(msg.data)

        # Checkpoint if all barriers received
        if should_checkpoint:
            self._checkpoint()

    def _process_data(self, data: str):
        """Application logic (word count)"""
        self.state[data] = self.state.get(data, 0) + 1

    def _checkpoint(self):
        """Save state to checkpoint"""
        print(f"→ Operator checkpointing: state size = {len(self.state)}")
        self.checkpoint_manager.create_checkpoint(self.state, 0)

# Example: Two-input operator (join)
if __name__ == '__main__':
    operator = CheckpointedOperator(num_inputs=2)

    # Stream 1 messages
    operator.on_message(Message("hello", 0), input_id=0)
    operator.on_message(Message("world", 1), input_id=0)
    operator.on_message(Barrier(checkpoint_id=1), input_id=0)

    # Stream 2 messages (delayed barrier)
    operator.on_message(Message("kafka", 0), input_id=1)
    operator.on_message(Message("flink", 1), input_id=1)

    # This completes the checkpoint
    operator.on_message(Barrier(checkpoint_id=1), input_id=1)

    print(f"Final state: {operator.state}")

Prerequisites:

Related Concepts:

Used In Systems:

  • Apache Flink: Distributed checkpointing with barriers
  • Spark Structured Streaming: Micro-batch checkpointing
  • Kafka Streams: State stores with changelog

Explained In Detail:

  • Stream Processing Deep Dive - Checkpointing implementation details

See It In Action

Quick Self-Check

  • Can explain checkpointing in 60 seconds?
  • Understand how checkpoint barriers work?
  • Know the trade-offs between checkpoint frequencies?
  • Can explain how checkpointing enables exactly-once semantics?
  • Understand incremental vs full checkpointing?
  • Can design a checkpoint strategy for given requirements?

Production signal

Why this concept matters

Interview 60% of streaming interviews
Production Flink, Spark, Kafka Streams
Performance Fast recovery
Scale Exactly-once semantics