https://aleics.me/posts/feed.xml

Building distributed systems with Viewstamped Replication

2025-09-05

You can find the latest state of vsr in the repository.

The Viewstamped Replication (VSR) algorithm has gained popularity in the recent years due to its simplicity and efficiency. The wonderful Tigerbeetle database has shown how good it is at handling large volumes of data, traffic and unexpected errors, for building high-performance distributed systems. This stroke my curiosity. So, I've decided to implement it in Rust. This implementation is solely based on the Viewstamped Replication Revisited paper. Certain parts of the implementation are inspired by Tigerbeetle.

VSR

The VSR algorithm is a consensus algorithm that allows a group of nodes to agree on a sequence of operations. It is based on the idea of viewstamping, which is a way to order operations in a distributed system. Each node maintains a view number, which is incremented whenever a new view is started. When a node receives an operation, it checks if the view number is greater than or equal to the view number of the operation. If it is, the node applies the operation and increments its view number. If it is not, the node rejects the operation.

The VSR algorithm can be divided in three different protocols, which are responsible of handle different aspects or situations for our high-distributed system to be always available and consistent.

  • Normal Operation: responsible for handling requests, whenever the primary replica is available. All the replicas are in the same view and are available for handling requests.
  • View Changes: responsible for masking failures of the primary replica and electing a new primary replica.
  • Recovery: responsible for recovering a replica after a crash.

This blog post focuses on the design of the implementation, and not the details of the algorithm. I found, the Viewstamped Replication Revisited paper very easy to follow, so I'd encourage you to read it if you want to deep dive into details.

Replica

State Machine

Each replica is implemented as a state machine, where each incoming message is processed using an event loop. The event loop is responsible for handling incoming messages and updating the state of the replica. In essence, a simplified version of the replica is as follow:

struct Replica {
    /// The current view from the perspective of the replica
    view: u8,
    /// Replica identifier
    replica: u8,
    /// Total number of replicas
    total: u8,
    /// Current replica state to understand availability
    status: Status,
    /// Bus to communicate with other replicas
    bus: Bus,
}

enum Status {
    Normal,
    ViewChange,
    Recovery,
}

impl Replica {
    fn listen(&mut self) -> Result<(), ReplicaError> {
        loop {
            self.tick()?;
        }
    }
    
    fn tick(&mut self) -> Result<(), ReplicaError> {
        for message in self.bus.tick()? {
            match self.handle_message(message) {
                Ok(output) => self.handle_output(output),
                Err(error) => self.handle_error(error),
            }
        }
    }
    
    fn handle_message(
        &mut self,
        message: Message
    ) -> Result<HandleOutput, ReplicaError> {
        match message {
            Message::Request(request) => self.handle_request(request),
            Message::Prepare(prepare) => self.handle_prepare(prepare),
            // Other messages
        }
    }

    fn handle_request(
        &mut self,
        message: Request
    ) -> Result<HandleOutput, ReplicaError> {
        // Handle a request message from the client or another replica
    }
    
    fn handle_prepare(
        &mut self,
        prepare: Prepare
    ) -> Result<HandleOutput, ReplicaError> {
        // Handle a prepare message from the primary replica
    }
}

As you can see in the implementation, an infinite loop is used to listen for incoming messages. Messages might be sent by other replicas or the client. Then, each message handler mutates the state of the replica and generates a possible output (e.g. send a message to another replica), which needs to be executed by the event loop.

In case you're interested to learn all the details of the implementation, feel free to check out the source code.

Bus

The code snippet above introduces the concept of a "bus". A bus is responsible to queue incoming and outgoing messages. It ensures that messages are processed in the correct order and that the replica can handle multiple requests concurrently. A bus has a connection pool to manage the different connections to other replicas, as well as, to a client.

The communication between replicas is done using TCP sockets. The connections must be non-blocking to avoid blocking the replica's event loop.

struct Bus {
    connections: ConnectionPool,
    replicas: Vec<u8>,
    address: SocketAddr,
    io: Io,
}

impl Bus {
    fn tick(&mut self) -> Result<Vec<Message>, IOError> {
        // Make sure connection to other replicas is established
        self.connect_ot_other_replicas();
        self.run_for_ns(TICK_TIMEOUT_NS)
    } 
    
    fn connect_other_replicas(&mut self) -> Result<(), IOError> {
        for replica in self.replicas.iter() {
            self.io.connect(self.address, *replica)?;
        }
    }
    
