Building distributed systems with Viewstamped Replication
You can find the latest state of
vsrin the repository.
The Viewstamped Replication (VSR) algorithm has gained popularity in the recent years due to its simplicity and efficiency. The wonderful Tigerbeetle database has shown how good it is at handling large volumes of data, traffic and unexpected errors, for building high-performance distributed systems. This stroke my curiosity. So, I've decided to implement it in Rust. This implementation is solely based on the Viewstamped Replication Revisited paper. Certain parts of the implementation are inspired by Tigerbeetle.
VSR
The VSR algorithm is a consensus algorithm that allows a group of nodes to agree on a sequence of operations. It is based on the idea of viewstamping, which is a way to order operations in a distributed system. Each node maintains a view number, which is incremented whenever a new view is started. When a node receives an operation, it checks if the view number is greater than or equal to the view number of the operation. If it is, the node applies the operation and increments its view number. If it is not, the node rejects the operation.
The VSR algorithm can be divided in three different protocols, which are responsible of handle different aspects or situations for our high-distributed system to be always available and consistent.
- Normal Operation: responsible for handling requests, whenever the primary replica is available. All the replicas are in the same view and are available for handling requests.
- View Changes: responsible for masking failures of the primary replica and electing a new primary replica.
- Recovery: responsible for recovering a replica after a crash.
This blog post focuses on the design of the implementation, and not the details of the algorithm. I found, the Viewstamped Replication Revisited paper very easy to follow, so I'd encourage you to read it if you want to deep dive into details.
Replica
State Machine
Each replica is implemented as a state machine, where each incoming message is processed using an event loop. The event loop is responsible for handling incoming messages and updating the state of the replica. In essence, a simplified version of the replica is as follow:
As you can see in the implementation, an infinite loop is used to listen for incoming messages. Messages might be sent by other replicas or the client. Then, each message handler mutates the state of the replica and generates a possible output (e.g. send a message to another replica), which needs to be executed by the event loop.
In case you're interested to learn all the details of the implementation, feel free to check out the source code.
Bus
The code snippet above introduces the concept of a "bus". A bus is responsible to queue incoming and outgoing messages. It ensures that messages are processed in the correct order and that the replica can handle multiple requests concurrently. A bus has a connection pool to manage the different connections to other replicas, as well as, to a client.
The communication between replicas is done using TCP sockets. The connections must be non-blocking to avoid blocking the replica's event loop.
This is a simplified version, but in essence, whenever the bus is being ticked by the replica, it will process any incoming and outgoing messages. The bus makes sure that the connections are established and healthy, and executed any immediate IO. Writing messages to the bus is done by using an intermediate buffer, making sure that writing messages is also done non-blocking:
Non-blocking I/O
Non-blocking I/O is a technique that allows a program to perform multiple operations concurrently without blocking the execution of other tasks. This topic could probably have its own dedicated post, as there's plenty of details that we could cover. However, for now I'd focus on showing my solution using mio.
mio is a low-level library for non-blocking I/O, which provides a high-level API for working with sockets, and managing events. It allows you to register sockets with an event loop, and receive notifications when events occur, such as data being available to read or write. mio is the backbone of the Tokio runtime, which makes it a well-tested and reliable choice for our implementation. Ideally, we should be using io-uring for handling IO events asynchronously in a very efficient manner. However, mio does not support io-uring yet (See here). We could implement native io-uring for our asynchrinous IO implementation in a future iteration.
We can then implement all our IO requirements using mio. The main idea of mio is registering sockets with a poller and waiting for events. Registering uses the so called "interests" to specify what events are we interested in.
Registering TCP sockets with mio is very simple:
Then, our IO event loop polls for new events and depending on the event they will get processed differently (e.g. read, write, accept, close).
I'm omitting the implementation for each IO action, but they can be found in the repository.
Other aspects
Building a sophisticated solution for distributed systems takes a lot of effort and time. It requires a deep understanding and there's a lot of more aspects to consider. I'd mention a few of them that I've encountered while working on this project.
Checksum
Checksums are used to verify the integrity of the data being transmitted between the replicas. They are calculated on the sender side and verified on the receiver side. If checksums don't match, the data is considered corrupted and the replica should discard such message and ask for a retransmission.
Chain checksums is an extension to plain checksums that allows for the verification of multiple messages in a single checksum. This is useful in scenarios where multiple messages are sent in a single transmission, such as in a distributed system.
Clock
Clock synchronization is a crucial aspect of distributed systems. It ensures that all replicas have a consistent view of time, which is essential for maintaining consistency and ordering of events. I've learned of this concept while exploring Tigerbeetle's implementation.
We've hinted the concept of a clock when introducing "ticking" functions. The idea is that each replica has an internal clock, which is nothing more than an internal counter, which gets increased whenever a "system tick" has happened. Thus, things like timeouts, timestamps, etc. Are based on internal clocks and not actual the system's time. This unlocks the possibility to slow and speed up time as we pleased.
Simulation testing
Last, but not least, I've explored the concept of simulation testing. Simulation testing is a technique used to test software systems by simulating their behavior in a controlled environment. This is useful in scenarios where it's difficult or impossible to test the system in the real world. Distributed systems are particularly challenging to test in the real world, since they are involved in complex interactions between multiple components. Simulation testing allows us to isolate and test individual components in isolation, making it easier to identify and fix bugs.
Deterministic Simulation testing is hard to achieve because it requires that all our logic is deterministic. Thus, things like async code, random generators, time and other sources need to be controlled and always reproducible. So, our design had a very hard requirement that we omitted until now: determinism.
Our simulation setup bases all its randomness on a single seed, which is generated for each test run. Whenever a test fails due to an error, the only thing we need to reproduce the error is the seed. In case you're interested, matklad has shared great insights on this topic (link).
We can, for instance, simulate I/O errors based on the test run seed.
And implement a simulation testing runner that initializes a cluster of replicas and runs the simulation:
Conclusions
In this blog post, we've explored how to implement a replication technique for building distributed systems with Viewstamped Replication. We've seen what different aspects and parts play a role when building such systems, and how to build confidence of our solution by using simulation testing.