From 99b4b0050249e061f39b144aab1f90f80cb1fb03 Mon Sep 17 00:00:00 2001 From: Feng Ruohang Date: Sat, 9 Aug 2025 21:01:34 +0800 Subject: [PATCH] adjust anchor for ch5 - ch10 --- content/en/ch10.md | 182 ++++++++++++++++--------------- content/en/ch6.md | 247 ++++++++++++++++++++++-------------------- content/en/ch7.md | 96 +++++++++-------- content/en/ch8.md | 262 +++++++++++++++++++++++---------------------- content/en/ch9.md | 172 +++++++++++++++-------------- 5 files changed, 510 insertions(+), 449 deletions(-) diff --git a/content/en/ch10.md b/content/en/ch10.md index 9f6aa1d..1e9a2af 100644 --- a/content/en/ch10.md +++ b/content/en/ch10.md @@ -62,7 +62,9 @@ systems and similar infrastructure, you will need to go much deeper into the the chance of your systems being robust. As usual, the literature references in this chapter provide some initial pointers. -# Linearizability + + +## Linearizability If you want a replicated database to be as simple as possible to use, you should make it behave as if it wasn’t replicated at all. Then users don’t have to worry about replication lag, conflicts, and @@ -83,7 +85,6 @@ guarantee*. To clarify this idea, let’s look at an example of a system that is {{< figure src="/fig/ddia_1001.png" id="fig_consistency_linearizability_0" caption="Figure 10-1. If this database were linearizable, then either Alice's read would return 1 instead of 0, or Bob's read would return 0 instead of 1." class="w-full my-4" >}} - [Figure 10-1](/en/ch10#fig_consistency_linearizability_0) shows an example of a nonlinearizable sports website [^4]. Aaliyah and Bryce are sitting in the same room, both checking their phones to see the outcome of a game their favorite team is playing. Just after the final score is announced, Aaliyah refreshes the @@ -98,7 +99,7 @@ his query) *after* he heard Aaliyah exclaim the final score, and therefore he ex result to be at least as recent as Aaliyah’s. The fact that his query returned a stale result is a violation of linearizability. -## What Makes a System Linearizable? +### What Makes a System Linearizable? In order to understand linearizability better, let’s look at some more examples. [Figure 10-2](/en/ch10#fig_consistency_linearizability_1) shows three clients concurrently reading and writing the same @@ -219,7 +220,10 @@ in [“Problems with Replication Lag”](/en/ch6#sec_replication_lag) are exampl guarantees all these weaker properties, and more. In this chapter we will focus on linearizability, which is the strongest consistency model in common use. -# Linearizability Versus Serializability + +-------- + +> [!TIP] LINEARIZABILITY VERSUS SERIALIZABILITY Linearizability is easily confused with serializability (see [“Serializability”](/en/ch8#sec_transactions_serializability)), as both words seem to mean something like “can be arranged in a sequential order.” However, they are @@ -245,8 +249,7 @@ Linearizability (*Sequential consistency* is something else again [^8], but we won’t discuss it here.) A database may provide both serializability and linearizability, and this combination is known as -*strict serializability* or *strong one-copy serializability* (*strong-1SR*) -[^11] [^12]. +*strict serializability* or *strong one-copy serializability* (*strong-1SR*) [^11] [^12]. Single-node databases are typically linearizable. With distributed databases using optimistic methods like serializable snapshot isolation (see [“Serializable Snapshot Isolation (SSI)”](/en/ch8#sec_transactions_ssi)) the situation is more complicated: for example, CockroachDB provides serializability, and some recency guarantees on @@ -257,14 +260,16 @@ It is also possible to combine a weaker isolation level with linearizability, or consistency model with serializability; in fact, consistency model and isolation level can be chosen largely independently from each other [^15] [^16]. -## Relying on Linearizability +-------- + +### Relying on Linearizability In what circumstances is linearizability useful? Viewing the final score of a sporting match is perhaps a frivolous example: a result that is outdated by a few seconds is unlikely to cause any real harm in this situation. However, there a few areas in which linearizability is an important requirement for making a system work correctly. -### Locking and leader election +#### Locking and leader election A system that uses single-leader replication needs to ensure that there is indeed only one leader, not several (split brain). One way of electing a leader is to use a lease: every node that starts up @@ -280,10 +285,14 @@ election correctly (see for example the fencing issue in [“Distributed Locks a libraries like Apache Curator help by providing higher-level recipes on top of ZooKeeper. However, a linearizable storage service is the basic foundation for these coordination tasks. -> [!NOTE]> Strictly speaking, ZooKeeper provides linearizable writes, but reads may be stale, since there is no -> guarantee that they are served from the current leader -> [^18]. -> etcd since version 3 provides linearizable reads by default. +-------- + +> [!NOTE] +> Strictly speaking, ZooKeeper provides linearizable writes, but reads may be stale, since there is no +> guarantee that they are served from the current leader [^18]. etcd since version 3 provides linearizable reads by default. + +-------- + Distributed locking is also used at a much more granular level in some distributed databases, such as Oracle Real Application Clusters (RAC) [^19]. @@ -292,7 +301,7 @@ to the same disk storage system. Since these linearizable locks are on the criti transaction execution, RAC deployments usually have a dedicated cluster interconnect network for communication between database nodes. -### Constraints and uniqueness guarantees +#### Constraints and uniqueness guarantees Uniqueness constraints are common in databases: for example, a username or email address must uniquely identify one user, and in a file storage service there cannot be two files with the same @@ -320,7 +329,7 @@ However, a hard uniqueness constraint, such as the one you typically find in rel requires linearizability. Other kinds of constraints, such as foreign key or attribute constraints, can be implemented without linearizability [^20]. -### Cross-channel timing dependencies +#### Cross-channel timing dependencies Notice a detail in [Figure 10-1](/en/ch10#fig_consistency_linearizability_0): if Aaliyah hadn’t exclaimed the score, Bryce wouldn’t have known that the result of his query was stale. He would have just refreshed the @@ -367,7 +376,8 @@ understand. If you control the additional communication channel (like in the cas queue, but not in the case of Aaliyah and Bryce), you can use alternative approaches similar to what we discussed in [“Reading Your Own Writes”](/en/ch6#sec_replication_ryw), at the cost of additional complexity. -## Implementing Linearizable Systems + +### Implementing Linearizable Systems Now that we’ve looked at a few examples in which linearizability is useful, let’s think about how we might implement a system that offers linearizable semantics. @@ -375,11 +385,9 @@ might implement a system that offers linearizable semantics. Since linearizability essentially means “behave as though there is only a single copy of the data, and all operations on it are atomic,” the simplest answer would be to really only use a single copy of the data. However, that approach would not be able to tolerate faults: if the node holding that -one copy failed, the data would be lost, or at least inaccessible until the node was brought up -again. +one copy failed, the data would be lost, or at least inaccessible until the node was brought up again. -Let’s revisit the replication methods from [Chapter 6](/en/ch6#ch_replication), and compare whether they can be made -linearizable: +Let’s revisit the replication methods from [Chapter 6](/en/ch6#ch_replication), and compare whether they can be made linearizable: Single-leader replication (potentially linearizable) : In a system with single-leader replication, the leader has the primary copy of the data that is @@ -399,10 +407,7 @@ Single-leader replication (potentially linearizable) Consensus algorithms (likely linearizable) : Some consensus algorithms are essentially single-leader replication with automatic leader election and failover. They are carefully designed to prevent split brain, allowing them to implement - linearizable storage safely. ZooKeeper uses the Zab consensus algorithm - [^22] - and etcd uses Raft - [^23], for example. + linearizable storage safely. ZooKeeper uses the Zab consensus algorithm [^22] and etcd uses Raft [^23], for example. However, just because a system uses consensus does not guarantee that all operations on it are linearizable: if it allows reads on a node without checking that it is still the leader, the results of the read may be stale if a new leader has just been elected. @@ -424,7 +429,7 @@ Leaderless replication (probably not linearizable) consistent with actual event ordering due to clock skew (see [“Relying on Synchronized Clocks”](/en/ch9#sec_distributed_clocks_relying)). Even with quorums, nonlinearizable behavior is possible, as demonstrated in the next section. -### Linearizability and quorums +#### Linearizability and quorums Intuitively, it seems as though quorum reads and writes should be linearizable in a Dynamo-style model. However, when we have variable network delays, it is possible to have race @@ -459,7 +464,7 @@ linearizable compare-and-set operation cannot, because it requires a consensus a In summary, it is safest to assume that a leaderless system with Dynamo-style replication does not provide linearizability, even with quorum reads and writes. -## The Cost of Linearizability +### The Cost of Linearizability As some replication methods can provide linearizability and others cannot, it is interesting to explore the pros and cons of linearizability in more depth. @@ -495,7 +500,7 @@ If clients can connect directly to the leader region, this is not a problem, sin application continues to work normally there. But clients that can only reach a follower region will experience an outage until the network link is repaired. -### The CAP theorem +#### The CAP theorem This issue is not just a consequence of single-leader and multi-leader replication: any linearizable database has this problem, no matter how it is implemented. The issue also isn’t specific to @@ -524,19 +529,17 @@ implementing large-scale web services [^36]. CAP deserves credit for this culture shift—it helped trigger the NoSQL movement, a burst of new database technologies around the mid-2000s. -# The Unhelpful CAP Theorem +> [!TIP] THE UNHELPFUL CAP THEOREM CAP is sometimes presented as *Consistency, Availability, Partition tolerance: pick 2 out of 3*. Unfortunately, putting it this way is misleading [^32] because network partitions are a kind of -fault, so they aren’t something about which you have a choice: they will happen whether you like it -or not. +fault, so they aren’t something about which you have a choice: they will happen whether you like it or not. At times when the network is working correctly, a system can provide both consistency (linearizability) and total availability. When a network fault occurs, you have to choose between either linearizability or total availability. Thus, a better way of phrasing CAP would be *either Consistent or Available when Partitioned* [^37]. -A more reliable network needs to make this choice less often, but at some point the choice is -inevitable. +A more reliable network needs to make this choice less often, but at some point the choice is inevitable. The CP/AP classification scheme has several further flaws [^4]. *Consistency* is formalized as linearizability (the theorem doesn’t say anything about weaker consistency models), and the @@ -565,7 +568,7 @@ However, this definition inherits several problems with CAP, such as the counter There are many more interesting impossibility results in distributed systems [^43], and CAP has now been superseded by more precise results [^44] [^45], so it is of mostly historical interest today. -### Linearizability and network delays +#### Linearizability and network delays Although linearizability is a useful guarantee, surprisingly few systems are actually linearizable in practice. For example, even RAM on a modern multi-core CPU is not linearizable [^46]: @@ -598,7 +601,8 @@ exist, but weaker consistency models can be much faster, so this trade-off is im latency-sensitive systems. In [Link to Come] we will discuss some approaches for avoiding linearizability without sacrificing correctness. -# ID Generators and Logical Clocks + +## ID Generators and Logical Clocks In many applications you need to assign some sort of unique ID to database records when they are created, which gives you a primary key by which you can refer to those records. In single-node @@ -666,14 +670,8 @@ Wall-clock timestamp made unique putting a timestamp from that clock in the most significant bits, and filling the remaining bits with extra information that ensures the ID is unique even if the timestamp is not—for example, a shard number and a per-shard incrementing sequence number, or a long random value. This approach - is used in Version 7 UUIDs - [^50], - Twitter’s Snowflake [^51], - ULIDs [^52], - Hazelcast’s Flake ID generator, MongoDB ObjectIDs, and many similar schemes - [^50]. - You can implement these ID generators in application code or within a database - [^53]. + is used in Version 7 UUIDs [^50], Twitter’s Snowflake [^51], ULIDs [^52], Hazelcast’s Flake ID generator, + MongoDB ObjectIDs, and many similar schemes [^50]. You can implement these ID generators in application code or within a database [^53]. All these schemes generate IDs that are unique (at least with high enough probability that collisions are vanishingly rare), but they have much weaker ordering guarantees for IDs than the @@ -691,7 +689,7 @@ using atomic clocks or GPS receivers. But it would also be nice to be able to ge unique and correctly ordered without relying on special hardware. That’s what *logical clocks* are about. -## Logical Clocks +### Logical Clocks In [“Unreliable Clocks”](/en/ch9#sec_distributed_clocks) we discussed time-of-day clocks and monotonic clocks. Both of these are *physical clocks*: they measure the passing of seconds (or milliseconds, microseconds, etc.). @@ -713,7 +711,7 @@ The requirements for a logical clock are typically: A single-node ID generator meets these requirements, but the distributed ID generators we just discussed do not meet the causal ordering requirement. -### Lamport timestamps +#### Lamport timestamps Fortunately, there is a simple method for generating logical timestamps that *is* consistent with causality, and which you can use as a distributed ID generator. It is called a *Lamport clock*, @@ -733,8 +731,7 @@ each timestamp is made unique. Every time a node generates a timestamp, it increments its counter value and uses the new value. Moreover, every time a node sees a timestamp from another node, if the counter value in that -timestamp is greater than its local counter value, it increases its local counter to match the value -in the timestamp. +timestamp is greater than its local counter value, it increases its local counter to match the value in the timestamp. In [Figure 10-9](/en/ch10#fig_consistency_lamport_ts), Aaliyah had not yet seen Caleb’s message when posting her own, and vice versa. Assuming both users start with an initial counter value of 0, both therefore @@ -749,7 +746,7 @@ two timestamps have the same counter, we compare their node IDs instead, using t lexicographic string comparison. Thus, the timestamp order in this example is (1, “Aaliyah”) < (1, “Caleb”) < (2, “Bryce”). -### Hybrid logical clocks +#### Hybrid logical clocks Lamport timestamps are good at capturing the order in which things happened, but they have some limitations: @@ -779,7 +776,7 @@ conventional time-of-day clock, with the added property that its ordering is con happens-before relation. It doesn’t depend on any special hardware, and requires only roughly synchronized clocks. Hybrid logical clocks are used by CockroachDB, for example. -### Lamport/hybrid logical clocks vs. vector clocks +#### Lamport/hybrid logical clocks vs. vector clocks In [“Multi-version concurrency control (MVCC)”](/en/ch8#sec_transactions_snapshot_impl) we discussed how snapshot isolation is often implemented: essentially, by giving each transaction a transaction ID, and allowing each transaction to see @@ -799,7 +796,7 @@ algorithm, such as a *vector clock*. The downside is that the timestamps from a much bigger—potentially one integer for every node in the system. See [“Detecting Concurrent Writes”](/en/ch6#sec_replication_concurrent) for more details on detecting concurrency. -## Linearizable ID Generators +### Linearizable ID Generators Although Lamport clocks and hybrid logical clocks provide useful ordering guarantees, that ordering is still weaker than the linearizable single-node ID generator we talked about previously. Recall @@ -839,7 +836,7 @@ the example, that’s not so easy. The simplest solution in this case would be to use a linearizable ID generator, which would ensure that the photo upload is assigned a greater ID than the account permissions change. -### Implementing a linearizable ID generator +#### Implementing a linearizable ID generator The simplest way of ensuring that ID assignment is linearizable is by actually using a single node for this purpose. That node only needs to atomically increment a counter and return its value when @@ -874,7 +871,7 @@ assignment without any communication: even requests in different regions will be without waiting for cross-region requests. The downside is that you need hardware and software support for clocks to be tightly synchronized and compute the necessary uncertainty interval. -### Enforcing constraints using logical clocks +#### Enforcing constraints using logical clocks In [“Constraints and uniqueness guarantees”](/en/ch10#sec_consistency_uniqueness) we saw that a linearizable compare-and-set operation can be used to implement locks, uniqueness constraints, and similar constructs in a distributed system. This @@ -897,15 +894,16 @@ the kind of fault-tolerant system that we need. To implement locks, leases, and similar constructs in a fault-tolerant way, we need something stronger than logical clocks or ID generators: we need consensus. -# Consensus + + +## Consensus In this chapter we have seen several examples of things that are easy when you have only a single node, but which get a lot harder if you want fault tolerance: * A database can be linearizable if you have only a single leader, and you make all reads and writes on that leader. But how do you fail over if that leader fails, while avoiding split brain? How do - you ensure that a node that believes itself to be the leader hasn’t actually been voted out in the - meantime? + you ensure that a node that believes itself to be the leader hasn’t actually been voted out in the meantime? * A linearizable ID generator on a single node is just a counter with an atomic fetch-and-add instruction, but what if it crashes? * An atomic compare-and-set (CAS) operation is useful for many things, such as deciding who gets a @@ -933,7 +931,9 @@ Such *Byzantine fault tolerant* (BFT) consensus algorithms are used in blockchai However, as explained in [“Byzantine Faults”](/en/ch9#sec_distributed_byzantine), BFT algorithms are beyond the scope of this book. -# The Impossibility of Consensus +-------- + +> [!TIP] THE IMPOSSIBILITY OF CONSENSUS You may have heard about the FLP result [^72]—named after the authors Fischer, Lynch, and Paterson—which proves that there is no algorithm that is always able to @@ -951,7 +951,9 @@ Even just allowing the algorithm to use random numbers is sufficient to get arou Thus, although the FLP result about the impossibility of consensus is of great theoretical importance, distributed systems can usually achieve consensus in practice. -## The Many Faces of Consensus +-------- + +### The Many Faces of Consensus Consensus can be expressed in several different ways: @@ -966,10 +968,9 @@ Consensus can be expressed in several different ways: We will explore all of these shortly. In fact, these problems are all equivalent to each other: if you have an algorithm that solves one of these problems, you can convert it into a solution for any of the others. This is quite a profound and perhaps surprising insight! And that’s why we can lump -all of these things together under “consensus”, even though they look quite different on the -surface. +all of these things together under “consensus”, even though they look quite different on the surface. -### Single-value consensus +#### Single-value consensus The standard formulation of consensus involves getting multiple nodes to agree on a single value. For example: @@ -1039,7 +1040,7 @@ there is a severe network problem [^75]. Thus, a large-scale outage can stop the system from being able to process requests, but it cannot corrupt the consensus system by causing it to make inconsistent decisions. -### Compare-and-set as consensus +#### Compare-and-set as consensus A compare-and-set (CAS) operation checks whether the current value of some object equals some expected value; if yes, it atomically updates the object to some new value; if no, it leaves the @@ -1056,8 +1057,7 @@ values in the CAS invocation, and then set the object to whatever value was deci consensus. Any CAS invocations whose new value was not decided return an error. CAS invocations with different expected values use separate runs of the consensus protocol. -This shows that CAS and consensus are equivalent to each other -[^28] [^73]. +This shows that CAS and consensus are equivalent to each other [^28] [^73]. Again, both are straightforward on a single node, but challenging to make fault-tolerant. As an example of CAS in a distributed setting, we saw conditional write operations for object stores in [“Databases backed by object storage”](/en/ch6#sec_replication_object_storage), which allow a write to happen only if an object with the same @@ -1068,7 +1068,7 @@ tells us that consensus cannot be solved by a deterministic algorithm in the asy model [^72], but we saw in [“Linearizability and quorums”](/en/ch10#sec_consistency_quorum_linearizable) that a linearizable register can be implemented using quorum reads/writes in this model [^24] [^25] [^26]. From this it follows that a linearizable register cannot solve consensus. -### Shared logs as consensus +#### Shared logs as consensus We have seen several examples of logs, such as replication logs, transaction logs, and write-ahead logs. A log stores a sequence of *log entries*, and anyone who reads it sees the same entries in the @@ -1101,10 +1101,14 @@ Validity : If a node reads a log entry containing some value, then some node previously requested for that value to be added to the log. +-------- + > [!NOTE] > A shared log is formally known as a *total order broadcast*, *atomic broadcast*, or *total order multicast* protocol [^26] [^76] [^77] > It’s the same thing described in different words: requesting a value to be added to the log is then called “broadcasting” it, and reading a log entry is called “delivering” it. +-------- + If you have an implementation of a shared log, it is easy to solve the consensus problem: every node that wants to propose a value requests for it to be added to the log, and whichever value is read back in the first log entry is the value that is decided. Since all nodes read log entries in the @@ -1128,7 +1132,7 @@ replication without failover does not meet the liveness requirements, since it s messages if the leader crashes. As usual, the challenge is in performing failover safely and automatically. -### Fetch-and-add as consensus +#### Fetch-and-add as consensus The linearizable ID generator we saw in [“Linearizable ID Generators”](/en/ch10#sec_consistency_linearizable_id) comes close to solving consensus, but it falls slightly short. We can implement such an ID generator using a fetch-and-add @@ -1163,7 +1167,7 @@ can say that fetch-and-add has a *consensus number* of two [^28]. In contrast, CAS and shared logs solve consensus for any number of nodes that may propose values, so they have a consensus number of ∞ (infinity). -### Atomic commitment as consensus +#### Atomic commitment as consensus In [“Distributed Transactions”](/en/ch8#sec_transactions_distributed) we saw the *atomic commitment* problem, which is to ensure that the databases or shards involved in a distributed transaction all either commit or abort a @@ -1198,8 +1202,7 @@ non-triviality property ensures the algorithm can’t simply always abort (but i any of the communication among the nodes times out). The other three properties are basically the same as for consensus. -If you have a solution for consensus, there are multiple ways you could solve atomic commitment -[^78] [^79]. +If you have a solution for consensus, there are multiple ways you could solve atomic commitment [^78] [^79]. One works like this: when you want to commit the transaction, every node sends its vote to commit or abort to every other node. Nodes that receive a vote to commit from itself and every other node propose “commit” using the consensus algorithm; nodes that receive a vote to abort, or which @@ -1209,8 +1212,7 @@ consensus algorithm decided, it commits or aborts accordingly. In this algorithm, “commit” will only be proposed if all nodes voted to commit. If any node voted to abort, all proposals in the consensus algorithm will be “abort”. It could happen that some nodes propose “abort” while others propose “commit” if all nodes voted to commit but some communication -timed out; in this case it doesn’t matter whether the nodes commit or abort, as long as they all do -the same. +timed out; in this case it doesn’t matter whether the nodes commit or abort, as long as they all do the same. If you have a fault-tolerant atomic commitment protocol, you can also solve consensus. Every node that wants to propose a value starts a transaction on a quorum of nodes, and at each node it @@ -1221,7 +1223,7 @@ consensus; if atomic commit aborts, the proposing node retries with a new transa This shows that atomic commit and consensus are also equivalent to each other. -## Consensus in Practice +### Consensus in Practice We have seen that single-value consensus, CAS, shared logs, and atomic commitment are all equivalent to each other: you can convert a solution to one of them into a solution to any of the others. That @@ -1233,20 +1235,20 @@ Raft, Viewstamped Replication, and Zab provide shared logs right out of the box. single-value consensus, but in practice most systems using Paxos actually use the extension called Multi-Paxos, which also provides a shared log. -### Using shared logs +#### Using shared logs A shared log is a good fit for database replication: if every log entry represents a write to the database, and every replica processes the same writes in the same order using deterministic logic, -then the replicas will all end up in a consistent state. This idea is known as *state machine -replication* [^80], +then the replicas will all end up in a consistent state. This idea is known as *state machine replication* [^80], and it is the principle behind event sourcing, which we saw in [“Event Sourcing and CQRS”](/en/ch3#sec_datamodels_events). Shared logs are also useful for stream processing, as we shall see in [Link to Come]. Similarly, a shared log can be used to implement serializable transactions: as discussed in [“Actual Serial Execution”](/en/ch8#sec_transactions_serial), if every log entry represents a deterministic transaction to be executed as a stored procedure, and if every node executes those transactions in the same order, -then the transactions will be serializable -[^81] [^82]. +then the transactions will be serializable [^81] [^82]. + +--------- > [!NOTE] > Sharded databases with a strong consistency model often maintain a separate log per shard, which @@ -1254,6 +1256,8 @@ then the transactions will be serializable > references) they can offer across shards. Serializable transactions across shards are possible, but > require additional coordination [^83]. +-------- + A shared log is also powerful because it can easily be adapted to other forms of consensus: * We saw previously how to use it to implement single-value consensus and CAS: simply decide the @@ -1266,7 +1270,7 @@ A shared log is also powerful because it can easily be adapted to other forms of can be used to generate fencing tokens (see [“Fencing off zombies and delayed requests”](/en/ch9#sec_distributed_fencing_tokens)); for example, in ZooKeeper, this sequence number is called `zxid` [^18]. -### From single-leader replication to consensus +#### From single-leader replication to consensus We saw previously that single-value consensus is easy if you have a single “dictator” node that makes the decision, and likewise a shared log is easy if a single leader is the only node that is @@ -1314,7 +1318,7 @@ different protocols. In consensus algorithms, any node can start an election and quorum of nodes to respond; in 2PC, only the coordinator can request votes, and it requires a “yes” vote from *every* participant before it can commit. -### Subtleties of consensus +#### Subtleties of consensus This basic structure is common to all of Raft, Multi-Paxos, Zab, and Viewstamped Replication: a vote by a quorum of nodes elects a leader, and then another quorum vote is required for every entry that @@ -1330,7 +1334,10 @@ least as up-to-date as a majority of its followers [^69]. In contrast, Paxos allows any node to become the new leader, but requires it to bring its log up-to-date with other nodes before it can start appending new entries of its own. -# Consistency vs. Availability in Leader Election + +-------- + +> [!TIP] CONSISTENCY VS. AVAILABILITY IN LEADER ELECTION If you want the consensus algorithm to strictly guarantee the properties laid out in [“Shared logs as consensus”](/en/ch10#sec_consistency_shared_logs), it’s essential that the new leader is up-to-date with any confirmed @@ -1349,10 +1356,11 @@ availability, but you are on thin ice, since the theory of consensus no longer a will work fine as long as there are no faults, the problems discussed in [Chapter 9](/en/ch9#ch_distributed) can easily cause a lot of data loss or corruption. +-------- + Another subtlety is in how the algorithms deal with log entries that had been proposed by the old leader before it failed, but for which the vote on appending to the log had not yet completed. You -can find discussions of these details in the references for this chapter -[^23] [^69] [^86]. +can find discussions of these details in the references for this chapter [^23] [^69] [^86]. For databases that use a consensus algorithm for replication, not only do writes need to be turned into log entries and replicated to a quorum. If you want to guarantee linearizable reads, they also @@ -1366,7 +1374,7 @@ configuration. Consensus algorithms have been extended with *reconfiguration* fe this possible. This is especially useful when adding new regions to a system, or when migrating from one location to another (by first adding the new nodes, and then removing the old nodes). -### Pros and cons of consensus +#### Pros and cons of consensus Although they are complex and subtle, consensus algorithms are a huge breakthrough for distributed systems. Consensus is essentially “single-leader replication done right”, with automatic failover on @@ -1406,7 +1414,9 @@ only real alternative is to use a weaker consistency model instead, such as thos leaderless or multi-leader replication as discussed in [Chapter 6](/en/ch6#ch_replication). These approaches generally don’t offer linearizability, but for applications that don’t need it that is fine. -## Coordination Services + + +### Coordination Services Consensus algorithms are useful in any distributed database that wants to offer linearizable operations, and many modern distributed databases use consensus algorithms for replication. But one @@ -1453,7 +1463,9 @@ Failure detection and change notifications do not require consensus, but they ar distributed coordination alongside the atomic operations and fencing support that do require consensus. -# Managing configuration with coordination services +-------- + +> [!TIP] Managing configuration with coordination services Applications and infrastructure often have configuration parameters such as timeouts, thread pool sizes, and so on. Coordination services are sometimes used to store such configuration data, @@ -1466,7 +1478,9 @@ convenient to use a coordination service and rely on its notification feature if running the coordination service anyway. Alternatively, a process could periodically poll for configuration updates from a file or URL, which avoids the need for a specialized service. -### Allocating work to nodes +-------- + +#### Allocating work to nodes A coordination service is useful if you have several instances of a process or service, and one of them needs to be chosen as leader or primary. If the leader fails, one of the other nodes should @@ -1499,7 +1513,7 @@ intended for storing data that may change thousands of times per second. For tha use a conventional database; alternatively, tools like Apache BookKeeper [^90] [^91] can be used to replicate fast-changing internal state of a service. -### Service discovery +#### Service discovery ZooKeeper, etcd, and Consul are also often used for *service discovery*—that is, to find out which IP address you need to connect to in order to reach a particular service (see diff --git a/content/en/ch6.md b/content/en/ch6.md index b1a7d3e..ec50d95 100644 --- a/content/en/ch6.md +++ b/content/en/ch6.md @@ -41,7 +41,9 @@ concepts such as *eventual consistency* still cause confusion. In [“Problems w get more precise about eventual consistency and discuss things like the *read-your-writes* and *monotonic reads* guarantees. -# Backups and replication +-------- + +> [!TIP] BACKUPS AND REPLICATION You might be wondering whether you still need backups if you have replication. The answer is yes, because they have different purposes: replicas quickly reflect writes from one node on other nodes, @@ -59,15 +61,16 @@ the current state. If you have a large amount of data, it can be cheaper to keep data in an object store that is optimized for infrequently-accessed data, and to store only the current state of the database in primary storage. -# Single-Leader Replication +-------- + +## Single-Leader Replication Each node that stores a copy of the database is called a *replica*. With multiple replicas, a question inevitably arises: how do we ensure that all the data ends up on all the replicas? Every write to the database needs to be processed by every replica; otherwise, the replicas would no longer contain the same data. The most common solution is called *leader-based replication*, -*primary-backup*, or *active/passive*. It works as follows (see -[Figure 6-1](/en/ch6#fig_replication_leader_follower)): +*primary-backup*, or *active/passive*. It works as follows (see [Figure 6-1](/en/ch6#fig_replication_leader_follower)): 1. One of the replicas is designated the *leader* (also known as *primary* or *source* [^2]). When clients want to write to the database, they must send their requests to the leader, which @@ -96,11 +99,15 @@ Many consensus algorithms such as Raft, which is used for replication in Cockroa etcd, and RabbitMQ quorum queues (among others), are also based on a single leader, and automatically elect a new leader if the old one fails (we will discuss consensus in more detail in [Chapter 10](/en/ch10#ch_consistency)). +-------- + > [!NOTE] > In older documents you may see the term *master–slave replication*. It means the same as > leader-based replication, but the term should be avoided as it is widely considered offensive [^8]. -## Synchronous Versus Asynchronous Replication +-------- + +### Synchronous Versus Asynchronous Replication An important detail of a replicated system is whether the replication happens *synchronously* or *asynchronously*. (In relational databases, this is often a configurable option; other systems are @@ -158,7 +165,7 @@ Weakening durability may sound like a bad trade-off, but asynchronous replicatio widely used, especially if there are many followers or if they are geographically distributed [^9]. We will return to this issue in [“Problems with Replication Lag”](/en/ch6#sec_replication_lag). -## Setting Up New Followers +### Setting Up New Followers From time to time, you need to set up new followers—perhaps to increase the number of replicas, or to replace failed nodes. How do you ensure that the new follower has an accurate copy of the @@ -195,7 +202,9 @@ recovery. You can also perform steps 1 and 2 of setting up a new follower by dow from the object store. For example, WAL-G does this for PostgreSQL, MySQL, and SQL Server, and Litestream does the equivalent for SQLite. -# Databases backed by object storage +-------- + +> [!TIP] DATABASES BACKED BY OBJECT STORAGE Object storage can be used for more than archiving data. Many databases are beginning to use object stores such as Amazon Web Services S3, Google Cloud Storage, and Azure Blob Storage to serve data @@ -228,9 +237,7 @@ Different systems deal with these trade-offs in various ways. Some introduce a * architecture that places less frequently accessed data on object storage while new or frequently accessed data is kept on faster storage devices such as SSDs, NVMe, or even in memory. Other systems use object storage as their primary storage tier, but use a separate low-latency storage system such -as Amazon’s EBS or Neon’s Safekeepers -[^12]) -to store their WAL. Recently, some systems have gone even farther by adopting a +as Amazon’s EBS or Neon’s Safekeepers [^12]) to store their WAL. Recently, some systems have gone even farther by adopting a *zero-disk architecture* (ZDA). ZDA-based systems persist all data to object storage and use disks and memory strictly for caching. This allows nodes to have no persistent state, which dramatically simplifies operations. WarpStream, Confluent Freight, Buf’s Bufstream, and Redpanda Serverless are @@ -238,7 +245,9 @@ all Kafka-compatible systems built using a zero-disk architecture. Nearly every warehouse also adopts such an architecture, as does Turbopuffer (a vector search engine), and SlateDB (a cloud-native LSM storage engine). -## Handling Node Outages +-------- + +### Handling Node Outages Any node in the system can go down, perhaps unexpectedly due to a fault, but just as likely due to planned maintenance (for example, rebooting a machine to install a kernel security patch). Being @@ -248,7 +257,7 @@ the impact of a node outage as small as possible. How do you achieve high availability with leader-based replication? -### Follower failure: Catch-up recovery +#### Follower failure: Catch-up recovery On its local disk, each follower keeps a log of the data changes it has received from the leader. If a follower crashes and is restarted, or if the network between the leader and the follower is @@ -261,8 +270,7 @@ receiving a stream of data changes as before. Although follower recovery is conceptually simple, it can be challenging in terms of performance: if the database has a high write throughput or if the follower has been offline for a long time, there might be a lot of writes to catch up on. There will be high load on both the recovering follower and -the leader (which needs to send the backlog of writes to the follower) while this catch-up is -ongoing. +the leader (which needs to send the backlog of writes to the follower) while this catch-up is ongoing. The leader can delete its log of writes once all followers have confirmed that they have processed it, but if a follower is unavailable for a long time, the leader faces a choice: either it retains @@ -271,7 +279,7 @@ leader), or it deletes the log that the unavailable follower has not yet acknowl the follower won’t be able to recover from the log, and will have to be restored from a backup when it comes back). -### Leader failure: Failover +#### Leader failure: Failover Handling a failure of the leader is trickier: one of the followers needs to be promoted to be the new leader, clients need to be reconfigured to send their writes to the new leader, and the other @@ -331,11 +339,15 @@ Failover is fraught with things that can go wrong: is already struggling with high load or network problems, an unnecessary failover is likely to make the situation worse, not better. +-------- + > [!NOTE] > Guarding against split brain by limiting or shutting down old leaders is known as *fencing* or, more > emphatically, *Shoot The Other Node In The Head* (STONITH). We will discuss fencing in more detail > in [“Distributed Locks and Leases”](/en/ch9#sec_distributed_lock_fencing). +-------- + There are no easy solutions to these problems. For this reason, some operations teams prefer to perform failovers manually, even if the software supports automatic failover. @@ -350,12 +362,12 @@ These issues—node failures; unreliable networks; and trade-offs around replica durability, availability, and latency—are in fact fundamental problems in distributed systems. In [Chapter 9](/en/ch9#ch_distributed) and [Chapter 10](/en/ch10#ch_consistency) we will discuss them in greater depth. -## Implementation of Replication Logs +### Implementation of Replication Logs How does leader-based replication work under the hood? Several different replication methods are used in practice, so let’s look at each one briefly. -### Statement-based replication +#### Statement-based replication In the simplest case, the leader logs every write request (*statement*) that it executes and sends that statement log to its followers. For a relational database, this means that every `INSERT`, @@ -389,7 +401,7 @@ there is any nondeterminism in a statement. VoltDB uses statement-based replicat safe by requiring transactions to be deterministic [^16]. However, determinism can be hard to guarantee in practice, so many databases prefer other replication methods. -### Write-ahead log (WAL) shipping +#### Write-ahead log (WAL) shipping In [Chapter 4](/en/ch4#ch_storage) we saw that a write-ahead log is needed to make B-tree storage engines robust: every modification is first written to the WAL so that the tree can be restored to a consistent @@ -412,7 +424,7 @@ performing a failover to make one of the upgraded nodes the new leader. If the r does not allow this version mismatch, as is often the case with WAL shipping, such upgrades require downtime. -### Logical (row-based) log replication +#### Logical (row-based) log replication An alternative is to use different log formats for replication and for the storage engine, which allows the replication log to be decoupled from the storage engine internals. This kind of @@ -444,12 +456,12 @@ to send the contents of a database to an external system, such as a data warehou analysis, or for building custom indexes and caches [^21]. This technique is called *change data capture*, and we will return to it in [Link to Come]. -# Problems with Replication Lag + +## Problems with Replication Lag Being able to tolerate node failures is just one reason for wanting replication. As mentioned in [“Distributed versus Single-Node Systems”](/en/ch1#sec_introduction_distributed), other reasons are scalability (processing more -requests than a single machine can handle) and latency (placing replicas geographically closer to -users). +requests than a single machine can handle) and latency (placing replicas geographically closer to users). Leader-based replication requires all writes to go through a single node, but read-only queries can go to any replica. For workloads that consist of mostly reads and only a small percentage of writes @@ -471,14 +483,14 @@ just a temporary state—if you stop writing to the database and wait a while, t eventually catch up and become consistent with the leader. For that reason, this effect is known as *eventual consistency* [^22]. +-------- + > [!NOTE] -> The term *eventual consistency* was coined by Douglas Terry et al. -> [^23], -> popularized by Werner Vogels -> [^24], +> The term *eventual consistency* was coined by Douglas Terry et al. [^23], popularized by Werner Vogels [^24], > and became the battle cry of many NoSQL projects. However, not only NoSQL databases are eventually -> consistent: followers in an asynchronously replicated relational database have the same -> characteristics. +> consistent: followers in an asynchronously replicated relational database have the same characteristics. + +-------- The term “eventually” is deliberately vague: in general, there is no limit to how far a replica can fall behind. In normal operation, the delay between a write happening on the leader and being @@ -490,7 +502,7 @@ When the lag is so large, the inconsistencies it introduces are not just a theor real problem for applications. In this section we will highlight three examples of problems that are likely to occur when there is replication lag. We’ll also outline some approaches to solving them. -## Reading Your Own Writes +### Reading Your Own Writes Many applications let the user submit some data and then view what they have submitted. This might be a record in a customer database, or a comment on a discussion thread, or something else of that sort. @@ -505,8 +517,7 @@ submitted was lost, so they will be understandably unhappy. {{< figure src="/fig/ddia_0603.png" id="fig_replication_read_your_writes" caption="Figure 6-3. A user makes a write, followed by a read from a stale replica. To prevent this anomaly, we need read-after-write consistency." class="w-full my-4" >}} -In this situation, we need *read-after-write consistency*, also known as *read-your-writes consistency* -[^23]. +In this situation, we need *read-after-write consistency*, also known as *read-your-writes consistency* [^23]. This is a guarantee that if the user reloads the page, they will always see any updates they submitted themselves. It makes no promises about other users: other users’ updates may not be visible until some later time. However, it reassures the user that their own input has been saved @@ -526,15 +537,13 @@ are various possible techniques. To mention a few: effective, as most things would have to be read from the leader (negating the benefit of read scaling). In that case, other criteria may be used to decide whether to read from the leader. For example, you could track the time of the last update and, for one minute after the last update, make all - reads from the leader - [^25]. + reads from the leader [^25]. You could also monitor the replication lag on followers and prevent queries on any follower that is more than one minute behind the leader. * The client can remember the timestamp of its most recent write—then the system can ensure that the replica serving any reads for that user reflects updates at least until that timestamp. If a replica is not sufficiently up to date, either the read can be handled by another replica or the - query can wait until the replica has caught up - [^26]. + query can wait until the replica has caught up [^26]. The timestamp could be a *logical timestamp* (something that indicates ordering of writes, such as the log sequence number) or the actual system clock (in which case clock synchronization becomes critical; see [“Unreliable Clocks”](/en/ch9#sec_distributed_clocks)). @@ -558,7 +567,9 @@ In this case, there are some additional issues to consider: the devices’ network routes may be completely different.) If your approach requires reading from the leader, you may first need to route requests from all of a user’s devices to the same region. -# Regions and Availability Zones +-------- + +> ![TIP] Regions and Availability Zones We use the term *region* to refer to one or more datacenters in a single geographic location. Cloud providers locate multiple datacenters in the same geographic region. Each datacenter is referred to @@ -576,7 +587,9 @@ increased cloud networking bills. We will discuss these tradeoffs more in [“Multi-leader replication topologies”](/en/ch6#sec_replication_topologies). For now, just know that when we say region, we mean a collection of zones/datacenters in a single geographic location. -## Monotonic Reads +-------- + +### Monotonic Reads Our second example of an anomaly that can occur when reading from asynchronous followers is that it’s possible for a user to see things *moving backward in time*. @@ -605,7 +618,7 @@ the same replica (different users can read from different replicas). For example chosen based on a hash of the user ID, rather than randomly. However, if that replica fails, the user’s queries will need to be rerouted to another replica. -## Consistent Prefix Reads +### Consistent Prefix Reads Our third example of replication lag anomalies concerns violation of causality. Imagine the following short dialog between Mr. Poons and Mrs. Cake: @@ -630,15 +643,13 @@ Mr. Poons : How far into the future can you see, Mrs. Cake? To the observer it looks as though Mrs. Cake is answering the question before Mr. Poons has even asked -it. Such psychic powers are impressive, but very confusing -[^27]. +it. Such psychic powers are impressive, but very confusing [^27]. {{< figure src="/fig/ddia_0605.png" id="fig_replication_consistent_prefix" caption="Figure 6-5. If some shards are replicated slower than others, an observer may see the answer before they see the question." class="w-full my-4" >}} -Preventing this kind of anomaly requires another type of guarantee: *consistent prefix reads* -[^22]. This guarantee says that if a sequence of -writes happens in a certain order, then anyone reading those writes will see them appear in the same -order. +Preventing this kind of anomaly requires another type of guarantee: *consistent prefix reads* [^22]. +This guarantee says that if a sequence of writes happens in a certain order, +then anyone reading those writes will see them appear in the same order. This is a particular problem in sharded (partitioned) databases, which we will discuss in [Chapter 7](/en/ch7#ch_sharding). If the database always applies writes in the same order, reads always see a @@ -651,7 +662,7 @@ the same shard—but in some applications that cannot be done efficiently. There that explicitly keep track of causal dependencies, a topic that we will return to in [“The “happens-before” relation and concurrency”](/en/ch6#sec_replication_happens_before). -## Solutions for Replication Lag +### Solutions for Replication Lag When working with an eventually consistent system, it is worth thinking about how the application behaves if the replication lag increases to several minutes or even hours. If the answer is “no @@ -683,7 +694,9 @@ consistency guarantees: they can offer stronger resilience in the face of networ have lower overheads compared to transactional systems. We will explore such approaches in the rest of this chapter. -# Multi-Leader Replication + + +## Multi-Leader Replication So far in this chapter we have only considered replication architectures using a single leader. Although that is a common approach, there are interesting alternatives. @@ -710,7 +723,7 @@ as equivalent to single-leader replication. The rest of this section focusses on multi-leader replication, in which any leader can process writes even when its connection to the other leaders is interrupted. -## Geographically Distributed Operation +### Geographically Distributed Operation It rarely makes sense to use a multi-leader setup within a single region, because the benefits rarely outweigh the added complexity. However, there are some situations in which this configuration @@ -730,8 +743,7 @@ other regions. {{< figure src="/fig/ddia_0606.png" id="fig_replication_multi_dc" caption="Figure 6-6. Multi-leader replication across multiple regions." class="w-full my-4" >}} -Let’s compare how the single-leader and multi-leader configurations fare in a multi-region -deployment: +Let’s compare how the single-leader and multi-leader configurations fare in a multi-region deployment: Performance : In a single-leader configuration, every write must go over the internet to the region with the @@ -756,8 +768,7 @@ Tolerance of network problems over that link and wait for the response before it can complete. A multi-leader configuration with asynchronous replication can tolerate network problems better: - during a temporary network interruption, each region’s leader can continue independently processing - writes. + during a temporary network interruption, each region’s leader can continue independently processing writes. Consistency : A single-leader system can provide strong consistency guarantees, such as serializable @@ -768,25 +779,21 @@ Consistency account, registering a particular username), but which violate the constraint when taken together with another write on another leader. - This is simply a fundamental limitation of distributed systems - [^28]. + This is simply a fundamental limitation of distributed systems [^28]. If you need to enforce such constraints, you’re therefore better off with a single-leader system. However, as we will see in [“Dealing with Conflicting Writes”](/en/ch6#sec_replication_write_conflicts), multi-leader systems can still - achieve consistency properties that are useful in a wide range of apps that don’t need such - constraints. + achieve consistency properties that are useful in a wide range of apps that don’t need such constraints. Multi-leader replication is less common than single-leader replication, but it is still supported by many databases, including MySQL, Oracle, SQL Server, and YugabyteDB. In some cases it is an external -add-on feature, for example in Redis Enterprise, EDB Postgres Distributed, and pglogical -[^29]. +add-on feature, for example in Redis Enterprise, EDB Postgres Distributed, and pglogical [^29]. As multi-leader replication is a somewhat retrofitted feature in many databases, there are often subtle configuration pitfalls and surprising interactions with other database features. For example, autoincrementing keys, triggers, and integrity constraints can be problematic. For this reason, -multi-leader replication is often considered dangerous territory that should be avoided if possible -[^30]. +multi-leader replication is often considered dangerous territory that should be avoided if possible [^30]. -### Multi-leader replication topologies +#### Multi-leader replication topologies A *replication topology* describes the communication paths along which writes are propagated from one node to another. If you have two leaders, like in [Figure 6-9](/en/ch6#fig_replication_write_conflict), there is @@ -796,27 +803,30 @@ more than two leaders, various different topologies are possible. Some examples {{< figure src="/fig/ddia_0607.png" id="fig_replication_topologies" caption="Figure 6-7. Three example topologies in which multi-leader replication can be set up." class="w-full my-4" >}} -The most general topology is *all-to-all*, shown in -[Figure 6-7](/en/ch6#fig_replication_topologies)(c), +The most general topology is *all-to-all*, shown in [Figure 6-7](/en/ch6#fig_replication_topologies)(c), in which every leader sends its writes to every other leader. However, more restricted topologies are also used: for example a *circular topology* in which each node receives writes from one node and forwards those writes (plus any writes of its own) to one other node. Another popular topology has the shape of a *star*: one designated root node forwards writes to all of the other nodes. The star topology can be generalized to a tree. + +-------- + > [!NOTE] > Don’t confuse a star-shaped network topology with a *star schema* (see > [“Stars and Snowflakes: Schemas for Analytics”](/en/ch3#sec_datamodels_analytics)), which describes the structure of a data model. +-------- + In circular and star topologies, a write may need to pass through several nodes before it reaches all replicas. Therefore, nodes need to forward data changes they receive from other nodes. To prevent infinite replication loops, each node is given a unique identifier, and in the replication -log, each write is tagged with the identifiers of all the nodes it has passed through -[^31]. +log, each write is tagged with the identifiers of all the nodes it has passed through [^31]. When a node receives a data change that is tagged with its own identifier, that data change is ignored, because the node knows that it has already been processed. -### Problems with different topologies +#### Problems with different topologies A problem with circular and star topologies is that if just one node fails, it can interrupt the flow of replication messages between other nodes, leaving them unable to communicate until the @@ -850,7 +860,7 @@ issues like the one in [Figure 6-8](/en/ch6#fig_replication_causality). If you is worth being aware of these issues, carefully reading the documentation, and thoroughly testing your database to ensure that it really does provide the guarantees you believe it to have. -## Sync Engines and Local-First Software +### Sync Engines and Local-First Software Another situation in which multi-leader replication is appropriate is if you have an application that needs to continue to work while it is disconnected from the internet. @@ -870,7 +880,7 @@ From an architectural point of view, this setup is very similar to multi-leader regions, taken to the extreme: each device is a “region,” and the network connection between them is extremely unreliable. -### Real-time collaboration, offline-first, and local-first apps +#### Real-time collaboration, offline-first, and local-first apps Moreover, many modern web apps offer *real-time collaboration* features, such as Google Docs and Sheets for text documents and spreadsheets, Figma for graphics, and Linear for project management. @@ -904,7 +914,7 @@ service providers are available [^40]. For example, Git is a local-first collaboration system (albeit one that doesn’t support real-time collaboration) since you can sync via GitHub, GitLab, or any other repository hosting service. -### Pros and cons of sync engines +#### Pros and cons of sync engines The dominant way of building web apps today is to keep very little persistent state on the client, and to rely on making requests to a server whenever a new piece of data needs to be displayed or @@ -947,7 +957,8 @@ development jargon the equivalent of a sync engine is called *netcode*. The tech netcode are quite specific to the requirements of games [^44], and don’t directly carry over to other types of software, so we won’t consider them further in this book. -## Dealing with Conflicting Writes + +### Dealing with Conflicting Writes The biggest problem with multi-leader replication—both in a geo-distributed server-side database and a local-first sync engine on end user devices—is that concurrent writes on different leaders can @@ -972,7 +983,7 @@ In [“Detecting Concurrent Writes”](/en/ch6#sec_replication_concurrent) we wi whether two writes are concurrent. For now we will assume that we can detect conflicts, and we want to figure out the best way of resolving them. -### Conflict avoidance +#### Conflict avoidance One strategy for conflicts is to avoid them occurring in the first place. For example, if the application can ensure that all writes for a particular record go through the same leader, then @@ -999,7 +1010,8 @@ so that one leader only generates odd numbers and the other only generates even you can be sure that the two leaders won’t concurrently assign the same ID to different records. We will discuss other ID assignment schemes in [“ID Generators and Logical Clocks”](/en/ch10#sec_consistency_logical). -### Last write wins (discarding concurrent writes) + +#### Last write wins (discarding concurrent writes) If conflicts can’t be avoided, the simplest way of resolving them is to attach a timestamp to each write, and to always use the value with the greatest timestamp. For example, in @@ -1031,7 +1043,7 @@ that is ahead of the others, and you try to overwrite a value written by that no be ignored as it may have a lower timestamp, even though it clearly occurred later. This problem can be solved by using a *logical clock*, which we will discuss in [“ID Generators and Logical Clocks”](/en/ch10#sec_consistency_logical). -### Manual conflict resolution +#### Manual conflict resolution If randomly discarding some of your writes is not desirable, the next option is to resolve the conflict manually. You may be familiar with manual conflict resolution from Git and other version @@ -1063,9 +1075,7 @@ suffers from a number of problems: keeping all the shopping cart items that appeared in any of the siblings (i.e., taking the set union of the carts). This meant that if the customer had removed an item from their cart in one sibling, but another sibling still contained that old item, the removed item would unexpectedly - reappear in the customer’s cart - [^45]. - [Figure 6-10](/en/ch6#fig_replication_amazon_anomaly) shows an example where Device 1 removes Book from the shopping + reappear in the customer’s cart [^45]. [Figure 6-10](/en/ch6#fig_replication_amazon_anomaly) shows an example where Device 1 removes Book from the shopping cart and concurrently Device 2 removes DVD, but after merging the conflict both items reappear. * If multiple nodes observe the conflict and concurrently resolve it, the conflict resolution process can itself introduce a new conflict. Those resolutions could even be inconsistent: for @@ -1076,7 +1086,7 @@ suffers from a number of problems: {{< figure src="/fig/ddia_0610.png" id="fig_replication_amazon_anomaly" caption="Figure 6-10. Example of Amazon's shopping cart anomaly: if conflicts on a shopping cart are merged by taking the union, deleted items may reappear." class="w-full my-4" >}} -### Automatic conflict resolution +#### Automatic conflict resolution For many applications, the best way of handling conflicts is to use an algorithm that automatically merges concurrent writes into a consistent state. Automatic conflict resolution ensures that all @@ -1110,23 +1120,19 @@ Nevertheless, automatic conflict resolution is sufficient to build many useful a start from the requirement of wanting to build a collaborative offline-first or local-first app, then conflict resolution is inevitable, and automating it is often the best approach. -## CRDTs and Operational Transformation +### CRDTs and Operational Transformation Two families of algorithms are commonly used to implement automatic conflict resolution: -*Conflict-free replicated datatypes* (CRDTs) -[^46] and *Operational Transformation* (OT) -[^47]. +*Conflict-free replicated datatypes* (CRDTs) [^46] and *Operational Transformation* (OT) [^47]. They have different design philosophies and performance characteristics, but both are able to perform automatic merges for all the aforementioned types of data. [Figure 6-11](/en/ch6#fig_replication_ot_crdt) shows an example of how OT and a CRDT merge concurrent updates to a text. Assume you have two replicas that both start off with the text “ice”. One replica prepends the -letter “n” to make “nice”, while concurrently the other replica appends an exclamation mark to make -“ice!”. +letter “n” to make “nice”, while concurrently the other replica appends an exclamation mark to make “ice!”. {{< figure src="/fig/ddia_0611.png" id="fig_replication_ot_crdt" caption="Figure 6-11. How two concurrent insertions into a string are merged by OT and a CRDT respectively." class="w-full my-4" >}} - The merged result “nice!” is achieved differently by both types of algorithms: OT @@ -1155,7 +1161,7 @@ OT is most often used for real-time collaborative editing of text, e.g. in Googl distributed databases such as Redis Enterprise, Riak, and Azure Cosmos DB [^49]. Sync engines for JSON data can be implemented both with CRDTs (e.g., Automerge or Yjs) and with OT (e.g., ShareDB). -### What is a conflict? +#### What is a conflict? Some kinds of conflict are obvious. In the example in [Figure 6-9](/en/ch6#fig_replication_write_conflict), two writes concurrently modified the same field in the same record, setting it to two different values. There @@ -1174,7 +1180,8 @@ good understanding of this problem. We will see some more examples of conflicts [Chapter 8](/en/ch8#ch_transactions), and in [Link to Come] we will discuss scalable approaches for detecting and resolving conflicts in a replicated system. -# Leaderless Replication + +## Leaderless Replication The replication approaches we have discussed so far in this chapter—single-leader and multi-leader replication—are based on the idea that a client sends a write request to one node @@ -1189,17 +1196,21 @@ a fashionable architecture for databases after Amazon used it for its in-house * 2007 [^45]. Riak, Cassandra, and ScyllaDB are open source datastores with leaderless replication models inspired by Dynamo, so this kind of database is also known as *Dynamo-style*. +-------- + > [!NOTE] > The original *Dynamo* system was only described in a paper [^45], but never released outside of Amazon. > The similarly-named *DynamoDB* is a more recent cloud database from AWS, but it has a completely different architecture: > it uses single-leader replication based on the Multi-Paxos consensus algorithm [^5]. +-------- + In some leaderless implementations, the client directly sends its writes to several replicas, while in others, a coordinator node does this on behalf of the client. However, unlike a leader database, that coordinator does not enforce a particular ordering of writes. As we shall see, this difference in design has profound consequences for the way the database is used. -## Writing to the Database When a Node Is Down +### Writing to the Database When a Node Is Down Imagine you have a database with three replicas, and one of the replicas is currently unavailable—​perhaps it is being rebooted to install a system update. In a single-leader @@ -1231,7 +1242,7 @@ needs to be tagged with a version number or timestamp, similarly to what we saw one with the greatest timestamp (even if that value was only returned by one replica, and several other replicas returned older values). See [“Detecting Concurrent Writes”](/en/ch6#sec_replication_concurrent) for more details. -### Catching up on missed writes +#### Catching up on missed writes The replication system should ensure that eventually all the data is copied to every replica. After an unavailable node comes back online, how does it catch up on the writes that it missed? Several @@ -1257,7 +1268,7 @@ Anti-entropy replication log in leader-based replication, this *anti-entropy process* does not copy writes in any particular order, and there may be a significant delay before data is copied. -### Quorums for reading and writing +#### Quorums for reading and writing In the example of [Figure 6-12](/en/ch6#fig_replication_quorum_node_outage), we considered the write to be successful even though it was only processed on two out of three replicas. What if only one out of three @@ -1270,12 +1281,10 @@ respond, reads can nevertheless continue returning an up-to-date value. More generally, if there are *n* replicas, every write must be confirmed by *w* nodes to be considered successful, and we must query at least *r* nodes for each read. (In our example, -*n* = 3, *w* = 2, *r* = 2.) As long as *w* + *r* > -*n*, we expect to get an up-to-date value when reading, because at least one of the *r* nodes we’re -reading from must be up to date. Reads and writes that obey these *r* and *w* values are called -*quorum* reads and writes [^50]. -You can think of *r* and *w* as the minimum number of votes required for the read or write to be -valid. +*n* = 3, *w* = 2, *r* = 2.) As long as *w* + *r* > *n*, +we expect to get an up-to-date value when reading, because at least one of the *r* nodes we’re +reading from must be up to date. Reads and writes that obey these *r* and *w* values are called *quorum* reads and writes [^50]. +You can think of *r* and *w* as the minimum number of votes required for the read or write to be valid. In Dynamo-style databases, the parameters *n*, *w*, and *r* are typically configurable. A common choice is to make *n* an odd number (typically 3 or 5) and to set *w* = *r* = @@ -1284,11 +1293,15 @@ For example, a workload with few writes and many reads may benefit from setting *r* = 1. This makes reads faster, but has the disadvantage that just one failed node causes all database writes to fail. +-------- + > [!NOTE] > There may be more than *n* nodes in the cluster, but any given value is stored only on *n* > nodes. This allows the dataset to be sharded, supporting datasets that are larger than you can fit > on one node. We will return to sharding in [Chapter 7](/en/ch7#ch_sharding). +-------- + The quorum condition, *w* + *r* > *n*, allows the system to tolerate unavailable nodes as follows: @@ -1299,8 +1312,8 @@ as follows: * With *n* = 5, *w* = 3, *r* = 3 we can tolerate two unavailable nodes. This case is illustrated in [Figure 6-13](/en/ch6#fig_replication_quorum_overlap). -Normally, reads and writes are always sent to all *n* replicas in parallel. The parameters *w* and -*r* determine how many nodes we wait for—i.e., how many of the *n* nodes need to report success +Normally, reads and writes are always sent to all *n* replicas in parallel. The parameters *w* and *r* +determine how many nodes we wait for—i.e., how many of the *n* nodes need to report success before we consider the read or write to be successful. {{< figure src="/fig/ddia_0613.png" id="fig_replication_quorum_overlap" caption="Figure 6-13. If *w* + *r* > *n*, at least one of the *r* replicas you read from must have seen the most recent successful write." class="w-full my-4" >}} @@ -1312,7 +1325,7 @@ error executing the operation (can’t write because the disk is full), due to a between the client and the node, or for any number of other reasons. We only care whether the node returned a successful response and don’t need to distinguish between different kinds of fault. -## Limitations of Quorum Consistency +### Limitations of Quorum Consistency If you have *n* replicas, and you choose *w* and *r* such that *w* + *r* > *n*, you can generally expect every read to return the most recent value written for a key. This is the case because the @@ -1324,8 +1337,7 @@ Often, *r* and *w* are chosen to be a majority (more than *n*/2) of nodes, becau *w* + *r* > *n* while still tolerating up to *n*/2 (rounded down) node failures. But quorums are not necessarily majorities—it only matters that the sets of nodes used by the read and write operations overlap in at least one node. Other quorum assignments are possible, which allows some -flexibility in the design of distributed algorithms -[^51]. +flexibility in the design of distributed algorithms [^51]. You may also set *w* and *r* to smaller numbers, so that *w* + *r* ≤ *n* (i.e., the quorum condition is not satisfied). In this case, reads and writes will still be sent to *n* @@ -1369,7 +1381,7 @@ it is not so simple. Dynamo-style databases are generally optimized for use case eventual consistency. The parameters *w* and *r* allow you to adjust the probability of stale values being read [^53], but it’s wise to not take them as absolute guarantees. -### Monitoring staleness +#### Monitoring staleness From an operational perspective, it’s important to monitor whether your databases are returning up-to-date results. Even if your application can tolerate stale reads, you need to be @@ -1388,7 +1400,8 @@ handoff can be one measure of system health, but it’s difficult to interpret u Eventual consistency is a deliberately vague guarantee, but for operability it’s important to be able to quantify “eventual.” -## Single-Leader vs. Leaderless Replication Performance + +### Single-Leader vs. Leaderless Replication Performance A replication system based on a single leader can provide strong consistency guarantees that are difficult or impossible to achieve in a leaderless system. However, as we have seen in @@ -1427,8 +1440,7 @@ That said, leaderless systems can have performance problems as well: * Even though the system doesn’t need to perform failover, one replica does need to detect when another replica is unavailable so that it can store hints about writes that the unavailable replica missed. When the unavailable replica comes back, the handoff process needs to send it - those hints. This puts additional load on the replicas at a time when the system is already under - strain [^54]. + those hints. This puts additional load on the replicas at a time when the system is already under strain [^54]. * The more replicas you have, the bigger the size of your quorums, and the more responses you have to wait for before a request can complete. Even if you wait only for the fastest *r* or *w* replicas to respond, and even if you make the requests in parallel, a bigger *r* or *w* increases @@ -1448,7 +1460,7 @@ be co-located with the client. However, since a write on one leader is propagate the others, reads can be arbitrarily out-of-date. Quorum reads and writes provide a compromise: good fault tolerance while also having a high likelihood of reading up-to-date data. -### Multi-region operation +#### Multi-region operation We previously discussed cross-region replication as a use case for multi-leader replication (see [“Multi-Leader Replication”](/en/ch6#sec_replication_multi_leader)). Leaderless replication is also suitable for @@ -1467,7 +1479,8 @@ describes the number of replicas within one region. Cross-region replication bet database clusters happens asynchronously in the background, in a style that is similar to multi-leader replication. -## Detecting Concurrent Writes + +### Detecting Concurrent Writes Like with multi-leader replication, leaderless databases allow concurrent writes to the same key, resulting in conflicts that need to be resolved. Such conflicts may occur as the writes happen, but @@ -1477,8 +1490,7 @@ The problem is that events may arrive in a different order at different nodes, d network delays and partial failures. For example, [Figure 6-14](/en/ch6#fig_replication_concurrency) shows two clients, A and B, simultaneously writing to a key *X* in a three-node datastore: -* Node 1 receives the write from A, but never receives the write from B due to a transient - outage. +* Node 1 receives the write from A, but never receives the write from B due to a transient outage. * Node 2 first receives the write from A, then the write from B. * Node 3 first receives the write from B, then the write from A. @@ -1501,7 +1513,7 @@ you whether two values are actually conflicting (i.e., they were written concurr were written one after another). If you want to resolve conflicts explicitly, the system needs to take more care to detect concurrent writes. -### The “happens-before” relation and concurrency +#### The “happens-before” relation and concurrency How do we decide whether two operations are concurrent or not? To develop an intuition, let’s look at some examples: @@ -1517,8 +1529,7 @@ at some examples: An operation A *happens before* another operation B if B knows about A, or depends on A, or builds upon A in some way. Whether one operation happens before another operation is the key to defining what concurrency means. In fact, we can simply say that two operations are *concurrent* if neither -happens before the other (i.e., neither knows about the other) -[^57]. +happens before the other (i.e., neither knows about the other) [^57]. Thus, whenever you have two operations A and B, there are three possibilities: either A happened before B, or B happened before A, or A and B are concurrent. What we need is an algorithm to tell us @@ -1526,7 +1537,9 @@ whether two operations are concurrent or not. If one operation happened before a operation should overwrite the earlier operation, but if the operations are concurrent, we have a conflict that needs to be resolved. -# Concurrency, Time, and Relativity +-------- + +> ![TIP] Concurrency, Time, and Relativity It may seem that two operations should be called concurrent if they occur “at the same time”—but in fact, it is not important whether they literally overlap in time. Because of problems with clocks @@ -1546,7 +1559,9 @@ principle have allowed one operation to affect the other. For example, if the ne interrupted at the time, two operations can occur some time apart and still be concurrent, because the network problems prevented one operation from being able to know about the other. -### Capturing the happens-before relationship +-------- + +#### Capturing the happens-before relationship Let’s look at an algorithm that determines whether two operations are concurrent, or whether one happened before another. To keep things simple, let’s start with a database that has only one @@ -1619,7 +1634,7 @@ write is based on. If you make a write without including a version number, it is other writes, so it will not overwrite anything—it will just be returned as one of the values on subsequent reads. -### Version vectors +#### Version vectors The example in [Figure 6-15](/en/ch6#fig_replication_causality_single) used only a single replica. How does the algorithm change when there are multiple replicas, but no leader? @@ -1646,12 +1661,16 @@ The version vector also ensures that it is safe to read from one replica and sub to another replica. Doing so may result in siblings being created, but no data is lost as long as siblings are merged correctly. -# Version vectors and vector clocks +-------- + +> [!TIP] VERSION VECTORS AND VECTOR CLOCKS A *version vector* is sometimes also called a *vector clock*, even though they are not quite the same. The difference is subtle—please see the references for details [^60] [^63] [^64]. In brief, when comparing the state of replicas, version vectors are the right data structure to use. +-------- + ## Summary In this chapter we looked at the issue of replication. Replication can serve several purposes: diff --git a/content/en/ch7.md b/content/en/ch7.md index d1ea3e4..2b437ba 100644 --- a/content/en/ch7.md +++ b/content/en/ch7.md @@ -12,8 +12,7 @@ breadcrumbs: false A distributed database typically distributes data across nodes in two ways: -1. Having a copy of the same data on multiple nodes: this is *replication*, which we discussed in - [Chapter 6](/en/ch6#ch_replication). +1. Having a copy of the same data on multiple nodes: this is *replication*, which we discussed in [Chapter 6](/en/ch6#ch_replication). 2. If we don’t want every node to store all the data, we can split up a large amount of data into smaller *shards* or *partitions*, and store different shards on different nodes. We’ll discuss sharding in this chapter. @@ -38,7 +37,9 @@ Everything we discussed in [Chapter 6](/en/ch6#ch_replication) about replicatio replication of shards. Since the choice of sharding scheme is mostly independent of the choice of replication scheme, we will ignore replication in this chapter for the sake of simplicity. -# Sharding and Partitioning +-------- + +> [!TIP] SHARDING AND PARTITIONING What we call a *shard* in this chapter has many different names depending on which software you’re using: it’s called a *partition* in Kafka, a *range* in CockroachDB, a *region* in HBase and TiDB, a @@ -61,7 +62,9 @@ Available Replicated Data*—reportedly a 1980s database, details of which are l By the way, partitioning has nothing to do with *network partitions* (netsplits), a type of fault in the network between nodes. We will discuss such faults in [Chapter 9](/en/ch9#ch_distributed). -# Pros and Cons of Sharding +-------- + +## Pros and Cons of Sharding The primary reason for sharding a database is *scalability*: it’s a solution if the volume of data or the write throughput has become too great for a single node to handle, as it allows you to spread @@ -105,7 +108,7 @@ access* (NUMA) architecture in which some banks of memory are closer to one CPU For example, Redis, VoltDB, and FoundationDB use one process per core, and rely on sharding to spread load across CPU cores in the same machine [^6]. -## Sharding for Multitenancy +### Sharding for Multitenancy Software as a Service (SaaS) products and cloud services are often *multitenant*, where each tenant is a customer. Multiple users may have logins on the same tenant, but each tenant has a @@ -166,7 +169,9 @@ The main challenges around using sharding for multitenancy are: * If you ever need to support features that connect data across multiple tenants, these become harder to implement if you need to join data across multiple shards. -# Sharding of Key-Value Data + + +## Sharding of Key-Value Data Say you have a large amount of data, and you want to shard it. How do you decide which records to store on which nodes? @@ -181,8 +186,7 @@ If the sharding is unfair, so that some shards have more data or queries than ot *skewed*. The presence of skew makes sharding much less effective. In an extreme case, all the load could end up on one shard, so 9 out of 10 nodes are idle and your bottleneck is the single busy node. A shard with disproportionately high load is called a *hot shard* or *hot spot*. If there’s -one key with a particularly high load (e.g., a celebrity in a social network), we call it a *hot -key*. +one key with a particularly high load (e.g., a celebrity in a social network), we call it a *hot key*. Therefore we need an algorithm that takes as input the partition key of a record, and tells us which shard that record is in. In a key-value store the partition key is usually the key, or the first @@ -190,7 +194,8 @@ part of the key. In a relational model the partition key might be some column of necessarily its primary key). That algorithm needs to be amenable to rebalancing in order to relieve hot spots. -## Sharding by Key Range + +### Sharding by Key Range One way of sharding is to assign a contiguous range of partition keys (from some minimum to some maximum) to each shard, like the volumes of a paper encyclopedia, as illustrated in @@ -233,7 +238,7 @@ active at the same time, the write load will end up more evenly spread across th downside is that when you want to fetch the values of multiple sensors within a time range, you now need to perform a separate range query for each sensor. -### Rebalancing key-range sharded data +#### Rebalancing key-range sharded data When you first set up your database, there are no key ranges to split into shards. Some databases, such as HBase and MongoDB, allow you to configure an initial set of shards on an empty database, @@ -251,20 +256,18 @@ With databases that manage shard boundaries automatically, a shard split is typi * the shard reaching a configured size (for example, on HBase, the default is 10 GB), or * in some systems, the write throughput being persistently above some threshold. Thus, a hot shard - may be split even if it is not storing a lot of data, so that its write load can be distributed - more uniformly. + may be split even if it is not storing a lot of data, so that its write load can be distributed more uniformly. An advantage of key-range sharding is that the number of shards adapts to the data volume. If there is only a small amount of data, a small number of shards is sufficient, so overheads are small; if -there is a huge amount of data, the size of each individual shard is limited to a configurable -maximum [^15]. +there is a huge amount of data, the size of each individual shard is limited to a configurable maximum [^15]. A downside of this approach is that splitting a shard is an expensive operation, since it requires all of its data to be rewritten into new files, similarly to a compaction in a log-structured storage engine. A shard that needs splitting is often also one that is under high load, and the cost of splitting can exacerbate that load, risking it becoming overloaded. -## Sharding by Hash of Key +### Sharding by Hash of Key Key-range sharding is useful if you want records with nearby (but different) partition keys to be grouped into the same shard; for example, this might be the case with timestamps. If you don’t care @@ -273,9 +276,8 @@ application), a common approach is to first hash the partition key before mappin A good hash function takes skewed data and makes it uniformly distributed. Say you have a 32-bit hash function that takes a string. Whenever you give it a new string, it returns a seemingly random -number between 0 and 232 − 1. Even if the input strings are very similar, their -hashes are evenly distributed across that range of numbers (but the same input always produces the -same output). +number between 0 and 232 − 1. Even if the input strings are very similar, their hashes are evenly +distributed across that range of numbers (but the same input always produces the same output). For sharding purposes, the hash function need not be cryptographically strong: for example, MongoDB uses MD5, whereas Cassandra and ScyllaDB use Murmur3. Many programming languages have simple hash @@ -283,7 +285,7 @@ functions built in (as they are used for hash tables), but they may not be suita for example, in Java’s `Object.hashCode()` and Ruby’s `Object#hash`, the same key may have a different hash value in different processes, making them unsuitable for sharding [^16]. -### Hash modulo number of nodes +#### Hash modulo number of nodes Once you have hashed the key, how do you choose which shard to store it in? Maybe your first thought is to take the hash value *modulo* the number of nodes in the system (using the `%` operator in many @@ -303,7 +305,7 @@ The *mod N* function is easy to compute, but it leads to very inefficient rebala is a lot of unnecessary movement of records from one node to another. We need an approach that doesn’t move data around more than necessary. -### Fixed number of shards +#### Fixed number of shards One simple but widely-used solution is to create many more shards than there are nodes, and to assign several shards to each node. For example, a database running on a cluster of 10 nodes may be @@ -313,8 +315,7 @@ which shard is stored on which node. Now, if a node is added to the cluster, the system can reassign some of the shards from existing nodes to the new node until they are fairly distributed once again. This process is illustrated in -[Figure 7-4](/en/ch7#fig_sharding_rebalance_fixed). If a node is removed from the cluster, the same happens in -reverse. +[Figure 7-4](/en/ch7#fig_sharding_rebalance_fixed). If a node is removed from the cluster, the same happens in reverse. {{< figure src="/fig/ddia_0704.png" id="fig_sharding_rebalance_fixed" caption="Figure 7-4. Adding a new node to a database cluster with multiple shards per node." class="w-full my-4" >}} @@ -349,7 +350,7 @@ expensive. But if shards are too small, they incur too much overhead. The best p achieved when the size of shards is “just right,” neither too big nor too small, which can be hard to achieve if the number of shards is fixed but the dataset size varies. -### Sharding by hash range +#### Sharding by hash range If the required number of shards can’t be predicted in advance, it’s better to use a scheme in which the number of shards can adapt easily to the workload. The aforementioned key-range sharding scheme @@ -375,7 +376,9 @@ two or more columns, and the partition key is only the first of these columns, y efficient range queries over the second and later columns: as long as all records in the range query have the same partition key, they will be in the same shard. -# Partitioning and Range Queries in Data Warehouses +-------- + +> [!TIPS] PARTITIONING AND RANGE QUERIES IN DATA WAREHOUSES Data warehouses such as BigQuery, Snowflake, and Delta Lake support a similar indexing approach, though the terminology differs. In BigQuery, for example, the partition key determines which @@ -385,6 +388,8 @@ cluster keys for a table. Delta Lake supports both manual and automatic partitio supports cluster keys. Clustering data not only improves range scan performance, but can improve compression and filtering performance as well. +-------- + Hash-range sharding is used in YugabyteDB and DynamoDB [^17], and is an option in MongoDB. Cassandra and ScyllaDB use a variant of this approach that is illustrated in [Figure 7-6](/en/ch7#fig_sharding_cassandra): the space of hash values is split into a number of ranges proportional @@ -402,7 +407,7 @@ transfers parts of two of its ranges to node 3, and node 2 transfers part of one node 3. This has the effect of giving the new node an approximately fair share of the dataset, without transferring more data than necessary from one node to another. -### Consistent hashing +#### Consistent hashing A *consistent hashing* algorithm is a hash function that maps keys to a specified number of shards in a way that satisfies two properties: @@ -422,7 +427,7 @@ sub-ranges; on the other hand, with rendezvous and jump consistent hashes, the n individual keys that were previously scattered across all of the other nodes. Which one is preferable depends on the application. -## Skewed Workloads and Relieving Hot Spots +### Skewed Workloads and Relieving Hot Spots Consistent hashing ensures that keys are uniformly distributed across nodes, but that doesn’t mean that the actual load is uniformly distributed. If the workload is highly skewed—that is, the amount @@ -461,7 +466,7 @@ Some systems (especially cloud services designed for large scale) have automated dealing with hot shards; for example, Amazon calls it *heat management* [^28] or *adaptive capacity* [^17]. The details of how these systems work go beyond the scope of this book. -## Operations: Automatic or Manual Rebalancing +### Operations: Automatic or Manual Rebalancing There is one important question with regard to rebalancing that we have glossed over: does the splitting of shards and rebalancing happen automatically or manually? @@ -469,8 +474,7 @@ splitting of shards and rebalancing happen automatically or manually? Some systems automatically decide when to split shards and when to move them from one node to another, without any human interaction, while others leave sharding to be explicitly configured by an administrator. There is also a middle ground: for example, Couchbase and Riak generate a -suggested shard assignment automatically, but require an administrator to commit it before it takes -effect. +suggested shard assignment automatically, but require an administrator to commit it before it takes effect. Fully automated rebalancing can be convenient, because there is less operational work to do for normal maintenance, and such systems can even auto-scale to adapt to changes in workload. Cloud @@ -488,13 +492,14 @@ Such automation can be dangerous in combination with automatic failure detection one node is overloaded and is temporarily slow to respond to requests. The other nodes conclude that the overloaded node is dead, and automatically rebalance the cluster to move load away from it. This puts additional load on other nodes and the network, making the situation worse. There is a risk of -causing a cascading failure where other nodes become overloaded and are also falsely suspected of -being down. +causing a cascading failure where other nodes become overloaded and are also falsely suspected of being down. For that reason, it can be a good thing to have a human in the loop for rebalancing. It’s slower than a fully automatic process, but it can help prevent operational surprises. -# Request Routing + + +## Request Routing We have discussed how to shard a dataset across multiple nodes, and how to rebalance those shards as nodes are added or removed. Now let’s move on to the question: if you want to read or write a @@ -508,8 +513,8 @@ balancer can send a request to any of the instances. With sharded databases, a r only be handled by a node that is a replica for the shard containing that key. This means that request routing has to be aware of the assignment from keys to shards, and from -shards to nodes. On a high level, there are a few different approaches to this problem (illustrated -in [Figure 7-7](/en/ch7#fig_sharding_routing)): +shards to nodes. On a high level, there are a few different approaches to this problem +(illustrated in [Figure 7-7](/en/ch7#fig_sharding_routing)): 1. Allow clients to contact any node (e.g., via a round-robin load balancer). If that node coincidentally owns the shard to which the request applies, it can handle the request directly; @@ -568,7 +573,7 @@ typically have a very different kind of query execution: rather than executing i query typically needs to aggregate and join data from many different shards in parallel. We will discuss techniques for such parallel query execution in [Link to Come]. -# Sharding and Secondary Indexes +## Sharding and Secondary Indexes The sharding schemes we have discussed so far rely on the client knowing the partition key for any record it wants to access. This is most easily done in a key-value data model, where the partition @@ -587,7 +592,7 @@ search engines such as Solr and Elasticsearch. The problem with secondary indexe map neatly to shards. There are two main approaches to sharding a database with secondary indexes: local and global indexes. -## Local Secondary Indexes +### Local Secondary Indexes For example, imagine you are operating a website for selling used cars (illustrated in [Figure 7-9](/en/ch7#fig_sharding_local_secondary)). Each listing has a unique ID, and you use that ID as partition @@ -602,7 +607,7 @@ automatically adds its ID to the list of IDs for the index entry `color:red`. As {{< figure src="/fig/ddia_0709.png" id="fig_sharding_local_secondary" caption="Figure 7-9. Local secondary indexes: each shard indexes only the records within its own shard." class="w-full my-4" >}} -###### Warning +> [!WARN] WARNING If your database only supports a key-value model, you might be tempted to implement a secondary index yourself by creating a mapping from values to IDs in application code. If you go down this @@ -610,6 +615,8 @@ route, you need to take great care to ensure your indexes remain consistent with data. Race conditions and intermittent write failures (where some changes were saved but others weren’t) can very easily cause the data to go out of sync—see [“The need for multi-object transactions”](/en/ch8#sec_transactions_need). +-------- + In this indexing approach, each shard is completely separate: each shard maintains its own secondary indexes, covering only the records in that shard. It doesn’t care what data is stored in other shards. Whenever you write to the database—to add, remove, or update a records—you only need to @@ -635,7 +642,7 @@ process every query anyway. Nevertheless, local secondary indexes are widely used [^31]: for example, MongoDB, Riak, Cassandra [^32], Elasticsearch [^33], SolrCloud, and VoltDB [^34] all use local secondary indexes. -## Global Secondary Indexes +### Global Secondary Indexes Rather than each shard having its own, local secondary index, we can construct a *global index* that covers data in all shards. However, we can’t just store that index on one node, since it would @@ -645,16 +652,13 @@ but it can be sharded differently from the primary key index. [Figure 7-10](/en/ch7#fig_sharding_global_secondary) illustrates what this could look like: the IDs of red cars from all shards appear under `color:red` in the index, but the index is sharded so that colors starting with the letters *a* to *r* appear in shard 0 and colors starting with *s* to *z* appear in shard 1. -The index on the make of car is partitioned similarly (with the shard boundary being between *f* and -*h*). +The index on the make of car is partitioned similarly (with the shard boundary being between *f* and *h*). {{< figure src="/fig/ddia_0710.png" id="fig_sharding_global_secondary" caption="Figure 7-10. A global secondary index reflects data from all shards, and is itself sharded by the indexed value." class="w-full my-4" >}} -This kind of index is also called *term-partitioned* -[^30]: +This kind of index is also called *term-partitioned* [^30]: recall from [“Full-Text Search”](/en/ch4#sec_storage_full_text) that in full-text search, a *term* is a keyword in a text that -you can search for. Here we generalise it to mean any value that you can search for in the secondary -index. +you can search for. Here we generalise it to mean any value that you can search for in the secondary index. The global index uses the term as partition key, so that when you’re looking for a particular term or value, you can figure out which shard you need to query. As before, a shard can contain a @@ -684,6 +688,7 @@ indexes, so reads from a global index may be stale (similarly to replication lag Nevertheless, global indexes are useful if read throughput is higher than write throughput, and if the postings lists are not too long. + ## Summary In this chapter we explored different ways of sharding a large dataset into smaller subsets. @@ -692,8 +697,7 @@ is no longer feasible. The goal of sharding is to spread the data and query load evenly across multiple machines, avoiding hot spots (nodes with disproportionately high load). This requires choosing a sharding scheme that -is appropriate to your data, and rebalancing the shards when nodes are added to or removed from the -cluster. +is appropriate to your data, and rebalancing the shards when nodes are added to or removed from the cluster. We discussed two main approaches to sharding: diff --git a/content/en/ch8.md b/content/en/ch8.md index be4ce1c..28272ef 100644 --- a/content/en/ch8.md +++ b/content/en/ch8.md @@ -63,11 +63,10 @@ Concurrency control is relevant for both single-node and distributed databases. chapter, in [“Distributed Transactions”](/en/ch8#sec_transactions_distributed), we will examine the *two-phase commit* protocol and the challenge of achieving atomicity in a distributed transaction. -# What Exactly Is a Transaction? +## What Exactly Is a Transaction? Almost all relational databases today, and some nonrelational databases, support transactions. Most -of them follow the style that was introduced in 1975 by IBM System R, the first SQL database -[^2] [^3] [^4]. +of them follow the style that was introduced in 1975 by IBM System R, the first SQL database [^2] [^3] [^4]. Although some implementation details have changed, the general idea has remained virtually the same for 50 years: the transaction support in MySQL, PostgreSQL, Oracle, SQL Server, etc., is uncannily similar to that of System R. @@ -92,7 +91,7 @@ technical design choice, transactions have advantages and limitations. In order trade-offs, let’s go into the details of the guarantees that transactions can provide—both in normal operation and in various extreme (but realistic) circumstances. -## The Meaning of ACID +## #The Meaning of ACID The safety guarantees provided by transactions are often described by the well-known acronym *ACID*, which stands for *Atomicity*, *Consistency*, *Isolation*, and *Durability*. It was coined in 1983 by @@ -112,7 +111,7 @@ BASE is “not ACID”; i.e., it can mean almost anything you want.) Let’s dig into the definitions of atomicity, consistency, isolation, and durability, as this will let us refine our idea of transactions. -### Atomicity +#### Atomicity In general, *atomic* refers to something that cannot be broken down into smaller parts. The word means similar but subtly different things in different branches of computing. For example, in @@ -141,7 +140,7 @@ The ability to abort a transaction on error and have all writes from that transa the defining feature of ACID atomicity. Perhaps *abortability* would have been a better term than *atomicity*, but we will stick with *atomicity* since that’s the usual word. -### Consistency +#### Consistency The word *consistency* is terribly overloaded: @@ -181,7 +180,7 @@ invariants, but you haven’t declared those invariants, the database can’t st in ACID often depends on how the application uses the database, and it’s not a property of the database alone. -### Isolation +### #Isolation Most databases are accessed by several clients at the same time. That is no problem if they are reading and writing different parts of the database, but if they are accessing the same database @@ -211,7 +210,7 @@ is a weaker guarantee than serializability [^10] [^14]). This means that some kinds of race conditions can still occur. We will explore snapshot isolation and other forms of isolation in [“Weak Isolation Levels”](/en/ch8#sec_transactions_isolation_levels). -### Durability +#### Durability The purpose of a database system is to provide a safe place where data can be stored without fear of losing it. *Durability* is the promise that once a transaction has committed successfully, any data it @@ -231,7 +230,9 @@ as discussed in [“Reliability and Fault Tolerance”](/en/ch2#sec_introduction hard disks and all your backups are destroyed at the same time, there’s obviously nothing your database can do to save you. -# Replication and Durability +-------- + +> [!TIP] REPLICATION AND DURABILITY Historically, durability meant writing to an archive tape. Then it was understood as writing to a disk or SSD. More recently, it has been adapted to mean replication. Which implementation is better? @@ -269,7 +270,9 @@ risk-reduction techniques, including writing to disk, replicating to remote mach backups—​and they can and should be used together. As always, it’s wise to take any theoretical “guarantees” with a healthy grain of salt. -## Single-Object and Multi-Object Operations +-------- + +### Single-Object and Multi-Object Operations To recap, in ACID, atomicity and isolation describe what the database should do if a client makes several writes within the same transaction: @@ -329,7 +332,7 @@ operation that updates several keys in one operation), that doesn’t necessaril transaction semantics: the command may succeed for some keys and fail for others, leaving the database in a partially updated state. -### Single-object writes +#### Single-object writes Atomicity and isolation also apply when a single object is being changed. For example, imagine you are writing a 20 KB JSON document to a database: @@ -353,11 +356,15 @@ Similarly popular is a *conditional write* operation, which allows a write to ha has not been concurrently changed by someone else (see [“Conditional writes (compare-and-set)”](/en/ch8#sec_transactions_compare_and_set)), similarly to a compare-and-set or compare-and-swap (CAS) operation in shared-memory concurrency. +-------- + > [!NOTE] > Strictly speaking, the term *atomic increment* uses the word *atomic* in the sense of multi-threaded > programming. In the context of ACID, it should actually be called an *isolated* or *serializable* > increment, but that’s not the usual term. +-------- + These single-object operations are useful, as they can prevent lost updates when several clients try to write to the same object concurrently (see [“Preventing Lost Updates”](/en/ch8#sec_transactions_lost_update)). However, they are not transactions in the usual sense of the word. For example, the “lightweight transactions” feature @@ -365,7 +372,7 @@ of Cassandra and ScyllaDB, and Aerospike’s “strong consistency” mode offer [“Linearizability”](/en/ch10#sec_consistency_linearizability)) reads and conditional writes on a single object, but no guarantees across multiple objects. -### The need for multi-object transactions +#### The need for multi-object transactions Do we need multi-object transactions at all? Would it be possible to implement any application with only a key-value data model and single-object operations? @@ -396,7 +403,7 @@ much more complicated without atomicity, and the lack of isolation can cause con We will discuss those in [“Weak Isolation Levels”](/en/ch8#sec_transactions_isolation_levels), and explore alternative approaches in [Link to Come]. -### Handling errors and aborts +#### Handling errors and aborts A key feature of a transaction is that it can be aborted and safely retried if an error occurred. ACID databases are based on this philosophy: if the database is in danger of violating its guarantee @@ -437,7 +444,9 @@ isn’t perfect: in [“Two-Phase Commit (2PC)”](/en/ch8#sec_transactions_2pc)). * If the client process crashes while retrying, any data it was trying to write to the database is lost. -# Weak Isolation Levels + + +## Weak Isolation Levels If two transactions don’t access the same data, or if both are read-only, they can safely be run in parallel, because neither depends on the other. Concurrency issues (race conditions) only come into @@ -470,11 +479,15 @@ financial data!”—but that misses the point. Even many popular relational dat are usually considered “ACID”) use weak isolation, so they wouldn’t necessarily have prevented these bugs from occurring. +-------- + > [!NOTE] > Incidentally, much of the banking system relies on text files that are exchanged via secure FTP [^35]. > In this context, having an audit trail and some human-level fraud prevention measures is actually > more important than ACID properties. +-------- + Those examples also highlight an important point: even if concurrency issues are rare in normal operation, you have to consider the possibility that an attacker deliberately sends a burst of highly concurrent requests to your API in an attempt to deliberately exploit concurrency bugs [^30]. Therefore, in order to build @@ -488,7 +501,7 @@ serializability in detail (see [“Serializability”](/en/ch8#sec_transactions_ levels will be informal, using examples. If you want rigorous definitions and analyses of their properties, you can find them in the academic literature [^36] [^37] [^38] [^39]. -## Read Committed +### Read Committed The most basic level of transaction isolation is *read committed*. It makes two guarantees: @@ -498,7 +511,7 @@ The most basic level of transaction isolation is *read committed*. It makes two Some databases support an even weaker isolation level called *read uncommitted*. It prevents dirty writes, but does not prevent dirty reads. Let’s discuss these two guarantees in more detail. -### No dirty reads +#### No dirty reads Imagine a transaction has written some data to the database, but the transaction has not yet committed or aborted. Can another transaction see that uncommitted data? If yes, that is called a @@ -506,13 +519,11 @@ Can another transaction see that uncommitted data? If yes, that is called a Transactions running at the read committed isolation level must prevent dirty reads. This means that any writes by a transaction only become visible to others when that transaction commits (and then -all of its writes become visible at once). This is illustrated in -[Figure 8-4](/en/ch8#fig_transactions_read_committed), where user 1 has set *x* = 3, but user 2’s *get x* still +all of its writes become visible at once). This is illustrated in [Figure 8-4](/en/ch8#fig_transactions_read_committed), where user 1 has set *x* = 3, but user 2’s *get x* still returns the old value, 2, while user 1 has not yet committed. {{< figure src="/fig/ddia_0804.png" id="fig_transactions_read_committed" caption="Figure 8-4. No dirty reads: user 2 sees the new value for x only after user 1's transaction has committed." class="w-full my-4" >}} - There are a few reasons why it’s useful to prevent dirty reads: * If a transaction needs to update several rows, a dirty read means that another transaction may @@ -526,7 +537,7 @@ There are a few reasons why it’s useful to prevent dirty reads: transaction that read uncommitted data would also need to be aborted, leading to a problem called *cascading aborts*. -### No dirty writes +#### No dirty writes What happens if two transactions concurrently try to update the same row in a database? We don’t know in which order the writes will happen, but we normally assume that the later write overwrites @@ -555,7 +566,7 @@ By preventing dirty writes, this isolation level avoids some kinds of concurrenc {{< figure src="/fig/ddia_0805.png" id="fig_transactions_dirty_writes" caption="Figure 8-5. With dirty writes, conflicting writes from different transactions can be mixed up." class="w-full my-4" >}} -### Implementing read committed +#### Implementing read committed Read committed is a very popular isolation level. It is the default setting in Oracle Database, PostgreSQL, SQL Server, and many other databases [^10]. @@ -584,15 +595,14 @@ different part of the application, due to waiting for locks. Nevertheless, locks are used to prevent dirty reads in some databases, such as IBM Db2 and Microsoft SQL Server in the `read_committed_snapshot=off` setting [^29]. -A more commonly used approach to preventing dirty reads is the one illustrated in -[Figure 8-4](/en/ch8#fig_transactions_read_committed): for every +A more commonly used approach to preventing dirty reads is the one illustrated in [Figure 8-4](/en/ch8#fig_transactions_read_committed): for every row that is written, the database remembers both the old committed value and the new value set by the transaction that currently holds the write lock. While the transaction is ongoing, any other transactions that read the row are simply given the old value. Only when the new value is committed do transactions switch over to reading the new value (see [“Multi-version concurrency control (MVCC)”](/en/ch8#sec_transactions_snapshot_impl) for more detail). -## Snapshot Isolation and Repeatable Read +### Snapshot Isolation and Repeatable Read If you look superficially at read committed isolation, you could be forgiven for thinking that it does everything that a transaction needs to do: it allows aborts (required for atomicity), it @@ -616,15 +626,18 @@ now appears as though she only has a total of $900 in her accounts—it seems th vanished into thin air. This anomaly is called *read skew*, and it is an example of a *nonrepeatable read*: -if Aaliyah were to read the balance of -account 1 again at the end of the transaction, she would see a different value ($600) than she saw +if Aaliyah were to read the balance of account 1 again at the end of the transaction, she would see a different value ($600) than she saw in her previous query. Read skew is considered acceptable under read committed isolation: the account balances that Aaliyah saw were indeed committed at the time when she read them. +-------- + > [!NOTE] > The term *skew* is unfortunately overloaded: we previously used it in the sense of an *unbalanced > workload with hot spots* (see [“Skewed Workloads and Relieving Hot Spots”](/en/ch7#sec_sharding_skew)), whereas here it means *timing anomaly*. +-------- + In Aaliyah’s case, this is not a lasting problem, because she will most likely see consistent account balances if she reloads the online banking website a few seconds later. However, some situations cannot tolerate such temporary inconsistency: @@ -659,7 +672,7 @@ one system to the next [^29] [^40] [^41]. Some databases, such as Oracle, TiDB, and Aurora DSQL, even choose snapshot isolation as their highest isolation level. -### Multi-version concurrency control (MVCC) +#### Multi-version concurrency control (MVCC) Like read committed isolation, implementations of snapshot isolation typically use write locks to prevent dirty writes (see [“Implementing read committed”](/en/ch8#sec_transactions_read_committed_impl)), which means that a transaction @@ -707,7 +720,7 @@ All of the versions of a row are stored within the same database heap (see or not. The versions of the same row form a linked list, going either from newest version to oldest version or the other way round, so that queries can internally iterate over all versions of a row [^45] [^46]. -### Visibility rules for observing a consistent snapshot +#### Visibility rules for observing a consistent snapshot When a transaction reads from the database, transaction IDs are used to decide which row versions it can see and which are invisible. By carefully defining visibility rules, the database can present a @@ -743,7 +756,7 @@ that (from other transactions’ point of view) have long been overwritten or de updating values in place but instead inserting a new version every time a value is changed, the database can provide a consistent snapshot while incurring only a small overhead. -### Indexes and snapshot isolation +#### Indexes and snapshot isolation How do indexes work in a multi-version database? The most common approach is that each index entry points at one of the versions of a row that matches the entry (either the oldest or the newest @@ -754,9 +767,8 @@ are no longer visible to any transaction, the corresponding index entries can al Many implementation details affect the performance of multi-version concurrency control [^45] [^46]. For example, PostgreSQL has optimizations for avoiding index updates if different versions of the -same row can fit on the same page [^40]. -Some other databases avoid storing full copies of modified rows, and only store differences between -versions to save space. +same row can fit on the same page [^40]. Some other databases avoid storing full copies of modified rows, +and only store differences between versions to save space. Another approach is used in CouchDB, Datomic, and LMDB. Although they also use B-trees (see [“B-Trees”](/en/ch4#sec_storage_b_trees)), they use an *immutable* (copy-on-write) variant that does not overwrite @@ -771,7 +783,7 @@ was created. There is no need to filter out rows based on transaction IDs becaus writes cannot modify an existing B-tree; they can only create new tree roots. This approach also requires a background process for compaction and garbage collection. -### Snapshot isolation, repeatable read, and naming confusion +#### Snapshot isolation, repeatable read, and naming confusion MVCC is a commonly used implementation technique for databases, and often it is used to implement snapshot isolation. However, different databases sometimes use different terms to refer to the same @@ -796,7 +808,7 @@ formal definition. And to top it off, IBM Db2 uses “repeatable read” to refe As a result, nobody really knows what repeatable read means. -## Preventing Lost Updates +### Preventing Lost Updates The read committed and snapshot isolation levels we’ve discussed so far have been primarily about the guarantees of what a read-only transaction can see in the presence of concurrent writes. We have mostly ignored @@ -822,7 +834,7 @@ pattern occurs in various different scenarios: Because this is such a common problem, a variety of solutions have been developed [^48]. -### Atomic write operations +#### Atomic write operations Many databases provide atomic update operations, which remove the need to implement read-modify-write cycles in application code. They are usually the best solution if your code can be @@ -849,7 +861,7 @@ that performs unsafe read-modify-write cycles instead of using atomic operations database [^49] [^50] [^51]. This can be a source of subtle bugs that are difficult to find by testing. -### Explicit locking +#### Explicit locking Another option for preventing lost updates, if the database’s built-in atomic operations don’t provide the necessary functionality, is for the application to explicitly lock objects that are @@ -869,8 +881,8 @@ players from concurrently moving the same piece, as illustrated in [Example 8-1 BEGIN TRANSACTION; SELECT * FROM figures - WHERE name = 'robot' AND game_id = 222 - FOR UPDATE; ❶ + WHERE name = 'robot' AND game_id = 222 + FOR UPDATE; ❶ -- Check whether move is valid, then update the position -- of the piece that was returned by the previous SELECT. @@ -889,7 +901,7 @@ are waiting for each other to release their locks. Many databases automatically and abort one of the involved transactions so that the system can make progress. You can handle this situation at the application level by retrying the aborted transaction. -### Automatically detecting lost updates +#### Automatically detecting lost updates Atomic operations and locks are ways of preventing lost updates by forcing the read-modify-write cycles to happen sequentially. An alternative is to allow them to execute in parallel and, if the @@ -909,7 +921,7 @@ special database features—you may forget to use a lock or an atomic operation a bug, but lost update detection happens automatically and is thus less error-prone. However, you also have to retry aborted transactions at the application level. -### Conditional writes (compare-and-set) +#### Conditional writes (compare-and-set) In databases that don’t provide transactions, you sometimes find a *conditional write* operation that can prevent lost updates by allowing an update to happen only if the value has not changed @@ -925,7 +937,7 @@ user started editing it: ```sql -- This may or may not be safe, depending on the database implementation UPDATE wiki_pages SET content = 'new content' - WHERE id = 1234 AND content = 'old content'; + WHERE id = 1234 AND content = 'old content'; ``` If the content has changed and no longer matches `'old content'`, this update will have no effect, @@ -940,7 +952,7 @@ implementations of MVCC have an exception to the visibility rules for this scena written by other transactions are visible to the evaluation of the `WHERE` clause of `UPDATE` and `DELETE` queries, even though those writes are not otherwise visible in the snapshot. -### Conflict resolution and replication +#### Conflict resolution and replication In replicated databases (see [Chapter 6](/en/ch6#ch_replication)), preventing lost updates takes on another dimension: since they have copies of the data on multiple nodes, and the data can potentially be @@ -968,7 +980,7 @@ On the other hand, the *last write wins* (LWW) conflict resolution method is pro as discussed in [“Last write wins (discarding concurrent writes)”](/en/ch6#sec_replication_lww). Unfortunately, LWW is the default in many replicated databases. -## Write Skew and Phantoms +### Write Skew and Phantoms In the previous sections we saw *dirty writes* and *lost updates*, two kinds of race conditions that can occur when different transactions concurrently try to write to the same objects. In order to @@ -995,10 +1007,9 @@ In each transaction, your application first checks that two or more doctors are if yes, it assumes it’s safe for one doctor to go off call. Since the database is using snapshot isolation, both checks return `2`, so both transactions proceed to the next stage. Aaliyah updates her own record to take herself off call, and Bryce updates his own record likewise. Both transactions -commit, and now no doctor is on call. Your requirement of having at least one doctor on call has -been violated. +commit, and now no doctor is on call. Your requirement of having at least one doctor on call has been violated. -### Characterizing write skew +#### Characterizing write skew This anomaly is called *write skew* [^36]. It is neither a dirty write nor a lost update, because the two transactions are updating two different @@ -1035,51 +1046,50 @@ options are more restricted: BEGIN TRANSACTION; SELECT * FROM doctors - WHERE on_call = true - AND shift_id = 1234 FOR UPDATE; ❶ + WHERE on_call = true + AND shift_id = 1234 FOR UPDATE; ❶ UPDATE doctors - SET on_call = false - WHERE name = 'Aaliyah' - AND shift_id = 1234; + SET on_call = false + WHERE name = 'Aaliyah' + AND shift_id = 1234; COMMIT; ``` ❶: As before, `FOR UPDATE` tells the database to lock all rows returned by this query. -### More examples of write skew +#### More examples of write skew Write skew may seem like an esoteric issue at first, but once you’re aware of it, you may notice more situations in which it can occur. Here are some more examples: Meeting room booking system : Say you want to enforce that there cannot be two bookings for the same meeting room at the same time [^55]. - When someone wants to make a booking, you first check for any conflicting bookings (i.e., - bookings for the same room with an overlapping time range), and if none are found, you create the - meeting (see [Example 8-2](/en/ch8#fig_transactions_meeting_rooms)). + When someone wants to make a booking, you first check for any conflicting bookings (i.e., + bookings for the same room with an overlapping time range), and if none are found, you create the + meeting (see [Example 8-2](/en/ch8#fig_transactions_meeting_rooms)). + + {{< figure id="fig_transactions_meeting_rooms" title="Example 8-2. A meeting room booking system tries to avoid double-booking (not safe under snapshot isolation)" class="w-full my-4" >}} + + ```sql + BEGIN TRANSACTION; + + -- Check for any existing bookings that overlap with the period of noon-1pm + SELECT COUNT(*) FROM bookings + WHERE room_id = 123 AND + end_time > '2025-01-01 12:00' AND start_time < '2025-01-01 13:00'; + + -- If the previous query returned zero: + INSERT INTO bookings (room_id, start_time, end_time, user_id) + VALUES (123, '2025-01-01 12:00', '2025-01-01 13:00', 666); + + COMMIT; + ``` - ##### Example 8-2. A meeting room booking system tries to avoid double-booking (not safe under snapshot isolation) - - ```sql - BEGIN TRANSACTION; - - -- Check for any existing bookings that overlap with the period of noon-1pm - SELECT COUNT(*) FROM bookings - WHERE room_id = 123 AND - end_time > '2025-01-01 12:00' AND start_time < '2025-01-01 13:00'; - - -- If the previous query returned zero: - INSERT INTO bookings - (room_id, start_time, end_time, user_id) - VALUES (123, '2025-01-01 12:00', '2025-01-01 13:00', 666); - - COMMIT; - ``` - - Unfortunately, snapshot isolation does not prevent another user from concurrently inserting a conflicting - meeting. In order to guarantee you won’t get scheduling conflicts, you once again need serializable - isolation. + Unfortunately, snapshot isolation does not prevent another user from concurrently inserting a conflicting + meeting. In order to guarantee you won’t get scheduling conflicts, you once again need serializable + isolation. Multiplayer game : In [Example 8-1](/en/ch8#fig_transactions_select_for_update), we used a lock to prevent lost updates (that is, making @@ -1103,7 +1113,7 @@ Preventing double-spending With write skew, it could happen that two spending items are inserted concurrently that together cause the balance to go negative, but that neither transaction notices the other. -### Phantoms causing write skew +#### Phantoms causing write skew All of these examples follow a similar pattern: @@ -1139,7 +1149,7 @@ Snapshot isolation avoids phantoms in read-only queries, but in read-write trans examples we discussed, phantoms can lead to particularly tricky cases of write skew. The SQL generated by ORMs is also prone to write skew [^50] [^51]. -### Materializing conflicts +#### Materializing conflicts If the problem of phantoms is that there is no object to which we can attach the locks, perhaps we can artificially introduce a lock object into the database? @@ -1162,7 +1172,9 @@ mechanism leak into the application data model. For those reasons, materializing considered a last resort if no alternative is possible. A serializable isolation level is much preferable in most cases. -# Serializability + + +## Serializability In this chapter we have seen several examples of transactions that are prone to race conditions. Some race conditions are prevented by the read committed and snapshot isolation levels, but @@ -1195,12 +1207,11 @@ serializability, and how they perform. Most databases that provide serializabili three techniques, which we will explore in the rest of this chapter: * Literally executing transactions in a serial order (see [“Actual Serial Execution”](/en/ch8#sec_transactions_serial)) -* Two-phase locking (see [“Two-Phase Locking (2PL)”](/en/ch8#sec_transactions_2pl)), which for several decades was the only viable - option +* Two-phase locking (see [“Two-Phase Locking (2PL)”](/en/ch8#sec_transactions_2pl)), which for several decades was the only viable option * Optimistic concurrency control techniques such as serializable snapshot isolation (see [“Serializable Snapshot Isolation (SSI)”](/en/ch8#sec_transactions_ssi)) -## Actual Serial Execution +### Actual Serial Execution The simplest way of avoiding concurrency problems is to remove the concurrency entirely: to execute only one transaction at a time, in serial order, on a single thread. By doing so, we completely @@ -1230,7 +1241,7 @@ supports concurrency, because it can avoid the coordination overhead of locking. throughput is limited to that of a single CPU core. In order to make the most of that single thread, transactions need to be structured differently from their traditional form. -### Encapsulating transactions in stored procedures +#### Encapsulating transactions in stored procedures In the early days of databases, the intention was that a database transaction could encompass an entire flow of user activity. For example, booking an airline ticket is a multi-stage process @@ -1270,8 +1281,7 @@ stored procedure can execute very quickly, without waiting for any network or di {{< figure src="/fig/ddia_0809.png" id="fig_transactions_stored_proc" caption="Figure 8-9. The difference between an interactive transaction and a stored procedure (using the example transaction of [Figure 8-8](/en/ch8#fig_transactions_write_skew))." class="w-full my-4" >}} - -### Pros and cons of stored procedures +#### Pros and cons of stored procedures Stored procedures have existed for some time in relational databases, and they have been part of the SQL standard (SQL/PSM) since 1999. They have gained a somewhat bad reputation, for various reasons: @@ -1312,7 +1322,7 @@ so through special deterministic APIs (see [“Durable Execution and Workflows deterministic operations). This approach is called *state machine replication*, and we will return to it in [Chapter 10](/en/ch10#ch_consistency). -### Sharding +#### Sharding Executing all transactions serially makes concurrency control much simpler, but limits the transaction throughput of the database to the speed of a single CPU core on a single machine. @@ -1341,13 +1351,12 @@ application. Simple key-value data can often be sharded very easily, but data wi secondary indexes is likely to require a lot of cross-shard coordination (see [“Sharding and Secondary Indexes”](/en/ch7#sec_sharding_secondary_indexes)). -### Summary of serial execution +#### Summary of serial execution Serial execution of transactions has become a viable way of achieving serializable isolation within certain constraints: -* Every transaction must be small and fast, because it takes only one slow transaction to stall all - transaction processing. +* Every transaction must be small and fast, because it takes only one slow transaction to stall all transaction processing. * It is most appropriate in situations where the active dataset can fit in memory. Rarely accessed data could potentially be moved to disk, but if it needed to be accessed in a single-threaded transaction, the system would get very slow. @@ -1355,19 +1364,24 @@ certain constraints: to be sharded without requiring cross-shard coordination. * Cross-shard transactions are possible, but their throughput is hard to scale. -## Two-Phase Locking (2PL) +### Two-Phase Locking (2PL) For around 30 years, there was only one widely used algorithm for serializability in databases: *two-phase locking* (2PL), sometimes called *strong strict two-phase locking* (SS2PL) to distinguish it from other variants of 2PL. -# 2PL is not 2PC + +-------- + +> [!TIP] 2PL IS NOT 2PC Two-phase *locking* (2PL) and two-phase *commit* (2PC) are two very different things. 2PL provides serializable isolation, whereas 2PC provides atomic commit in a distributed database (see [“Two-Phase Commit (2PC)”](/en/ch8#sec_transactions_2pc)). To avoid confusion, it’s best to think of them as entirely separate concepts and to ignore the unfortunate similarity in the names. +-------- + We saw previously that locks are often used to prevent dirty writes (see [“No dirty writes”](/en/ch8#sec_transactions_dirty_write)): if two transactions concurrently try to write to the same object, the lock ensures that the second writer must wait until the first one has finished its transaction @@ -1390,7 +1404,7 @@ readers* (see [“Multi-version concurrency control (MVCC)”](/en/ch8#sec_trans snapshot isolation and two-phase locking. On the other hand, because 2PL provides serializability, it protects against all the race conditions discussed earlier, including lost updates and write skew. -### Implementation of two-phase locking +#### Implementation of two-phase locking 2PL is used by the serializable isolation level in MySQL (InnoDB) and SQL Server, and the repeatable read isolation level in Db2 [^29]. @@ -1417,7 +1431,7 @@ transaction B to release its lock, and vice versa. This situation is called *dea automatically detects deadlocks between transactions and aborts one of them so that the others can make progress. The aborted transaction needs to be retried by the application. -### Performance of two-phase locking +#### Performance of two-phase locking The big downside of two-phase locking, and the reason why it hasn’t been used by everybody since the 1970s, is performance: transaction throughput and response times of queries are significantly worse @@ -1446,7 +1460,7 @@ transaction). This can be an additional performance problem: when a transaction deadlock and is retried, it needs to do its work all over again. If deadlocks are frequent, this can mean significant wasted effort. -### Predicate locks +#### Predicate locks In the preceding description of locks, we glossed over a subtle but important detail. In [“Phantoms causing write skew”](/en/ch8#sec_transactions_phantom) we discussed the problem of *phantoms*—that is, one transaction @@ -1485,7 +1499,7 @@ database, but which might be added in the future (phantoms). If two-phase lockin the database prevents all forms of write skew and other race conditions, and so its isolation becomes serializable. -### Index-range locks +#### Index-range locks Unfortunately, predicate locks do not perform well: if there are many locks by active transactions, checking for matching locks becomes time-consuming. For that reason, most databases with 2PL @@ -1499,8 +1513,7 @@ room 123) between noon and 1 p.m. This is safe because any write that matches th will definitely also match the approximations. In the room bookings database you would probably have an index on the `room_id` column, and/or -indexes on `start_time` and `end_time` (otherwise the preceding query would be very slow on a large -database): +indexes on `start_time` and `end_time` (otherwise the preceding query would be very slow on a large database): * Say your index is on `room_id`, and the database uses this index to find existing bookings for room 123. Now the database can simply attach a shared lock to this index entry, indicating that a @@ -1523,7 +1536,7 @@ If there is no suitable index where a range lock can be attached, the database c shared lock on the entire table. This will not be good for performance, since it will stop all other transactions writing to the table, but it’s a safe fallback position. -## Serializable Snapshot Isolation (SSI) +### Serializable Snapshot Isolation (SSI) This chapter has painted a bleak picture of concurrency control in databases. On the one hand, we have implementations of serializability that don’t perform well (two-phase locking) or don’t scale @@ -1539,7 +1552,7 @@ Today SSI and similar algorithms are used in single-node databases (the serializ in PostgreSQL [^54], SQL Server’s In-Memory OLTP/Hekaton [^66], and HyPer [^67]), distributed databases (CockroachDB [^5] and FoundationDB [^8]), and embedded storage engines such as BadgerDB. -### Pessimistic versus optimistic concurrency control +#### Pessimistic versus optimistic concurrency control Two-phase locking is a so-called *pessimistic* concurrency control mechanism: it is based on the principle that if anything might possibly go wrong (as indicated by a lock held by another @@ -1558,8 +1571,7 @@ transaction wants to commit, the database checks whether anything bad happened ( isolation was violated); if so, the transaction is aborted and has to be retried. Only transactions that executed serializably are allowed to commit. -Optimistic concurrency control is an old idea [^68], -and its advantages and disadvantages have been debated for a long time [^69]. +Optimistic concurrency control is an old idea [^68], and its advantages and disadvantages have been debated for a long time [^69]. It performs badly if there is high contention (many transactions trying to access the same objects), as this leads to a high proportion of transactions needing to abort. If the system is already close to its maximum throughput, the additional transaction load from retried transactions can make @@ -1577,14 +1589,13 @@ are made from a consistent snapshot of the database (see [“Snapshot Isolation On top of snapshot isolation, SSI adds an algorithm for detecting serialization conflicts among reads and writes, and determining which transactions to abort. -### Decisions based on an outdated premise +#### Decisions based on an outdated premise When we previously discussed write skew in snapshot isolation (see [“Write Skew and Phantoms”](/en/ch8#sec_transactions_write_skew)), we observed a recurring pattern: a transaction reads some data from the database, examines the result of the query, and decides to take some action (write to the database) based on the result that it saw. However, under snapshot isolation, the result from the original query may no longer be -up-to-date by the time the transaction commits, because the data may have been modified in the -meantime. +up-to-date by the time the transaction commits, because the data may have been modified in the meantime. Put another way, the transaction is taking an action based on a *premise* (a fact that was true at the beginning of the transaction, e.g., “There are currently two doctors on call”). Later, when the @@ -1603,7 +1614,7 @@ How does the database know if a query result might have changed? There are two c * Detecting reads of a stale MVCC object version (uncommitted write occurred before the read) * Detecting writes that affect prior reads (the write occurs after the read) -### Detecting stale MVCC reads +#### Detecting stale MVCC reads Recall that snapshot isolation is usually implemented by multi-version concurrency control (MVCC; see [“Multi-version concurrency control (MVCC)”](/en/ch8#sec_transactions_snapshot_impl)). When a transaction reads from a consistent snapshot in an @@ -1634,7 +1645,7 @@ abort or may still be uncommitted at the time when transaction 43 is committed, turn out not to have been stale after all. By avoiding unnecessary aborts, SSI preserves snapshot isolation’s support for long-running reads from a consistent snapshot. -### Detecting writes that affect prior reads +#### Detecting writes that affect prior reads The second case to consider is when another transaction modifies data after it has been read. This case is illustrated in [Figure 8-11](/en/ch8#fig_transactions_detect_index_range). @@ -1665,7 +1676,7 @@ transaction 43’s write affected 42, 43 hasn’t yet committed, so the write ha However, when transaction 43 wants to commit, the conflicting write from 42 has already been committed, so 43 must abort. -### Performance of serializable snapshot isolation +#### Performance of serializable snapshot isolation As always, many engineering details affect how well an algorithm works in practice. For example, one trade-off is the granularity at which transactions’ reads and writes are tracked. If the database @@ -1702,7 +1713,7 @@ SSI requires that read-write transactions be fairly short (long-running read-onl okay). However, SSI is less sensitive to slow transactions than two-phase locking or serial execution. -# Distributed Transactions +## Distributed Transactions The last few sections have focused on concurrency control for isolation, the I in ACID. The algorithms we have seen apply to both single-node and distributed databases: although there are @@ -1759,10 +1770,9 @@ was later aborted, user 2’s transaction would have to be reverted as well, sin data that was retroactively declared not to have existed. A better approach is to ensure that the nodes involved in a transaction either all commit or all -abort, and to prevent a mixture of the two. Ensuring this is known as the *atomic commitment* -problem. +abort, and to prevent a mixture of the two. Ensuring this is known as the *atomic commitment* problem. -## Two-Phase Commit (2PC) +### Two-Phase Commit (2PC) Two-phase commit is an algorithm for achieving atomic transaction commit across multiple nodes. It is a classic algorithm in distributed databases [^13] [^71] [^72]. 2PC is used @@ -1800,7 +1810,7 @@ the answer “I do” from both. After receiving both acknowledgments, the minis couple husband and wife: the transaction is committed, and the happy fact is broadcast to all attendees. If either bride or groom does not say “yes,” the ceremony is aborted [^76]. -### A system of promises +#### A system of promises From this short description it might not be clear why two-phase commit ensures atomicity, while one-phase commit across several nodes does not. Surely the prepare and commit requests can just @@ -1851,7 +1861,7 @@ married or not by querying the minister for the status of your global transactio wait for the minister’s next retry of the commit request (since the retries will have continued throughout your period of unconsciousness). -### Coordinator failure +#### Coordinator failure We have discussed what happens if one of the participants or the network fails during 2PC: if any of the prepare requests fails or times out, the coordinator aborts the transaction; if any of the @@ -1886,7 +1896,7 @@ all in-doubt transactions by reading its transaction log. Any transactions that record in the coordinator’s log are aborted. Thus, the commit point of 2PC comes down to a regular single-node atomic commit on the coordinator. -### Three-phase commit +#### Three-phase commit Two-phase commit is called a *blocking* atomic commit protocol due to the fact that 2PC can become stuck waiting for the coordinator to recover. It is possible to make an atomic commit protocol @@ -1901,7 +1911,7 @@ cannot guarantee atomicity. A better solution in practice is to replace the single-node coordinator with a fault-tolerant consensus protocol. We will see how to do this in [Chapter 10](/en/ch10#ch_consistency). -## Distributed Transactions Across Different Systems +### Distributed Transactions Across Different Systems Distributed transactions and two-phase commit have a mixed reputation. On the one hand, they are seen as providing an important safety guarantee that would be hard to achieve otherwise; on the @@ -1936,7 +1946,7 @@ use any protocol and apply optimizations specific to that particular technology. database-internal distributed transactions can often work quite well. On the other hand, transactions spanning heterogeneous technologies are a lot more challenging. -### Exactly-once message processing +#### Exactly-once message processing Heterogeneous distributed transactions allow diverse systems to be integrated in powerful ways. For example, a message from a message queue can be acknowledged as processed if and only if the database @@ -1961,11 +1971,10 @@ safely be retried as if nothing had happened. We will return to the topic of exactly-once semantics later in this chapter. Let’s look first at the atomic commit protocol that allows such heterogeneous distributed transactions. -### XA transactions +#### XA transactions *X/Open XA* (short for *eXtended Architecture*) is a standard for implementing two-phase commit -across heterogeneous technologies [^73]. -It was introduced in 1991 and has been widely +across heterogeneous technologies [^73]. It was introduced in 1991 and has been widely implemented: XA is supported by many traditional relational databases (including PostgreSQL, MySQL, Db2, SQL Server, and Oracle) and message brokers (including ActiveMQ, HornetQ, MSMQ, and IBM MQ). @@ -1996,7 +2005,7 @@ transaction. Only then can the coordinator use the database driver’s XA callba participants to commit or abort, as appropriate. The database server cannot contact the coordinator directly, since all communication must go via its client library. -### Holding locks while in doubt +#### Holding locks while in doubt Why do we care so much about a transaction being stuck in doubt? Can’t the rest of the system just get on with its work, and ignore the in-doubt transaction that will be cleaned up eventually? @@ -2019,7 +2028,7 @@ cannot simply continue with their business—if they want to access that same da blocked. This can cause large parts of your application to become unavailable until the in-doubt transaction is resolved. -### Recovering from coordinator failure +#### Recovering from coordinator failure In theory, if the coordinator crashes and is restarted, it should cleanly recover its state from the log and resolve any in-doubt transactions. However, in practice, *orphaned* in-doubt transactions do occur [^83] [^84] — that is, @@ -2046,7 +2055,7 @@ decision from the coordinator [^73]. To be clear, violates the system of promises in two-phase commit. Thus, heuristic decisions are intended only for getting out of catastrophic situations, and not for regular use. -### Problems with XA transactions +#### Problems with XA transactions A single-node coordinator is a single point of failure for the entire system, and making it part of the application server is also problematic because the coordinator’s logs on its local disk become a @@ -2077,7 +2086,7 @@ However, keeping several heterogeneous data systems consistent with each other i important problem, so we need to find a different solution to it. This can be done, as we will see in the next section and in [Link to Come]. -## Database-internal Distributed Transactions +### Database-internal Distributed Transactions As explained previously, there is a big difference between distributed transactions that span multiple heterogeneous storage technologies, and those that are internal to a system—i.e., where all @@ -2109,7 +2118,7 @@ The isolation levels offered for distributed transactions depend on the system, isolation and serializable snapshot isolation are both possible across shards. The details of how this works can be found in the papers referenced at the end of this chapter. -### Exactly-once message processing revisited +#### Exactly-once message processing revisited We saw in [“Exactly-once message processing”](/en/ch8#sec_transactions_exactly_once) that an important use case for distributed transactions is to ensure that some operation takes effect exactly once, even if a crash occurs while it is being @@ -2148,14 +2157,15 @@ Thus, achieving exactly-once processing only requires transactions within the da across database and message broker is not necessary for this use case. Recording the message ID in the database makes the message processing *idempotent*, so that message processing can be safely retried without duplicating its side-effects. A similar approach is used in stream processing -frameworks such as Kafka Streams to achieve exactly-once semantics, as we shall see in -[Link to Come]. +frameworks such as Kafka Streams to achieve exactly-once semantics, as we shall see in [Link to Come]. However, internal distributed transactions within the database are still useful for the scalability of patterns such as these: for example, they would allow the message IDs to be stored on one shard and the main data updated by the message processing to be stored on other shards, and to ensure atomicity of the transaction commit across those shards. + + ## Summary Transactions are an abstraction layer that allows an application to pretend that certain concurrency diff --git a/content/en/ch9.md b/content/en/ch9.md index 3242b01..e28080e 100644 --- a/content/en/ch9.md +++ b/content/en/ch9.md @@ -34,7 +34,7 @@ explore how to think about the state of a distributed system and how to reason a have happened ([“Knowledge, Truth, and Lies”](/en/ch9#sec_distributed_truth)). Later, in [Chapter 10](/en/ch10#ch_consistency), we will look at some examples of how we can achieve fault tolerance in the face of those faults. -# Faults and Partial Failures +## Faults and Partial Failures When you are writing a program on a single computer, it normally behaves in a fairly predictable way: either it works or it doesn’t. Buggy software may give the appearance that the computer is @@ -69,7 +69,7 @@ anecdote [^3]: > pickup truck into a DC’s HVAC [heating, ventilation, and air conditioning] system. And I’m not even > an ops guy. > -> Coda Hale +> —— Coda Hale In a distributed system, there may well be some parts of the system that are broken in some unpredictable way, even though other parts of the system are working fine. This is known as a @@ -77,8 +77,7 @@ unpredictable way, even though other parts of the system are working fine. This anything involving multiple nodes and the network, it may sometimes work and sometimes unpredictably fail. As we shall see, you may not even *know* whether something succeeded or not! -This nondeterminism and possibility of partial failures is what makes distributed systems hard to -work with [^4]. +This nondeterminism and possibility of partial failures is what makes distributed systems hard to work with [^4]. On the other hand, if a distributed system can tolerate partial failures, that opens up powerful possibilities: for example, it allows you to perform a rolling upgrade, rebooting one node at a time to install software updates while the system as a whole continues working uninterrupted all the @@ -90,7 +89,7 @@ supposed to tolerate. It is important to consider a wide range of possible fault unlikely ones—and to artificially create such situations in your testing environment to see what happens. In distributed systems, suspicion, pessimism, and paranoia pay off. -# Unreliable Networks +## Unreliable Networks As discussed in [“Shared-Memory, Shared-Disk, and Shared-Nothing Architecture”](/en/ch2#sec_introduction_shared_nothing), the distributed systems we focus on in this book are mostly *shared-nothing systems*: i.e., a bunch of machines connected by a network. @@ -130,18 +129,22 @@ the response is not going to arrive. However, when a timeout occurs, you still d the remote node got your request or not (and if the request is still queued somewhere, it may still be delivered to the recipient, even if the sender has given up on it). -## The Limitations of TCP +### The Limitations of TCP Network packets have a maximum size (generally a few kilobytes), but many applications need to send messages (requests, responses) that are too big to fit in one packet. These applications most often use TCP, the Transmission Control Protocol, to establish a *connection* that breaks up large data streams into individual packets, and puts them back together again on the receiving side. +-------- + > [!NOTE] > Most of what we say about TCP applies also to its more recent alternative QUIC, as well as the > Stream Control Transmission Protocol (SCTP) used in WebRTC, the BitTorrent uTP protocol, and > other transport protocols. For a comparison to UDP, see [“TCP Versus UDP”](/en/ch9#sidebar_distributed_tcp_udp). +-------- + TCP is often described as providing “reliable” delivery, in the sense that it detects and retransmits dropped packets, it detects reordered packets and puts them back in the correct order, and it detects packet corruption using a simple checksum. It also figures out how fast it can send @@ -177,7 +180,7 @@ use it to send multiple requests and responses. This is usually done by first se indicates the length of the following message in bytes, followed by the actual message. HTTP and many RPC protocols (see [“Dataflow Through Services: REST and RPC”](/en/ch5#sec_encoding_dataflow_rpc)) work like this. -## Network Faults in Practice +### Network Faults in Practice We have been building computer networks for decades—one might hope that by now we would have figured out how to make them reliable. Unfortunately, we have not yet succeeded. There are some systematic @@ -197,25 +200,27 @@ even in controlled environments like a datacenter operated by one company [^8]: * Across different cloud regions, round-trip times of up to several *minutes* have been observed at high percentiles [^18]. Even within a single datacenter, packet delay of more than a minute can occur during a network - topology reconfiguration, triggered by a problem during a software upgrade for a switch - [^19]. + topology reconfiguration, triggered by a problem during a software upgrade for a switch [^19]. Thus, we have to assume that messages might be delayed arbitrarily. * Sometimes communications are partially interrupted, depending on who you’re talking to: for example, A and B can communicate, B and C can communicate, but A and C cannot [^20] [^21]. Other surprising faults include a network interface that sometimes drops all inbound packets but sends outbound packets successfully [^22]: - just because a network link works in one direction doesn’t guarantee it’s also working in the - opposite direction. + just because a network link works in one direction doesn’t guarantee it’s also working in the opposite direction. * Even a brief network interruption can have repercussions that last for much longer than the original issue [^8] [^20] [^23]. -# Network partitions +-------- + +> [!TIP] NETWORK PARTITIONS When one part of the network is cut off from the rest due to a network fault, that is sometimes called a *network partition* or *netsplit*, but it is not fundamentally different from other kinds of network interruption. Network partitions are not related to sharding of a storage system, which is sometimes also called *partitioning* (see [Chapter 7](/en/ch7#ch_sharding)). +-------- + Even if network faults are rare in your environment, the fact that faults *can* occur means that your software needs to be able to handle them. Whenever any communication happens over a network, it may fail—there is no way around it. @@ -233,7 +238,7 @@ and ensure that the system can recover from them. It may make sense to deliberately trigger network problems and test the system’s response (this is known as *fault injection*; see [“Fault injection”](/en/ch9#sec_fault_injection)). -## Detecting Faults +### Detecting Faults Many systems need to automatically detect faulty nodes. For example: @@ -266,7 +271,7 @@ gone wrong, you may get an error response at some level of the stack, but in gen assume that you will get no response at all. You can retry a few times, wait for a timeout to elapse, and eventually declare the node dead if you don’t hear back within the timeout. -## Timeouts and Unbounded Delays +### Timeouts and Unbounded Delays If a timeout is the only sure way of detecting a fault, then how long should the timeout be? There is unfortunately no simple answer. @@ -304,7 +309,7 @@ cannot guarantee that they can handle requests within some maximum time (see be fast most of the time: if your timeout is low, it only takes a transient spike in round-trip times to throw the system off-balance. -### Network congestion and queueing +#### Network congestion and queueing When driving a car, travel times on road networks often vary most due to traffic congestion. Similarly, the variability of packet delays on computer networks is most often due to queueing [^27]: @@ -327,12 +332,13 @@ Similarly, the variability of packet delays on computer networks is most often d {{< figure src="/fig/ddia_0902.png" id="fig_distributed_switch_queueing" caption="Figure 9-2. If several machines send network traffic to the same destination, its switch queue can fill up. Here, ports 1, 2, and 4 are all trying to send packets to port 3." class="w-full my-4" >}} - Moreover, when TCP detects and automatically retransmits a lost packet, although the application does not see the packet loss directly, it does see the resulting delay (waiting for the timeout to expire, and then waiting for the retransmitted packet to be acknowledged). -# TCP Versus UDP +-------- + +> [!TIP] TCP VERSUS UDP Some latency-sensitive applications, such as videoconferencing and Voice over IP (VoIP), use UDP rather than TCP. It’s a trade-off between reliability and variability of delays: as UDP does not @@ -346,6 +352,8 @@ application must instead fill the missing packet’s time slot with silence (cau interruption in the sound) and move on in the stream. The retry happens at the human layer instead. (“Could you repeat that please? The sound just cut out for a moment.”) +-------- + All of these factors contribute to the variability of network delays. Queueing delays have an especially wide range when a system is close to its maximum capacity: a system with plenty of spare capacity can easily drain queues, whereas in a highly utilized system, long queues can build up very @@ -369,7 +377,7 @@ observed response time distribution. The Phi Accrual failure detector [^32], which is used for example in Akka and Cassandra [^33] is one way of doing this. TCP retransmission timeouts also work similarly [^5]. -## Synchronous Versus Asynchronous Networks +### Synchronous Versus Asynchronous Networks Distributed systems would be a lot simpler if we could rely on the network to deliver packets with some fixed maximum delay, and not to drop packets. Why can’t we solve this at the hardware level @@ -394,7 +402,7 @@ suffer from queueing, because the 16 bits of space for the call have already bee next hop of the network. And because there is no queueing, the maximum end-to-end latency of the network is fixed. We call this a *bounded delay*. -### Can we not simply make network delays predictable? +#### Can we not simply make network delays predictable? Note that a circuit in a telephone network is very different from a TCP connection: a circuit is a fixed amount of reserved bandwidth which nobody else can use while the circuit is established, @@ -433,7 +441,9 @@ Loss, and Scalable Throughput (L4S) attempt to mitigate some of the queuing and problems both at the client and router level. Linux’s traffic controller (TC) also allows applications to reprioritize packets for QoS purposes. -# Latency and Resource Utilization +-------- + +> [!TIP] LATENCY AND RESOURCE UTILIZATION More generally, you can think of variable delays as a consequence of dynamic resource partitioning. @@ -460,25 +470,25 @@ platforms run several virtual machines from different customers on the same phys Latency guarantees are achievable in certain environments, if resources are statically partitioned (e.g., dedicated hardware and exclusive bandwidth allocations). However, it comes at the cost of reduced utilization—in other words, it is more expensive. On the other hand, multitenancy with -dynamic resource partitioning provides better utilization, so it is cheaper, but it has the downside -of variable delays. +dynamic resource partitioning provides better utilization, so it is cheaper, but it has the downside of variable delays. -Variable delays in networks are not a law of nature, but simply the result of a cost/benefit -trade-off. +Variable delays in networks are not a law of nature, but simply the result of a cost/benefit trade-off. -However, such quality of service is currently not enabled in multitenant datacenters and public -clouds, or when communicating via the internet. +-------- + +However, such quality of service is currently not enabled in multitenant datacenters and public clouds, or when communicating via the internet. Currently deployed technology does not allow us to make any guarantees about delays or reliability of the network: we have to assume that network congestion, queueing, and unbounded delays will -happen. Consequently, there’s no “correct” value for timeouts—they need to be determined -experimentally. +happen. Consequently, there’s no “correct” value for timeouts—they need to be determined experimentally. Peering agreements between internet service providers and the establishment of routes through the Border Gateway Protocol (BGP), bear closer resemblance to circuit switching than IP itself. At this level, it is possible to buy dedicated bandwidth. However, internet routing operates at the level of networks, not individual connections between hosts, and at a much longer timescale. -# Unreliable Clocks + + +## Unreliable Clocks Clocks and time are important. Applications depend on clocks in various ways to answer questions like the following: @@ -509,13 +519,13 @@ synchronize clocks to some degree: the most commonly used mechanism is the Netwo allows the computer clock to be adjusted according to the time reported by a group of servers [^39]. The servers in turn get their time from a more accurate time source, such as a GPS receiver. -## Monotonic Versus Time-of-Day Clocks +### Monotonic Versus Time-of-Day Clocks Modern computers have at least two different kinds of clocks: a *time-of-day clock* and a *monotonic clock*. Although they both measure time, it is important to distinguish the two, since they serve different purposes. -### Time-of-day clocks +#### Time-of-day clocks A time-of-day clock does what you intuitively expect of a clock: it returns the current date and time according to some calendar (also known as *wall-clock time*). For example, @@ -539,11 +549,10 @@ Time-of-day clocks have also historically had quite a coarse-grained resolution, in steps of 10 ms on older Windows systems [^41]. On recent systems, this is less of a problem. -### Monotonic clocks +#### Monotonic clocks A monotonic clock is suitable for measuring a duration (time interval), such as a timeout or a -service’s response time: `clock_gettime(CLOCK_MONOTONIC)` or `clock_gettime(CLOCK_BOOTTIME)` on -Linux [^42] +service’s response time: `clock_gettime(CLOCK_MONOTONIC)` or `clock_gettime(CLOCK_BOOTTIME)` on Linux [^42] and `System.nanoTime()` in Java are monotonic clocks, for example. The name comes from the fact that they are guaranteed to always move forward (whereas a time-of-day clock may jump back in time). @@ -571,7 +580,7 @@ In a distributed system, using a monotonic clock for measuring elapsed time (e.g usually fine, because it doesn’t assume any synchronization between different nodes’ clocks and is not sensitive to slight inaccuracies of measurement. -## Clock Synchronization and Accuracy +### Clock Synchronization and Accuracy Monotonic clocks don’t need synchronization, but time-of-day clocks need to be set according to an NTP server or other external time source in order to be useful. Unfortunately, our methods for @@ -631,7 +640,7 @@ Some cloud providers have begun offering high-accuracy clock synchronization for However, clock synchronization still requires a lot of care. If your NTP daemon is misconfigured, or a firewall is blocking NTP traffic, the clock error due to drift can quickly become large. -## Relying on Synchronized Clocks +### Relying on Synchronized Clocks The problem with clocks is that while they seem simple and easy to use, they have a surprising number of pitfalls: a day may not have exactly 86,400 seconds, time-of-day clocks may move backward @@ -655,7 +664,7 @@ monitor the clock offsets between all the machines. Any node whose clock drifts others should be declared dead and removed from the cluster. Such monitoring ensures that you notice the broken clocks before they can cause too much damage. -### Timestamps for ordering events +#### Timestamps for ordering events Let’s consider one particular situation in which it is tempting, but dangerous, to rely on clocks: ordering of events across multiple nodes [^64]. @@ -718,8 +727,7 @@ arrived before it was sent, which is impossible. Could NTP synchronization be made accurate enough that such incorrect orderings cannot occur? Probably not, because NTP’s synchronization accuracy is itself limited by the network round-trip time, in addition to other sources of error such as quartz drift. To guarantee a correct ordering, -you would need the clock error to be significantly lower than the network delay, which is not -possible. +you would need the clock error to be significantly lower than the network delay, which is not possible. So-called *logical clocks* [^66], which are based on incrementing counters rather than an oscillating quartz crystal, are a safer alternative for ordering events (see [“Detecting Concurrent Writes”](/en/ch6#sec_replication_concurrent)). Logical clocks do not measure @@ -728,7 +736,7 @@ event happened before or after another). In contrast, time-of-day and monotonic measure actual elapsed time, are also known as *physical clocks*. We’ll look at logical clocks in more detail in [“ID Generators and Logical Clocks”](/en/ch10#sec_consistency_logical). -### Clock readings with a confidence interval +#### Clock readings with a confidence interval You may be able to read a machine’s time-of-day clock with microsecond or even nanosecond resolution. But even if you can get such a fine-grained measurement, that doesn’t mean the value is @@ -740,10 +748,8 @@ possible accuracy is probably to the tens of milliseconds, and the error may eas Thus, it doesn’t make sense to think of a clock reading as a point in time—it is more like a range of times, within a confidence interval: for example, a system may be 95% confident that the -time now is between 10.3 and 10.5 seconds past the minute, but it doesn’t know any more precisely -than that [^67]. -If we only know the time +/– 100 ms, the microsecond digits in the timestamp are -essentially meaningless. +time now is between 10.3 and 10.5 seconds past the minute, but it doesn’t know any more precisely than that [^67]. +If we only know the time +/– 100 ms, the microsecond digits in the timestamp are essentially meaningless. The uncertainty bound can be calculated based on your time source. If you have a GPS receiver or atomic clock directly attached to your computer, the expected error range is determined by @@ -763,7 +769,7 @@ timestamp. Based on its uncertainty calculations, the clock knows that the actua somewhere within that interval. The width of the interval depends, among other things, on how long it has been since the local quartz clock was last synchronized with a more accurate clock source. -### Synchronized clocks for global snapshots +#### Synchronized clocks for global snapshots In [“Snapshot Isolation and Repeatable Read”](/en/ch8#sec_transactions_snapshot_isolation) we discussed *multi-version concurrency control* (MVCC), which is a very useful feature in databases that need to support both small, fast read-write @@ -807,7 +813,7 @@ have a confidence interval, and the accurate clock sources only help keep that i systems are beginning to adopt similar approaches: for example, YugabyteDB can leverage ClockBound when running on AWS [^70], and several other systems now also rely on clock synchronization to various degrees [^71] [^72]. -## Process Pauses +### Process Pauses Let’s consider another example of dangerous clock use in a distributed system. Say you have a database with a single leader per shard. Only the leader is allowed to accept writes. How does a @@ -824,16 +830,16 @@ You can imagine the request-handling loop looking something like this: ```js while (true) { - request = getIncomingRequest(); + request = getIncomingRequest(); - // Ensure that the lease always has at least 10 seconds remaining - if (lease.expiryTimeMillis - System.currentTimeMillis() < 10000) { - lease = lease.renew(); - } + // Ensure that the lease always has at least 10 seconds remaining + if (lease.expiryTimeMillis - System.currentTimeMillis() < 10000) { + lease = lease.renew(); + } - if (lease.isValid()) { - process(request); - } + if (lease.isValid()) { + process(request); + } } ``` @@ -917,7 +923,7 @@ keeps moving and may even declare the paused node dead because it’s not respon the paused node may continue running, without even noticing that it was asleep until it checks its clock sometime later. -### Response time guarantees +#### Response time guarantees In many programming languages and operating systems, threads and processes may pause for an unbounded amount of time, as discussed. Those reasons for pausing *can* be eliminated if you try @@ -929,12 +935,16 @@ must respond quickly and predictably to their sensor inputs. In these systems, t *deadline* by which the software must respond; if it doesn’t meet the deadline, that may cause a failure of the entire system. These are so-called *hard real-time* systems. +-------- + > [!NOTE] > In embedded systems, *real-time* means that a system is carefully designed and tested to meet > specified timing guarantees in all circumstances. This meaning is in contrast to the more vague use of the > term *real-time* on the web, where it describes servers pushing data to clients and stream > processing without hard response time constraints (see [Link to Come]). +-------- + For example, if your car’s onboard sensors detect that you are currently experiencing a crash, you wouldn’t want the release of the airbag to be delayed due to an inopportune GC pause in the airbag release system. @@ -958,7 +968,7 @@ For most server-side data processing systems, real-time guarantees are simply no appropriate. Consequently, these systems must suffer the pauses and clock instability that come from operating in a non-real-time environment. -### Limiting the impact of garbage collection +#### Limiting the impact of garbage collection Garbage collection used to be one of the biggest reasons for process pauses [^79], but fortunately GC algorithms have improved a lot: a properly tuned collector will now usually pause @@ -990,7 +1000,9 @@ planned restart, like in a rolling upgrade (see [Chapter 5](/en/ch5#ch_encoding These measures cannot fully prevent garbage collection pauses, but they can usefully reduce their impact on the application. -# Knowledge, Truth, and Lies + + +## Knowledge, Truth, and Lies So far in this chapter we have explored the ways in which distributed systems are different from programs running on a single computer: there is no shared memory, only message passing via an @@ -1005,8 +1017,7 @@ exchanging messages with it. If a remote node doesn’t respond, there is no way it is in, because problems in the network cannot reliably be distinguished from problems at a node. Discussions of these systems border on the philosophical: What do we know to be true or false in our -system? How sure can we be of that knowledge, if the mechanisms for perception and measurement are -unreliable [^83]? +system? How sure can we be of that knowledge, if the mechanisms for perception and measurement are unreliable [^83]? Should software systems obey the laws that we expect of the physical world, such as cause and effect? Fortunately, we don’t need to go as far as figuring out the meaning of life. In a distributed @@ -1022,7 +1033,7 @@ we can make and the guarantees we may want to provide. In [Chapter 10](/en/ch10 look at some examples of distributed algorithms that provide particular guarantees under particular assumptions. -## The Majority Rules +### The Majority Rules Imagine a network with an asymmetric fault: a node is able to receive all messages sent to it, but any outgoing messages from that node are dropped or delayed [^22]. Even though that node is working @@ -1064,7 +1075,7 @@ tolerated). However, it is still safe, because there can only be only one majori system—there cannot be two majorities with conflicting decisions at the same time. We will discuss the use of quorums in more detail when we get to *consensus algorithms* in [Chapter 10](/en/ch10#ch_consistency). -## Distributed Locks and Leases +### Distributed Locks and Leases Locks and leases in distributed application are prone to be misused, and a common source of bugs [^84]. Let’s look at one particular case of how they can go wrong. @@ -1087,8 +1098,7 @@ wasted computational resources, which is not a big deal. But in the first two ca could be lost or corrupted data, which is much more serious. For example, [Figure 9-4](/en/ch9#fig_distributed_lease_pause) shows a data corruption bug due to an incorrect -implementation of locking. (The bug is not theoretical: HBase used to have this problem -[^85] [^86].) +implementation of locking. (The bug is not theoretical: HBase used to have this problem [^85] [^86].) Say you want to ensure that a file in a storage service can only be accessed by one client at a time, because if multiple clients tried to write to it, the file would become corrupted. You try to implement this by requiring a client to obtain a lease from a lock @@ -1115,7 +1125,7 @@ to [Figure 9-4](/en/ch9#fig_distributed_lease_pause). {{< figure src="/fig/ddia_0905.png" id="fig_distributed_lease_delay" caption="Figure 9-5. A message from a former leaseholder might be delayed for a long time, and arrive after another node has taken over the lease." class="w-full my-4" >}} -### Fencing off zombies and delayed requests +#### Fencing off zombies and delayed requests The term *zombie* is sometimes used to describe a former leaseholder who has not yet found out that it lost the lease, and who is still acting as if it was the current leaseholder. Since we cannot @@ -1141,12 +1151,16 @@ token*, which is a number that increases every time a lock is granted (e.g., inc service). We can then require that every time a client sends a write request to the storage service, it must include its current fencing token. +-------- + > [!NOTE] > There are several alternative names for fencing tokens. In Chubby, Google’s lock service, they are > called *sequencers* [^88], and in Kafka they are called *epoch numbers*. > In consensus algorithms, which we will discuss in [Chapter 10](/en/ch10#ch_consistency), the *ballot number* (Paxos) or > *term number* (Raft) serves a similar purpose. +-------- + In [Figure 9-6](/en/ch9#fig_distributed_fencing), client 1 acquires the lease with a token of 33, but then it goes into a long pause and the lease expires. Client 2 acquires the lease with a token of 34 (the number always increases) and then sends its write request to the storage service, including the @@ -1168,12 +1182,10 @@ read it, similarly to an atomic compare-and-set (CAS) operation. For example, ob services support such a check: Amazon S3 calls it *conditional writes*, Azure Blob Storage calls it *conditional headers*, and Google Cloud Storage calls it *request preconditions*. -### Fencing with multiple replicas +#### Fencing with multiple replicas If your clients need to write only to one storage service that supports such conditional writes, the -lock service is somewhat redundant -[^91] [^92], -since the lease assignment could have been implemented directly based on that storage service [^93]. +lock service is somewhat redundant [^91] [^92], since the lease assignment could have been implemented directly based on that storage service [^93]. However, once you have a fencing token you can also use it with multiple services or replicas, and ensure that the old leaseholder is fenced off on all of those services. @@ -1202,7 +1214,7 @@ As you can see from these examples, it is not safe to assume that there is only lease at any one time. Fortunately, with a bit of care you can use fencing tokens to prevent zombies and delayed requests from doing any damage. -## Byzantine Faults +### Byzantine Faults Fencing tokens can detect and block a node that is *inadvertently* acting in error (e.g., because it hasn’t yet found out that its lease has expired). However, if the node deliberately wanted to @@ -1219,7 +1231,7 @@ arbitrary faulty or corrupted responses)—for example, it might cast multiple c the same election. Such behavior is known as a *Byzantine fault*, and the problem of reaching consensus in this untrusting environment is known as the *Byzantine Generals Problem* [^94]. -# The Byzantine Generals Problem +> [!TIP] THE BYZANTINE GENERALS PROBLEM The Byzantine Generals Problem is a generalization of the so-called *Two Generals Problem* [^95], which imagines a situation in which two army generals need to agree on a battle plan. As they @@ -1240,6 +1252,8 @@ before computers [^96]. Lamport wanted to choose a nationality that would not offend any readers, and he was advised that calling it *The Albanian Generals Problem* was not such a good idea [^97]. +-------- + A system is *Byzantine fault-tolerant* if it continues to operate correctly even if some of the nodes are malfunctioning and not obeying the protocol, or if malicious attackers are interfering with the network. This concern is relevant in certain specific circumstances. For example: @@ -1285,7 +1299,7 @@ an attacker can compromise one node, they can probably compromise all of them, b probably running the same software. Thus, traditional mechanisms (authentication, access control, encryption, firewalls, and so on) continue to be the main protection against attackers. -### Weak forms of lying +#### Weak forms of lying Although we assume that nodes are generally honest, it can be worth adding mechanisms to software that guard against weak forms of “lying”—for example, invalid messages due to hardware issues, @@ -1308,7 +1322,7 @@ pragmatic steps toward better reliability. For example: incorrect time is detected as an outlier and is excluded from synchronization [^39]. The use of multiple servers makes NTP more robust than if it only uses a single server. -## System Model and Reality +### System Model and Reality Many algorithms have been designed to solve distributed systems problems—for example, we will examine solutions for the consensus problem in [Chapter 10](/en/ch10#ch_consistency). In order to be useful, these @@ -1377,7 +1391,7 @@ For modeling real systems, the partially synchronous model with crash-recovery f the most useful model. It allows for unbounded network delay, process pauses, and slow nodes. But how do distributed algorithms cope with that model? -### Defining the correctness of an algorithm +#### Defining the correctness of an algorithm To define what it means for an algorithm to be *correct*, we can describe its *properties*. For example, the output of a sorting algorithm has the property that for any two distinct elements of @@ -1403,7 +1417,7 @@ that we assume may occur in that system model. However, if all nodes crash, or a suddenly become infinitely long, then no algorithm will be able to get anything done. How can we still make useful guarantees even in a system model that allows complete failures? -### Safety and liveness +#### Safety and liveness To clarify the situation, it is worth distinguishing between two different kinds of properties: *safety* and *liveness* properties. In the example just given, *uniqueness* and *monotonic sequence* are @@ -1438,7 +1452,7 @@ network eventually recovers from an outage. The definition of the partially sync requires that eventually the system returns to a synchronous state—that is, any period of network interruption lasts only for a finite duration and is then repaired. -### Mapping system models to the real world +#### Mapping system models to the real world Safety and liveness properties and system models are very useful for reasoning about the correctness of a distributed algorithm. However, when implementing an algorithm in practice, the messy facts of @@ -1469,7 +1483,7 @@ They are incredibly helpful for distilling down the complexity of real systems t of faults that we can reason about, so that we can understand the problem and try to solve it systematically. -## Formal Methods and Randomized Testing +### Formal Methods and Randomized Testing How do we know that an algorithm satisfies the required properties? Due to concurrency, partial failures, and network delays there are a huge number of potential states. We need to guarantee @@ -1490,7 +1504,7 @@ testing (DST) use randomization to test a system in a wide range of situations. Amazon Web Services have successfully used a combination of these techniques on many of their products [^120] [^121]. -### Model checking and specification languages +#### Model checking and specification languages *Model checkers* are tools that help verify that an algorithm or system behaves as expected. An algorithm specification is written in a purpose-built language such as TLA+, Gallina, or FizzBee. These @@ -1518,7 +1532,7 @@ state space, but it risks that your specification and your implementation go out It is possible to check whether the model and the real implementation have equivalent behavior, but this requires instrumentation in the real implementation [^127]. -### Fault injection +#### Fault injection Many bugs are triggered when machine and network failures occur. Fault injection is an effective (and sometimes scary) technique that verifies whether a system’s implementation works as expected things @@ -1546,7 +1560,7 @@ simplify the process. Such frameworks come with integrations for various operati pre-built fault injectors [^129]. Jepsen has been remarkably effective at finding critical bugs in many widely-used systems [^130] [^131]. -### Deterministic simulation testing +#### Deterministic simulation testing Deterministic simulation testing (DST) has also become a popular complement to model-checking and fault injection. It uses a similar state space exploration process as a model checker, but it tests