    fn run_for_ns(&mut self, ns: u64) -> Result<Vec<Message>, IOError> {
        let mut messages = Vec::new();
        // Read any IO events and process them
        for completion in self.io.run(ns)? {
            match completion {
                Completion::Accept => {
                    let connection = self.io.accept()?;
                    self.connections.add(connection);
                }
                Completion::Read { connection_id } => {
                    // Read message from the connection IO
                    let connection = self.connections.get(connection_id)?;
                    let message = self.io.read(connection.socket)?;
                    messages.push(message);
                },
                Completion::Write { connection_id } => {
                    // Write message from output buffer to the connection IO
                    let connection = self.connections.get(connection_id)?;
                    while let Some(mut bytes) = connection.outgoing_buffer.next() {
                        self.io.write(connection.socket, bytes)?;
                    }
                }
            }
        }
        Ok(messages)
    }
}

This is a simplified version, but in essence, whenever the bus is being ticked by the replica, it will process any incoming and outgoing messages. The bus makes sure that the connections are established and healthy, and executed any immediate IO. Writing messages to the bus is done by using an intermediate buffer, making sure that writing messages is also done non-blocking:

impl Bus {
    fn send(&mut self, message: Message, replica: u8) -> Result<(), IOError> {
        let connection = self.connections.get(replica)?;
        connection.buffer(message)?;
        
        // Notify the connection socket that the bus is
        // ready to send a message.
        self.io.send(connection.socket, connection.id)?;
        
        Ok(())
    }
}

Non-blocking I/O

Non-blocking I/O is a technique that allows a program to perform multiple operations concurrently without blocking the execution of other tasks. This topic could probably have its own dedicated post, as there's plenty of details that we could cover. However, for now I'd focus on showing my solution using mio.

mio is a low-level library for non-blocking I/O, which provides a high-level API for working with sockets, and managing events. It allows you to register sockets with an event loop, and receive notifications when events occur, such as data being available to read or write. mio is the backbone of the Tokio runtime, which makes it a well-tested and reliable choice for our implementation. Ideally, we should be using io-uring for handling IO events asynchronously in a very efficient manner. However, mio does not support io-uring yet (See here). We could implement native io-uring for our asynchrinous IO implementation in a future iteration.

pub trait IO {
    fn open_tcp(&self, addr: SocketAddr) -> Result<TcpListener, IOError>;

    fn connect(
        &mut self,
        addr: SocketAddr,
        connection_id: usize,
    ) -> Result<Option<TcpStream>, IOError>;

    fn accept(
        &mut self,
        socket: &TcpListener,
        connection_id: usize,
    ) -> Result<Vec<TcpStream>, IOError>;

    fn close(&self, socket: &mut TcpStream) -> Result<(), IOError>;

    fn recv(
        &self,
        socket: &mut TcpStream,
        buffer: &mut BytesMut,
    ) -> Result<bool, IOError>;

    fn send(
        &self,
        socket: &mut TcpStream,
        connection_id: usize,
    ) -> Result<(), IOError>;

    fn write(
        &self,
        socket: &mut TcpStream,
        bytes: &Bytes,
    ) -> Result<Option<usize>, IOError>;

    fn run(&mut self, timeout: Duration) -> Result<Vec<Completion>, IOError>;
}

We can then implement all our IO requirements using mio. The main idea of mio is registering sockets with a poller and waiting for events. Registering uses the so called "interests" to specify what events are we interested in.

Registering TCP sockets with mio is very simple:

fn open_tcp(&self, addr: SocketAddr) -> Result<TcpListener, IOError> {
    let listener = TcpListener::bind(addr)?;
    // We care only for "READABLE" events for our incoming connection.
    // We use `SERVER` constant to indicate that this is a server socket.
    self.poll.registry().register(&mut listener, SERVER, Interest::READABLE)?;
    Ok(listener)
}

fn connect(
    &mut self,
    addr: SocketAddr,
    connection_id: usize,
) -> Result<TcpStream, IOError> {
    match TcpStream::connect(addr) {
        Ok(mut stream) => {
            // We care only for "WRITABLE" events for our outgoing connection
            self.poll.registry().register(
                &mut stream,
                Token(connection_id),
                Interest::WRITABLE,
            )?;

            Ok(Some(stream))
        }
        // In case the connection would block, we'd return None,
        // and try again later.
        Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(None),
        Err(e) => Err(e)?,
    }
}

Then, our IO event loop polls for new events and depending on the event they will get processed differently (e.g. read, write, accept, close).

