2
0
Fork 0
mirror of https://github.com/Vonng/ddia.git synced 2026-06-21 00:47:05 +08:00

adjust anchor for ch5 - ch10

This commit is contained in:
Feng Ruohang 2025-08-09 21:01:34 +08:00
parent 860cc17b5d
commit 99b4b00502
5 changed files with 510 additions and 449 deletions

View file

@ -62,7 +62,9 @@ systems and similar infrastructure, you will need to go much deeper into the the
chance of your systems being robust. As usual, the literature references in this chapter provide
some initial pointers.
# Linearizability
## Linearizability
If you want a replicated database to be as simple as possible to use, you should make it behave as
if it wasnt replicated at all. Then users dont have to worry about replication lag, conflicts, and
@ -83,7 +85,6 @@ guarantee*. To clarify this idea, lets look at an example of a system that is
{{< figure src="/fig/ddia_1001.png" id="fig_consistency_linearizability_0" caption="Figure 10-1. If this database were linearizable, then either Alice's read would return 1 instead of 0, or Bob's read would return 0 instead of 1." class="w-full my-4" >}}
[Figure 10-1](/en/ch10#fig_consistency_linearizability_0) shows an example of a nonlinearizable sports website [^4].
Aaliyah and Bryce are sitting in the same room, both checking their phones to see the outcome of a
game their favorite team is playing. Just after the final score is announced, Aaliyah refreshes the
@ -98,7 +99,7 @@ his query) *after* he heard Aaliyah exclaim the final score, and therefore he ex
result to be at least as recent as Aaliyahs. The fact that his query returned a stale result is a
violation of linearizability.
## What Makes a System Linearizable?
### What Makes a System Linearizable?
In order to understand linearizability better, lets look at some more examples.
[Figure 10-2](/en/ch10#fig_consistency_linearizability_1) shows three clients concurrently reading and writing the same
@ -219,7 +220,10 @@ in [“Problems with Replication Lag”](/en/ch6#sec_replication_lag) are exampl
guarantees all these weaker properties, and more. In this chapter we will focus on linearizability,
which is the strongest consistency model in common use.
# Linearizability Versus Serializability
--------
> [!TIP] LINEARIZABILITY VERSUS SERIALIZABILITY
Linearizability is easily confused with serializability (see [“Serializability”](/en/ch8#sec_transactions_serializability)),
as both words seem to mean something like “can be arranged in a sequential order.” However, they are
@ -245,8 +249,7 @@ Linearizability
(*Sequential consistency* is something else again [^8], but we wont discuss it here.)
A database may provide both serializability and linearizability, and this combination is known as
*strict serializability* or *strong one-copy serializability* (*strong-1SR*)
[^11] [^12].
*strict serializability* or *strong one-copy serializability* (*strong-1SR*) [^11] [^12].
Single-node databases are typically linearizable. With distributed databases using optimistic
methods like serializable snapshot isolation (see [“Serializable Snapshot Isolation (SSI)”](/en/ch8#sec_transactions_ssi)) the situation is more
complicated: for example, CockroachDB provides serializability, and some recency guarantees on
@ -257,14 +260,16 @@ It is also possible to combine a weaker isolation level with linearizability, or
consistency model with serializability; in fact, consistency model and isolation level can be chosen
largely independently from each other [^15] [^16].
## Relying on Linearizability
--------
### Relying on Linearizability
In what circumstances is linearizability useful? Viewing the final score of a sporting match is
perhaps a frivolous example: a result that is outdated by a few seconds is unlikely to cause any
real harm in this situation. However, there a few areas in which linearizability is an important
requirement for making a system work correctly.
### Locking and leader election
#### Locking and leader election
A system that uses single-leader replication needs to ensure that there is indeed only one leader,
not several (split brain). One way of electing a leader is to use a lease: every node that starts up
@ -280,10 +285,14 @@ election correctly (see for example the fencing issue in [“Distributed Locks a
libraries like Apache Curator help by providing higher-level recipes on top of ZooKeeper. However, a
linearizable storage service is the basic foundation for these coordination tasks.
> [!NOTE]> Strictly speaking, ZooKeeper provides linearizable writes, but reads may be stale, since there is no
> guarantee that they are served from the current leader
> [^18].
> etcd since version 3 provides linearizable reads by default.
--------
> [!NOTE]
> Strictly speaking, ZooKeeper provides linearizable writes, but reads may be stale, since there is no
> guarantee that they are served from the current leader [^18]. etcd since version 3 provides linearizable reads by default.
--------
Distributed locking is also used at a much more granular level in some distributed databases, such as
Oracle Real Application Clusters (RAC) [^19].
@ -292,7 +301,7 @@ to the same disk storage system. Since these linearizable locks are on the criti
transaction execution, RAC deployments usually have a dedicated cluster interconnect network for
communication between database nodes.
### Constraints and uniqueness guarantees
#### Constraints and uniqueness guarantees
Uniqueness constraints are common in databases: for example, a username or email address must
uniquely identify one user, and in a file storage service there cannot be two files with the same
@ -320,7 +329,7 @@ However, a hard uniqueness constraint, such as the one you typically find in rel
requires linearizability. Other kinds of constraints, such as foreign key or attribute constraints,
can be implemented without linearizability [^20].
### Cross-channel timing dependencies
#### Cross-channel timing dependencies
Notice a detail in [Figure 10-1](/en/ch10#fig_consistency_linearizability_0): if Aaliyah hadnt exclaimed the score,
Bryce wouldnt have known that the result of his query was stale. He would have just refreshed the
@ -367,7 +376,8 @@ understand. If you control the additional communication channel (like in the cas
queue, but not in the case of Aaliyah and Bryce), you can use alternative approaches similar to what
we discussed in [“Reading Your Own Writes”](/en/ch6#sec_replication_ryw), at the cost of additional complexity.
## Implementing Linearizable Systems
### Implementing Linearizable Systems
Now that weve looked at a few examples in which linearizability is useful, lets think about how we
might implement a system that offers linearizable semantics.
@ -375,11 +385,9 @@ might implement a system that offers linearizable semantics.
Since linearizability essentially means “behave as though there is only a single copy of the data,
and all operations on it are atomic,” the simplest answer would be to really only use a single copy
of the data. However, that approach would not be able to tolerate faults: if the node holding that
one copy failed, the data would be lost, or at least inaccessible until the node was brought up
again.
one copy failed, the data would be lost, or at least inaccessible until the node was brought up again.
Lets revisit the replication methods from [Chapter 6](/en/ch6#ch_replication), and compare whether they can be made
linearizable:
Lets revisit the replication methods from [Chapter 6](/en/ch6#ch_replication), and compare whether they can be made linearizable:
Single-leader replication (potentially linearizable)
: In a system with single-leader replication, the leader has the primary copy of the data that is
@ -399,10 +407,7 @@ Single-leader replication (potentially linearizable)
Consensus algorithms (likely linearizable)
: Some consensus algorithms are essentially single-leader replication with automatic leader election
and failover. They are carefully designed to prevent split brain, allowing them to implement
linearizable storage safely. ZooKeeper uses the Zab consensus algorithm
[^22]
and etcd uses Raft
[^23], for example.
linearizable storage safely. ZooKeeper uses the Zab consensus algorithm [^22] and etcd uses Raft [^23], for example.
However, just because a system uses consensus does not guarantee that all operations on it are
linearizable: if it allows reads on a node without checking that it is still the leader, the
results of the read may be stale if a new leader has just been elected.
@ -424,7 +429,7 @@ Leaderless replication (probably not linearizable)
consistent with actual event ordering due to clock skew (see [“Relying on Synchronized Clocks”](/en/ch9#sec_distributed_clocks_relying)).
Even with quorums, nonlinearizable behavior is possible, as demonstrated in the next section.
### Linearizability and quorums
#### Linearizability and quorums
Intuitively, it seems as though quorum reads and writes should be linearizable in a
Dynamo-style model. However, when we have variable network delays, it is possible to have race
@ -459,7 +464,7 @@ linearizable compare-and-set operation cannot, because it requires a consensus a
In summary, it is safest to assume that a leaderless system with Dynamo-style replication does not
provide linearizability, even with quorum reads and writes.
## The Cost of Linearizability
### The Cost of Linearizability
As some replication methods can provide linearizability and others cannot, it is interesting to
explore the pros and cons of linearizability in more depth.
@ -495,7 +500,7 @@ If clients can connect directly to the leader region, this is not a problem, sin
application continues to work normally there. But clients that can only reach a follower region
will experience an outage until the network link is repaired.
### The CAP theorem
#### The CAP theorem
This issue is not just a consequence of single-leader and multi-leader replication: any linearizable
database has this problem, no matter how it is implemented. The issue also isnt specific to
@ -524,19 +529,17 @@ implementing large-scale web services [^36].
CAP deserves credit for this culture shift—it helped trigger the NoSQL movement, a burst of new
database technologies around the mid-2000s.
# The Unhelpful CAP Theorem
> [!TIP] THE UNHELPFUL CAP THEOREM
CAP is sometimes presented as *Consistency, Availability, Partition tolerance: pick 2 out of 3*.
Unfortunately, putting it this way is misleading [^32] because network partitions are a kind of
fault, so they arent something about which you have a choice: they will happen whether you like it
or not.
fault, so they arent something about which you have a choice: they will happen whether you like it or not.
At times when the network is working correctly, a system can provide both consistency
(linearizability) and total availability. When a network fault occurs, you have to choose between
either linearizability or total availability. Thus, a better way of phrasing CAP would be
*either Consistent or Available when Partitioned* [^37].
A more reliable network needs to make this choice less often, but at some point the choice is
inevitable.
A more reliable network needs to make this choice less often, but at some point the choice is inevitable.
The CP/AP classification scheme has several further flaws [^4]. *Consistency* is formalized as
linearizability (the theorem doesnt say anything about weaker consistency models), and the
@ -565,7 +568,7 @@ However, this definition inherits several problems with CAP, such as the counter
There are many more interesting impossibility results in distributed systems [^43], and CAP has now been
superseded by more precise results [^44] [^45], so it is of mostly historical interest today.
### Linearizability and network delays
#### Linearizability and network delays
Although linearizability is a useful guarantee, surprisingly few systems are actually linearizable
in practice. For example, even RAM on a modern multi-core CPU is not linearizable [^46]:
@ -598,7 +601,8 @@ exist, but weaker consistency models can be much faster, so this trade-off is im
latency-sensitive systems. In [Link to Come] we will discuss some approaches for avoiding
linearizability without sacrificing correctness.
# ID Generators and Logical Clocks
## ID Generators and Logical Clocks
In many applications you need to assign some sort of unique ID to database records when they are
created, which gives you a primary key by which you can refer to those records. In single-node
@ -666,14 +670,8 @@ Wall-clock timestamp made unique
putting a timestamp from that clock in the most significant bits, and filling the remaining bits
with extra information that ensures the ID is unique even if the timestamp is not—for example, a
shard number and a per-shard incrementing sequence number, or a long random value. This approach
is used in Version 7 UUIDs
[^50],
Twitters Snowflake [^51],
ULIDs [^52],
Hazelcasts Flake ID generator, MongoDB ObjectIDs, and many similar schemes
[^50].
You can implement these ID generators in application code or within a database
[^53].
is used in Version 7 UUIDs [^50], Twitters Snowflake [^51], ULIDs [^52], Hazelcasts Flake ID generator,
MongoDB ObjectIDs, and many similar schemes [^50]. You can implement these ID generators in application code or within a database [^53].
All these schemes generate IDs that are unique (at least with high enough probability that
collisions are vanishingly rare), but they have much weaker ordering guarantees for IDs than the
@ -691,7 +689,7 @@ using atomic clocks or GPS receivers. But it would also be nice to be able to ge
unique and correctly ordered without relying on special hardware. Thats what *logical clocks* are
about.
## Logical Clocks
### Logical Clocks
In [“Unreliable Clocks”](/en/ch9#sec_distributed_clocks) we discussed time-of-day clocks and monotonic clocks. Both of these
are *physical clocks*: they measure the passing of seconds (or milliseconds, microseconds, etc.).
@ -713,7 +711,7 @@ The requirements for a logical clock are typically:
A single-node ID generator meets these requirements, but the distributed ID generators we just
discussed do not meet the causal ordering requirement.
### Lamport timestamps
#### Lamport timestamps
Fortunately, there is a simple method for generating logical timestamps that *is* consistent with
causality, and which you can use as a distributed ID generator. It is called a *Lamport clock*,
@ -733,8 +731,7 @@ each timestamp is made unique.
Every time a node generates a timestamp, it increments its counter value and uses the new value.
Moreover, every time a node sees a timestamp from another node, if the counter value in that
timestamp is greater than its local counter value, it increases its local counter to match the value
in the timestamp.
timestamp is greater than its local counter value, it increases its local counter to match the value in the timestamp.
In [Figure 10-9](/en/ch10#fig_consistency_lamport_ts), Aaliyah had not yet seen Calebs message when posting her own,
and vice versa. Assuming both users start with an initial counter value of 0, both therefore
@ -749,7 +746,7 @@ two timestamps have the same counter, we compare their node IDs instead, using t
lexicographic string comparison. Thus, the timestamp order in this example is
(1, “Aaliyah”) < (1, “Caleb”) < (2, “Bryce”).
### Hybrid logical clocks
#### Hybrid logical clocks
Lamport timestamps are good at capturing the order in which things happened, but they have some
limitations:
@ -779,7 +776,7 @@ conventional time-of-day clock, with the added property that its ordering is con
happens-before relation. It doesnt depend on any special hardware, and requires only roughly
synchronized clocks. Hybrid logical clocks are used by CockroachDB, for example.
### Lamport/hybrid logical clocks vs. vector clocks
#### Lamport/hybrid logical clocks vs. vector clocks
In [“Multi-version concurrency control (MVCC)”](/en/ch8#sec_transactions_snapshot_impl) we discussed how snapshot isolation is often implemented:
essentially, by giving each transaction a transaction ID, and allowing each transaction to see
@ -799,7 +796,7 @@ algorithm, such as a *vector clock*. The downside is that the timestamps from a
much bigger—potentially one integer for every node in the system. See [“Detecting Concurrent Writes”](/en/ch6#sec_replication_concurrent)
for more details on detecting concurrency.
## Linearizable ID Generators
### Linearizable ID Generators
Although Lamport clocks and hybrid logical clocks provide useful ordering guarantees, that ordering
is still weaker than the linearizable single-node ID generator we talked about previously. Recall
@ -839,7 +836,7 @@ the example, thats not so easy.
The simplest solution in this case would be to use a linearizable ID generator, which would ensure
that the photo upload is assigned a greater ID than the account permissions change.
### Implementing a linearizable ID generator
#### Implementing a linearizable ID generator
The simplest way of ensuring that ID assignment is linearizable is by actually using a single node
for this purpose. That node only needs to atomically increment a counter and return its value when
@ -874,7 +871,7 @@ assignment without any communication: even requests in different regions will be
without waiting for cross-region requests. The downside is that you need hardware and software
support for clocks to be tightly synchronized and compute the necessary uncertainty interval.
### Enforcing constraints using logical clocks
#### Enforcing constraints using logical clocks
In [“Constraints and uniqueness guarantees”](/en/ch10#sec_consistency_uniqueness) we saw that a linearizable compare-and-set operation can be used
to implement locks, uniqueness constraints, and similar constructs in a distributed system. This
@ -897,15 +894,16 @@ the kind of fault-tolerant system that we need.
To implement locks, leases, and similar constructs in a fault-tolerant way, we need something
stronger than logical clocks or ID generators: we need consensus.
# Consensus
## Consensus
In this chapter we have seen several examples of things that are easy when you have only a single
node, but which get a lot harder if you want fault tolerance:
* A database can be linearizable if you have only a single leader, and you make all reads and writes
on that leader. But how do you fail over if that leader fails, while avoiding split brain? How do
you ensure that a node that believes itself to be the leader hasnt actually been voted out in the
meantime?
you ensure that a node that believes itself to be the leader hasnt actually been voted out in the meantime?
* A linearizable ID generator on a single node is just a counter with an atomic fetch-and-add
instruction, but what if it crashes?
* An atomic compare-and-set (CAS) operation is useful for many things, such as deciding who gets a
@ -933,7 +931,9 @@ Such *Byzantine fault tolerant* (BFT) consensus algorithms are used in blockchai
However, as explained in [“Byzantine Faults”](/en/ch9#sec_distributed_byzantine), BFT algorithms are beyond the scope of this
book.
# The Impossibility of Consensus
--------
> [!TIP] THE IMPOSSIBILITY OF CONSENSUS
You may have heard about the FLP result [^72]—named after the
authors Fischer, Lynch, and Paterson—which proves that there is no algorithm that is always able to
@ -951,7 +951,9 @@ Even just allowing the algorithm to use random numbers is sufficient to get arou
Thus, although the FLP result about the impossibility of consensus is of great theoretical
importance, distributed systems can usually achieve consensus in practice.
## The Many Faces of Consensus
--------
### The Many Faces of Consensus
Consensus can be expressed in several different ways:
@ -966,10 +968,9 @@ Consensus can be expressed in several different ways:
We will explore all of these shortly. In fact, these problems are all equivalent to each other: if
you have an algorithm that solves one of these problems, you can convert it into a solution for any
of the others. This is quite a profound and perhaps surprising insight! And thats why we can lump
all of these things together under “consensus”, even though they look quite different on the
surface.
all of these things together under “consensus”, even though they look quite different on the surface.
### Single-value consensus
#### Single-value consensus
The standard formulation of consensus involves getting multiple nodes to agree on a single value.
For example:
@ -1039,7 +1040,7 @@ there is a severe network problem [^75].
Thus, a large-scale outage can stop the system from being able to process requests, but it cannot
corrupt the consensus system by causing it to make inconsistent decisions.
### Compare-and-set as consensus
#### Compare-and-set as consensus
A compare-and-set (CAS) operation checks whether the current value of some object equals some
expected value; if yes, it atomically updates the object to some new value; if no, it leaves the
@ -1056,8 +1057,7 @@ values in the CAS invocation, and then set the object to whatever value was deci
consensus. Any CAS invocations whose new value was not decided return an error. CAS invocations with
different expected values use separate runs of the consensus protocol.
This shows that CAS and consensus are equivalent to each other
[^28] [^73].
This shows that CAS and consensus are equivalent to each other [^28] [^73].
Again, both are straightforward on a single node, but challenging to make fault-tolerant. As an
example of CAS in a distributed setting, we saw conditional write operations for object stores in
[“Databases backed by object storage”](/en/ch6#sec_replication_object_storage), which allow a write to happen only if an object with the same
@ -1068,7 +1068,7 @@ tells us that consensus cannot be solved by a deterministic algorithm in the asy
model [^72], but we saw in [“Linearizability and quorums”](/en/ch10#sec_consistency_quorum_linearizable) that a linearizable register can be implemented using quorum
reads/writes in this model [^24] [^25] [^26]. From this it follows that a linearizable register cannot solve consensus.
### Shared logs as consensus
#### Shared logs as consensus
We have seen several examples of logs, such as replication logs, transaction logs, and write-ahead
logs. A log stores a sequence of *log entries*, and anyone who reads it sees the same entries in the
@ -1101,10 +1101,14 @@ Validity
: If a node reads a log entry containing some value, then some node previously requested for that
value to be added to the log.
--------
> [!NOTE]
> A shared log is formally known as a *total order broadcast*, *atomic broadcast*, or *total order multicast* protocol [^26] [^76] [^77]
> Its the same thing described in different words: requesting a value to be added to the log is then called “broadcasting” it, and reading a log entry is called “delivering” it.
--------
If you have an implementation of a shared log, it is easy to solve the consensus problem: every node
that wants to propose a value requests for it to be added to the log, and whichever value is read
back in the first log entry is the value that is decided. Since all nodes read log entries in the
@ -1128,7 +1132,7 @@ replication without failover does not meet the liveness requirements, since it s
messages if the leader crashes. As usual, the challenge is in performing failover safely and
automatically.
### Fetch-and-add as consensus
#### Fetch-and-add as consensus
The linearizable ID generator we saw in [“Linearizable ID Generators”](/en/ch10#sec_consistency_linearizable_id) comes close to solving
consensus, but it falls slightly short. We can implement such an ID generator using a fetch-and-add
@ -1163,7 +1167,7 @@ can say that fetch-and-add has a *consensus number* of two [^28].
In contrast, CAS and shared logs solve consensus for any number of nodes that may propose values, so
they have a consensus number of ∞ (infinity).
### Atomic commitment as consensus
#### Atomic commitment as consensus
In [“Distributed Transactions”](/en/ch8#sec_transactions_distributed) we saw the *atomic commitment* problem, which is to ensure that
the databases or shards involved in a distributed transaction all either commit or abort a
@ -1198,8 +1202,7 @@ non-triviality property ensures the algorithm cant simply always abort (but i
any of the communication among the nodes times out). The other three properties are basically the
same as for consensus.
If you have a solution for consensus, there are multiple ways you could solve atomic commitment
[^78] [^79].
If you have a solution for consensus, there are multiple ways you could solve atomic commitment [^78] [^79].
One works like this: when you want to commit the transaction, every node sends its vote to commit or
abort to every other node. Nodes that receive a vote to commit from itself and every other node
propose “commit” using the consensus algorithm; nodes that receive a vote to abort, or which
@ -1209,8 +1212,7 @@ consensus algorithm decided, it commits or aborts accordingly.
In this algorithm, “commit” will only be proposed if all nodes voted to commit. If any node voted to
abort, all proposals in the consensus algorithm will be “abort”. It could happen that some nodes
propose “abort” while others propose “commit” if all nodes voted to commit but some communication
timed out; in this case it doesnt matter whether the nodes commit or abort, as long as they all do
the same.
timed out; in this case it doesnt matter whether the nodes commit or abort, as long as they all do the same.
If you have a fault-tolerant atomic commitment protocol, you can also solve consensus. Every node
that wants to propose a value starts a transaction on a quorum of nodes, and at each node it
@ -1221,7 +1223,7 @@ consensus; if atomic commit aborts, the proposing node retries with a new transa
This shows that atomic commit and consensus are also equivalent to each other.
## Consensus in Practice
### Consensus in Practice
We have seen that single-value consensus, CAS, shared logs, and atomic commitment are all equivalent
to each other: you can convert a solution to one of them into a solution to any of the others. That
@ -1233,20 +1235,20 @@ Raft, Viewstamped Replication, and Zab provide shared logs right out of the box.
single-value consensus, but in practice most systems using Paxos actually use the extension called
Multi-Paxos, which also provides a shared log.
### Using shared logs
#### Using shared logs
A shared log is a good fit for database replication: if every log entry represents a write to the
database, and every replica processes the same writes in the same order using deterministic logic,
then the replicas will all end up in a consistent state. This idea is known as *state machine
replication* [^80],
then the replicas will all end up in a consistent state. This idea is known as *state machine replication* [^80],
and it is the principle behind event sourcing, which we saw in [“Event Sourcing and CQRS”](/en/ch3#sec_datamodels_events). Shared
logs are also useful for stream processing, as we shall see in [Link to Come].
Similarly, a shared log can be used to implement serializable transactions: as discussed in
[“Actual Serial Execution”](/en/ch8#sec_transactions_serial), if every log entry represents a deterministic transaction to be
executed as a stored procedure, and if every node executes those transactions in the same order,
then the transactions will be serializable
[^81] [^82].
then the transactions will be serializable [^81] [^82].
---------
> [!NOTE]
> Sharded databases with a strong consistency model often maintain a separate log per shard, which
@ -1254,6 +1256,8 @@ then the transactions will be serializable
> references) they can offer across shards. Serializable transactions across shards are possible, but
> require additional coordination [^83].
--------
A shared log is also powerful because it can easily be adapted to other forms of consensus:
* We saw previously how to use it to implement single-value consensus and CAS: simply decide the
@ -1266,7 +1270,7 @@ A shared log is also powerful because it can easily be adapted to other forms of
can be used to generate fencing tokens (see [“Fencing off zombies and delayed requests”](/en/ch9#sec_distributed_fencing_tokens)); for example, in
ZooKeeper, this sequence number is called `zxid` [^18].
### From single-leader replication to consensus
#### From single-leader replication to consensus
We saw previously that single-value consensus is easy if you have a single “dictator” node that
makes the decision, and likewise a shared log is easy if a single leader is the only node that is
@ -1314,7 +1318,7 @@ different protocols. In consensus algorithms, any node can start an election and
quorum of nodes to respond; in 2PC, only the coordinator can request votes, and it requires a “yes”
vote from *every* participant before it can commit.
### Subtleties of consensus
#### Subtleties of consensus
This basic structure is common to all of Raft, Multi-Paxos, Zab, and Viewstamped Replication: a vote
by a quorum of nodes elects a leader, and then another quorum vote is required for every entry that
@ -1330,7 +1334,10 @@ least as up-to-date as a majority of its followers [^69].
In contrast, Paxos allows any node to become the new leader, but requires it to bring its log
up-to-date with other nodes before it can start appending new entries of its own.
# Consistency vs. Availability in Leader Election
--------
> [!TIP] CONSISTENCY VS. AVAILABILITY IN LEADER ELECTION
If you want the consensus algorithm to strictly guarantee the properties laid out in
[“Shared logs as consensus”](/en/ch10#sec_consistency_shared_logs), its essential that the new leader is up-to-date with any confirmed
@ -1349,10 +1356,11 @@ availability, but you are on thin ice, since the theory of consensus no longer a
will work fine as long as there are no faults, the problems discussed in [Chapter 9](/en/ch9#ch_distributed) can
easily cause a lot of data loss or corruption.
--------
Another subtlety is in how the algorithms deal with log entries that had been proposed by the old
leader before it failed, but for which the vote on appending to the log had not yet completed. You
can find discussions of these details in the references for this chapter
[^23] [^69] [^86].
can find discussions of these details in the references for this chapter [^23] [^69] [^86].
For databases that use a consensus algorithm for replication, not only do writes need to be turned
into log entries and replicated to a quorum. If you want to guarantee linearizable reads, they also
@ -1366,7 +1374,7 @@ configuration. Consensus algorithms have been extended with *reconfiguration* fe
this possible. This is especially useful when adding new regions to a system, or when migrating from
one location to another (by first adding the new nodes, and then removing the old nodes).
### Pros and cons of consensus
#### Pros and cons of consensus
Although they are complex and subtle, consensus algorithms are a huge breakthrough for distributed
systems. Consensus is essentially “single-leader replication done right”, with automatic failover on
@ -1406,7 +1414,9 @@ only real alternative is to use a weaker consistency model instead, such as thos
leaderless or multi-leader replication as discussed in [Chapter 6](/en/ch6#ch_replication). These approaches
generally dont offer linearizability, but for applications that dont need it that is fine.
## Coordination Services
### Coordination Services
Consensus algorithms are useful in any distributed database that wants to offer linearizable
operations, and many modern distributed databases use consensus algorithms for replication. But one
@ -1453,7 +1463,9 @@ Failure detection and change notifications do not require consensus, but they ar
distributed coordination alongside the atomic operations and fencing support that do require
consensus.
# Managing configuration with coordination services
--------
> [!TIP] Managing configuration with coordination services
Applications and infrastructure often have configuration parameters such as timeouts, thread pool
sizes, and so on. Coordination services are sometimes used to store such configuration data,
@ -1466,7 +1478,9 @@ convenient to use a coordination service and rely on its notification feature if
running the coordination service anyway. Alternatively, a process could periodically poll for
configuration updates from a file or URL, which avoids the need for a specialized service.
### Allocating work to nodes
--------
#### Allocating work to nodes
A coordination service is useful if you have several instances of a process or service, and one
of them needs to be chosen as leader or primary. If the leader fails, one of the other nodes should
@ -1499,7 +1513,7 @@ intended for storing data that may change thousands of times per second. For tha
use a conventional database; alternatively, tools like Apache BookKeeper [^90] [^91]
can be used to replicate fast-changing internal state of a service.
### Service discovery
#### Service discovery
ZooKeeper, etcd, and Consul are also often used for *service discovery*—that is, to find out which
IP address you need to connect to in order to reach a particular service (see

View file

@ -41,7 +41,9 @@ concepts such as *eventual consistency* still cause confusion. In [“Problems w
get more precise about eventual consistency and discuss things like the *read-your-writes* and
*monotonic reads* guarantees.
# Backups and replication
--------
> [!TIP] BACKUPS AND REPLICATION
You might be wondering whether you still need backups if you have replication. The answer is yes,
because they have different purposes: replicas quickly reflect writes from one node on other nodes,
@ -59,15 +61,16 @@ the current state. If you have a large amount of data, it can be cheaper to keep
data in an object store that is optimized for infrequently-accessed data, and to store only the
current state of the database in primary storage.
# Single-Leader Replication
--------
## Single-Leader Replication
Each node that stores a copy of the database is called a *replica*. With multiple replicas, a
question inevitably arises: how do we ensure that all the data ends up on all the replicas?
Every write to the database needs to be processed by every replica; otherwise, the replicas would no
longer contain the same data. The most common solution is called *leader-based replication*,
*primary-backup*, or *active/passive*. It works as follows (see
[Figure 6-1](/en/ch6#fig_replication_leader_follower)):
*primary-backup*, or *active/passive*. It works as follows (see [Figure 6-1](/en/ch6#fig_replication_leader_follower)):
1. One of the replicas is designated the *leader* (also known as *primary* or *source* [^2]).
When clients want to write to the database, they must send their requests to the leader, which
@ -96,11 +99,15 @@ Many consensus algorithms such as Raft, which is used for replication in Cockroa
etcd, and RabbitMQ quorum queues (among others), are also based on a single leader, and automatically
elect a new leader if the old one fails (we will discuss consensus in more detail in [Chapter 10](/en/ch10#ch_consistency)).
--------
> [!NOTE]
> In older documents you may see the term *masterslave replication*. It means the same as
> leader-based replication, but the term should be avoided as it is widely considered offensive [^8].
## Synchronous Versus Asynchronous Replication
--------
### Synchronous Versus Asynchronous Replication
An important detail of a replicated system is whether the replication happens *synchronously* or
*asynchronously*. (In relational databases, this is often a configurable option; other systems are
@ -158,7 +165,7 @@ Weakening durability may sound like a bad trade-off, but asynchronous replicatio
widely used, especially if there are many followers or if they are geographically distributed [^9].
We will return to this issue in [“Problems with Replication Lag”](/en/ch6#sec_replication_lag).
## Setting Up New Followers
### Setting Up New Followers
From time to time, you need to set up new followers—perhaps to increase the number of replicas,
or to replace failed nodes. How do you ensure that the new follower has an accurate copy of the
@ -195,7 +202,9 @@ recovery. You can also perform steps 1 and 2 of setting up a new follower by dow
from the object store. For example, WAL-G does this for PostgreSQL, MySQL, and SQL Server, and
Litestream does the equivalent for SQLite.
# Databases backed by object storage
--------
> [!TIP] DATABASES BACKED BY OBJECT STORAGE
Object storage can be used for more than archiving data. Many databases are beginning to use object
stores such as Amazon Web Services S3, Google Cloud Storage, and Azure Blob Storage to serve data
@ -228,9 +237,7 @@ Different systems deal with these trade-offs in various ways. Some introduce a *
architecture that places less frequently accessed data on object storage while new or frequently
accessed data is kept on faster storage devices such as SSDs, NVMe, or even in memory. Other systems
use object storage as their primary storage tier, but use a separate low-latency storage system such
as Amazons EBS or Neons Safekeepers
[^12])
to store their WAL. Recently, some systems have gone even farther by adopting a
as Amazons EBS or Neons Safekeepers [^12]) to store their WAL. Recently, some systems have gone even farther by adopting a
*zero-disk architecture* (ZDA). ZDA-based systems persist all data to object storage and use disks
and memory strictly for caching. This allows nodes to have no persistent state, which dramatically
simplifies operations. WarpStream, Confluent Freight, Bufs Bufstream, and Redpanda Serverless are
@ -238,7 +245,9 @@ all Kafka-compatible systems built using a zero-disk architecture. Nearly every
warehouse also adopts such an architecture, as does Turbopuffer (a vector search engine), and
SlateDB (a cloud-native LSM storage engine).
## Handling Node Outages
--------
### Handling Node Outages
Any node in the system can go down, perhaps unexpectedly due to a fault, but just as likely due to
planned maintenance (for example, rebooting a machine to install a kernel security patch). Being
@ -248,7 +257,7 @@ the impact of a node outage as small as possible.
How do you achieve high availability with leader-based replication?
### Follower failure: Catch-up recovery
#### Follower failure: Catch-up recovery
On its local disk, each follower keeps a log of the data changes it has received from the leader. If
a follower crashes and is restarted, or if the network between the leader and the follower is
@ -261,8 +270,7 @@ receiving a stream of data changes as before.
Although follower recovery is conceptually simple, it can be challenging in terms of performance: if
the database has a high write throughput or if the follower has been offline for a long time, there
might be a lot of writes to catch up on. There will be high load on both the recovering follower and
the leader (which needs to send the backlog of writes to the follower) while this catch-up is
ongoing.
the leader (which needs to send the backlog of writes to the follower) while this catch-up is ongoing.
The leader can delete its log of writes once all followers have confirmed that they have processed
it, but if a follower is unavailable for a long time, the leader faces a choice: either it retains
@ -271,7 +279,7 @@ leader), or it deletes the log that the unavailable follower has not yet acknowl
the follower wont be able to recover from the log, and will have to be restored from a backup when
it comes back).
### Leader failure: Failover
#### Leader failure: Failover
Handling a failure of the leader is trickier: one of the followers needs to be promoted to be the
new leader, clients need to be reconfigured to send their writes to the new leader, and the other
@ -331,11 +339,15 @@ Failover is fraught with things that can go wrong:
is already struggling with high load or network problems, an unnecessary failover is likely to
make the situation worse, not better.
--------
> [!NOTE]
> Guarding against split brain by limiting or shutting down old leaders is known as *fencing* or, more
> emphatically, *Shoot The Other Node In The Head* (STONITH). We will discuss fencing in more detail
> in [“Distributed Locks and Leases”](/en/ch9#sec_distributed_lock_fencing).
--------
There are no easy solutions to these problems. For this reason, some operations teams prefer to
perform failovers manually, even if the software supports automatic failover.
@ -350,12 +362,12 @@ These issues—node failures; unreliable networks; and trade-offs around replica
durability, availability, and latency—are in fact fundamental problems in distributed systems.
In [Chapter 9](/en/ch9#ch_distributed) and [Chapter 10](/en/ch10#ch_consistency) we will discuss them in greater depth.
## Implementation of Replication Logs
### Implementation of Replication Logs
How does leader-based replication work under the hood? Several different replication methods are
used in practice, so lets look at each one briefly.
### Statement-based replication
#### Statement-based replication
In the simplest case, the leader logs every write request (*statement*) that it executes and sends
that statement log to its followers. For a relational database, this means that every `INSERT`,
@ -389,7 +401,7 @@ there is any nondeterminism in a statement. VoltDB uses statement-based replicat
safe by requiring transactions to be deterministic [^16]. However, determinism can be hard to guarantee
in practice, so many databases prefer other replication methods.
### Write-ahead log (WAL) shipping
#### Write-ahead log (WAL) shipping
In [Chapter 4](/en/ch4#ch_storage) we saw that a write-ahead log is needed to make B-tree storage engines robust:
every modification is first written to the WAL so that the tree can be restored to a consistent
@ -412,7 +424,7 @@ performing a failover to make one of the upgraded nodes the new leader. If the r
does not allow this version mismatch, as is often the case with WAL shipping, such upgrades require
downtime.
### Logical (row-based) log replication
#### Logical (row-based) log replication
An alternative is to use different log formats for replication and for the storage engine, which
allows the replication log to be decoupled from the storage engine internals. This kind of
@ -444,12 +456,12 @@ to send the contents of a database to an external system, such as a data warehou
analysis, or for building custom indexes and caches [^21].
This technique is called *change data capture*, and we will return to it in [Link to Come].
# Problems with Replication Lag
## Problems with Replication Lag
Being able to tolerate node failures is just one reason for wanting replication. As mentioned
in [“Distributed versus Single-Node Systems”](/en/ch1#sec_introduction_distributed), other reasons are scalability (processing more
requests than a single machine can handle) and latency (placing replicas geographically closer to
users).
requests than a single machine can handle) and latency (placing replicas geographically closer to users).
Leader-based replication requires all writes to go through a single node, but read-only queries can
go to any replica. For workloads that consist of mostly reads and only a small percentage of writes
@ -471,14 +483,14 @@ just a temporary state—if you stop writing to the database and wait a while, t
eventually catch up and become consistent with the leader. For that reason, this effect is known
as *eventual consistency* [^22].
--------
> [!NOTE]
> The term *eventual consistency* was coined by Douglas Terry et al.
> [^23],
> popularized by Werner Vogels
> [^24],
> The term *eventual consistency* was coined by Douglas Terry et al. [^23], popularized by Werner Vogels [^24],
> and became the battle cry of many NoSQL projects. However, not only NoSQL databases are eventually
> consistent: followers in an asynchronously replicated relational database have the same
> characteristics.
> consistent: followers in an asynchronously replicated relational database have the same characteristics.
--------
The term “eventually” is deliberately vague: in general, there is no limit to how far a replica can
fall behind. In normal operation, the delay between a write happening on the leader and being
@ -490,7 +502,7 @@ When the lag is so large, the inconsistencies it introduces are not just a theor
real problem for applications. In this section we will highlight three examples of problems that are
likely to occur when there is replication lag. Well also outline some approaches to solving them.
## Reading Your Own Writes
### Reading Your Own Writes
Many applications let the user submit some data and then view what they have submitted. This might
be a record in a customer database, or a comment on a discussion thread, or something else of that sort.
@ -505,8 +517,7 @@ submitted was lost, so they will be understandably unhappy.
{{< figure src="/fig/ddia_0603.png" id="fig_replication_read_your_writes" caption="Figure 6-3. A user makes a write, followed by a read from a stale replica. To prevent this anomaly, we need read-after-write consistency." class="w-full my-4" >}}
In this situation, we need *read-after-write consistency*, also known as *read-your-writes consistency*
[^23].
In this situation, we need *read-after-write consistency*, also known as *read-your-writes consistency* [^23].
This is a guarantee that if the user reloads the page, they will always see any updates they
submitted themselves. It makes no promises about other users: other users updates may not be
visible until some later time. However, it reassures the user that their own input has been saved
@ -526,15 +537,13 @@ are various possible techniques. To mention a few:
effective, as most things would have to be read from the leader (negating the benefit of read
scaling). In that case, other criteria may be used to decide whether to read from the leader. For
example, you could track the time of the last update and, for one minute after the last update, make all
reads from the leader
[^25].
reads from the leader [^25].
You could also monitor the replication lag on followers and prevent queries on any follower that
is more than one minute behind the leader.
* The client can remember the timestamp of its most recent write—then the system can ensure that the
replica serving any reads for that user reflects updates at least until that timestamp. If a
replica is not sufficiently up to date, either the read can be handled by another replica or the
query can wait until the replica has caught up
[^26].
query can wait until the replica has caught up [^26].
The timestamp could be a *logical timestamp* (something that indicates ordering of writes, such as
the log sequence number) or the actual system clock (in which case clock synchronization becomes
critical; see [“Unreliable Clocks”](/en/ch9#sec_distributed_clocks)).
@ -558,7 +567,9 @@ In this case, there are some additional issues to consider:
the devices network routes may be completely different.) If your approach requires reading from the
leader, you may first need to route requests from all of a users devices to the same region.
# Regions and Availability Zones
--------
> ![TIP] Regions and Availability Zones
We use the term *region* to refer to one or more datacenters in a single geographic location. Cloud
providers locate multiple datacenters in the same geographic region. Each datacenter is referred to
@ -576,7 +587,9 @@ increased cloud networking bills. We will discuss these tradeoffs more in
[“Multi-leader replication topologies”](/en/ch6#sec_replication_topologies). For now, just know that when we say region, we mean a collection of
zones/datacenters in a single geographic location.
## Monotonic Reads
--------
### Monotonic Reads
Our second example of an anomaly that can occur when reading from asynchronous followers is that its
possible for a user to see things *moving backward in time*.
@ -605,7 +618,7 @@ the same replica (different users can read from different replicas). For example
chosen based on a hash of the user ID, rather than randomly. However, if that replica fails, the
users queries will need to be rerouted to another replica.
## Consistent Prefix Reads
### Consistent Prefix Reads
Our third example of replication lag anomalies concerns violation of causality. Imagine the
following short dialog between Mr. Poons and Mrs. Cake:
@ -630,15 +643,13 @@ Mr. Poons
: How far into the future can you see, Mrs. Cake?
To the observer it looks as though Mrs. Cake is answering the question before Mr. Poons has even asked
it. Such psychic powers are impressive, but very confusing
[^27].
it. Such psychic powers are impressive, but very confusing [^27].
{{< figure src="/fig/ddia_0605.png" id="fig_replication_consistent_prefix" caption="Figure 6-5. If some shards are replicated slower than others, an observer may see the answer before they see the question." class="w-full my-4" >}}
Preventing this kind of anomaly requires another type of guarantee: *consistent prefix reads*
[^22]. This guarantee says that if a sequence of
writes happens in a certain order, then anyone reading those writes will see them appear in the same
order.
Preventing this kind of anomaly requires another type of guarantee: *consistent prefix reads* [^22].
This guarantee says that if a sequence of writes happens in a certain order,
then anyone reading those writes will see them appear in the same order.
This is a particular problem in sharded (partitioned) databases, which we will discuss in
[Chapter 7](/en/ch7#ch_sharding). If the database always applies writes in the same order, reads always see a
@ -651,7 +662,7 @@ the same shard—but in some applications that cannot be done efficiently. There
that explicitly keep track of causal dependencies, a topic that we will return to in
[“The “happens-before” relation and concurrency”](/en/ch6#sec_replication_happens_before).
## Solutions for Replication Lag
### Solutions for Replication Lag
When working with an eventually consistent system, it is worth thinking about how the application
behaves if the replication lag increases to several minutes or even hours. If the answer is “no
@ -683,7 +694,9 @@ consistency guarantees: they can offer stronger resilience in the face of networ
have lower overheads compared to transactional systems. We will explore such approaches in the rest
of this chapter.
# Multi-Leader Replication
## Multi-Leader Replication
So far in this chapter we have only considered replication architectures using a single leader.
Although that is a common approach, there are interesting alternatives.
@ -710,7 +723,7 @@ as equivalent to single-leader replication. The rest of this section focusses on
multi-leader replication, in which any leader can process writes even when its connection to the
other leaders is interrupted.
## Geographically Distributed Operation
### Geographically Distributed Operation
It rarely makes sense to use a multi-leader setup within a single region, because the benefits
rarely outweigh the added complexity. However, there are some situations in which this configuration
@ -730,8 +743,7 @@ other regions.
{{< figure src="/fig/ddia_0606.png" id="fig_replication_multi_dc" caption="Figure 6-6. Multi-leader replication across multiple regions." class="w-full my-4" >}}
Lets compare how the single-leader and multi-leader configurations fare in a multi-region
deployment:
Lets compare how the single-leader and multi-leader configurations fare in a multi-region deployment:
Performance
: In a single-leader configuration, every write must go over the internet to the region with the
@ -756,8 +768,7 @@ Tolerance of network problems
over that link and wait for the response before it can complete.
A multi-leader configuration with asynchronous replication can tolerate network problems better:
during a temporary network interruption, each regions leader can continue independently processing
writes.
during a temporary network interruption, each regions leader can continue independently processing writes.
Consistency
: A single-leader system can provide strong consistency guarantees, such as serializable
@ -768,25 +779,21 @@ Consistency
account, registering a particular username), but which violate the constraint when taken together
with another write on another leader.
This is simply a fundamental limitation of distributed systems
[^28].
This is simply a fundamental limitation of distributed systems [^28].
If you need to enforce such constraints, youre therefore better off with a single-leader system.
However, as we will see in [“Dealing with Conflicting Writes”](/en/ch6#sec_replication_write_conflicts), multi-leader systems can still
achieve consistency properties that are useful in a wide range of apps that dont need such
constraints.
achieve consistency properties that are useful in a wide range of apps that dont need such constraints.
Multi-leader replication is less common than single-leader replication, but it is still supported by
many databases, including MySQL, Oracle, SQL Server, and YugabyteDB. In some cases it is an external
add-on feature, for example in Redis Enterprise, EDB Postgres Distributed, and pglogical
[^29].
add-on feature, for example in Redis Enterprise, EDB Postgres Distributed, and pglogical [^29].
As multi-leader replication is a somewhat retrofitted feature in many databases, there are often
subtle configuration pitfalls and surprising interactions with other database features. For example,
autoincrementing keys, triggers, and integrity constraints can be problematic. For this reason,
multi-leader replication is often considered dangerous territory that should be avoided if possible
[^30].
multi-leader replication is often considered dangerous territory that should be avoided if possible [^30].
### Multi-leader replication topologies
#### Multi-leader replication topologies
A *replication topology* describes the communication paths along which writes are propagated from
one node to another. If you have two leaders, like in [Figure 6-9](/en/ch6#fig_replication_write_conflict), there is
@ -796,27 +803,30 @@ more than two leaders, various different topologies are possible. Some examples
{{< figure src="/fig/ddia_0607.png" id="fig_replication_topologies" caption="Figure 6-7. Three example topologies in which multi-leader replication can be set up." class="w-full my-4" >}}
The most general topology is *all-to-all*, shown in
[Figure 6-7](/en/ch6#fig_replication_topologies)(c),
The most general topology is *all-to-all*, shown in [Figure 6-7](/en/ch6#fig_replication_topologies)(c),
in which every leader sends its writes to every other leader. However, more restricted topologies
are also used: for example a *circular topology* in which each node receives writes from one node
and forwards those writes (plus any writes of its own) to one other node. Another popular topology
has the shape of a *star*: one designated root node forwards writes to all of the other nodes. The
star topology can be generalized to a tree.
--------
> [!NOTE]
> Dont confuse a star-shaped network topology with a *star schema* (see
> [“Stars and Snowflakes: Schemas for Analytics”](/en/ch3#sec_datamodels_analytics)), which describes the structure of a data model.
--------
In circular and star topologies, a write may need to pass through several nodes before it reaches
all replicas. Therefore, nodes need to forward data changes they receive from other nodes. To
prevent infinite replication loops, each node is given a unique identifier, and in the replication
log, each write is tagged with the identifiers of all the nodes it has passed through
[^31].
log, each write is tagged with the identifiers of all the nodes it has passed through [^31].
When a node receives a data change that is tagged with its own identifier, that data change is
ignored, because the node knows that it has already been processed.
### Problems with different topologies
#### Problems with different topologies
A problem with circular and star topologies is that if just one node fails, it can interrupt the
flow of replication messages between other nodes, leaving them unable to communicate until the
@ -850,7 +860,7 @@ issues like the one in [Figure 6-8](/en/ch6#fig_replication_causality). If you
is worth being aware of these issues, carefully reading the documentation, and thoroughly testing
your database to ensure that it really does provide the guarantees you believe it to have.
## Sync Engines and Local-First Software
### Sync Engines and Local-First Software
Another situation in which multi-leader replication is appropriate is if you have an application
that needs to continue to work while it is disconnected from the internet.
@ -870,7 +880,7 @@ From an architectural point of view, this setup is very similar to multi-leader
regions, taken to the extreme: each device is a “region,” and the network connection between them is
extremely unreliable.
### Real-time collaboration, offline-first, and local-first apps
#### Real-time collaboration, offline-first, and local-first apps
Moreover, many modern web apps offer *real-time collaboration* features, such as Google Docs and
Sheets for text documents and spreadsheets, Figma for graphics, and Linear for project management.
@ -904,7 +914,7 @@ service providers are available [^40].
For example, Git is a local-first collaboration system (albeit one that doesnt support real-time
collaboration) since you can sync via GitHub, GitLab, or any other repository hosting service.
### Pros and cons of sync engines
#### Pros and cons of sync engines
The dominant way of building web apps today is to keep very little persistent state on the client,
and to rely on making requests to a server whenever a new piece of data needs to be displayed or
@ -947,7 +957,8 @@ development jargon the equivalent of a sync engine is called *netcode*. The tech
netcode are quite specific to the requirements of games [^44], and dont directly
carry over to other types of software, so we wont consider them further in this book.
## Dealing with Conflicting Writes
### Dealing with Conflicting Writes
The biggest problem with multi-leader replication—both in a geo-distributed server-side database and
a local-first sync engine on end user devices—is that concurrent writes on different leaders can
@ -972,7 +983,7 @@ In [“Detecting Concurrent Writes”](/en/ch6#sec_replication_concurrent) we wi
whether two writes are concurrent. For now we will assume that we can detect conflicts, and we want
to figure out the best way of resolving them.
### Conflict avoidance
#### Conflict avoidance
One strategy for conflicts is to avoid them occurring in the first place. For example, if the
application can ensure that all writes for a particular record go through the same leader, then
@ -999,7 +1010,8 @@ so that one leader only generates odd numbers and the other only generates even
you can be sure that the two leaders wont concurrently assign the same ID to different records.
We will discuss other ID assignment schemes in [“ID Generators and Logical Clocks”](/en/ch10#sec_consistency_logical).
### Last write wins (discarding concurrent writes)
#### Last write wins (discarding concurrent writes)
If conflicts cant be avoided, the simplest way of resolving them is to attach a timestamp to each
write, and to always use the value with the greatest timestamp. For example, in
@ -1031,7 +1043,7 @@ that is ahead of the others, and you try to overwrite a value written by that no
be ignored as it may have a lower timestamp, even though it clearly occurred later. This problem can
be solved by using a *logical clock*, which we will discuss in [“ID Generators and Logical Clocks”](/en/ch10#sec_consistency_logical).
### Manual conflict resolution
#### Manual conflict resolution
If randomly discarding some of your writes is not desirable, the next option is to resolve the
conflict manually. You may be familiar with manual conflict resolution from Git and other version
@ -1063,9 +1075,7 @@ suffers from a number of problems:
keeping all the shopping cart items that appeared in any of the siblings (i.e., taking the set
union of the carts). This meant that if the customer had removed an item from their cart in one
sibling, but another sibling still contained that old item, the removed item would unexpectedly
reappear in the customers cart
[^45].
[Figure 6-10](/en/ch6#fig_replication_amazon_anomaly) shows an example where Device 1 removes Book from the shopping
reappear in the customers cart [^45]. [Figure 6-10](/en/ch6#fig_replication_amazon_anomaly) shows an example where Device 1 removes Book from the shopping
cart and concurrently Device 2 removes DVD, but after merging the conflict both items reappear.
* If multiple nodes observe the conflict and concurrently resolve it, the conflict resolution
process can itself introduce a new conflict. Those resolutions could even be inconsistent: for
@ -1076,7 +1086,7 @@ suffers from a number of problems:
{{< figure src="/fig/ddia_0610.png" id="fig_replication_amazon_anomaly" caption="Figure 6-10. Example of Amazon's shopping cart anomaly: if conflicts on a shopping cart are merged by taking the union, deleted items may reappear." class="w-full my-4" >}}
### Automatic conflict resolution
#### Automatic conflict resolution
For many applications, the best way of handling conflicts is to use an algorithm that automatically
merges concurrent writes into a consistent state. Automatic conflict resolution ensures that all
@ -1110,23 +1120,19 @@ Nevertheless, automatic conflict resolution is sufficient to build many useful a
start from the requirement of wanting to build a collaborative offline-first or local-first app,
then conflict resolution is inevitable, and automating it is often the best approach.
## CRDTs and Operational Transformation
### CRDTs and Operational Transformation
Two families of algorithms are commonly used to implement automatic conflict resolution:
*Conflict-free replicated datatypes* (CRDTs)
[^46] and *Operational Transformation* (OT)
[^47].
*Conflict-free replicated datatypes* (CRDTs) [^46] and *Operational Transformation* (OT) [^47].
They have different design philosophies and performance characteristics, but both are able to
perform automatic merges for all the aforementioned types of data.
[Figure 6-11](/en/ch6#fig_replication_ot_crdt) shows an example of how OT and a CRDT merge concurrent updates to a
text. Assume you have two replicas that both start off with the text “ice”. One replica prepends the
letter “n” to make “nice”, while concurrently the other replica appends an exclamation mark to make
“ice!”.
letter “n” to make “nice”, while concurrently the other replica appends an exclamation mark to make “ice!”.
{{< figure src="/fig/ddia_0611.png" id="fig_replication_ot_crdt" caption="Figure 6-11. How two concurrent insertions into a string are merged by OT and a CRDT respectively." class="w-full my-4" >}}
The merged result “nice!” is achieved differently by both types of algorithms:
OT
@ -1155,7 +1161,7 @@ OT is most often used for real-time collaborative editing of text, e.g. in Googl
distributed databases such as Redis Enterprise, Riak, and Azure Cosmos DB [^49].
Sync engines for JSON data can be implemented both with CRDTs (e.g., Automerge or Yjs) and with OT (e.g., ShareDB).
### What is a conflict?
#### What is a conflict?
Some kinds of conflict are obvious. In the example in [Figure 6-9](/en/ch6#fig_replication_write_conflict), two writes
concurrently modified the same field in the same record, setting it to two different values. There
@ -1174,7 +1180,8 @@ good understanding of this problem. We will see some more examples of conflicts
[Chapter 8](/en/ch8#ch_transactions), and in [Link to Come] we will discuss scalable approaches for detecting and
resolving conflicts in a replicated system.
# Leaderless Replication
## Leaderless Replication
The replication approaches we have discussed so far in this chapter—single-leader and
multi-leader replication—are based on the idea that a client sends a write request to one node
@ -1189,17 +1196,21 @@ a fashionable architecture for databases after Amazon used it for its in-house *
2007 [^45]. Riak, Cassandra, and ScyllaDB are open source datastores with leaderless replication models inspired
by Dynamo, so this kind of database is also known as *Dynamo-style*.
--------
> [!NOTE]
> The original *Dynamo* system was only described in a paper [^45], but never released outside of Amazon.
> The similarly-named *DynamoDB* is a more recent cloud database from AWS, but it has a completely different architecture:
> it uses single-leader replication based on the Multi-Paxos consensus algorithm [^5].
--------
In some leaderless implementations, the client directly sends its writes to several replicas, while
in others, a coordinator node does this on behalf of the client. However, unlike a leader database,
that coordinator does not enforce a particular ordering of writes. As we shall see, this difference in design has
profound consequences for the way the database is used.
## Writing to the Database When a Node Is Down
### Writing to the Database When a Node Is Down
Imagine you have a database with three replicas, and one of the replicas is currently
unavailable—perhaps it is being rebooted to install a system update. In a single-leader
@ -1231,7 +1242,7 @@ needs to be tagged with a version number or timestamp, similarly to what we saw
one with the greatest timestamp (even if that value was only returned by one replica, and several
other replicas returned older values). See [“Detecting Concurrent Writes”](/en/ch6#sec_replication_concurrent) for more details.
### Catching up on missed writes
#### Catching up on missed writes
The replication system should ensure that eventually all the data is copied to every replica. After
an unavailable node comes back online, how does it catch up on the writes that it missed? Several
@ -1257,7 +1268,7 @@ Anti-entropy
replication log in leader-based replication, this *anti-entropy process* does not copy writes in
any particular order, and there may be a significant delay before data is copied.
### Quorums for reading and writing
#### Quorums for reading and writing
In the example of [Figure 6-12](/en/ch6#fig_replication_quorum_node_outage), we considered the write to be successful
even though it was only processed on two out of three replicas. What if only one out of three
@ -1270,12 +1281,10 @@ respond, reads can nevertheless continue returning an up-to-date value.
More generally, if there are *n* replicas, every write must be confirmed by *w* nodes to be
considered successful, and we must query at least *r* nodes for each read. (In our example,
*n* = 3, *w* = 2, *r* = 2.) As long as *w* + *r* >
*n*, we expect to get an up-to-date value when reading, because at least one of the *r* nodes were
reading from must be up to date. Reads and writes that obey these *r* and *w* values are called
*quorum* reads and writes [^50].
You can think of *r* and *w* as the minimum number of votes required for the read or write to be
valid.
*n* = 3, *w* = 2, *r* = 2.) As long as *w* + *r* > *n*,
we expect to get an up-to-date value when reading, because at least one of the *r* nodes were
reading from must be up to date. Reads and writes that obey these *r* and *w* values are called *quorum* reads and writes [^50].
You can think of *r* and *w* as the minimum number of votes required for the read or write to be valid.
In Dynamo-style databases, the parameters *n*, *w*, and *r* are typically configurable. A common
choice is to make *n* an odd number (typically 3 or 5) and to set *w* = *r* =
@ -1284,11 +1293,15 @@ For example, a workload with few writes and many reads may benefit from setting
*r* = 1. This makes reads faster, but has the disadvantage that just one failed node causes all
database writes to fail.
--------
> [!NOTE]
> There may be more than *n* nodes in the cluster, but any given value is stored only on *n*
> nodes. This allows the dataset to be sharded, supporting datasets that are larger than you can fit
> on one node. We will return to sharding in [Chapter 7](/en/ch7#ch_sharding).
--------
The quorum condition, *w* + *r* > *n*, allows the system to tolerate unavailable nodes
as follows:
@ -1299,8 +1312,8 @@ as follows:
* With *n* = 5, *w* = 3, *r* = 3 we can tolerate two unavailable nodes.
This case is illustrated in [Figure 6-13](/en/ch6#fig_replication_quorum_overlap).
Normally, reads and writes are always sent to all *n* replicas in parallel. The parameters *w* and
*r* determine how many nodes we wait for—i.e., how many of the *n* nodes need to report success
Normally, reads and writes are always sent to all *n* replicas in parallel. The parameters *w* and *r*
determine how many nodes we wait for—i.e., how many of the *n* nodes need to report success
before we consider the read or write to be successful.
{{< figure src="/fig/ddia_0613.png" id="fig_replication_quorum_overlap" caption="Figure 6-13. If *w* + *r* > *n*, at least one of the *r* replicas you read from must have seen the most recent successful write." class="w-full my-4" >}}
@ -1312,7 +1325,7 @@ error executing the operation (cant write because the disk is full), due to a
between the client and the node, or for any number of other reasons. We only care whether the node
returned a successful response and dont need to distinguish between different kinds of fault.
## Limitations of Quorum Consistency
### Limitations of Quorum Consistency
If you have *n* replicas, and you choose *w* and *r* such that *w* + *r* > *n*, you can
generally expect every read to return the most recent value written for a key. This is the case because the
@ -1324,8 +1337,7 @@ Often, *r* and *w* are chosen to be a majority (more than *n*/2) of nodes, becau
*w* + *r* > *n* while still tolerating up to *n*/2 (rounded down) node failures. But quorums are
not necessarily majorities—it only matters that the sets of nodes used by the read and write
operations overlap in at least one node. Other quorum assignments are possible, which allows some
flexibility in the design of distributed algorithms
[^51].
flexibility in the design of distributed algorithms [^51].
You may also set *w* and *r* to smaller numbers, so that *w* + *r**n* (i.e.,
the quorum condition is not satisfied). In this case, reads and writes will still be sent to *n*
@ -1369,7 +1381,7 @@ it is not so simple. Dynamo-style databases are generally optimized for use case
eventual consistency. The parameters *w* and *r* allow you to adjust the probability of stale values
being read [^53], but its wise to not take them as absolute guarantees.
### Monitoring staleness
#### Monitoring staleness
From an operational perspective, its important to monitor whether your databases are
returning up-to-date results. Even if your application can tolerate stale reads, you need to be
@ -1388,7 +1400,8 @@ handoff can be one measure of system health, but its difficult to interpret u
Eventual consistency is a deliberately vague guarantee, but for operability its important to be
able to quantify “eventual.”
## Single-Leader vs. Leaderless Replication Performance
### Single-Leader vs. Leaderless Replication Performance
A replication system based on a single leader can provide strong consistency guarantees that are
difficult or impossible to achieve in a leaderless system. However, as we have seen in
@ -1427,8 +1440,7 @@ That said, leaderless systems can have performance problems as well:
* Even though the system doesnt need to perform failover, one replica does need to detect when
another replica is unavailable so that it can store hints about writes that the unavailable
replica missed. When the unavailable replica comes back, the handoff process needs to send it
those hints. This puts additional load on the replicas at a time when the system is already under
strain [^54].
those hints. This puts additional load on the replicas at a time when the system is already under strain [^54].
* The more replicas you have, the bigger the size of your quorums, and the more responses you have
to wait for before a request can complete. Even if you wait only for the fastest *r* or *w*
replicas to respond, and even if you make the requests in parallel, a bigger *r* or *w* increases
@ -1448,7 +1460,7 @@ be co-located with the client. However, since a write on one leader is propagate
the others, reads can be arbitrarily out-of-date. Quorum reads and writes provide a compromise: good
fault tolerance while also having a high likelihood of reading up-to-date data.
### Multi-region operation
#### Multi-region operation
We previously discussed cross-region replication as a use case for multi-leader replication (see
[“Multi-Leader Replication”](/en/ch6#sec_replication_multi_leader)). Leaderless replication is also suitable for
@ -1467,7 +1479,8 @@ describes the number of replicas within one region. Cross-region replication bet
database clusters happens asynchronously in the background, in a style that is similar to
multi-leader replication.
## Detecting Concurrent Writes
### Detecting Concurrent Writes
Like with multi-leader replication, leaderless databases allow concurrent writes to the same key,
resulting in conflicts that need to be resolved. Such conflicts may occur as the writes happen, but
@ -1477,8 +1490,7 @@ The problem is that events may arrive in a different order at different nodes, d
network delays and partial failures. For example, [Figure 6-14](/en/ch6#fig_replication_concurrency) shows two clients,
A and B, simultaneously writing to a key *X* in a three-node datastore:
* Node 1 receives the write from A, but never receives the write from B due to a transient
outage.
* Node 1 receives the write from A, but never receives the write from B due to a transient outage.
* Node 2 first receives the write from A, then the write from B.
* Node 3 first receives the write from B, then the write from A.
@ -1501,7 +1513,7 @@ you whether two values are actually conflicting (i.e., they were written concurr
were written one after another). If you want to resolve conflicts explicitly, the system needs to
take more care to detect concurrent writes.
### The “happens-before” relation and concurrency
#### The “happens-before” relation and concurrency
How do we decide whether two operations are concurrent or not? To develop an intuition, lets look
at some examples:
@ -1517,8 +1529,7 @@ at some examples:
An operation A *happens before* another operation B if B knows about A, or depends on A, or builds
upon A in some way. Whether one operation happens before another operation is the key to defining
what concurrency means. In fact, we can simply say that two operations are *concurrent* if neither
happens before the other (i.e., neither knows about the other)
[^57].
happens before the other (i.e., neither knows about the other) [^57].
Thus, whenever you have two operations A and B, there are three possibilities: either A happened
before B, or B happened before A, or A and B are concurrent. What we need is an algorithm to tell us
@ -1526,7 +1537,9 @@ whether two operations are concurrent or not. If one operation happened before a
operation should overwrite the earlier operation, but if the operations are concurrent, we have a
conflict that needs to be resolved.
# Concurrency, Time, and Relativity
--------
> ![TIP] Concurrency, Time, and Relativity
It may seem that two operations should be called concurrent if they occur “at the same time”—but
in fact, it is not important whether they literally overlap in time. Because of problems with clocks
@ -1546,7 +1559,9 @@ principle have allowed one operation to affect the other. For example, if the ne
interrupted at the time, two operations can occur some time apart and still be concurrent, because
the network problems prevented one operation from being able to know about the other.
### Capturing the happens-before relationship
--------
#### Capturing the happens-before relationship
Lets look at an algorithm that determines whether two operations are concurrent, or whether one
happened before another. To keep things simple, lets start with a database that has only one
@ -1619,7 +1634,7 @@ write is based on. If you make a write without including a version number, it is
other writes, so it will not overwrite anything—it will just be returned as one of the values
on subsequent reads.
### Version vectors
#### Version vectors
The example in [Figure 6-15](/en/ch6#fig_replication_causality_single) used only a single replica. How does the
algorithm change when there are multiple replicas, but no leader?
@ -1646,12 +1661,16 @@ The version vector also ensures that it is safe to read from one replica and sub
to another replica. Doing so may result in siblings being created, but no data is lost as long as
siblings are merged correctly.
# Version vectors and vector clocks
--------
> [!TIP] VERSION VECTORS AND VECTOR CLOCKS
A *version vector* is sometimes also called a *vector clock*, even though they are not quite the
same. The difference is subtle—please see the references for details [^60] [^63] [^64]. In brief, when
comparing the state of replicas, version vectors are the right data structure to use.
--------
## Summary
In this chapter we looked at the issue of replication. Replication can serve several purposes:

View file

@ -12,8 +12,7 @@ breadcrumbs: false
A distributed database typically distributes data across nodes in two ways:
1. Having a copy of the same data on multiple nodes: this is *replication*, which we discussed in
[Chapter 6](/en/ch6#ch_replication).
1. Having a copy of the same data on multiple nodes: this is *replication*, which we discussed in [Chapter 6](/en/ch6#ch_replication).
2. If we dont want every node to store all the data, we can split up a large amount of data into
smaller *shards* or *partitions*, and store different shards on different nodes. Well discuss
sharding in this chapter.
@ -38,7 +37,9 @@ Everything we discussed in [Chapter 6](/en/ch6#ch_replication) about replicatio
replication of shards. Since the choice of sharding scheme is mostly independent of the choice of
replication scheme, we will ignore replication in this chapter for the sake of simplicity.
# Sharding and Partitioning
--------
> [!TIP] SHARDING AND PARTITIONING
What we call a *shard* in this chapter has many different names depending on which software youre
using: its called a *partition* in Kafka, a *range* in CockroachDB, a *region* in HBase and TiDB, a
@ -61,7 +62,9 @@ Available Replicated Data*—reportedly a 1980s database, details of which are l
By the way, partitioning has nothing to do with *network partitions* (netsplits), a type of fault in
the network between nodes. We will discuss such faults in [Chapter 9](/en/ch9#ch_distributed).
# Pros and Cons of Sharding
--------
## Pros and Cons of Sharding
The primary reason for sharding a database is *scalability*: its a solution if the volume of data
or the write throughput has become too great for a single node to handle, as it allows you to spread
@ -105,7 +108,7 @@ access* (NUMA) architecture in which some banks of memory are closer to one CPU
For example, Redis, VoltDB, and FoundationDB use one process per core, and rely on sharding to
spread load across CPU cores in the same machine [^6].
## Sharding for Multitenancy
### Sharding for Multitenancy
Software as a Service (SaaS) products and cloud services are often *multitenant*, where each tenant
is a customer. Multiple users may have logins on the same tenant, but each tenant has a
@ -166,7 +169,9 @@ The main challenges around using sharding for multitenancy are:
* If you ever need to support features that connect data across multiple tenants, these become
harder to implement if you need to join data across multiple shards.
# Sharding of Key-Value Data
## Sharding of Key-Value Data
Say you have a large amount of data, and you want to shard it. How do you decide which records to
store on which nodes?
@ -181,8 +186,7 @@ If the sharding is unfair, so that some shards have more data or queries than ot
*skewed*. The presence of skew makes sharding much less effective. In an extreme case, all the load
could end up on one shard, so 9 out of 10 nodes are idle and your bottleneck is the single busy
node. A shard with disproportionately high load is called a *hot shard* or *hot spot*. If theres
one key with a particularly high load (e.g., a celebrity in a social network), we call it a *hot
key*.
one key with a particularly high load (e.g., a celebrity in a social network), we call it a *hot key*.
Therefore we need an algorithm that takes as input the partition key of a record, and tells us which
shard that record is in. In a key-value store the partition key is usually the key, or the first
@ -190,7 +194,8 @@ part of the key. In a relational model the partition key might be some column of
necessarily its primary key). That algorithm needs to be amenable to rebalancing in order to relieve
hot spots.
## Sharding by Key Range
### Sharding by Key Range
One way of sharding is to assign a contiguous range of partition keys (from some minimum to some
maximum) to each shard, like the volumes of a paper encyclopedia, as illustrated in
@ -233,7 +238,7 @@ active at the same time, the write load will end up more evenly spread across th
downside is that when you want to fetch the values of multiple sensors within a time range, you now
need to perform a separate range query for each sensor.
### Rebalancing key-range sharded data
#### Rebalancing key-range sharded data
When you first set up your database, there are no key ranges to split into shards. Some databases,
such as HBase and MongoDB, allow you to configure an initial set of shards on an empty database,
@ -251,20 +256,18 @@ With databases that manage shard boundaries automatically, a shard split is typi
* the shard reaching a configured size (for example, on HBase, the default is 10 GB), or
* in some systems, the write throughput being persistently above some threshold. Thus, a hot shard
may be split even if it is not storing a lot of data, so that its write load can be distributed
more uniformly.
may be split even if it is not storing a lot of data, so that its write load can be distributed more uniformly.
An advantage of key-range sharding is that the number of shards adapts to the data volume. If there
is only a small amount of data, a small number of shards is sufficient, so overheads are small; if
there is a huge amount of data, the size of each individual shard is limited to a configurable
maximum [^15].
there is a huge amount of data, the size of each individual shard is limited to a configurable maximum [^15].
A downside of this approach is that splitting a shard is an expensive operation, since it requires
all of its data to be rewritten into new files, similarly to a compaction in a log-structured
storage engine. A shard that needs splitting is often also one that is under high load, and the cost
of splitting can exacerbate that load, risking it becoming overloaded.
## Sharding by Hash of Key
### Sharding by Hash of Key
Key-range sharding is useful if you want records with nearby (but different) partition keys to be
grouped into the same shard; for example, this might be the case with timestamps. If you dont care
@ -273,9 +276,8 @@ application), a common approach is to first hash the partition key before mappin
A good hash function takes skewed data and makes it uniformly distributed. Say you have a 32-bit
hash function that takes a string. Whenever you give it a new string, it returns a seemingly random
number between 0 and 232  1. Even if the input strings are very similar, their
hashes are evenly distributed across that range of numbers (but the same input always produces the
same output).
number between 0 and 232  1. Even if the input strings are very similar, their hashes are evenly
distributed across that range of numbers (but the same input always produces the same output).
For sharding purposes, the hash function need not be cryptographically strong: for example, MongoDB
uses MD5, whereas Cassandra and ScyllaDB use Murmur3. Many programming languages have simple hash
@ -283,7 +285,7 @@ functions built in (as they are used for hash tables), but they may not be suita
for example, in Javas `Object.hashCode()` and Rubys `Object#hash`, the same key may have a
different hash value in different processes, making them unsuitable for sharding [^16].
### Hash modulo number of nodes
#### Hash modulo number of nodes
Once you have hashed the key, how do you choose which shard to store it in? Maybe your first thought
is to take the hash value *modulo* the number of nodes in the system (using the `%` operator in many
@ -303,7 +305,7 @@ The *mod N* function is easy to compute, but it leads to very inefficient rebala
is a lot of unnecessary movement of records from one node to another. We need an approach that
doesnt move data around more than necessary.
### Fixed number of shards
#### Fixed number of shards
One simple but widely-used solution is to create many more shards than there are nodes, and to
assign several shards to each node. For example, a database running on a cluster of 10 nodes may be
@ -313,8 +315,7 @@ which shard is stored on which node.
Now, if a node is added to the cluster, the system can reassign some of the shards from existing
nodes to the new node until they are fairly distributed once again. This process is illustrated in
[Figure 7-4](/en/ch7#fig_sharding_rebalance_fixed). If a node is removed from the cluster, the same happens in
reverse.
[Figure 7-4](/en/ch7#fig_sharding_rebalance_fixed). If a node is removed from the cluster, the same happens in reverse.
{{< figure src="/fig/ddia_0704.png" id="fig_sharding_rebalance_fixed" caption="Figure 7-4. Adding a new node to a database cluster with multiple shards per node." class="w-full my-4" >}}
@ -349,7 +350,7 @@ expensive. But if shards are too small, they incur too much overhead. The best p
achieved when the size of shards is “just right,” neither too big nor too small, which can be hard
to achieve if the number of shards is fixed but the dataset size varies.
### Sharding by hash range
#### Sharding by hash range
If the required number of shards cant be predicted in advance, its better to use a scheme in which
the number of shards can adapt easily to the workload. The aforementioned key-range sharding scheme
@ -375,7 +376,9 @@ two or more columns, and the partition key is only the first of these columns, y
efficient range queries over the second and later columns: as long as all records in the range query
have the same partition key, they will be in the same shard.
# Partitioning and Range Queries in Data Warehouses
--------
> [!TIPS] PARTITIONING AND RANGE QUERIES IN DATA WAREHOUSES
Data warehouses such as BigQuery, Snowflake, and Delta Lake support a similar indexing approach,
though the terminology differs. In BigQuery, for example, the partition key determines which
@ -385,6 +388,8 @@ cluster keys for a table. Delta Lake supports both manual and automatic partitio
supports cluster keys. Clustering data not only improves range scan performance, but can
improve compression and filtering performance as well.
--------
Hash-range sharding is used in YugabyteDB and DynamoDB [^17], and is an option in MongoDB.
Cassandra and ScyllaDB use a variant of this approach that is illustrated in
[Figure 7-6](/en/ch7#fig_sharding_cassandra): the space of hash values is split into a number of ranges proportional
@ -402,7 +407,7 @@ transfers parts of two of its ranges to node 3, and node 2 transfers part of one
node 3. This has the effect of giving the new node an approximately fair share of the dataset,
without transferring more data than necessary from one node to another.
### Consistent hashing
#### Consistent hashing
A *consistent hashing* algorithm is a hash function that maps keys to a specified number of shards
in a way that satisfies two properties:
@ -422,7 +427,7 @@ sub-ranges; on the other hand, with rendezvous and jump consistent hashes, the n
individual keys that were previously scattered across all of the other nodes. Which one is
preferable depends on the application.
## Skewed Workloads and Relieving Hot Spots
### Skewed Workloads and Relieving Hot Spots
Consistent hashing ensures that keys are uniformly distributed across nodes, but that doesnt mean
that the actual load is uniformly distributed. If the workload is highly skewed—that is, the amount
@ -461,7 +466,7 @@ Some systems (especially cloud services designed for large scale) have automated
dealing with hot shards; for example, Amazon calls it *heat management* [^28] or *adaptive capacity* [^17].
The details of how these systems work go beyond the scope of this book.
## Operations: Automatic or Manual Rebalancing
### Operations: Automatic or Manual Rebalancing
There is one important question with regard to rebalancing that we have glossed over: does the
splitting of shards and rebalancing happen automatically or manually?
@ -469,8 +474,7 @@ splitting of shards and rebalancing happen automatically or manually?
Some systems automatically decide when to split shards and when to move them from one node to
another, without any human interaction, while others leave sharding to be explicitly configured by
an administrator. There is also a middle ground: for example, Couchbase and Riak generate a
suggested shard assignment automatically, but require an administrator to commit it before it takes
effect.
suggested shard assignment automatically, but require an administrator to commit it before it takes effect.
Fully automated rebalancing can be convenient, because there is less operational work to do for
normal maintenance, and such systems can even auto-scale to adapt to changes in workload. Cloud
@ -488,13 +492,14 @@ Such automation can be dangerous in combination with automatic failure detection
one node is overloaded and is temporarily slow to respond to requests. The other nodes conclude that
the overloaded node is dead, and automatically rebalance the cluster to move load away from it. This
puts additional load on other nodes and the network, making the situation worse. There is a risk of
causing a cascading failure where other nodes become overloaded and are also falsely suspected of
being down.
causing a cascading failure where other nodes become overloaded and are also falsely suspected of being down.
For that reason, it can be a good thing to have a human in the loop for rebalancing. Its slower
than a fully automatic process, but it can help prevent operational surprises.
# Request Routing
## Request Routing
We have discussed how to shard a dataset across multiple nodes, and how to rebalance those shards as
nodes are added or removed. Now lets move on to the question: if you want to read or write a
@ -508,8 +513,8 @@ balancer can send a request to any of the instances. With sharded databases, a r
only be handled by a node that is a replica for the shard containing that key.
This means that request routing has to be aware of the assignment from keys to shards, and from
shards to nodes. On a high level, there are a few different approaches to this problem (illustrated
in [Figure 7-7](/en/ch7#fig_sharding_routing)):
shards to nodes. On a high level, there are a few different approaches to this problem
(illustrated in [Figure 7-7](/en/ch7#fig_sharding_routing)):
1. Allow clients to contact any node (e.g., via a round-robin load balancer). If that node
coincidentally owns the shard to which the request applies, it can handle the request directly;
@ -568,7 +573,7 @@ typically have a very different kind of query execution: rather than executing i
query typically needs to aggregate and join data from many different shards in parallel. We will
discuss techniques for such parallel query execution in [Link to Come].
# Sharding and Secondary Indexes
## Sharding and Secondary Indexes
The sharding schemes we have discussed so far rely on the client knowing the partition key for any
record it wants to access. This is most easily done in a key-value data model, where the partition
@ -587,7 +592,7 @@ search engines such as Solr and Elasticsearch. The problem with secondary indexe
map neatly to shards. There are two main approaches to sharding a database with secondary indexes:
local and global indexes.
## Local Secondary Indexes
### Local Secondary Indexes
For example, imagine you are operating a website for selling used cars (illustrated in
[Figure 7-9](/en/ch7#fig_sharding_local_secondary)). Each listing has a unique ID, and you use that ID as partition
@ -602,7 +607,7 @@ automatically adds its ID to the list of IDs for the index entry `color:red`. As
{{< figure src="/fig/ddia_0709.png" id="fig_sharding_local_secondary" caption="Figure 7-9. Local secondary indexes: each shard indexes only the records within its own shard." class="w-full my-4" >}}
###### Warning
> [!WARN] WARNING
If your database only supports a key-value model, you might be tempted to implement a secondary
index yourself by creating a mapping from values to IDs in application code. If you go down this
@ -610,6 +615,8 @@ route, you need to take great care to ensure your indexes remain consistent with
data. Race conditions and intermittent write failures (where some changes were saved but others
werent) can very easily cause the data to go out of sync—see [“The need for multi-object transactions”](/en/ch8#sec_transactions_need).
--------
In this indexing approach, each shard is completely separate: each shard maintains its own secondary
indexes, covering only the records in that shard. It doesnt care what data is stored in other
shards. Whenever you write to the database—to add, remove, or update a records—you only need to
@ -635,7 +642,7 @@ process every query anyway.
Nevertheless, local secondary indexes are widely used [^31]: for example, MongoDB, Riak, Cassandra [^32], Elasticsearch [^33],
SolrCloud, and VoltDB [^34] all use local secondary indexes.
## Global Secondary Indexes
### Global Secondary Indexes
Rather than each shard having its own, local secondary index, we can construct a *global index* that
covers data in all shards. However, we cant just store that index on one node, since it would
@ -645,16 +652,13 @@ but it can be sharded differently from the primary key index.
[Figure 7-10](/en/ch7#fig_sharding_global_secondary) illustrates what this could look like: the IDs of red cars from
all shards appear under `color:red` in the index, but the index is sharded so that colors starting
with the letters *a* to *r* appear in shard 0 and colors starting with *s* to *z* appear in shard 1.
The index on the make of car is partitioned similarly (with the shard boundary being between *f* and
*h*).
The index on the make of car is partitioned similarly (with the shard boundary being between *f* and *h*).
{{< figure src="/fig/ddia_0710.png" id="fig_sharding_global_secondary" caption="Figure 7-10. A global secondary index reflects data from all shards, and is itself sharded by the indexed value." class="w-full my-4" >}}
This kind of index is also called *term-partitioned*
[^30]:
This kind of index is also called *term-partitioned* [^30]:
recall from [“Full-Text Search”](/en/ch4#sec_storage_full_text) that in full-text search, a *term* is a keyword in a text that
you can search for. Here we generalise it to mean any value that you can search for in the secondary
index.
you can search for. Here we generalise it to mean any value that you can search for in the secondary index.
The global index uses the term as partition key, so that when youre looking for a particular term
or value, you can figure out which shard you need to query. As before, a shard can contain a
@ -684,6 +688,7 @@ indexes, so reads from a global index may be stale (similarly to replication lag
Nevertheless, global indexes are useful if read throughput is higher than write throughput, and if
the postings lists are not too long.
## Summary
In this chapter we explored different ways of sharding a large dataset into smaller subsets.
@ -692,8 +697,7 @@ is no longer feasible.
The goal of sharding is to spread the data and query load evenly across multiple machines, avoiding
hot spots (nodes with disproportionately high load). This requires choosing a sharding scheme that
is appropriate to your data, and rebalancing the shards when nodes are added to or removed from the
cluster.
is appropriate to your data, and rebalancing the shards when nodes are added to or removed from the cluster.
We discussed two main approaches to sharding:

View file

@ -63,11 +63,10 @@ Concurrency control is relevant for both single-node and distributed databases.
chapter, in [“Distributed Transactions”](/en/ch8#sec_transactions_distributed), we will examine the *two-phase commit* protocol and
the challenge of achieving atomicity in a distributed transaction.
# What Exactly Is a Transaction?
## What Exactly Is a Transaction?
Almost all relational databases today, and some nonrelational databases, support transactions. Most
of them follow the style that was introduced in 1975 by IBM System R, the first SQL database
[^2] [^3] [^4].
of them follow the style that was introduced in 1975 by IBM System R, the first SQL database [^2] [^3] [^4].
Although some implementation details have changed, the general idea has remained virtually the same
for 50 years: the transaction support in MySQL, PostgreSQL, Oracle, SQL Server, etc., is uncannily
similar to that of System R.
@ -92,7 +91,7 @@ technical design choice, transactions have advantages and limitations. In order
trade-offs, lets go into the details of the guarantees that transactions can provide—both in normal
operation and in various extreme (but realistic) circumstances.
## The Meaning of ACID
## #The Meaning of ACID
The safety guarantees provided by transactions are often described by the well-known acronym *ACID*,
which stands for *Atomicity*, *Consistency*, *Isolation*, and *Durability*. It was coined in 1983 by
@ -112,7 +111,7 @@ BASE is “not ACID”; i.e., it can mean almost anything you want.)
Lets dig into the definitions of atomicity, consistency, isolation, and durability, as this will let
us refine our idea of transactions.
### Atomicity
#### Atomicity
In general, *atomic* refers to something that cannot be broken down into smaller parts. The word
means similar but subtly different things in different branches of computing. For example, in
@ -141,7 +140,7 @@ The ability to abort a transaction on error and have all writes from that transa
the defining feature of ACID atomicity. Perhaps *abortability* would have been a better term than
*atomicity*, but we will stick with *atomicity* since thats the usual word.
### Consistency
#### Consistency
The word *consistency* is terribly overloaded:
@ -181,7 +180,7 @@ invariants, but you havent declared those invariants, the database cant st
in ACID often depends on how the application uses the database, and its not a property of the
database alone.
### Isolation
### #Isolation
Most databases are accessed by several clients at the same time. That is no problem if they are
reading and writing different parts of the database, but if they are accessing the same database
@ -211,7 +210,7 @@ is a weaker guarantee than serializability [^10] [^14]).
This means that some kinds of race conditions can still occur. We will explore snapshot isolation
and other forms of isolation in [“Weak Isolation Levels”](/en/ch8#sec_transactions_isolation_levels).
### Durability
#### Durability
The purpose of a database system is to provide a safe place where data can be stored without fear of
losing it. *Durability* is the promise that once a transaction has committed successfully, any data it
@ -231,7 +230,9 @@ as discussed in [“Reliability and Fault Tolerance”](/en/ch2#sec_introduction
hard disks and all your backups are destroyed at the same time, theres obviously nothing your
database can do to save you.
# Replication and Durability
--------
> [!TIP] REPLICATION AND DURABILITY
Historically, durability meant writing to an archive tape. Then it was understood as writing to a disk
or SSD. More recently, it has been adapted to mean replication. Which implementation is better?
@ -269,7 +270,9 @@ risk-reduction techniques, including writing to disk, replicating to remote mach
backups—and they can and should be used together. As always, its wise to take any theoretical
“guarantees” with a healthy grain of salt.
## Single-Object and Multi-Object Operations
--------
### Single-Object and Multi-Object Operations
To recap, in ACID, atomicity and isolation describe what the database should do if a client makes
several writes within the same transaction:
@ -329,7 +332,7 @@ operation that updates several keys in one operation), that doesnt necessaril
transaction semantics: the command may succeed for some keys and fail for others, leaving the
database in a partially updated state.
### Single-object writes
#### Single-object writes
Atomicity and isolation also apply when a single object is being changed. For example, imagine you
are writing a 20 KB JSON document to a database:
@ -353,11 +356,15 @@ Similarly popular is a *conditional write* operation, which allows a write to ha
has not been concurrently changed by someone else (see [“Conditional writes (compare-and-set)”](/en/ch8#sec_transactions_compare_and_set)),
similarly to a compare-and-set or compare-and-swap (CAS) operation in shared-memory concurrency.
--------
> [!NOTE]
> Strictly speaking, the term *atomic increment* uses the word *atomic* in the sense of multi-threaded
> programming. In the context of ACID, it should actually be called an *isolated* or *serializable*
> increment, but thats not the usual term.
--------
These single-object operations are useful, as they can prevent lost updates when several clients try
to write to the same object concurrently (see [“Preventing Lost Updates”](/en/ch8#sec_transactions_lost_update)). However, they are
not transactions in the usual sense of the word. For example, the “lightweight transactions” feature
@ -365,7 +372,7 @@ of Cassandra and ScyllaDB, and Aerospikes “strong consistency” mode offer
[“Linearizability”](/en/ch10#sec_consistency_linearizability)) reads and conditional writes on a single object, but no
guarantees across multiple objects.
### The need for multi-object transactions
#### The need for multi-object transactions
Do we need multi-object transactions at all? Would it be possible to implement any application with
only a key-value data model and single-object operations?
@ -396,7 +403,7 @@ much more complicated without atomicity, and the lack of isolation can cause con
We will discuss those in [“Weak Isolation Levels”](/en/ch8#sec_transactions_isolation_levels), and explore alternative approaches
in [Link to Come].
### Handling errors and aborts
#### Handling errors and aborts
A key feature of a transaction is that it can be aborted and safely retried if an error occurred.
ACID databases are based on this philosophy: if the database is in danger of violating its guarantee
@ -437,7 +444,9 @@ isnt perfect:
in [“Two-Phase Commit (2PC)”](/en/ch8#sec_transactions_2pc)).
* If the client process crashes while retrying, any data it was trying to write to the database is lost.
# Weak Isolation Levels
## Weak Isolation Levels
If two transactions dont access the same data, or if both are read-only, they can safely be run in
parallel, because neither depends on the other. Concurrency issues (race conditions) only come into
@ -470,11 +479,15 @@ financial data!”—but that misses the point. Even many popular relational dat
are usually considered “ACID”) use weak isolation, so they wouldnt necessarily have prevented these
bugs from occurring.
--------
> [!NOTE]
> Incidentally, much of the banking system relies on text files that are exchanged via secure FTP [^35].
> In this context, having an audit trail and some human-level fraud prevention measures is actually
> more important than ACID properties.
--------
Those examples also highlight an important point: even if concurrency issues are rare in normal
operation, you have to consider the possibility that an attacker deliberately sends a burst of
highly concurrent requests to your API in an attempt to deliberately exploit concurrency bugs [^30]. Therefore, in order to build
@ -488,7 +501,7 @@ serializability in detail (see [“Serializability”](/en/ch8#sec_transactions_
levels will be informal, using examples. If you want rigorous definitions and analyses of their
properties, you can find them in the academic literature [^36] [^37] [^38] [^39].
## Read Committed
### Read Committed
The most basic level of transaction isolation is *read committed*. It makes two guarantees:
@ -498,7 +511,7 @@ The most basic level of transaction isolation is *read committed*. It makes two
Some databases support an even weaker isolation level called *read uncommitted*. It prevents dirty
writes, but does not prevent dirty reads. Lets discuss these two guarantees in more detail.
### No dirty reads
#### No dirty reads
Imagine a transaction has written some data to the database, but the transaction has not yet committed or aborted.
Can another transaction see that uncommitted data? If yes, that is called a
@ -506,13 +519,11 @@ Can another transaction see that uncommitted data? If yes, that is called a
Transactions running at the read committed isolation level must prevent dirty reads. This means that
any writes by a transaction only become visible to others when that transaction commits (and then
all of its writes become visible at once). This is illustrated in
[Figure 8-4](/en/ch8#fig_transactions_read_committed), where user 1 has set *x* = 3, but user 2s *get x* still
all of its writes become visible at once). This is illustrated in [Figure 8-4](/en/ch8#fig_transactions_read_committed), where user 1 has set *x* = 3, but user 2s *get x* still
returns the old value, 2, while user 1 has not yet committed.
{{< figure src="/fig/ddia_0804.png" id="fig_transactions_read_committed" caption="Figure 8-4. No dirty reads: user 2 sees the new value for x only after user 1's transaction has committed." class="w-full my-4" >}}
There are a few reasons why its useful to prevent dirty reads:
* If a transaction needs to update several rows, a dirty read means that another transaction may
@ -526,7 +537,7 @@ There are a few reasons why its useful to prevent dirty reads:
transaction that read uncommitted data would also need to be aborted, leading to a problem called
*cascading aborts*.
### No dirty writes
#### No dirty writes
What happens if two transactions concurrently try to update the same row in a database? We dont
know in which order the writes will happen, but we normally assume that the later write overwrites
@ -555,7 +566,7 @@ By preventing dirty writes, this isolation level avoids some kinds of concurrenc
{{< figure src="/fig/ddia_0805.png" id="fig_transactions_dirty_writes" caption="Figure 8-5. With dirty writes, conflicting writes from different transactions can be mixed up." class="w-full my-4" >}}
### Implementing read committed
#### Implementing read committed
Read committed is a very popular isolation level. It is the default setting in Oracle Database,
PostgreSQL, SQL Server, and many other databases [^10].
@ -584,15 +595,14 @@ different part of the application, due to waiting for locks.
Nevertheless, locks are used to prevent dirty reads in some databases, such as IBM
Db2 and Microsoft SQL Server in the `read_committed_snapshot=off` setting [^29].
A more commonly used approach to preventing dirty reads is the one illustrated in
[Figure 8-4](/en/ch8#fig_transactions_read_committed): for every
A more commonly used approach to preventing dirty reads is the one illustrated in [Figure 8-4](/en/ch8#fig_transactions_read_committed): for every
row that is written, the database remembers both the old committed value and the new value
set by the transaction that currently holds the write lock. While the transaction is ongoing, any
other transactions that read the row are simply given the old value. Only when the new value is
committed do transactions switch over to reading the new value (see
[“Multi-version concurrency control (MVCC)”](/en/ch8#sec_transactions_snapshot_impl) for more detail).
## Snapshot Isolation and Repeatable Read
### Snapshot Isolation and Repeatable Read
If you look superficially at read committed isolation, you could be forgiven for thinking that it
does everything that a transaction needs to do: it allows aborts (required for atomicity), it
@ -616,15 +626,18 @@ now appears as though she only has a total of $900 in her accounts—it seems th
vanished into thin air.
This anomaly is called *read skew*, and it is an example of a *nonrepeatable read*:
if Aaliyah were to read the balance of
account 1 again at the end of the transaction, she would see a different value ($600) than she saw
if Aaliyah were to read the balance of account 1 again at the end of the transaction, she would see a different value ($600) than she saw
in her previous query. Read skew is considered acceptable under read committed isolation: the
account balances that Aaliyah saw were indeed committed at the time when she read them.
--------
> [!NOTE]
> The term *skew* is unfortunately overloaded: we previously used it in the sense of an *unbalanced
> workload with hot spots* (see [“Skewed Workloads and Relieving Hot Spots”](/en/ch7#sec_sharding_skew)), whereas here it means *timing anomaly*.
--------
In Aaliyahs case, this is not a lasting problem, because she will most likely see consistent account
balances if she reloads the online banking website a few seconds later. However, some situations
cannot tolerate such temporary inconsistency:
@ -659,7 +672,7 @@ one system to the next [^29] [^40] [^41].
Some databases, such as Oracle, TiDB, and Aurora DSQL, even choose snapshot isolation as their
highest isolation level.
### Multi-version concurrency control (MVCC)
#### Multi-version concurrency control (MVCC)
Like read committed isolation, implementations of snapshot isolation typically use write locks to
prevent dirty writes (see [“Implementing read committed”](/en/ch8#sec_transactions_read_committed_impl)), which means that a transaction
@ -707,7 +720,7 @@ All of the versions of a row are stored within the same database heap (see
or not. The versions of the same row form a linked list, going either from newest version to oldest
version or the other way round, so that queries can internally iterate over all versions of a row [^45] [^46].
### Visibility rules for observing a consistent snapshot
#### Visibility rules for observing a consistent snapshot
When a transaction reads from the database, transaction IDs are used to decide which row versions it
can see and which are invisible. By carefully defining visibility rules, the database can present a
@ -743,7 +756,7 @@ that (from other transactions point of view) have long been overwritten or de
updating values in place but instead inserting a new version every time a value is changed, the
database can provide a consistent snapshot while incurring only a small overhead.
### Indexes and snapshot isolation
#### Indexes and snapshot isolation
How do indexes work in a multi-version database? The most common approach is that each index entry
points at one of the versions of a row that matches the entry (either the oldest or the newest
@ -754,9 +767,8 @@ are no longer visible to any transaction, the corresponding index entries can al
Many implementation details affect the performance of multi-version concurrency control [^45] [^46].
For example, PostgreSQL has optimizations for avoiding index updates if different versions of the
same row can fit on the same page [^40].
Some other databases avoid storing full copies of modified rows, and only store differences between
versions to save space.
same row can fit on the same page [^40]. Some other databases avoid storing full copies of modified rows,
and only store differences between versions to save space.
Another approach is used in CouchDB, Datomic, and LMDB. Although they also use B-trees (see
[“B-Trees”](/en/ch4#sec_storage_b_trees)), they use an *immutable* (copy-on-write) variant that does not overwrite
@ -771,7 +783,7 @@ was created. There is no need to filter out rows based on transaction IDs becaus
writes cannot modify an existing B-tree; they can only create new tree roots. This approach also
requires a background process for compaction and garbage collection.
### Snapshot isolation, repeatable read, and naming confusion
#### Snapshot isolation, repeatable read, and naming confusion
MVCC is a commonly used implementation technique for databases, and often it is used to implement
snapshot isolation. However, different databases sometimes use different terms to refer to the same
@ -796,7 +808,7 @@ formal definition. And to top it off, IBM Db2 uses “repeatable read” to refe
As a result, nobody really knows what repeatable read means.
## Preventing Lost Updates
### Preventing Lost Updates
The read committed and snapshot isolation levels weve discussed so far have been primarily about the guarantees
of what a read-only transaction can see in the presence of concurrent writes. We have mostly ignored
@ -822,7 +834,7 @@ pattern occurs in various different scenarios:
Because this is such a common problem, a variety of solutions have been developed [^48].
### Atomic write operations
#### Atomic write operations
Many databases provide atomic update operations, which remove the need to implement
read-modify-write cycles in application code. They are usually the best solution if your code can be
@ -849,7 +861,7 @@ that performs unsafe read-modify-write cycles instead of using atomic operations
database [^49] [^50] [^51].
This can be a source of subtle bugs that are difficult to find by testing.
### Explicit locking
#### Explicit locking
Another option for preventing lost updates, if the databases built-in atomic operations dont
provide the necessary functionality, is for the application to explicitly lock objects that are
@ -869,8 +881,8 @@ players from concurrently moving the same piece, as illustrated in [Example 8-1
BEGIN TRANSACTION;
SELECT * FROM figures
WHERE name = 'robot' AND game_id = 222
FOR UPDATE; ❶
WHERE name = 'robot' AND game_id = 222
FOR UPDATE; ❶
-- Check whether move is valid, then update the position
-- of the piece that was returned by the previous SELECT.
@ -889,7 +901,7 @@ are waiting for each other to release their locks. Many databases automatically
and abort one of the involved transactions so that the system can make progress. You can handle this
situation at the application level by retrying the aborted transaction.
### Automatically detecting lost updates
#### Automatically detecting lost updates
Atomic operations and locks are ways of preventing lost updates by forcing the read-modify-write
cycles to happen sequentially. An alternative is to allow them to execute in parallel and, if the
@ -909,7 +921,7 @@ special database features—you may forget to use a lock or an atomic operation
a bug, but lost update detection happens automatically and is thus less error-prone. However, you
also have to retry aborted transactions at the application level.
### Conditional writes (compare-and-set)
#### Conditional writes (compare-and-set)
In databases that dont provide transactions, you sometimes find a *conditional write* operation
that can prevent lost updates by allowing an update to happen only if the value has not changed
@ -925,7 +937,7 @@ user started editing it:
```sql
-- This may or may not be safe, depending on the database implementation
UPDATE wiki_pages SET content = 'new content'
WHERE id = 1234 AND content = 'old content';
WHERE id = 1234 AND content = 'old content';
```
If the content has changed and no longer matches `'old content'`, this update will have no effect,
@ -940,7 +952,7 @@ implementations of MVCC have an exception to the visibility rules for this scena
written by other transactions are visible to the evaluation of the `WHERE` clause of `UPDATE` and
`DELETE` queries, even though those writes are not otherwise visible in the snapshot.
### Conflict resolution and replication
#### Conflict resolution and replication
In replicated databases (see [Chapter 6](/en/ch6#ch_replication)), preventing lost updates takes on another
dimension: since they have copies of the data on multiple nodes, and the data can potentially be
@ -968,7 +980,7 @@ On the other hand, the *last write wins* (LWW) conflict resolution method is pro
as discussed in [“Last write wins (discarding concurrent writes)”](/en/ch6#sec_replication_lww).
Unfortunately, LWW is the default in many replicated databases.
## Write Skew and Phantoms
### Write Skew and Phantoms
In the previous sections we saw *dirty writes* and *lost updates*, two kinds of race conditions that
can occur when different transactions concurrently try to write to the same objects. In order to
@ -995,10 +1007,9 @@ In each transaction, your application first checks that two or more doctors are
if yes, it assumes its safe for one doctor to go off call. Since the database is using snapshot
isolation, both checks return `2`, so both transactions proceed to the next stage. Aaliyah updates her
own record to take herself off call, and Bryce updates his own record likewise. Both transactions
commit, and now no doctor is on call. Your requirement of having at least one doctor on call has
been violated.
commit, and now no doctor is on call. Your requirement of having at least one doctor on call has been violated.
### Characterizing write skew
#### Characterizing write skew
This anomaly is called *write skew* [^36]. It
is neither a dirty write nor a lost update, because the two transactions are updating two different
@ -1035,51 +1046,50 @@ options are more restricted:
BEGIN TRANSACTION;
SELECT * FROM doctors
WHERE on_call = true
AND shift_id = 1234 FOR UPDATE; ❶
WHERE on_call = true
AND shift_id = 1234 FOR UPDATE; ❶
UPDATE doctors
SET on_call = false
WHERE name = 'Aaliyah'
AND shift_id = 1234;
SET on_call = false
WHERE name = 'Aaliyah'
AND shift_id = 1234;
COMMIT;
```
❶: As before, `FOR UPDATE` tells the database to lock all rows returned by this query.
### More examples of write skew
#### More examples of write skew
Write skew may seem like an esoteric issue at first, but once youre aware of it, you may notice
more situations in which it can occur. Here are some more examples:
Meeting room booking system
: Say you want to enforce that there cannot be two bookings for the same meeting room at the same time [^55].
When someone wants to make a booking, you first check for any conflicting bookings (i.e.,
bookings for the same room with an overlapping time range), and if none are found, you create the
meeting (see [Example 8-2](/en/ch8#fig_transactions_meeting_rooms)).
When someone wants to make a booking, you first check for any conflicting bookings (i.e.,
bookings for the same room with an overlapping time range), and if none are found, you create the
meeting (see [Example 8-2](/en/ch8#fig_transactions_meeting_rooms)).
{{< figure id="fig_transactions_meeting_rooms" title="Example 8-2. A meeting room booking system tries to avoid double-booking (not safe under snapshot isolation)" class="w-full my-4" >}}
```sql
BEGIN TRANSACTION;
-- Check for any existing bookings that overlap with the period of noon-1pm
SELECT COUNT(*) FROM bookings
WHERE room_id = 123 AND
end_time > '2025-01-01 12:00' AND start_time < '2025-01-01 13:00';
-- If the previous query returned zero:
INSERT INTO bookings (room_id, start_time, end_time, user_id)
VALUES (123, '2025-01-01 12:00', '2025-01-01 13:00', 666);
COMMIT;
```
##### Example 8-2. A meeting room booking system tries to avoid double-booking (not safe under snapshot isolation)
```sql
BEGIN TRANSACTION;
-- Check for any existing bookings that overlap with the period of noon-1pm
SELECT COUNT(*) FROM bookings
WHERE room_id = 123 AND
end_time > '2025-01-01 12:00' AND start_time < '2025-01-01 13:00';
-- If the previous query returned zero:
INSERT INTO bookings
(room_id, start_time, end_time, user_id)
VALUES (123, '2025-01-01 12:00', '2025-01-01 13:00', 666);
COMMIT;
```
Unfortunately, snapshot isolation does not prevent another user from concurrently inserting a conflicting
meeting. In order to guarantee you wont get scheduling conflicts, you once again need serializable
isolation.
Unfortunately, snapshot isolation does not prevent another user from concurrently inserting a conflicting
meeting. In order to guarantee you wont get scheduling conflicts, you once again need serializable
isolation.
Multiplayer game
: In [Example 8-1](/en/ch8#fig_transactions_select_for_update), we used a lock to prevent lost updates (that is, making
@ -1103,7 +1113,7 @@ Preventing double-spending
With write skew, it could happen that two spending items are inserted concurrently that together
cause the balance to go negative, but that neither transaction notices the other.
### Phantoms causing write skew
#### Phantoms causing write skew
All of these examples follow a similar pattern:
@ -1139,7 +1149,7 @@ Snapshot isolation avoids phantoms in read-only queries, but in read-write trans
examples we discussed, phantoms can lead to particularly tricky cases of write skew. The SQL
generated by ORMs is also prone to write skew [^50] [^51].
### Materializing conflicts
#### Materializing conflicts
If the problem of phantoms is that there is no object to which we can attach the locks, perhaps we
can artificially introduce a lock object into the database?
@ -1162,7 +1172,9 @@ mechanism leak into the application data model. For those reasons, materializing
considered a last resort if no alternative is possible. A serializable isolation level is much
preferable in most cases.
# Serializability
## Serializability
In this chapter we have seen several examples of transactions that are prone to race conditions.
Some race conditions are prevented by the read committed and snapshot isolation levels, but
@ -1195,12 +1207,11 @@ serializability, and how they perform. Most databases that provide serializabili
three techniques, which we will explore in the rest of this chapter:
* Literally executing transactions in a serial order (see [“Actual Serial Execution”](/en/ch8#sec_transactions_serial))
* Two-phase locking (see [“Two-Phase Locking (2PL)”](/en/ch8#sec_transactions_2pl)), which for several decades was the only viable
option
* Two-phase locking (see [“Two-Phase Locking (2PL)”](/en/ch8#sec_transactions_2pl)), which for several decades was the only viable option
* Optimistic concurrency control techniques such as serializable snapshot isolation (see
[“Serializable Snapshot Isolation (SSI)”](/en/ch8#sec_transactions_ssi))
## Actual Serial Execution
### Actual Serial Execution
The simplest way of avoiding concurrency problems is to remove the concurrency entirely: to
execute only one transaction at a time, in serial order, on a single thread. By doing so, we completely
@ -1230,7 +1241,7 @@ supports concurrency, because it can avoid the coordination overhead of locking.
throughput is limited to that of a single CPU core. In order to make the most of that single thread,
transactions need to be structured differently from their traditional form.
### Encapsulating transactions in stored procedures
#### Encapsulating transactions in stored procedures
In the early days of databases, the intention was that a database transaction could encompass an
entire flow of user activity. For example, booking an airline ticket is a multi-stage process
@ -1270,8 +1281,7 @@ stored procedure can execute very quickly, without waiting for any network or di
{{< figure src="/fig/ddia_0809.png" id="fig_transactions_stored_proc" caption="Figure 8-9. The difference between an interactive transaction and a stored procedure (using the example transaction of [Figure 8-8](/en/ch8#fig_transactions_write_skew))." class="w-full my-4" >}}
### Pros and cons of stored procedures
#### Pros and cons of stored procedures
Stored procedures have existed for some time in relational databases, and they have been part of the
SQL standard (SQL/PSM) since 1999. They have gained a somewhat bad reputation, for various reasons:
@ -1312,7 +1322,7 @@ so through special deterministic APIs (see [“Durable Execution and Workflows
deterministic operations). This approach is called *state machine replication*, and we will return
to it in [Chapter 10](/en/ch10#ch_consistency).
### Sharding
#### Sharding
Executing all transactions serially makes concurrency control much simpler, but limits the
transaction throughput of the database to the speed of a single CPU core on a single machine.
@ -1341,13 +1351,12 @@ application. Simple key-value data can often be sharded very easily, but data wi
secondary indexes is likely to require a lot of cross-shard coordination (see
[“Sharding and Secondary Indexes”](/en/ch7#sec_sharding_secondary_indexes)).
### Summary of serial execution
#### Summary of serial execution
Serial execution of transactions has become a viable way of achieving serializable isolation within
certain constraints:
* Every transaction must be small and fast, because it takes only one slow transaction to stall all
transaction processing.
* Every transaction must be small and fast, because it takes only one slow transaction to stall all transaction processing.
* It is most appropriate in situations where the active dataset can fit in memory. Rarely accessed
data could potentially be moved to disk, but if it needed to be accessed in a single-threaded
transaction, the system would get very slow.
@ -1355,19 +1364,24 @@ certain constraints:
to be sharded without requiring cross-shard coordination.
* Cross-shard transactions are possible, but their throughput is hard to scale.
## Two-Phase Locking (2PL)
### Two-Phase Locking (2PL)
For around 30 years, there was only one widely used algorithm for serializability in databases:
*two-phase locking* (2PL), sometimes called *strong strict two-phase locking* (SS2PL) to distinguish
it from other variants of 2PL.
# 2PL is not 2PC
--------
> [!TIP] 2PL IS NOT 2PC
Two-phase *locking* (2PL) and two-phase *commit* (2PC) are two very different things. 2PL provides
serializable isolation, whereas 2PC provides atomic commit in a distributed database (see
[“Two-Phase Commit (2PC)”](/en/ch8#sec_transactions_2pc)). To avoid confusion, its best to think of them as entirely separate
concepts and to ignore the unfortunate similarity in the names.
--------
We saw previously that locks are often used to prevent dirty writes (see
[“No dirty writes”](/en/ch8#sec_transactions_dirty_write)): if two transactions concurrently try to write to the same object,
the lock ensures that the second writer must wait until the first one has finished its transaction
@ -1390,7 +1404,7 @@ readers* (see [“Multi-version concurrency control (MVCC)”](/en/ch8#sec_trans
snapshot isolation and two-phase locking. On the other hand, because 2PL provides serializability,
it protects against all the race conditions discussed earlier, including lost updates and write skew.
### Implementation of two-phase locking
#### Implementation of two-phase locking
2PL is used by the serializable isolation level in MySQL (InnoDB) and SQL Server, and the
repeatable read isolation level in Db2 [^29].
@ -1417,7 +1431,7 @@ transaction B to release its lock, and vice versa. This situation is called *dea
automatically detects deadlocks between transactions and aborts one of them so that the others can
make progress. The aborted transaction needs to be retried by the application.
### Performance of two-phase locking
#### Performance of two-phase locking
The big downside of two-phase locking, and the reason why it hasnt been used by everybody since the
1970s, is performance: transaction throughput and response times of queries are significantly worse
@ -1446,7 +1460,7 @@ transaction). This can be an additional performance problem: when a transaction
deadlock and is retried, it needs to do its work all over again. If deadlocks are frequent, this can
mean significant wasted effort.
### Predicate locks
#### Predicate locks
In the preceding description of locks, we glossed over a subtle but important detail. In
[“Phantoms causing write skew”](/en/ch8#sec_transactions_phantom) we discussed the problem of *phantoms*—that is, one transaction
@ -1485,7 +1499,7 @@ database, but which might be added in the future (phantoms). If two-phase lockin
the database prevents all forms of write skew and other race conditions, and so its isolation
becomes serializable.
### Index-range locks
#### Index-range locks
Unfortunately, predicate locks do not perform well: if there are many locks by active transactions,
checking for matching locks becomes time-consuming. For that reason, most databases with 2PL
@ -1499,8 +1513,7 @@ room 123) between noon and 1 p.m. This is safe because any write that matches th
will definitely also match the approximations.
In the room bookings database you would probably have an index on the `room_id` column, and/or
indexes on `start_time` and `end_time` (otherwise the preceding query would be very slow on a large
database):
indexes on `start_time` and `end_time` (otherwise the preceding query would be very slow on a large database):
* Say your index is on `room_id`, and the database uses this index to find existing bookings for
room 123. Now the database can simply attach a shared lock to this index entry, indicating that a
@ -1523,7 +1536,7 @@ If there is no suitable index where a range lock can be attached, the database c
shared lock on the entire table. This will not be good for performance, since it will stop all
other transactions writing to the table, but its a safe fallback position.
## Serializable Snapshot Isolation (SSI)
### Serializable Snapshot Isolation (SSI)
This chapter has painted a bleak picture of concurrency control in databases. On the one hand, we
have implementations of serializability that dont perform well (two-phase locking) or dont scale
@ -1539,7 +1552,7 @@ Today SSI and similar algorithms are used in single-node databases (the serializ
in PostgreSQL [^54], SQL Servers In-Memory OLTP/Hekaton [^66], and HyPer [^67]), distributed databases (CockroachDB [^5] and
FoundationDB [^8]), and embedded storage engines such as BadgerDB.
### Pessimistic versus optimistic concurrency control
#### Pessimistic versus optimistic concurrency control
Two-phase locking is a so-called *pessimistic* concurrency control mechanism: it is based on the
principle that if anything might possibly go wrong (as indicated by a lock held by another
@ -1558,8 +1571,7 @@ transaction wants to commit, the database checks whether anything bad happened (
isolation was violated); if so, the transaction is aborted and has to be retried. Only transactions
that executed serializably are allowed to commit.
Optimistic concurrency control is an old idea [^68],
and its advantages and disadvantages have been debated for a long time [^69].
Optimistic concurrency control is an old idea [^68], and its advantages and disadvantages have been debated for a long time [^69].
It performs badly if there is high contention (many transactions trying to access the same objects),
as this leads to a high proportion of transactions needing to abort. If the system is already close
to its maximum throughput, the additional transaction load from retried transactions can make
@ -1577,14 +1589,13 @@ are made from a consistent snapshot of the database (see [“Snapshot Isolation
On top of snapshot isolation, SSI adds an algorithm for detecting serialization conflicts among
reads and writes, and determining which transactions to abort.
### Decisions based on an outdated premise
#### Decisions based on an outdated premise
When we previously discussed write skew in snapshot isolation (see [“Write Skew and Phantoms”](/en/ch8#sec_transactions_write_skew)),
we observed a recurring pattern: a transaction reads some data from the database, examines the
result of the query, and decides to take some action (write to the database) based on the result
that it saw. However, under snapshot isolation, the result from the original query may no longer be
up-to-date by the time the transaction commits, because the data may have been modified in the
meantime.
up-to-date by the time the transaction commits, because the data may have been modified in the meantime.
Put another way, the transaction is taking an action based on a *premise* (a fact that was true at
the beginning of the transaction, e.g., “There are currently two doctors on call”). Later, when the
@ -1603,7 +1614,7 @@ How does the database know if a query result might have changed? There are two c
* Detecting reads of a stale MVCC object version (uncommitted write occurred before the read)
* Detecting writes that affect prior reads (the write occurs after the read)
### Detecting stale MVCC reads
#### Detecting stale MVCC reads
Recall that snapshot isolation is usually implemented by multi-version concurrency control (MVCC;
see [“Multi-version concurrency control (MVCC)”](/en/ch8#sec_transactions_snapshot_impl)). When a transaction reads from a consistent snapshot in an
@ -1634,7 +1645,7 @@ abort or may still be uncommitted at the time when transaction 43 is committed,
turn out not to have been stale after all. By avoiding unnecessary aborts, SSI preserves snapshot
isolations support for long-running reads from a consistent snapshot.
### Detecting writes that affect prior reads
#### Detecting writes that affect prior reads
The second case to consider is when another transaction modifies data after it has been read. This
case is illustrated in [Figure 8-11](/en/ch8#fig_transactions_detect_index_range).
@ -1665,7 +1676,7 @@ transaction 43s write affected 42, 43 hasnt yet committed, so the write ha
However, when transaction 43 wants to commit, the conflicting write from 42 has already been
committed, so 43 must abort.
### Performance of serializable snapshot isolation
#### Performance of serializable snapshot isolation
As always, many engineering details affect how well an algorithm works in practice. For example, one
trade-off is the granularity at which transactions reads and writes are tracked. If the database
@ -1702,7 +1713,7 @@ SSI requires that read-write transactions be fairly short (long-running read-onl
okay). However, SSI is less sensitive to slow transactions than two-phase locking or serial
execution.
# Distributed Transactions
## Distributed Transactions
The last few sections have focused on concurrency control for isolation, the I in ACID. The
algorithms we have seen apply to both single-node and distributed databases: although there are
@ -1759,10 +1770,9 @@ was later aborted, user 2s transaction would have to be reverted as well, sin
data that was retroactively declared not to have existed.
A better approach is to ensure that the nodes involved in a transaction either all commit or all
abort, and to prevent a mixture of the two. Ensuring this is known as the *atomic commitment*
problem.
abort, and to prevent a mixture of the two. Ensuring this is known as the *atomic commitment* problem.
## Two-Phase Commit (2PC)
### Two-Phase Commit (2PC)
Two-phase commit is an algorithm for achieving atomic transaction commit across multiple nodes. It
is a classic algorithm in distributed databases [^13] [^71] [^72]. 2PC is used
@ -1800,7 +1810,7 @@ the answer “I do” from both. After receiving both acknowledgments, the minis
couple husband and wife: the transaction is committed, and the happy fact is broadcast to all
attendees. If either bride or groom does not say “yes,” the ceremony is aborted [^76].
### A system of promises
#### A system of promises
From this short description it might not be clear why two-phase commit ensures atomicity, while
one-phase commit across several nodes does not. Surely the prepare and commit requests can just
@ -1851,7 +1861,7 @@ married or not by querying the minister for the status of your global transactio
wait for the ministers next retry of the commit request (since the retries will have continued
throughout your period of unconsciousness).
### Coordinator failure
#### Coordinator failure
We have discussed what happens if one of the participants or the network fails during 2PC: if any of
the prepare requests fails or times out, the coordinator aborts the transaction; if any of the
@ -1886,7 +1896,7 @@ all in-doubt transactions by reading its transaction log. Any transactions that
record in the coordinators log are aborted. Thus, the commit point of 2PC comes down to a regular
single-node atomic commit on the coordinator.
### Three-phase commit
#### Three-phase commit
Two-phase commit is called a *blocking* atomic commit protocol due to the fact that 2PC can become
stuck waiting for the coordinator to recover. It is possible to make an atomic commit protocol
@ -1901,7 +1911,7 @@ cannot guarantee atomicity.
A better solution in practice is to replace the single-node coordinator with a fault-tolerant
consensus protocol. We will see how to do this in [Chapter 10](/en/ch10#ch_consistency).
## Distributed Transactions Across Different Systems
### Distributed Transactions Across Different Systems
Distributed transactions and two-phase commit have a mixed reputation. On the one hand, they are
seen as providing an important safety guarantee that would be hard to achieve otherwise; on the
@ -1936,7 +1946,7 @@ use any protocol and apply optimizations specific to that particular technology.
database-internal distributed transactions can often work quite well. On the other hand,
transactions spanning heterogeneous technologies are a lot more challenging.
### Exactly-once message processing
#### Exactly-once message processing
Heterogeneous distributed transactions allow diverse systems to be integrated in powerful ways. For
example, a message from a message queue can be acknowledged as processed if and only if the database
@ -1961,11 +1971,10 @@ safely be retried as if nothing had happened.
We will return to the topic of exactly-once semantics later in this chapter. Lets look first at the
atomic commit protocol that allows such heterogeneous distributed transactions.
### XA transactions
#### XA transactions
*X/Open XA* (short for *eXtended Architecture*) is a standard for implementing two-phase commit
across heterogeneous technologies [^73].
It was introduced in 1991 and has been widely
across heterogeneous technologies [^73]. It was introduced in 1991 and has been widely
implemented: XA is supported by many traditional relational databases (including PostgreSQL, MySQL,
Db2, SQL Server, and Oracle) and message brokers (including ActiveMQ, HornetQ, MSMQ, and IBM MQ).
@ -1996,7 +2005,7 @@ transaction. Only then can the coordinator use the database drivers XA callba
participants to commit or abort, as appropriate. The database server cannot contact the coordinator
directly, since all communication must go via its client library.
### Holding locks while in doubt
#### Holding locks while in doubt
Why do we care so much about a transaction being stuck in doubt? Cant the rest of the system just
get on with its work, and ignore the in-doubt transaction that will be cleaned up eventually?
@ -2019,7 +2028,7 @@ cannot simply continue with their business—if they want to access that same da
blocked. This can cause large parts of your application to become unavailable until the in-doubt
transaction is resolved.
### Recovering from coordinator failure
#### Recovering from coordinator failure
In theory, if the coordinator crashes and is restarted, it should cleanly recover its state from the
log and resolve any in-doubt transactions. However, in practice, *orphaned* in-doubt transactions do occur [^83] [^84] — that is,
@ -2046,7 +2055,7 @@ decision from the coordinator [^73]. To be clear,
violates the system of promises in two-phase commit. Thus, heuristic decisions are intended only for
getting out of catastrophic situations, and not for regular use.
### Problems with XA transactions
#### Problems with XA transactions
A single-node coordinator is a single point of failure for the entire system, and making it part of
the application server is also problematic because the coordinators logs on its local disk become a
@ -2077,7 +2086,7 @@ However, keeping several heterogeneous data systems consistent with each other i
important problem, so we need to find a different solution to it. This can be done, as we will see
in the next section and in [Link to Come].
## Database-internal Distributed Transactions
### Database-internal Distributed Transactions
As explained previously, there is a big difference between distributed transactions that span
multiple heterogeneous storage technologies, and those that are internal to a system—i.e., where all
@ -2109,7 +2118,7 @@ The isolation levels offered for distributed transactions depend on the system,
isolation and serializable snapshot isolation are both possible across shards. The details of how
this works can be found in the papers referenced at the end of this chapter.
### Exactly-once message processing revisited
#### Exactly-once message processing revisited
We saw in [“Exactly-once message processing”](/en/ch8#sec_transactions_exactly_once) that an important use case for distributed transactions
is to ensure that some operation takes effect exactly once, even if a crash occurs while it is being
@ -2148,14 +2157,15 @@ Thus, achieving exactly-once processing only requires transactions within the da
across database and message broker is not necessary for this use case. Recording the message ID in
the database makes the message processing *idempotent*, so that message processing can be safely
retried without duplicating its side-effects. A similar approach is used in stream processing
frameworks such as Kafka Streams to achieve exactly-once semantics, as we shall see in
[Link to Come].
frameworks such as Kafka Streams to achieve exactly-once semantics, as we shall see in [Link to Come].
However, internal distributed transactions within the database are still useful for the scalability
of patterns such as these: for example, they would allow the message IDs to be stored on one shard
and the main data updated by the message processing to be stored on other shards, and to ensure
atomicity of the transaction commit across those shards.
## Summary
Transactions are an abstraction layer that allows an application to pretend that certain concurrency

View file

@ -34,7 +34,7 @@ explore how to think about the state of a distributed system and how to reason a
have happened ([“Knowledge, Truth, and Lies”](/en/ch9#sec_distributed_truth)). Later, in [Chapter 10](/en/ch10#ch_consistency), we will look at some
examples of how we can achieve fault tolerance in the face of those faults.
# Faults and Partial Failures
## Faults and Partial Failures
When you are writing a program on a single computer, it normally behaves in a fairly predictable
way: either it works or it doesnt. Buggy software may give the appearance that the computer is
@ -69,7 +69,7 @@ anecdote [^3]:
> pickup truck into a DCs HVAC [heating, ventilation, and air conditioning] system. And Im not even
> an ops guy.
>
> Coda Hale
> —— Coda Hale
In a distributed system, there may well be some parts of the system that are broken in some
unpredictable way, even though other parts of the system are working fine. This is known as a
@ -77,8 +77,7 @@ unpredictable way, even though other parts of the system are working fine. This
anything involving multiple nodes and the network, it may sometimes work and sometimes unpredictably
fail. As we shall see, you may not even *know* whether something succeeded or not!
This nondeterminism and possibility of partial failures is what makes distributed systems hard to
work with [^4].
This nondeterminism and possibility of partial failures is what makes distributed systems hard to work with [^4].
On the other hand, if a distributed system can tolerate partial failures, that opens up powerful
possibilities: for example, it allows you to perform a rolling upgrade, rebooting one node at a time
to install software updates while the system as a whole continues working uninterrupted all the
@ -90,7 +89,7 @@ supposed to tolerate. It is important to consider a wide range of possible fault
unlikely ones—and to artificially create such situations in your testing environment to see what
happens. In distributed systems, suspicion, pessimism, and paranoia pay off.
# Unreliable Networks
## Unreliable Networks
As discussed in [“Shared-Memory, Shared-Disk, and Shared-Nothing Architecture”](/en/ch2#sec_introduction_shared_nothing), the distributed systems we focus on
in this book are mostly *shared-nothing systems*: i.e., a bunch of machines connected by a network.
@ -130,18 +129,22 @@ the response is not going to arrive. However, when a timeout occurs, you still d
the remote node got your request or not (and if the request is still queued somewhere, it may still
be delivered to the recipient, even if the sender has given up on it).
## The Limitations of TCP
### The Limitations of TCP
Network packets have a maximum size (generally a few kilobytes), but many applications need to send
messages (requests, responses) that are too big to fit in one packet. These applications most often
use TCP, the Transmission Control Protocol, to establish a *connection* that breaks up large data
streams into individual packets, and puts them back together again on the receiving side.
--------
> [!NOTE]
> Most of what we say about TCP applies also to its more recent alternative QUIC, as well as the
> Stream Control Transmission Protocol (SCTP) used in WebRTC, the BitTorrent uTP protocol, and
> other transport protocols. For a comparison to UDP, see [“TCP Versus UDP”](/en/ch9#sidebar_distributed_tcp_udp).
--------
TCP is often described as providing “reliable” delivery, in the sense that it detects and
retransmits dropped packets, it detects reordered packets and puts them back in the correct order,
and it detects packet corruption using a simple checksum. It also figures out how fast it can send
@ -177,7 +180,7 @@ use it to send multiple requests and responses. This is usually done by first se
indicates the length of the following message in bytes, followed by the actual message. HTTP and
many RPC protocols (see [“Dataflow Through Services: REST and RPC”](/en/ch5#sec_encoding_dataflow_rpc)) work like this.
## Network Faults in Practice
### Network Faults in Practice
We have been building computer networks for decades—one might hope that by now we would have figured
out how to make them reliable. Unfortunately, we have not yet succeeded. There are some systematic
@ -197,25 +200,27 @@ even in controlled environments like a datacenter operated by one company [^8]:
* Across different cloud regions, round-trip times of up to several *minutes* have been observed at
high percentiles [^18].
Even within a single datacenter, packet delay of more than a minute can occur during a network
topology reconfiguration, triggered by a problem during a software upgrade for a switch
[^19].
topology reconfiguration, triggered by a problem during a software upgrade for a switch [^19].
Thus, we have to assume that messages might be delayed arbitrarily.
* Sometimes communications are partially interrupted, depending on who youre talking to: for
example, A and B can communicate, B and C can communicate, but A and C cannot [^20] [^21].
Other surprising faults include a network interface that sometimes drops all inbound packets but
sends outbound packets successfully [^22]:
just because a network link works in one direction doesnt guarantee its also working in the
opposite direction.
just because a network link works in one direction doesnt guarantee its also working in the opposite direction.
* Even a brief network interruption can have repercussions that last for much longer than the
original issue [^8] [^20] [^23].
# Network partitions
--------
> [!TIP] NETWORK PARTITIONS
When one part of the network is cut off from the rest due to a network fault, that is sometimes
called a *network partition* or *netsplit*, but it is not fundamentally different from other kinds
of network interruption. Network partitions are not related to sharding of a storage system, which
is sometimes also called *partitioning* (see [Chapter 7](/en/ch7#ch_sharding)).
--------
Even if network faults are rare in your environment, the fact that faults *can* occur means that
your software needs to be able to handle them. Whenever any communication happens over a network, it
may fail—there is no way around it.
@ -233,7 +238,7 @@ and ensure that the system can recover from them.
It may make sense to deliberately trigger network problems and test the systems response (this is
known as *fault injection*; see [“Fault injection”](/en/ch9#sec_fault_injection)).
## Detecting Faults
### Detecting Faults
Many systems need to automatically detect faulty nodes. For example:
@ -266,7 +271,7 @@ gone wrong, you may get an error response at some level of the stack, but in gen
assume that you will get no response at all. You can retry a few times, wait for a timeout to
elapse, and eventually declare the node dead if you dont hear back within the timeout.
## Timeouts and Unbounded Delays
### Timeouts and Unbounded Delays
If a timeout is the only sure way of detecting a fault, then how long should the timeout be? There
is unfortunately no simple answer.
@ -304,7 +309,7 @@ cannot guarantee that they can handle requests within some maximum time (see
be fast most of the time: if your timeout is low, it only takes a transient spike in round-trip
times to throw the system off-balance.
### Network congestion and queueing
#### Network congestion and queueing
When driving a car, travel times on road networks often vary most due to traffic congestion.
Similarly, the variability of packet delays on computer networks is most often due to queueing [^27]:
@ -327,12 +332,13 @@ Similarly, the variability of packet delays on computer networks is most often d
{{< figure src="/fig/ddia_0902.png" id="fig_distributed_switch_queueing" caption="Figure 9-2. If several machines send network traffic to the same destination, its switch queue can fill up. Here, ports 1, 2, and 4 are all trying to send packets to port 3." class="w-full my-4" >}}
Moreover, when TCP detects and automatically retransmits a lost packet, although the application
does not see the packet loss directly, it does see the resulting delay (waiting for the timeout to
expire, and then waiting for the retransmitted packet to be acknowledged).
# TCP Versus UDP
--------
> [!TIP] TCP VERSUS UDP
Some latency-sensitive applications, such as videoconferencing and Voice over IP (VoIP), use UDP
rather than TCP. Its a trade-off between reliability and variability of delays: as UDP does not
@ -346,6 +352,8 @@ application must instead fill the missing packets time slot with silence (cau
interruption in the sound) and move on in the stream. The retry happens at the human layer instead.
(“Could you repeat that please? The sound just cut out for a moment.”)
--------
All of these factors contribute to the variability of network delays. Queueing delays have an
especially wide range when a system is close to its maximum capacity: a system with plenty of spare
capacity can easily drain queues, whereas in a highly utilized system, long queues can build up very
@ -369,7 +377,7 @@ observed response time distribution. The Phi Accrual failure detector [^32],
which is used for example in Akka and Cassandra [^33]
is one way of doing this. TCP retransmission timeouts also work similarly [^5].
## Synchronous Versus Asynchronous Networks
### Synchronous Versus Asynchronous Networks
Distributed systems would be a lot simpler if we could rely on the network to deliver packets with
some fixed maximum delay, and not to drop packets. Why cant we solve this at the hardware level
@ -394,7 +402,7 @@ suffer from queueing, because the 16 bits of space for the call have already bee
next hop of the network. And because there is no queueing, the maximum end-to-end latency of the
network is fixed. We call this a *bounded delay*.
### Can we not simply make network delays predictable?
#### Can we not simply make network delays predictable?
Note that a circuit in a telephone network is very different from a TCP connection: a circuit is a
fixed amount of reserved bandwidth which nobody else can use while the circuit is established,
@ -433,7 +441,9 @@ Loss, and Scalable Throughput (L4S) attempt to mitigate some of the queuing and
problems both at the client and router level. Linuxs traffic controller (TC) also allows
applications to reprioritize packets for QoS purposes.
# Latency and Resource Utilization
--------
> [!TIP] LATENCY AND RESOURCE UTILIZATION
More generally, you can think of variable delays as a consequence of dynamic resource partitioning.
@ -460,25 +470,25 @@ platforms run several virtual machines from different customers on the same phys
Latency guarantees are achievable in certain environments, if resources are statically partitioned
(e.g., dedicated hardware and exclusive bandwidth allocations). However, it comes at the cost of
reduced utilization—in other words, it is more expensive. On the other hand, multitenancy with
dynamic resource partitioning provides better utilization, so it is cheaper, but it has the downside
of variable delays.
dynamic resource partitioning provides better utilization, so it is cheaper, but it has the downside of variable delays.
Variable delays in networks are not a law of nature, but simply the result of a cost/benefit
trade-off.
Variable delays in networks are not a law of nature, but simply the result of a cost/benefit trade-off.
However, such quality of service is currently not enabled in multitenant datacenters and public
clouds, or when communicating via the internet.
--------
However, such quality of service is currently not enabled in multitenant datacenters and public clouds, or when communicating via the internet.
Currently deployed technology does not allow us to make any guarantees about delays or reliability
of the network: we have to assume that network congestion, queueing, and unbounded delays will
happen. Consequently, theres no “correct” value for timeouts—they need to be determined
experimentally.
happen. Consequently, theres no “correct” value for timeouts—they need to be determined experimentally.
Peering agreements between internet service providers and the establishment of routes through the
Border Gateway Protocol (BGP), bear closer resemblance to circuit switching than IP itself. At this
level, it is possible to buy dedicated bandwidth. However, internet routing operates at the level of
networks, not individual connections between hosts, and at a much longer timescale.
# Unreliable Clocks
## Unreliable Clocks
Clocks and time are important. Applications depend on clocks in various ways to answer questions
like the following:
@ -509,13 +519,13 @@ synchronize clocks to some degree: the most commonly used mechanism is the Netwo
allows the computer clock to be adjusted according to the time reported by a group of servers [^39].
The servers in turn get their time from a more accurate time source, such as a GPS receiver.
## Monotonic Versus Time-of-Day Clocks
### Monotonic Versus Time-of-Day Clocks
Modern computers have at least two different kinds of clocks: a *time-of-day clock* and a *monotonic
clock*. Although they both measure time, it is important to distinguish the two, since they serve
different purposes.
### Time-of-day clocks
#### Time-of-day clocks
A time-of-day clock does what you intuitively expect of a clock: it returns the current date and
time according to some calendar (also known as *wall-clock time*). For example,
@ -539,11 +549,10 @@ Time-of-day clocks have also historically had quite a coarse-grained resolution,
in steps of 10 ms on older Windows systems [^41].
On recent systems, this is less of a problem.
### Monotonic clocks
#### Monotonic clocks
A monotonic clock is suitable for measuring a duration (time interval), such as a timeout or a
services response time: `clock_gettime(CLOCK_MONOTONIC)` or `clock_gettime(CLOCK_BOOTTIME)` on
Linux [^42]
services response time: `clock_gettime(CLOCK_MONOTONIC)` or `clock_gettime(CLOCK_BOOTTIME)` on Linux [^42]
and `System.nanoTime()` in Java are monotonic clocks, for example. The name comes from the fact that
they are guaranteed to always move forward (whereas a time-of-day clock may jump back in time).
@ -571,7 +580,7 @@ In a distributed system, using a monotonic clock for measuring elapsed time (e.g
usually fine, because it doesnt assume any synchronization between different nodes clocks and is
not sensitive to slight inaccuracies of measurement.
## Clock Synchronization and Accuracy
### Clock Synchronization and Accuracy
Monotonic clocks dont need synchronization, but time-of-day clocks need to be set according to an
NTP server or other external time source in order to be useful. Unfortunately, our methods for
@ -631,7 +640,7 @@ Some cloud providers have begun offering high-accuracy clock synchronization for
However, clock synchronization still requires a lot of care. If your NTP daemon is misconfigured, or
a firewall is blocking NTP traffic, the clock error due to drift can quickly become large.
## Relying on Synchronized Clocks
### Relying on Synchronized Clocks
The problem with clocks is that while they seem simple and easy to use, they have a surprising
number of pitfalls: a day may not have exactly 86,400 seconds, time-of-day clocks may move backward
@ -655,7 +664,7 @@ monitor the clock offsets between all the machines. Any node whose clock drifts
others should be declared dead and removed from the cluster. Such monitoring ensures that you notice
the broken clocks before they can cause too much damage.
### Timestamps for ordering events
#### Timestamps for ordering events
Lets consider one particular situation in which it is tempting, but dangerous, to rely on clocks:
ordering of events across multiple nodes [^64].
@ -718,8 +727,7 @@ arrived before it was sent, which is impossible.
Could NTP synchronization be made accurate enough that such incorrect orderings cannot occur?
Probably not, because NTPs synchronization accuracy is itself limited by the network round-trip
time, in addition to other sources of error such as quartz drift. To guarantee a correct ordering,
you would need the clock error to be significantly lower than the network delay, which is not
possible.
you would need the clock error to be significantly lower than the network delay, which is not possible.
So-called *logical clocks* [^66], which are based on incrementing counters rather than an oscillating quartz crystal, are a safer
alternative for ordering events (see [“Detecting Concurrent Writes”](/en/ch6#sec_replication_concurrent)). Logical clocks do not measure
@ -728,7 +736,7 @@ event happened before or after another). In contrast, time-of-day and monotonic
measure actual elapsed time, are also known as *physical clocks*. Well look at logical clocks in
more detail in [“ID Generators and Logical Clocks”](/en/ch10#sec_consistency_logical).
### Clock readings with a confidence interval
#### Clock readings with a confidence interval
You may be able to read a machines time-of-day clock with microsecond or even nanosecond
resolution. But even if you can get such a fine-grained measurement, that doesnt mean the value is
@ -740,10 +748,8 @@ possible accuracy is probably to the tens of milliseconds, and the error may eas
Thus, it doesnt make sense to think of a clock reading as a point in time—it is more like a
range of times, within a confidence interval: for example, a system may be 95% confident that the
time now is between 10.3 and 10.5 seconds past the minute, but it doesnt know any more precisely
than that [^67].
If we only know the time +/ 100 ms, the microsecond digits in the timestamp are
essentially meaningless.
time now is between 10.3 and 10.5 seconds past the minute, but it doesnt know any more precisely than that [^67].
If we only know the time +/ 100 ms, the microsecond digits in the timestamp are essentially meaningless.
The uncertainty bound can be calculated based on your time source. If you have a GPS receiver or
atomic clock directly attached to your computer, the expected error range is determined by
@ -763,7 +769,7 @@ timestamp. Based on its uncertainty calculations, the clock knows that the actua
somewhere within that interval. The width of the interval depends, among other things, on how long
it has been since the local quartz clock was last synchronized with a more accurate clock source.
### Synchronized clocks for global snapshots
#### Synchronized clocks for global snapshots
In [“Snapshot Isolation and Repeatable Read”](/en/ch8#sec_transactions_snapshot_isolation) we discussed *multi-version concurrency control* (MVCC),
which is a very useful feature in databases that need to support both small, fast read-write
@ -807,7 +813,7 @@ have a confidence interval, and the accurate clock sources only help keep that i
systems are beginning to adopt similar approaches: for example, YugabyteDB can leverage ClockBound
when running on AWS [^70], and several other systems now also rely on clock synchronization to various degrees [^71] [^72].
## Process Pauses
### Process Pauses
Lets consider another example of dangerous clock use in a distributed system. Say you have a
database with a single leader per shard. Only the leader is allowed to accept writes. How does a
@ -824,16 +830,16 @@ You can imagine the request-handling loop looking something like this:
```js
while (true) {
request = getIncomingRequest();
request = getIncomingRequest();
// Ensure that the lease always has at least 10 seconds remaining
if (lease.expiryTimeMillis - System.currentTimeMillis() < 10000) {
lease = lease.renew();
}
// Ensure that the lease always has at least 10 seconds remaining
if (lease.expiryTimeMillis - System.currentTimeMillis() < 10000) {
lease = lease.renew();
}
if (lease.isValid()) {
process(request);
}
if (lease.isValid()) {
process(request);
}
}
```
@ -917,7 +923,7 @@ keeps moving and may even declare the paused node dead because its not respon
the paused node may continue running, without even noticing that it was asleep until it checks its
clock sometime later.
### Response time guarantees
#### Response time guarantees
In many programming languages and operating systems, threads and processes may pause for an
unbounded amount of time, as discussed. Those reasons for pausing *can* be eliminated if you try
@ -929,12 +935,16 @@ must respond quickly and predictably to their sensor inputs. In these systems, t
*deadline* by which the software must respond; if it doesnt meet the deadline, that may cause a
failure of the entire system. These are so-called *hard real-time* systems.
--------
> [!NOTE]
> In embedded systems, *real-time* means that a system is carefully designed and tested to meet
> specified timing guarantees in all circumstances. This meaning is in contrast to the more vague use of the
> term *real-time* on the web, where it describes servers pushing data to clients and stream
> processing without hard response time constraints (see [Link to Come]).
--------
For example, if your cars onboard sensors detect that you are currently experiencing a crash, you
wouldnt want the release of the airbag to be delayed due to an inopportune GC pause in the airbag
release system.
@ -958,7 +968,7 @@ For most server-side data processing systems, real-time guarantees are simply no
appropriate. Consequently, these systems must suffer the pauses and clock instability that come from
operating in a non-real-time environment.
### Limiting the impact of garbage collection
#### Limiting the impact of garbage collection
Garbage collection used to be one of the biggest reasons for process pauses [^79],
but fortunately GC algorithms have improved a lot: a properly tuned collector will now usually pause
@ -990,7 +1000,9 @@ planned restart, like in a rolling upgrade (see [Chapter 5](/en/ch5#ch_encoding
These measures cannot fully prevent garbage collection pauses, but they can usefully reduce their
impact on the application.
# Knowledge, Truth, and Lies
## Knowledge, Truth, and Lies
So far in this chapter we have explored the ways in which distributed systems are different from
programs running on a single computer: there is no shared memory, only message passing via an
@ -1005,8 +1017,7 @@ exchanging messages with it. If a remote node doesnt respond, there is no way
it is in, because problems in the network cannot reliably be distinguished from problems at a node.
Discussions of these systems border on the philosophical: What do we know to be true or false in our
system? How sure can we be of that knowledge, if the mechanisms for perception and measurement are
unreliable [^83]?
system? How sure can we be of that knowledge, if the mechanisms for perception and measurement are unreliable [^83]?
Should software systems obey the laws that we expect of the physical world, such as cause and effect?
Fortunately, we dont need to go as far as figuring out the meaning of life. In a distributed
@ -1022,7 +1033,7 @@ we can make and the guarantees we may want to provide. In [Chapter 10](/en/ch10
look at some examples of distributed algorithms that provide particular guarantees under particular
assumptions.
## The Majority Rules
### The Majority Rules
Imagine a network with an asymmetric fault: a node is able to receive all messages sent to it, but
any outgoing messages from that node are dropped or delayed [^22]. Even though that node is working
@ -1064,7 +1075,7 @@ tolerated). However, it is still safe, because there can only be only one majori
system—there cannot be two majorities with conflicting decisions at the same time. We will discuss
the use of quorums in more detail when we get to *consensus algorithms* in [Chapter 10](/en/ch10#ch_consistency).
## Distributed Locks and Leases
### Distributed Locks and Leases
Locks and leases in distributed application are prone to be misused, and a common source of bugs [^84].
Lets look at one particular case of how they can go wrong.
@ -1087,8 +1098,7 @@ wasted computational resources, which is not a big deal. But in the first two ca
could be lost or corrupted data, which is much more serious.
For example, [Figure 9-4](/en/ch9#fig_distributed_lease_pause) shows a data corruption bug due to an incorrect
implementation of locking. (The bug is not theoretical: HBase used to have this problem
[^85] [^86].)
implementation of locking. (The bug is not theoretical: HBase used to have this problem [^85] [^86].)
Say you want to ensure that a file in a storage service can only be
accessed by one client at a time, because if multiple clients tried to write to it, the file would
become corrupted. You try to implement this by requiring a client to obtain a lease from a lock
@ -1115,7 +1125,7 @@ to [Figure 9-4](/en/ch9#fig_distributed_lease_pause).
{{< figure src="/fig/ddia_0905.png" id="fig_distributed_lease_delay" caption="Figure 9-5. A message from a former leaseholder might be delayed for a long time, and arrive after another node has taken over the lease." class="w-full my-4" >}}
### Fencing off zombies and delayed requests
#### Fencing off zombies and delayed requests
The term *zombie* is sometimes used to describe a former leaseholder who has not yet found out that
it lost the lease, and who is still acting as if it was the current leaseholder. Since we cannot
@ -1141,12 +1151,16 @@ token*, which is a number that increases every time a lock is granted (e.g., inc
service). We can then require that every time a client sends a write request to the storage service,
it must include its current fencing token.
--------
> [!NOTE]
> There are several alternative names for fencing tokens. In Chubby, Googles lock service, they are
> called *sequencers* [^88], and in Kafka they are called *epoch numbers*.
> In consensus algorithms, which we will discuss in [Chapter 10](/en/ch10#ch_consistency), the *ballot number* (Paxos) or
> *term number* (Raft) serves a similar purpose.
--------
In [Figure 9-6](/en/ch9#fig_distributed_fencing), client 1 acquires the lease with a token of 33, but then
it goes into a long pause and the lease expires. Client 2 acquires the lease with a token of 34 (the
number always increases) and then sends its write request to the storage service, including the
@ -1168,12 +1182,10 @@ read it, similarly to an atomic compare-and-set (CAS) operation. For example, ob
services support such a check: Amazon S3 calls it *conditional writes*, Azure Blob Storage calls it
*conditional headers*, and Google Cloud Storage calls it *request preconditions*.
### Fencing with multiple replicas
#### Fencing with multiple replicas
If your clients need to write only to one storage service that supports such conditional writes, the
lock service is somewhat redundant
[^91] [^92],
since the lease assignment could have been implemented directly based on that storage service [^93].
lock service is somewhat redundant [^91] [^92], since the lease assignment could have been implemented directly based on that storage service [^93].
However, once you have a fencing token you can also use it with multiple services or replicas, and
ensure that the old leaseholder is fenced off on all of those services.
@ -1202,7 +1214,7 @@ As you can see from these examples, it is not safe to assume that there is only
lease at any one time. Fortunately, with a bit of care you can use fencing tokens to prevent zombies
and delayed requests from doing any damage.
## Byzantine Faults
### Byzantine Faults
Fencing tokens can detect and block a node that is *inadvertently* acting in error (e.g., because it
hasnt yet found out that its lease has expired). However, if the node deliberately wanted to
@ -1219,7 +1231,7 @@ arbitrary faulty or corrupted responses)—for example, it might cast multiple c
the same election. Such behavior is known as a *Byzantine fault*, and the problem of reaching
consensus in this untrusting environment is known as the *Byzantine Generals Problem* [^94].
# The Byzantine Generals Problem
> [!TIP] THE BYZANTINE GENERALS PROBLEM
The Byzantine Generals Problem is a generalization of the so-called *Two Generals Problem* [^95],
which imagines a situation in which two army generals need to agree on a battle plan. As they
@ -1240,6 +1252,8 @@ before computers [^96].
Lamport wanted to choose a nationality that would not offend any readers, and he was advised that
calling it *The Albanian Generals Problem* was not such a good idea [^97].
--------
A system is *Byzantine fault-tolerant* if it continues to operate correctly even if some of the
nodes are malfunctioning and not obeying the protocol, or if malicious attackers are interfering
with the network. This concern is relevant in certain specific circumstances. For example:
@ -1285,7 +1299,7 @@ an attacker can compromise one node, they can probably compromise all of them, b
probably running the same software. Thus, traditional mechanisms (authentication, access control,
encryption, firewalls, and so on) continue to be the main protection against attackers.
### Weak forms of lying
#### Weak forms of lying
Although we assume that nodes are generally honest, it can be worth adding mechanisms to software
that guard against weak forms of “lying”—for example, invalid messages due to hardware issues,
@ -1308,7 +1322,7 @@ pragmatic steps toward better reliability. For example:
incorrect time is detected as an outlier and is excluded from synchronization [^39]. The use of multiple servers makes NTP
more robust than if it only uses a single server.
## System Model and Reality
### System Model and Reality
Many algorithms have been designed to solve distributed systems problems—for example, we will
examine solutions for the consensus problem in [Chapter 10](/en/ch10#ch_consistency). In order to be useful, these
@ -1377,7 +1391,7 @@ For modeling real systems, the partially synchronous model with crash-recovery f
the most useful model. It allows for unbounded network delay, process pauses, and slow nodes. But
how do distributed algorithms cope with that model?
### Defining the correctness of an algorithm
#### Defining the correctness of an algorithm
To define what it means for an algorithm to be *correct*, we can describe its *properties*. For
example, the output of a sorting algorithm has the property that for any two distinct elements of
@ -1403,7 +1417,7 @@ that we assume may occur in that system model. However, if all nodes crash, or a
suddenly become infinitely long, then no algorithm will be able to get anything done. How can we
still make useful guarantees even in a system model that allows complete failures?
### Safety and liveness
#### Safety and liveness
To clarify the situation, it is worth distinguishing between two different kinds of properties:
*safety* and *liveness* properties. In the example just given, *uniqueness* and *monotonic sequence* are
@ -1438,7 +1452,7 @@ network eventually recovers from an outage. The definition of the partially sync
requires that eventually the system returns to a synchronous state—that is, any period of network
interruption lasts only for a finite duration and is then repaired.
### Mapping system models to the real world
#### Mapping system models to the real world
Safety and liveness properties and system models are very useful for reasoning about the correctness
of a distributed algorithm. However, when implementing an algorithm in practice, the messy facts of
@ -1469,7 +1483,7 @@ They are incredibly helpful for distilling down the complexity of real systems t
of faults that we can reason about, so that we can understand the problem and try to solve it
systematically.
## Formal Methods and Randomized Testing
### Formal Methods and Randomized Testing
How do we know that an algorithm satisfies the required properties? Due to concurrency, partial
failures, and network delays there are a huge number of potential states. We need to guarantee
@ -1490,7 +1504,7 @@ testing (DST) use randomization to test a system in a wide range of situations.
Amazon Web Services have successfully used a combination of these techniques on many of their
products [^120] [^121].
### Model checking and specification languages
#### Model checking and specification languages
*Model checkers* are tools that help verify that an algorithm or system behaves as expected. An algorithm
specification is written in a purpose-built language such as TLA+, Gallina, or FizzBee. These
@ -1518,7 +1532,7 @@ state space, but it risks that your specification and your implementation go out
It is possible to check whether the model and the real implementation have equivalent behavior, but
this requires instrumentation in the real implementation [^127].
### Fault injection
#### Fault injection
Many bugs are triggered when machine and network failures occur. Fault injection is an effective
(and sometimes scary) technique that verifies whether a systems implementation works as expected things
@ -1546,7 +1560,7 @@ simplify the process. Such frameworks come with integrations for various operati
pre-built fault injectors [^129].
Jepsen has been remarkably effective at finding critical bugs in many widely-used systems [^130] [^131].
### Deterministic simulation testing
#### Deterministic simulation testing
Deterministic simulation testing (DST) has also become a popular complement to model-checking and
fault injection. It uses a similar state space exploration process as a model checker, but it tests