fn run(&mut self, timeout: Duration) -> Result<Vec<Completion>, IOError> {
    self.poll.poll(&mut self.events, Some(timeout))?;

    let mut completions = Vec::with_capacity(self.events.len());

    for event in &self.events {
        match event.token() {
            SERVER => {
                completions.push(Completion::Accept);
            }
            token => {
                if event.is_readable() {
                    completions.push(Completion::Recv {
                        connection_id: token.0,
                    });
                }
                if event.is_writable() {
                    completions.push(Completion::Write {
                        connection_id: token.0,
                    });
                }
            }
        }
    
    }
    Ok(completions)
}

I'm omitting the implementation for each IO action, but they can be found in the repository.

Other aspects

Building a sophisticated solution for distributed systems takes a lot of effort and time. It requires a deep understanding and there's a lot of more aspects to consider. I'd mention a few of them that I've encountered while working on this project.

Checksum

Checksums are used to verify the integrity of the data being transmitted between the replicas. They are calculated on the sender side and verified on the receiver side. If checksums don't match, the data is considered corrupted and the replica should discard such message and ask for a retransmission.

Chain checksums is an extension to plain checksums that allows for the verification of multiple messages in a single checksum. This is useful in scenarios where multiple messages are sent in a single transmission, such as in a distributed system.

Clock

Clock synchronization is a crucial aspect of distributed systems. It ensures that all replicas have a consistent view of time, which is essential for maintaining consistency and ordering of events. I've learned of this concept while exploring Tigerbeetle's implementation.

We've hinted the concept of a clock when introducing "ticking" functions. The idea is that each replica has an internal clock, which is nothing more than an internal counter, which gets increased whenever a "system tick" has happened. Thus, things like timeouts, timestamps, etc. Are based on internal clocks and not actual the system's time. This unlocks the possibility to slow and speed up time as we pleased.

Simulation testing

Last, but not least, I've explored the concept of simulation testing. Simulation testing is a technique used to test software systems by simulating their behavior in a controlled environment. This is useful in scenarios where it's difficult or impossible to test the system in the real world. Distributed systems are particularly challenging to test in the real world, since they are involved in complex interactions between multiple components. Simulation testing allows us to isolate and test individual components in isolation, making it easier to identify and fix bugs.

Deterministic Simulation testing is hard to achieve because it requires that all our logic is deterministic. Thus, things like async code, random generators, time and other sources need to be controlled and always reproducible. So, our design had a very hard requirement that we omitted until now: determinism.

Our simulation setup bases all its randomness on a single seed, which is generated for each test run. Whenever a test fails due to an error, the only thing we need to reproduce the error is the seed. In case you're interested, matklad has shared great insights on this topic (link).

We can, for instance, simulate I/O errors based on the test run seed.

pub struct Env {
    seed: u64,
    rng: ChaCha8Rng,
}

impl Env {
    fn new(seed: u64) -> Self {
        let rng = ChaCha8Rng::seed_from_u64(seed);
        Self { seed, rng }
    }
    
    fn flip_coin(&mut self, prob: f64) -> bool {
        let mut rng = self.rng.borrow_mut();
        rng.random_bool(prob)
    }
}

struct FaultyIO {
    env: Env,
}

impl IO for FaultyIO {
    // Simulate a faulty TCP connection with a 20% chance of failure
    fn open_tcp(&self, addr: SocketAddr) -> Result<FaultyTcpListener, IOError> {
        if self.env.flip_coin(0.2) {
            return Err(io_error());
        }
        
        Ok(FaultyTcpListener::new(self.env.clone()))
    }
}

And implement a simulation testing runner that initializes a cluster of replicas and runs the simulation:

struct SimulationRunner {
    env: Env,
    replicas: Vec<Replica>,
}

impl SimulationRunner {    
    fn init(seed: u64, options: ClusterOptions) -> Self {
        // Initialize replicas by injecting a faulty IO
    }
    fn tick_replicas(&self);
}

fn main() {
    let env = Env::new(args.seed);
    let options = ClusterOptions {
        replica_count: 3,
        client_count: 1,
    };
    let runner = SimulationRunner::init(seed, options);
    
    loop {
        if let Err(err) = runner.tick_replicas() {
            tracing::error!("[main] Error in replica: {}", err);
        }
        
        if let Err(err) = runner.tick_clients() {
            tracing::error!("[main] Error in replica: {}", err);
        }
    }
}

Conclusions

In this blog post, we've explored how to implement a replication technique for building distributed systems with Viewstamped Replication. We've seen what different aspects and parts play a role when building such systems, and how to build confidence of our solution by using simulation testing.