2
0
Fork 0
mirror of https://github.com/Vonng/ddia.git synced 2026-06-21 17:07:12 +08:00
ddia/content/en/ch13.md
2026-02-15 10:54:24 +08:00

1778 lines
127 KiB
Markdown
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: "13. A Philosophy of Streaming Systems"
weight: 313
breadcrumbs: false
---
<a id="ch_philosophy"></a>
![](/map/ch12.png)
> *If a thing be ordained to another as to its end, its last end cannot consist in the preservation
> of its being. Hence a captain does not intend as a last end, the preservation of the ship
> entrusted to him, since a ship is ordained to something else as its end, viz. to navigation.*
>
> *(Often quoted as: If the highest aim of a captain was the preserve his ship, he would keep it in
> port forever.)*
>
> St. Thomas Aquinas, *Summa Theologica* (1265--1274)
> [!TIP] A NOTE FOR EARLY RELEASE READERS
> With Early Release ebooks, you get books in their earliest form---the author's raw and unedited
> content as they write---so you can take advantage of these technologies long before the official
> release of these titles.
>
> This will be the 13th chapter of the final book. The GitHub repo for this book is
> *[*https://github.com/ept/ddia2-feedback*](https://github.com/ept/ddia2-feedback)*.
>
> If you'd like to be actively involved in reviewing and commenting on this draft, please reach out on GitHub.
In [Chapter 2](/en/ch2#ch_nonfunctional) we discussed the goal of creating applications and systems
that are *reliable*, *scalable*, and *maintainable*. These themes have run through all of the
chapters: for example, we discussed many fault-tolerance algorithms that help improve reliability,
sharding to improve scalability, and mechanisms for evolution and abstraction that improve
maintainability.
In this chapter we will bring all of these ideas together, and build on the streaming/event-driven
architecture ideas from [Chapter 12](/en/ch12#ch_stream) in particular to develop a philosophy of
application development that meets those goals. This chapter is more opinionated than previous
chapters, presenting a deep-dive into one particular philosophy rather than comparing multiple
approaches.
## Data Integration {#sec_future_integration}
A recurring theme in this book has been that for any given problem, there are several solutions, all
of which have different pros, cons, and trade-offs. For example, when discussing storage engines in
[Chapter 4](/en/ch4#ch_storage), we saw log-structured storage, B-trees, and column-oriented
storage. When discussing replication in [Chapter 6](/en/ch6#ch_replication), we saw single-leader,
multi-leader, and leaderless approaches.
If you have a problem such as "I want to store some data and look it up again later," there is no
one right solution, but many different approaches that are each appropriate in different
circumstances. A software implementation typically has to pick one particular approach. It's hard
enough to get one code path robust and performing well---trying to do everything in one piece of
software almost guarantees that the implementation will be poor.
Thus, the most appropriate choice of software tool also depends on the circumstances. Every piece of
software, even a so-called "general-purpose" database, is designed for a particular usage pattern.
Faced with this profusion of alternatives, the first challenge is then to figure out the mapping
between the software products and the circumstances in which they are a good fit. Vendors are
understandably reluctant to tell you about the kinds of workloads for which their software is poorly
suited, but hopefully the previous chapters have equipped you with some questions to ask in order to
read between the lines and better understand the trade-offs.
However, even if you perfectly understand the mapping between tools and circumstances for their use,
there is another challenge: in complex applications, data is often used in several different ways.
There is unlikely to be one piece of software that is suitable for *all* the different circumstances
in which the data is used, so you inevitably end up having to cobble together several different
pieces of software in order to provide your application's functionality.
### Combining Specialized Tools by Deriving Data {#id442}
For example, it is common to need to integrate an OLTP database with a full-text search index in
order to handle queries for arbitrary keywords. Although some databases (such as PostgreSQL) include
a full-text indexing feature, which can be sufficient for simple applications [^1], more
sophisticated search facilities require specialist information retrieval tools. Conversely, search
indexes are generally not very suitable as a durable system of record, and so many applications need
to combine two different tools in order to satisfy all of the requirements.
We touched on the issue of integrating data systems in ["Keeping Systems in
Sync"](/en/ch12#sec_stream_sync). As the number of different representations of the data increases,
the integration problem becomes harder. Besides the database and the search index, perhaps you need
to keep copies of the data in analytics systems (data warehouses, or batch and stream processing
systems); maintain caches or denormalized versions of objects that were derived from the original
data; pass the data through machine learning, classification, ranking, or recommendation systems; or
send notifications based on changes to the data.
#### Reasoning about dataflows {#id443}
When copies of the same data need to be maintained in several storage systems in order to satisfy
different access patterns, you need to be very clear about the inputs and outputs: where is data
written first, and which representations are derived from which sources? How do you get data into
all the right places, in the right formats?
For example, you might arrange for data to first be written to a system of record database,
capturing the changes made to that database (see ["Change Data Capture"](/en/ch12#sec_stream_cdc))
and then applying the changes to the search index in the same order. If change data capture (CDC) is
the only way of updating the index, you can be confident that the index is entirely derived from the
system of record, and therefore consistent with it (barring bugs in the software). Writing to the
database is the only way of supplying new input into this system.
Allowing the application to directly write to both the search index and the database introduces the
problem shown in [Figure 12-4](/en/ch12#fig_stream_write_order), in which two clients concurrently
send conflicting writes, and the two storage systems process them in a different order. In this
case, neither the database nor the search index is "in charge" of determining the order of writes,
and so they may make contradictory decisions and become permanently inconsistent with each other.
If it is possible for you to funnel all user input through a single system that decides on an
ordering for all writes, it becomes much easier to derive other representations of the data by
processing the writes in the same order. This is an application of the state machine replication
approach that we saw in ["Consensus in Practice"](/en/ch10#sec_consistency_total_order). Whether you
use change data capture or an event sourcing log is less important than simply the principle of
deciding on a total order.
Updating a derived data system based on an event log can often be made deterministic and idempotent
(see ["Idempotence"](/en/ch12#sec_stream_idempotence)), making it quite easy to recover from faults.
#### Derived data versus distributed transactions {#sec_future_derived_vs_transactions}
The classic approach for keeping different data systems consistent with each other involves
distributed transactions, as discussed in ["Two-Phase Commit (2PC)"](/en/ch8#sec_transactions_2pc).
How does the approach of using derived data systems fare in comparison to distributed transactions?
At an abstract level, they achieve a similar goal by different means. Distributed transactions
decide on an ordering of writes by using locks for mutual exclusion, while CDC and event sourcing
use a log for ordering. Distributed transactions use atomic commit to ensure that changes take
effect exactly once, while log-based systems are often based on deterministic retry and idempotence.
The biggest difference is that transaction systems usually guarantee that after a value is written,
you can immediately read the up-to-date value (see ["Reading Your Own
Writes"](/en/ch6#sec_replication_ryw)). On the other hand, derived data systems are often updated
asynchronously, and so they do not by default guarantee that reads are up-to-date.
Within limited environments that are willing to pay the cost of distributed transactions, they have
been used successfully. However, XA has poor fault tolerance and performance characteristics (see
["Distributed Transactions Across Different Systems"](/en/ch8#sec_transactions_xa)), which severely
limit its usefulness. It might be possible to create a better protocol for distributed transactions,
but getting such a protocol widely adopted and integrated with existing tools would be challenging,
and is unlikely to happen soon.
In the absence of widespread support for a good distributed transaction protocol, log-based derived
data is the most promising approach for integrating different data systems. However, guarantees such
as reading your own writes are useful, and it is not productive to tell everyone "eventual
consistency is inevitable---suck it up and learn to deal with it" (at least not without good
guidance on *how* to deal with it).
Later in this chapter we will discuss some approaches for implementing stronger guarantees on top of
asynchronously derived systems, and work toward a middle ground between distributed transactions and
asynchronous log-based systems.
#### The limits of total ordering {#id335}
With systems that are small enough, constructing a totally ordered event log is entirely feasible
(as demonstrated by the popularity of databases with single-leader replication, which construct
precisely such a log). However, as systems are scaled toward bigger and more complex workloads,
limitations begin to emerge:
- In most cases, constructing a totally ordered log requires all events to pass through a *single
leader node* that decides on the ordering. If the throughput of events is greater than a single
machine can handle, you need to shard the log across multiple machines. The order of events in two
different shards is then ambiguous.
- If the servers are spread across multiple *geographically distributed* regions, for example in
order to tolerate an entire datacenter going offline, you typically have a separate leader in each
datacenter, because network delays make synchronous cross-datacenter coordination inefficient.
This implies an undefined ordering of events that originate in two different datacenters.
- When applications are deployed as *microservices*, a common design choice is to deploy each
service and its durable state as an independent unit, with no durable state shared between
services. When two events originate in different services, there is no defined order for those
events.
- Some applications maintain client-side state that is updated immediately on user input (without
waiting for confirmation from a server), and even continue to work offline. With such
applications, clients and servers are very likely to see events in different orders.
In formal terms, deciding on a total order of events is known as *total order broadcast*, which is
equivalent to consensus (see ["The Many Faces of Consensus"](/en/ch10#sec_consistency_faces)). Most
consensus algorithms are designed for situations in which the throughput of a single node is
sufficient to process the entire stream of events, and these algorithms do not provide a mechanism
for multiple nodes to share the work of ordering the events.
#### Ordering events to capture causality {#sec_future_capture_causality}
In cases where there is no causal link between events, the lack of a total order is not a big
problem, since concurrent events can be ordered arbitrarily. Some other cases are easy to handle:
for example, when there are multiple updates of the same object, they can be totally ordered by
routing all updates for a particular object ID to the same log shard. However, causal dependencies
sometimes arise in more subtle ways.
For example, consider a social networking service, and two users who were in a relationship but have
just broken up. One of the users removes the other as a friend, and then sends a message to their
remaining friends complaining about their ex-partner. The user's intention is that their ex-partner
should not see the rude message, since the message was sent after the friend status was revoked.
However, in a system that stores friendship status in one place and messages in another place, that
ordering dependency between the *unfriend* event and the *message-send* event may be lost. If the
causal dependency is not captured, a service that sends notifications about new messages may process
the *message-send* event before the *unfriend* event, and thus incorrectly send a notification to
the ex-partner.
In this example, the notifications are effectively a join between the messages and the friend list,
making it related to the timing issues of joins that we discussed previously (see ["Time-dependence
of joins"](/en/ch12#sec_stream_join_time)). Unfortunately, there does not seem to be a simple answer
to this problem [^2], [^3]. Starting points include:
- Logical timestamps can provide total ordering without coordination (see ["ID Generators and
Logical Clocks"](/en/ch10#sec_consistency_logical)), so they may help in cases where total order
broadcast is not feasible. However, they still require recipients to handle events that are
delivered out of order, and they require additional metadata to be passed around.
- If you can log an event to record the state of the system that the user saw before making a
decision, and give that event a unique identifier, then any later events can reference that event
identifier in order to record the causal dependency [^4].
- Conflict resolution algorithms (see ["Automatic conflict
resolution"](/en/ch6#sec_replication_automatic_resolution)) help with processing events that are
delivered in an unexpected order. They are useful for maintaining state, but they do not help if
actions have external side effects (such as sending a notification to a user).
Perhaps, patterns for application development will emerge in the future that allow causal
dependencies to be captured efficiently, and derived state to be maintained correctly, without
forcing all events to go through the bottleneck of total order broadcast.
### Batch and Stream Processing {#sec_future_batch_streaming}
The goal of data integration is to make sure that data ends up in the right form in all the right
places. Doing so requires consuming inputs, transforming, joining, filtering, aggregating, training
models, evaluating, and eventually writing to the appropriate outputs. Batch and stream processors
are the tools for achieving this goal. The outputs of batch and stream processes are derived
datasets such as search indexes, materialized views, recommendations to show to users, aggregate
metrics, and so on.
As we saw in [Chapter 11](/en/ch11#ch_batch) and [Chapter 12](/en/ch12#ch_stream), batch and stream
processing have a lot of principles in common, and the main fundamental difference is that stream
processors operate on unbounded datasets whereas batch process inputs are of a known, finite size.
#### Maintaining derived state {#id446}
Batch processing has a quite strong functional flavor (even if the code is not written in a
functional programming language): it encourages deterministic, pure functions whose output depends
only on the input and which have no side effects other than the explicit outputs, treating inputs as
immutable and outputs as append-only. Stream processing is similar, but it extends operators to
allow managed, fault-tolerant state.
The principle of deterministic functions with well-defined inputs and outputs is not only good for
fault tolerance, but also simplifies reasoning about the dataflows in an organization
[^5]. No matter whether the derived data is a search index, a statistical model, or a
cache, it is helpful to think in terms of data pipelines that derive one thing from another, pushing
state changes in one system through functional application code and applying the effects to derived
systems.
In principle, derived data systems could be maintained synchronously, just like a relational
database updates secondary indexes synchronously within the same transaction as writes to the table
being indexed. However, asynchrony is what makes systems based on event logs robust: it allows a
fault in one part of the system to be contained locally, whereas distributed transactions abort if
any one participant fails, so they tend to amplify failures by spreading them to the rest of the
system.
We saw in ["Sharding and Secondary Indexes"](/en/ch7#sec_sharding_secondary_indexes) that secondary
indexes often cross shard boundaries. A sharded system with secondary indexes either needs to send
writes to multiple shards (if the index is term-partitioned) or send reads to all shards (if the
index is document-partitioned). Such cross-shard communication is also most reliable and scalable if
the index is maintained asynchronously [^6].
#### Reprocessing data for application evolution {#sec_future_reprocessing}
When maintaining derived data, batch and stream processing are both useful. Stream processing allows
changes in the input to be reflected in derived views with low delay, whereas batch processing
allows large amounts of accumulated historical data to be reprocessed in order to derive new views
onto an existing dataset.
In particular, reprocessing existing data provides a good mechanism for maintaining a system,
evolving it to support new features and changed requirements. Without reprocessing, schema evolution
is limited to simple changes like adding a new optional field to a record, or adding a new type of
record. On the other hand, with reprocessing it is possible to restructure a dataset into a
completely different model in order to better serve new requirements.
> [!TIP] SCHEMA MIGRATIONS ON RAILWAYS
> Large-scale "schema migrations" occur in noncomputer systems as well. For example, in the early days
> of railway building in 19th-century England there were various competing standards for the gauge
> (the distance between the two rails). Trains built for one gauge couldn't run on tracks of another
> gauge, which restricted the possible interconnections in the train network [^7].
>
> After a single standard gauge was finally decided upon in 1846, tracks with other gauges had to be
> converted---but how do you do this without shutting down the train line for months or years? The
> solution is to first convert the track to *dual gauge* or *mixed gauge* by adding a third rail. This
> conversion can be done gradually, and when it is done, trains of both gauges can run on the line,
> using two of the three rails. Eventually, once all trains have been converted to the standard gauge,
> the rail providing the nonstandard gauge can be removed.
>
> "Reprocessing" the existing tracks in this way, and allowing the old and new versions to exist side
> by side, makes it possible to change the gauge gradually over the course of years. Nevertheless, it
> is an expensive undertaking, which is why nonstandard gauges still exist today. For example, the
> BART system in the San Francisco Bay Area uses a different gauge from the majority of the US.
Derived views allow *gradual* evolution. If you want to restructure a dataset, you do not need to
perform the migration as a sudden switch. Instead, you can maintain the old schema and the new
schema side by side as two independently derived views onto the same underlying data. You can then
start shifting a small number of users to the new view in order to test its performance and find any
bugs, while most users continue to be routed to the old view. Gradually, you can increase the
proportion of users accessing the new view, and eventually you can drop the old view [^8],
[^9].
The beauty of such a gradual migration is that every stage of the process is easily reversible if
something goes wrong: you always have a working system to go back to. By reducing the risk of
irreversible damage, you can be more confident about going ahead, and thus move faster to improve
your system [^10].
#### Unifying batch and stream processing {#id338}
An early proposal for unifying batch and stream processing was the *lambda architecture*
[^11], which had a number of problems [^12] and has fallen out of use. More
recent systems allow batch computations (reprocessing historical data) and stream computations
(processing events as they arrive) to be implemented in the same system [^13], an approach
that is sometimes known as the *kappa architecture* [^12].
Unifying batch and stream processing in one system requires the following features:
- The ability to replay historical events through the same processing engine that handles the stream
of recent events. For example, log-based message brokers have the ability to replay messages, and
some stream processors can read input from a distributed filesystem or object storage.
- Exactly-once semantics for stream processors---that is, ensuring that the output is the same as if
no faults had occurred, even if faults did in fact occur. Like with batch processing, this
requires discarding the partial output of any failed tasks.
- Tools for windowing by event time, not by processing time, since processing time is meaningless
when reprocessing historical events. For example, Apache Beam provides an API for expressing such
computations, which can then be run using Apache Flink or Google Cloud Dataflow.
## Unbundling Databases {#sec_future_unbundling}
At a most abstract level, databases, batch/stream processors, and operating systems all perform the
same functions: they store some data, and they allow you to process and query that data
[^14], [^15]. A database stores data in records of some data model (rows in tables,
documents, vertices in a graph, etc.) while an operating system's filesystem stores data in
files---but at their core, both are "information management" systems [^16]. As we saw in
[Chapter 11](/en/ch11#ch_batch), batch processors are like a distributed version of Unix.
Of course, there are many practical differences. For example, many filesystems do not cope very well
with a directory containing 10 million small files, whereas a database containing 10 million small
records is completely normal and unremarkable. Nevertheless, the similarities and differences
between operating systems and databases are worth exploring.
Unix and relational databases have approached the information management problem with very different
philosophies. Unix viewed its purpose as presenting programmers with a logical but fairly low-level
hardware abstraction, whereas relational databases wanted to give application programmers a
high-level abstraction that would hide the complexities of data structures on disk, concurrency,
crash recovery, and so on. Unix developed pipes and files that are just sequences of bytes, whereas
databases developed SQL and transactions.
Which approach is better? Of course, it depends what you want. Unix is "simpler" in the sense that
it is a fairly thin wrapper around hardware resources; relational databases are "simpler" in the
sense that a short declarative query can draw on a lot of powerful infrastructure (query
optimization, indexes, join methods, concurrency control, replication, etc.) without the author of
the query needing to understand the implementation details.
The tension between these philosophies has lasted for decades (both Unix and the relational model
emerged in the early 1970s) and still isn't resolved. For example, the NoSQL movement could be
interpreted as wanting to apply a Unix-esque approach of low-level abstractions to the domain of
distributed OLTP data storage.
This section attempts to reconcile the two philosophies, in the hope that we can combine the best of
both worlds.
### Composing Data Storage Technologies {#id447}
Over the course of this book we have discussed various features provided by databases and how they
work, including:
- Secondary indexes, which allow you to efficiently search for records based on the value of a
field;
- Materialized views, which are a kind of precomputed cache of query results;
- Replication logs, which keep copies of the data on other nodes up to date; and
- Full-text search indexes, which allow keyword search in text and which are built into some
relational databases [^1].
In Chapters [11](/en/ch11#ch_batch) and [12](/en/ch12#ch_stream), similar themes emerged. We talked
about building full-text search indexes, about materialized view maintenance, and about replicating
changes from a database to derived data systems using change data capture.
It seems that there are parallels between the features that are built into databases and the derived
data systems that people are building with batch and stream processors.
#### Creating an index {#id340}
Think about what happens when you run `CREATE INDEX` to create a new index in a relational database.
The database has to scan over a consistent snapshot of a table, pick out all of the field values
being indexed, sort them, and write out the index. Then it must process the backlog of writes that
have been made since the consistent snapshot was taken (assuming the table was not locked while
creating the index, so writes could continue). Once that is done, the database must continue to keep
the index up to date whenever a transaction writes to the table.
This process is remarkably similar to setting up a new follower replica (see ["Setting Up New
Followers"](/en/ch6#sec_replication_new_replica)), and also very similar to bootstrapping change
data capture in a streaming system (see ["Initial snapshot"](/en/ch12#sec_stream_cdc_snapshot)).
Whenever you run `CREATE INDEX`, the database essentially reprocesses the existing dataset and
derives the index as a new view onto the existing data. The existing data may be a snapshot of the
state rather than a log of all changes that ever happened, but the two are closely related.
#### The meta-database of everything {#id341}
In this light, the dataflow across an entire organization starts looking like one huge database
[^5]. Whenever a batch, stream, or ETL process transports data from one place and form to
another place and form, it is acting like the database subsystem that keeps indexes or materialized
views up to date.
Viewed like this, batch and stream processors are like elaborate implementations of triggers, stored
procedures, and materialized view maintenance algorithms. The derived data systems they maintain are
like different index types. For example, a relational database may support B-tree indexes, hash
indexes, spatial indexes, and other types of indexes. In the emerging architecture of derived data
systems, instead of implementing those facilities as features of a single integrated database
product, they are provided by various different pieces of software, running on different machines,
administered by different teams.
Where will these developments take us in the future? If we start from the premise that there is no
single data model or storage format that is suitable for all access patterns, there are two avenues
by which different storage and processing tools can nevertheless be composed into a cohesive system:
Federated databases: unifying reads
: It is possible to provide a unified query interface to a wide variety of underlying storage
engines and processing methods---an approach known as a *federated database* or *polystore*
[^17], [^18]. For example, PostgreSQL's *foreign data wrapper* feature fits this
pattern, as do federated query engines such as Trino, Hoptimator, and Xorq. Applications that
need a specialized data model or query interface can still access the underlying storage engines
directly, while users who want to combine data from disparate places can do so easily through
the federated interface.
A federated query interface follows the relational tradition of a single integrated system with
a high-level query language and elegant semantics, but a complicated implementation.
Unbundled databases: unifying writes
: While federation addresses read-only querying across several different systems, it does not have
a good answer to synchronizing writes across those systems. We said that within a single
database, creating a consistent index is a built-in feature. When we compose several storage
systems, we similarly need to ensure that all data changes end up in all the right places, even
in the face of faults. Making it easier to reliably plug together storage systems (e.g., through
change data capture and event logs) is like *unbundling* a database's index-maintenance features
in a way that can synchronize writes across disparate technologies [^5], [^19].
The unbundled approach follows the Unix tradition of small tools that do one thing well
[^20], that communicate through a uniform low-level API (pipes), and that can be
composed using a higher-level language (the shell) [^14].
#### Making unbundling work {#sec_future_unbundling_favor}
Federation and unbundling are two sides of the same coin: composing a reliable, scalable, and
maintainable system out of diverse components. Federated read-only querying requires mapping one
data model into another, which takes some thought but is ultimately quite a manageable problem.
Keeping the writes to several storage systems in sync is the harder engineering problem, and so we
will focus on it here.
The traditional approach to synchronizing writes requires distributed transactions across
heterogeneous storage systems [^17], which are problematic, as discussed previously.
Transactions within a single storage or stream processing system are feasible, but when data crosses
the boundary between different technologies, an asynchronous event log with idempotent writes is a
much more robust and practicable approach.
For example, distributed transactions are used within some stream processors to achieve exactly-once
semantics, and this can work quite well. However, when a transaction would need to involve systems
written by different groups of people (e.g., when data is written from a stream processor to a
distributed key-value store or search index), the lack of a standardized transaction protocol makes
integration much harder. An ordered log of events with idempotent consumers is a much simpler
abstraction, and thus much more feasible to implement across heterogeneous systems [^5].
The big advantage of log-based integration is *loose coupling* between the various components, which
manifests itself in two ways:
1. At a system level, asynchronous event streams make the system as a whole more robust to outages
or performance degradation of individual components. If a consumer runs slow or fails, the event
log can buffer messages, allowing the producer and any other consumers to continue running
unaffected. The faulty consumer can catch up when it is fixed, so it doesn't miss any data, and
the fault is contained. By contrast, the synchronous interaction of distributed transactions
tends to escalate local faults into large-scale failures.
2. At a human level, unbundling data systems allows different software components and services to
be developed, improved, and maintained independently from each other by different teams.
Specialization allows each team to focus on doing one thing well, with well-defined interfaces
to other teams' systems. Event logs provide an interface that is powerful enough to capture
fairly strong consistency properties (due to durability and ordering of events), but also
general enough to be applicable to almost any kind of data.
#### Unbundled versus integrated systems {#id448}
If unbundling does indeed become the way of the future, it will not replace databases in their
current form---they will still be needed as much as ever. Databases are still required for
maintaining state in stream processors, and in order to serve queries for the output of batch and
stream processors. Specialized query engines will continue to be important for particular workloads:
for example, query engines in data warehouses are optimized for exploratory analytic queries and
handle this kind of workload very well.
The complexity of running several different pieces of infrastructure can be a problem: each piece of
software has a learning curve, configuration issues, and operational quirks, and so it is worth
deploying as few moving parts as possible. A single integrated software product may also be able to
achieve better and more predictable performance on the kinds of workloads for which it is designed,
compared to a system consisting of several tools that you have composed with application code
[^21]. Building for scale that you don't need is wasted effort and may lock you into an
inflexible design. In effect, it is a form of premature optimization.
The goal of unbundling is not to compete with individual databases on performance for particular
workloads; the goal is to allow you to combine several different databases in order to achieve good
performance for a much wider range of workloads than is possible with a single piece of software.
It's about breadth, not depth.
Thus, if there is a single technology that does everything you need, you're most likely best off
simply using that product rather than trying to reimplement it yourself from lower-level components.
The advantages of unbundling and composition only come into the picture when there is no single
piece of software that satisfies all your requirements.
The tools for composing data systems are getting better: Debezium can extract change streams from
many databases, Kafka's protocol is becoming a de-facto standard for event streams, and incremental
view maintenance engines (see ["Incremental View Maintenance"](/en/ch12#sec_stream_ivm)) make it
possible to precompute and update caches of complex queries.
### Designing Applications Around Dataflow {#sec_future_dataflow}
The general idea of updating derived data when its underlying data changes is nothing new. For
example, spreadsheets have powerful dataflow programming capabilities [^22]: you can put a
formula in one cell (for example, the sum of cells in another column), and whenever any input to the
formula changes, the result of the formula is automatically recalculated. This is exactly what we
want at a data system level: when a record in a database changes, we want any index for that record
to be automatically updated, and any cached views or aggregations that depend on the record to be
automatically refreshed. You should not have to worry about the technical details of how this
refresh happens, but be able to simply trust that it works correctly.
Thus, most data systems still have something to learn from the features that VisiCalc already had in
1979 [^23]. The difference from spreadsheets is that today's data systems need to be
fault-tolerant, scalable, and store data durably. They also need to be able to integrate disparate
technologies written by different groups of people over time, and reuse existing libraries and
services: it is unrealistic to expect all software to be developed using one particular language,
framework, or tool.
In this section we will expand on these ideas and explore some ways of building applications around
the ideas of unbundled databases and dataflow.
#### Application code as a derivation function {#sec_future_dataflow_derivation}
When one dataset is derived from another, it goes through some kind of transformation function. For
example:
- A secondary index is a kind of derived dataset with a straightforward transformation function: for
each row or document in the base table, it picks out the values in the columns or fields being
indexed, and sorts by those values (assuming a SSTable or B-tree index, which are sorted by key).
- A full-text search index is created by applying various natural language processing functions such
as language detection, word segmentation, stemming or lemmatization, spelling correction, and
synonym identification, followed by building a data structure for efficient lookups (such as an
inverted index).
- In a machine learning system, we can consider the model as being derived from the training data by
applying various feature extraction and statistical analysis functions. When the model is applied
to new input data, the output of the model is derived from the input and the model (and hence,
indirectly, from the training data).
- A cache often contains an aggregation of data in the form in which it is going to be displayed in
a user interface (UI). Populating the cache thus requires knowledge of what fields are referenced
in the UI; changes in the UI may require updating the definition of how the cache is populated and
rebuilding the cache.
The derivation function for a secondary index is so commonly required that it is built into many
databases as a core feature, and you can invoke it by merely saying `CREATE INDEX`. For full-text
indexing, basic linguistic features for common languages may be built into a database, but the more
sophisticated features often require domain-specific tuning. In machine learning, feature
engineering is notoriously application-specific, and often has to incorporate detailed knowledge
about the user interaction and deployment of an application [^24].
When the function that creates a derived dataset is not a standard cookie-cutter function like
creating a secondary index, custom code is required to handle the application-specific aspects. And
this custom code is where many databases struggle. Although relational databases commonly support
triggers, stored procedures, and user-defined functions, which can be used to execute application
code within the database, they have been somewhat of an afterthought in database design.
#### Separation of application code and state {#id344}
In theory, databases could be deployment environments for arbitrary application code, like an
operating system. However, in practice they have turned out to be poorly suited for this purpose.
They do not fit well with the requirements of modern application development, such as dependency and
package management, version control, rolling upgrades, evolvability, monitoring, metrics, calls to
network services, and integration with external systems.
On the other hand, deployment and cluster management tools such as Kubernetes, Docker, Mesos, YARN,
and others are designed specifically for the purpose of running application code. By focusing on
doing one thing well, they are able to do it much better than a database that provides execution of
user-defined functions as one of its many features.
Most web applications today are deployed as stateless services, in which any user request can be
routed to any application server, and the server forgets everything about the request once it has
sent the response. This style of deployment is convenient, as servers can be added or removed at
will, but the state has to go somewhere: typically, a database. The trend has been to keep stateless
application logic separate from state management (databases): not putting application logic in the
database and not putting persistent state in the application [^25]. As people in the
functional programming community like to joke, "We believe in the separation of Church and state"
[^26].
> [!NOTE]
> Explaining a joke usually ruins it, but here is an explanation anyway so that nobody feels left out.
> *Church* is a reference to the mathematician Alonzo Church, who created the lambda calculus, an
> early form of computation that is the basis for most functional programming languages. The lambda
> calculus has no mutable state (i.e., no variables that can be overwritten), so one could say that
> mutable state is separate from Church's work.
In this typical web application model, the database acts as a kind of mutable shared variable that
can be accessed synchronously over the network. The application can read and update the variable,
and the database takes care of making it durable, providing some concurrency control and fault
tolerance.
However, in most programming languages you cannot subscribe to changes in a mutable variable---you
can only read it periodically. Unlike in a spreadsheet, readers of the variable don't get notified
if the value of the variable changes. (You can implement such notifications in your own code---this
is known as the *observer pattern*---but most languages do not have this pattern as a built-in
feature.)
Databases have inherited this passive approach to mutable data: if you want to find out whether the
content of the database has changed, often your only option is to poll (i.e., to repeat your query
periodically). Subscribing to changes is only just beginning to emerge as a feature.
#### Dataflow: Interplay between state changes and application code {#id450}
Thinking about applications in terms of dataflow implies renegotiating the relationship between
application code and state management. Instead of treating a database as a passive variable that is
manipulated by the application, we think much more about the interplay and collaboration between
state, state changes, and code that processes them. Application code responds to state changes in
one place by triggering state changes in another place.
We have already seen this idea in change data capture, in the actor model, in triggers, and
incremental view maintenance. Unbundling the database means taking this idea and applying it to the
creation of derived datasets outside of the primary database: caches, full-text search indexes,
machine learning, or analytics systems. We can use stream processing and messaging systems for this
purpose.
Maintaining derived data requires the following properties, which log-based message brokers can
provide:
- When maintaining derived data, the order of state changes is often important (if several views are
derived from an event log, they need to process the events in the same order so that they remain
consistent with each other).
- Fault tolerance is essential: losing just a single message causes the derived dataset to go
permanently out of sync with its data source. Both message delivery and derived state updates must
be reliable.
Stable message ordering and fault-tolerant message processing are quite stringent demands, but they
are much less expensive and more operationally robust than distributed transactions. Modern stream
processors can provide these ordering and reliability guarantees at scale, and they allow
application code to be run as stream operators.
This application code can do the arbitrary processing that built-in derivation functions in
databases generally don't provide. Like Unix tools chained by pipes, stream operators can be
composed to build large systems around dataflow. Each operator takes streams of state changes as
input, and produces other streams of state changes as output.
#### Stream processors and services {#id345}
The currently dominant style of application development involves breaking down functionality into a
set of *services* that communicate via synchronous network requests such as REST APIs. The advantage
of such a service-oriented architecture over a single monolithic application is primarily
organizational scalability through loose coupling: different teams can work on different services,
which reduces coordination effort between teams (as long as the services can be deployed and updated
independently).
Composing stream operators into dataflow systems has a lot of similar characteristics to the
microservices approach [^27], [^28]. However, the underlying communication mechanism
is very different: one-directional, asynchronous message streams rather than synchronous
request/response interactions.
Besides the advantages listed in ["Event-Driven Architectures"](/en/ch5#sec_encoding_dataflow_msg),
such as better fault tolerance, dataflow systems can also achieve better performance than
traditional REST APIs or RPC. For example, say a customer is purchasing an item that is priced in
one currency but paid for in another currency. In order to perform the currency conversion, you need
to know the current exchange rate. This operation could be implemented in two ways [^27],
[^29]:
1. In the microservices approach, the code that processes the purchase would probably query an
exchange-rate service or database in order to obtain the current rate for a particular currency.
2. In the dataflow approach, the code that processes purchases would subscribe to a stream of
exchange rate updates ahead of time, and record the current rate in a local database whenever it
changes. When it comes to processing the purchase, it only needs to query the local database.
The second approach has replaced a synchronous network request to another service with a query to a
local database (which may be on the same machine, even in the same process). In the microservices
approach, you could avoid the synchronous network request by caching the exchange rate locally in
the service that processes the purchase. However, in order to keep that cache fresh, you would need
to periodically poll for updated exchange rates, or subscribe to a stream of changes---which is
exactly what happens in the dataflow approach.
Not only is the dataflow approach faster, but it is also more robust to the failure of another
service. The fastest and most reliable network request is no network request at all! Instead of RPC,
we now have a stream join between purchase events and exchange rate update events.
The join is time-dependent: if the purchase events are reprocessed at a later point in time, the
exchange rate will have changed. If you want to reconstruct the original output, you will need to
obtain the historical exchange rate at the original time of purchase. No matter whether you query a
service or subscribe to a stream of exchange rate updates, you will need to handle this time
dependence (see ["Time-dependence of joins"](/en/ch12#sec_stream_join_time)).
Subscribing to a stream of changes, rather than querying the current state when needed, brings us
closer to a spreadsheet-like model of computation: when some piece of data changes, any derived data
that depends on it can swiftly be updated. There are still many open questions, for example around
issues like time-dependent joins, but building applications around dataflow ideas is a very
promising direction to explore.
### Observing Derived State {#sec_future_observing}
At an abstract level, the dataflow systems discussed in the last section give you a process for
creating derived datasets (such as search indexes, materialized views, and predictive models) and
keeping them up to date. Let's call that process the *write path*: whenever some piece of
information is written to the system, it may go through multiple stages of batch and stream
processing, and eventually every derived dataset is updated to incorporate the data that was
written. [Figure 13-1](/en/ch13#fig_future_write_read_paths) shows an example of updating a search
index.
{{< figure src="/fig/ddia_1301.png" id="fig_future_write_read_paths" caption="Figure 13-1. In a search index, writes (document updates) meet reads (queries)." class="w-full my-4" >}}
But why do you create the derived dataset in the first place? Most likely because you want to query
it again at a later time. This is the *read path*: when serving a user request you read from the
derived dataset, perhaps perform some more processing on the results, and construct the response to
the user.
Taken together, the write path and the read path encompass the whole journey of the data, from the
point where it is collected to the point where it is consumed (probably by another human). The write
path is the portion of the journey that is precomputed---i.e., that is done eagerly as soon as the
data comes in, regardless of whether anyone has asked to see it. The read path is the portion of the
journey that only happens when someone asks for it. If you are familiar with functional programming
languages, you might notice that the write path is similar to eager evaluation, and the read path is
similar to lazy evaluation.
The derived dataset is the place where the write path and the read path meet, as illustrated in
[Figure 13-1](/en/ch13#fig_future_write_read_paths). It represents a trade-off between the amount of
work that needs to be done at write time and the amount that needs to be done at read time.
#### Materialized views and caching {#id451}
A full-text search index is a good example: the write path updates the index, and the read path
searches the index for keywords. Both reads and writes need to do some work. Writes need to update
the index entries for all terms that appear in the document. Reads need to search for each of the
words in the query, and apply Boolean logic to find documents that contain *all* of the words in the
query (an `AND` operator), or *any* synonym of each of the words (an `OR` operator).
If you didn't have an index, a search query would have to scan over all documents (like `grep`),
which would get very expensive if you had a large number of documents. No index means less work on
the write path (no index to update), but a lot more work on the read path.
On the other hand, you could imagine precomputing the search results for all possible queries. In
that case, you would have less work to do on the read path: no Boolean logic, just find the results
for your query and return them. However, the write path would be a lot more expensive: the set of
possible search queries that could be asked is infinite (or at least exponential in the number of
terms in the corpus), and thus precomputing all possible search results would not be possible.
Another option would be to precompute the search results for only a fixed set of the most common
queries, so that they can be served quickly without having to go to the index. The uncommon queries
can still be served from the index. This would generally be called a *cache* of common queries,
although we could also call it a materialized view, as it would need to be updated when new
documents appear that should be included in the results of one of the common queries.
From this example we can see that an index is not the only possible boundary between the write path
and the read path. Caching of common search results is possible, and `grep`-like scanning without
the index is also possible on a small number of documents. Viewed like this, the role of caches,
indexes, and materialized views is simple: they shift the boundary between the read path and the
write path. They allow us to do more work on the write path, by precomputing results, in order to
save effort on the read path.
Shifting the boundary between work done on the write path and the read path was in fact the topic of
the social networking example in ["Case Study: Social Network Home
Timelines"](/en/ch2#sec_introduction_twitter). In that example, we also saw how the boundary between
write path and read path might be drawn differently for celebrities compared to ordinary users.
After 500 pages we have come full circle!
#### Stateful, offline-capable clients {#id347}
The idea of a boundary between write and read paths is interesting because we can discuss shifting
that boundary and explore what that shift means in practical terms. Let's look at the idea in a
different context.
In the past, web browsers were stateless clients that can only do useful things when you have an
internet connection (just about the only thing you could do offline was to scroll up and down in a
page that you had previously loaded while online). However, single-page JavaScript web apps now have
a lot of stateful capabilities, including client-side user interface interaction and persistent
local storage in the web browser. Mobile apps can similarly store a lot of state on the device and
don't require a round-trip to the server for most user interactions.
In ["Sync Engines and Local-First Software"](/en/ch6#sec_replication_offline_clients) we saw how
persistent local state enables a class of applications in which users can work offline, without an
internet connection, and sync with remote servers in the background when a network connection is
available [^30]. Since mobile devices sometimes have slow and unreliable cellular internet
connections, it's a big advantage for users if their user interface does not have to wait for
synchronous network requests, and if apps mostly work offline.
When we move away from the assumption of stateless clients talking to a central database and toward
state that is maintained on end-user devices, a world of new opportunities opens up. In particular,
we can think of the on-device state as a *cache of state on the server*. The pixels on the screen
are a materialized view onto model objects in the client app; the model objects are a local replica
of state in a remote datacenter [^31].
#### Pushing state changes to clients {#id348}
In a typical web page, if you load the page in a web browser and the data subsequently changes on
the server, the browser does not find out about the change until you reload the page. The browser
only reads the data at one point in time, assuming that it is static---it does not subscribe to
updates from the server. Thus, the state in the browser is a stale cache that is not updated unless
you explicitly poll for changes. (HTTP-based feed subscription protocols like RSS are really just a
basic form of polling.)
More recent protocols have moved beyond the basic request/response pattern of HTTP: server-sent
events (the EventSource API) and WebSockets provide communication channels by which a web browser
can keep an open TCP connection to a server, and the server can actively push messages to the
browser as long as it remains connected. This provides an opportunity for the server to actively
inform the end-user client about any changes to the state it has stored locally, reducing the
staleness of the client-side state.
In terms of our model of write path and read path, actively pushing state changes all the way to
client devices means extending the write path all the way to the end user. When a client is first
initialized, it would still need to use a read path to get its initial state, but thereafter it
could rely on a stream of state changes sent by the server. The ideas we discussed around stream
processing and messaging are not restricted to running only in a datacenter: we can take the ideas
further, and extend them all the way to end-user devices [^32].
The devices will be offline some of the time, and unable to receive any notifications of state
changes from the server during that time. But we already solved that problem: in ["Consumer
offsets"](/en/ch12#sec_stream_log_offsets) we discussed how a consumer of a log-based message broker
can reconnect after failing or becoming disconnected, and ensure that it doesn't miss any messages
that arrived while it was disconnected. The same technique works for individual users, where each
device is a small subscriber to a small stream of events.
#### End-to-end event streams {#id349}
Tools for developing stateful clients and user interfaces, such as React and Elm [^33],
already have the ability to update the rendered user interface in response to changes in the
underlying state. It would be very natural to extend this programming model to also allow a server
to push state-change events into this client-side event pipeline.
Thus, state changes could flow through an end-to-end write path: from the interaction on one device
that triggers a state change, via event logs and through several derived data systems and stream
processors, all the way to the user interface of a person observing the state on another device.
These state changes could be propagated with fairly low delay---say, under one second end to end.
Some applications, such as instant messaging and online games, already have such a "real-time"
architecture (in the sense of interactions with low delay, not in the sense of response time
guarantees). But why don't we build all applications this way?
The challenge is that the assumption of stateless clients and request/response interactions is very
deeply ingrained in our databases, libraries, frameworks, and protocols. Many datastores support
read and write operations where a request returns one response, but much fewer provide an ability to
subscribe to changes---i.e., a request that returns a stream of responses over time.
In order to extend the write path all the way to the end user, we would need to fundamentally
rethink the way we build many of these systems: moving away from request/response interaction and
toward publish/subscribe dataflow [^31]. This would require effort, but it would have the
advantage of making user interfaces more responsive and providing better offline support.
#### Reads are events too {#sec_future_read_events}
We discussed that when a stream processor writes derived data to a store (database, cache, or
index), and that store is queried, the store acts as the boundary between the write path and the
read path. The store allows random-access read queries to the data that would otherwise require
scanning the whole event log.
In many cases, the data storage is separate from the streaming system. But recall that stream
processors also need to maintain state to perform aggregations and joins. This state is normally
hidden inside the stream processor, but some frameworks allow it to also be queried by outside
clients [^34], turning the stream processor itself into a kind of simple database.
Let's take that idea further. As discussed so far, the writes to the store go through an event log,
while reads are transient network requests that go directly to the nodes that store the data being
queried. This is a reasonable design, but not the only possible one. It is also possible to
represent read requests as streams of events, and send both the read events and the write events
through a stream processor; the processor responds to read events by emitting the result of the read
to an output stream [^35].
When both the writes and the reads are represented as events, and routed to the same stream operator
in order to be handled, we are in fact performing a stream-table join between the stream of read
queries and the database. The read event needs to be sent to the database shard holding the data,
just like batch and stream processors need to copartition inputs on the same key when joining.
This correspondence between serving requests and performing joins is quite fundamental
[^36]. A one-off read request passes through the join operator, which then immediately
forgets the request; a subscribe request is a persistent join with past and future events on the
other side of the join.
Recording a log of read events potentially also has benefits with regard to tracking causal
dependencies and data provenance across a system: it would allow you to reconstruct what the user
saw before they made a particular decision. For example, in an online shop, it is likely that the
predicted shipping date and the inventory status shown to a customer affect whether they choose to
buy an item [^4]. To analyze this connection, you need to record the result of the user's
query of the shipping and inventory status.
Writing read requests to durable storage thus enables better tracking of causal dependencies, but it
incurs additional storage and I/O cost. Optimizing such systems to reduce the overhead is still an
open research problem [^2]. But if you already log read requests for operational purposes,
as a side effect of request processing, it is not such a great change to make the log the source of
the requests instead.
#### Multi-shard data processing {#sec_future_unbundled_multi_shard}
For queries that only touch a single shard, the effort of sending queries through a stream and
collecting a stream of responses is perhaps overkill. However, this idea opens the possibility of
distributed execution of complex queries that need to combine data from several shards, taking
advantage of the infrastructure for message routing, sharding, and joining that is already provided
by stream processors.
Storm's distributed RPC feature supports this usage pattern. For example, it has been used to
compute the number of people who have seen a URL on a social network---i.e., the union of the
follower sets of everyone who has posted that URL [^37]. As the set of users is sharded,
this computation requires combining results from many shards.
Another example of this pattern occurs in fraud prevention: in order to assess the risk of whether a
particular purchase event is fraudulent, you can examine the reputation scores of the user's IP
address, email address, billing address, shipping address, and so on. Each of these reputation
databases is itself sharded, and so collecting the scores for a particular purchase event requires a
sequence of joins with differently sharded datasets [^38].
The internal query execution graphs of data warehouse query engines have similar characteristics. If
you need to perform this kind of multi-shard join, it is probably simpler to use a database that
provides this feature than to implement it using a stream processor. However, treating queries as
streams provides an option for implementing large-scale applications that run against the limits of
conventional off-the-shelf solutions.
## Aiming for Correctness {#sec_future_correctness}
With stateless services that only read data, it is not a big deal if something goes wrong: you can
fix the bug and restart the service, and everything returns to normal. Stateful systems such as
databases are not so simple: they are designed to remember things forever (more or less), so if
something goes wrong, the effects also potentially last forever---which means they require more
careful thought [^39].
We want to build applications that are reliable and *correct* (i.e., programs whose semantics are
well defined and understood, even in the face of various faults). For approximately four decades,
the transaction properties of atomicity, isolation, and durability have been the tools of choice for
building correct applications. However, those foundations are weaker than they seem: witness for
example the confusion of weak isolation levels (see ["Weak Isolation
Levels"](/en/ch8#sec_transactions_isolation_levels)).
In some areas, transactions have been abandoned entirely and replaced with models that offer better
performance and scalability, but much messier semantics. *Consistency* is often talked about, but
poorly defined. Some people assert that we should "embrace weak consistency" for the sake of better
availability, while lacking a clear idea of what that actually means in practice.
For a topic that is so important, our understanding and our engineering methods are surprisingly
flaky. For example, it is very difficult to determine whether it is safe to run a particular
application at a particular transaction isolation level or replication configuration [^40],
[^41]. Often simple solutions appear to work correctly when concurrency is low and there are
no faults, but turn out to have many subtle bugs in more demanding circumstances.
For example, Kyle Kingsbury's Jepsen experiments [^42] have highlighted the stark
discrepancies between some products' claimed safety guarantees and their actual behavior in the
presence of network problems and crashes. Even if infrastructure products like databases were free
from problems, application code would still need to correctly use the features they provide, which
is error-prone if the configuration is hard to understand (which is the case with weak isolation
levels, quorum configurations, and so on).
If your application can tolerate occasionally corrupting or losing data in unpredictable ways, life
is a lot simpler, and you might be able to get away with simply crossing your fingers and hoping for
the best. On the other hand, if you need stronger assurances of correctness, then serializability
and atomic commit are established approaches, but they come at a cost: they typically only work in a
single datacenter (ruling out geographically distributed architectures), and they limit the scale
and fault-tolerance properties you can achieve.
While the traditional transaction approach is not going away, it is not the last word in making
applications correct and resilient to faults. In this section we will explore some ways of thinking
about correctness in the context of dataflow architectures.
### The End-to-End Argument for Databases {#sec_future_end_to_end}
Just because an application uses a data system that provides comparatively strong safety properties,
such as serializable transactions, that does not mean the application is guaranteed to be free from
data loss or corruption. For example, if an application has a bug that causes it to write incorrect
data, or delete data from a database, serializable transactions aren't going to save you. This is an
argument in favor of immutable and append-only data, because it is easier to recover from such
mistakes if you remove the ability of faulty code to destroy good data.
Although immutability is useful, it is not a cure-all by itself. Let's look at a more subtle example
of data corruption that can occur.
#### Exactly-once execution of an operation {#id353}
In ["Fault Tolerance"](/en/ch12#sec_stream_fault_tolerance) we encountered *exactly-once* (or
*effectively-once*) semantics. If something goes wrong while processing a message, you can either
give up (drop the message---i.e., incur data loss) or try again. If you try again, there is the risk
that it actually succeeded the first time, but you just didn't find out about the success, and so
the message ends up being processed twice.
Processing twice is a form of data corruption: it is undesirable to charge a customer twice for the
same service (billing them too much) or increment a counter twice (overstating some metric). In this
context, *exactly-once* means arranging the computation such that the final effect is the same as if
no faults had occurred, even if the operation actually was retried due to some fault. We previously
discussed a few approaches for achieving this goal.
One of the most effective approaches is to make the operation *idempotent*; that is, to ensure that
it has the same effect, no matter whether it is executed once or multiple times. However, taking an
operation that is not naturally idempotent and making it idempotent requires some effort and care:
you may need to maintain some additional metadata (such as the set of operation IDs that have
updated a value), and ensure fencing when failing over from one node to another (see ["Distributed
Locks and Leases"](/en/ch9#sec_distributed_lock_fencing)).
#### Duplicate suppression {#id354}
The same pattern of needing to suppress duplicates occurs in many other places besides stream
processing. For example, TCP uses sequence numbers on packets to put them in the correct order at
the recipient, and to determine whether any packets were lost or duplicated on the network. Any lost
packets are retransmitted and any duplicates are removed by the TCP stack before it hands the data
to an application.
However, this duplicate suppression only works within the context of a single TCP connection.
Imagine the TCP connection is a client's connection to a database, and it is currently executing the
transaction in [Example 13-1](/en/ch13#fig_future_non_idempotent). In many databases, a transaction
is tied to a client connection (if the client sends several queries, the database knows that they
belong to the same transaction because they are sent on the same TCP connection). If the client
suffers a network interruption and connection timeout after sending the `COMMIT`, but before hearing
back from the database server, it does not know whether the transaction has been committed or
aborted ([Figure 9-1](/en/ch9#fig_distributed_network)).
<a id="fig_future_non_idempotent"></a>
##### Example 13-1. A nonidempotent transfer of money from one account to another
``` sql
BEGIN TRANSACTION;
UPDATE accounts SET balance = balance + 11.00 WHERE account_id = 1234;
UPDATE accounts SET balance = balance - 11.00 WHERE account_id = 4321;
COMMIT;
```
The client can reconnect to the database and retry the transaction, but now it is outside of the
scope of TCP duplicate suppression. Since the transaction in
[Example 13-1](/en/ch13#fig_future_non_idempotent) is not idempotent, it could happen that \$22 is
transferred instead of the desired \$11. Thus, even though
[Example 13-1](/en/ch13#fig_future_non_idempotent) is a standard example for transaction atomicity,
it is actually not correct, and real banks do not work like this [^3].
Two-phase commit (see ["Two-Phase Commit (2PC)"](/en/ch8#sec_transactions_2pc)) protocols break the
1:1 mapping between a TCP connection and a transaction, since they must allow a transaction
coordinator to reconnect to a database after a network fault, and tell it whether to commit or abort
an in-doubt transaction. Is this sufficient to ensure that the transaction will only be executed
once? Unfortunately not.
Even if we can suppress duplicate transactions between the database client and server, we still need
to worry about the network between the end-user device and the application server. For example, if
the end-user client is a web browser, it probably uses an HTTP POST request to submit an instruction
to the server. Perhaps the user is on a weak cellular data connection, and they succeed in sending
the POST, but the signal becomes too weak before they are able to receive the response from the
server.
In this case, the user will probably be shown an error message, and they may retry manually. Web
browsers warn, "Are you sure you want to submit this form again?"---and the user says yes, because
they wanted the operation to happen. (The Post/Redirect/Get pattern [^43] avoids this
warning message in normal operation, but it doesn't help if the POST request times out.) From the
web server's point of view the retry is a separate request, and from the database's point of view it
is a separate transaction. The usual deduplication mechanisms don't help.
#### Uniquely identifying requests {#id355}
To make the request idempotent through several hops of network communication, it is not sufficient
to rely just on a transaction mechanism provided by a database---you need to consider the
*end-to-end* flow of the request.
For example, you could generate a unique identifier for a request (such as a UUID) and include it as
a hidden form field in the client application, or calculate a hash of all the relevant form fields
to derive the request ID [^3]. If the web browser submits the POST request twice, the two
requests will have the same request ID. You can then pass that request ID all the way through to the
database and check that you only ever execute one request with a given ID, as shown in
[Example 13-2](/en/ch13#fig_future_request_id).
<a id="fig_future_request_id"></a>
##### Example 13-2. Suppressing duplicate requests using a unique ID
``` sql
ALTER TABLE requests ADD UNIQUE (request_id);
BEGIN TRANSACTION;
INSERT INTO requests
(request_id, from_account, to_account, amount)
VALUES('0286FDB8-D7E1-423F-B40B-792B3608036C', 4321, 1234, 11.00);
UPDATE accounts SET balance = balance + 11.00 WHERE account_id = 1234;
UPDATE accounts SET balance = balance - 11.00 WHERE account_id = 4321;
COMMIT;
```
[Example 13-2](/en/ch13#fig_future_request_id) relies on a uniqueness constraint on the `request_id`
column. If a transaction attempts to insert an ID that already exists, the `INSERT` fails and the
transaction is aborted, preventing it from taking effect twice. Relational databases can generally
maintain a uniqueness constraint correctly, even at weak isolation levels (whereas an
application-level check-then-insert may fail under nonserializable isolation, as discussed in
["Write Skew and Phantoms"](/en/ch8#sec_transactions_write_skew)).
Besides suppressing duplicate requests, the `requests` table in
[Example 13-2](/en/ch13#fig_future_request_id) acts as a kind of event log, which can be useful for
event sourcing or change data capture. The updates to the account balances don't actually have to
happen in the same transaction as the insertion of the event, since they are redundant and could be
derived from the request event in a downstream consumer---as long as the event is processed exactly
once, which can again be enforced using the request ID.
#### The end-to-end argument {#sec_future_e2e_argument}
This scenario of suppressing duplicate transactions is just one example of a more general principle
called the *end-to-end argument*, which was articulated by Saltzer, Reed, and Clark in 1984
[^44]:
> The function in question can completely and correctly be implemented only with the knowledge and
> help of the application standing at the endpoints of the communication system. Therefore,
> providing that questioned function as a feature of the communication system itself is not
> possible. (Sometimes an incomplete version of the function provided by the communication system
> may be useful as a performance enhancement.)
In our example, the *function in question* was duplicate suppression. We saw that TCP suppresses
duplicate packets at the TCP connection level, and some stream processors provide so-called
exactly-once semantics at the message processing level, but that is not enough to prevent a user
from submitting a duplicate request if the first one times out. By themselves, TCP, database
transactions, and stream processors cannot entirely rule out these duplicates. Solving the problem
requires an end-to-end solution: a transaction identifier that is passed all the way from the
end-user client to the database.
The end-to-end argument also applies to checking the integrity of data: checksums built into
Ethernet, TCP, and TLS can detect corruption of packets in the network, but they cannot detect
corruption due to bugs in the software at the sending and receiving ends of the network connection,
or corruption on the disks where the data is stored. If you want to catch all possible sources of
data corruption, you also need end-to-end checksums.
A similar argument applies with encryption [^44]: the password on your home WiFi network
protects against people snooping your WiFi traffic, but not against attackers elsewhere on the
internet; TLS/SSL between your client and the server protects against network attackers, but not
against compromises of the server. Only end-to-end encryption and authentication can protect against
all of these things.
Although the low-level features (TCP duplicate suppression, Ethernet checksums, WiFi encryption)
cannot provide the desired end-to-end features by themselves, they are still useful, since they
reduce the probability of problems at the higher levels. For example, HTTP requests would often get
mangled if we didn't have TCP putting the packets back in the right order. We just need to remember
that the low-level reliability features are not by themselves sufficient to ensure end-to-end
correctness.
#### Applying end-to-end thinking in data systems {#id357}
This brings us back to the original thesis: just because an application uses a data system that
provides comparatively strong safety properties, such as serializable transactions, that does not
mean the application is guaranteed to be free from data loss or corruption. The application itself
needs to take end-to-end measures, such as duplicate suppression, as well.
That is a shame, because fault-tolerance mechanisms are hard to get right. Low-level reliability
mechanisms, such as those in TCP, work quite well, and so the remaining higher-level faults occur
fairly rarely. It would be really nice to wrap up the remaining high-level fault-tolerance machinery
in an abstraction so that application code needn't worry about it---but it seems that we have not
yet found the right abstraction.
Transactions have long been seen as a useful abstraction. As discussed in
[Chapter 8](/en/ch8#ch_transactions), they take a wide range of possible issues (concurrent writes,
constraint violations, crashes, network interruptions, disk failures) and collapse them down to two
possible outcomes: commit or abort. That is a huge simplification of the programming model, but it
is not enough.
Transactions are expensive, especially when they involve heterogeneous storage technologies (see
["Distributed Transactions Across Different Systems"](/en/ch8#sec_transactions_xa)). When we refuse
to use distributed transactions because they are too expensive, we end up having to reimplement
fault-tolerance mechanisms in application code. As numerous examples throughout this book have
shown, reasoning about concurrency and partial failure is difficult and counterintuitive, and so
most application-level mechanisms do not work correctly. The consequence is lost or corrupted data.
For these reasons, it is worth exploring fault-tolerance abstractions that make it easy to provide
application-specific end-to-end correctness properties, but also maintain good performance and good
operational characteristics in a large-scale distributed environment.
### Enforcing Constraints {#sec_future_constraints}
Let's think about correctness in the context of the ideas around unbundling databases. We saw that
end-to-end duplicate suppression can be achieved with a request ID that is passed all the way from
the client to the database that records the write. What about other kinds of constraints?
In particular, let's focus on uniqueness constraints---such as the one we relied on in
[Example 13-2](/en/ch13#fig_future_request_id). In ["Constraints and uniqueness
guarantees"](/en/ch10#sec_consistency_uniqueness) we saw several other examples of application
features that need to enforce uniqueness: a username or email address must uniquely identify a user,
a file storage service cannot have more than one file with the same name, and two people cannot book
the same seat on a flight or in a theater.
Other kinds of constraints are very similar: for example, ensuring that an account balance never
goes negative, that you don't sell more items than you have in stock in the warehouse, or that a
meeting room does not have overlapping bookings. Techniques that enforce uniqueness can often be
used for these kinds of constraints as well.
#### Uniqueness constraints require consensus {#id452}
In [Chapter 10](/en/ch10#ch_consistency) we saw that in a distributed setting, enforcing a
uniqueness constraint requires consensus: if there are several concurrent requests with the same
value, the system somehow needs to decide which one of the conflicting operations is accepted, and
reject the others as violations of the constraint.
The most common way of achieving this consensus is to make a single node the leader, and put it in
charge of making all the decisions. That works fine as long as you don't mind funneling all requests
through a single node (even if the client is on the other side of the world), and as long as that
node doesn't fail. Consensus algorithms like Raft tackle the problem of safely electing a new leader
if the current leader has failed (or is believed to have failed due to a network problem), and
preventing split brain.
Uniqueness checking can be scaled out by sharding based on the value that needs to be unique. For
example, if you need to ensure uniqueness by request ID, as in
[Example 13-2](/en/ch13#fig_future_request_id), you can ensure all requests with the same request ID
are routed to the same shard. If you need usernames to be unique, you can shard by hash of username.
However, asynchronous multi-leader replication is ruled out, because it could happen that different
leaders concurrently accept conflicting writes, and thus the values are no longer unique. If you
want to be able to immediately reject any writes that would violate the constraint, synchronous
coordination is unavoidable [^45].
#### Uniqueness in log-based messaging {#sec_future_uniqueness_log}
A shared log ensures that all consumers see messages in the same order---a guarantee that is
formally known as *total order broadcast* and is equivalent to consensus (see ["The Many Faces of
Consensus"](/en/ch10#sec_consistency_faces)). In the unbundled database approach with log-based
messaging, we can use a very similar approach to enforce uniqueness constraints.
A stream processor consumes all the messages in a log shard sequentially on a single thread. Thus,
if the log is sharded based on the value that needs to be unique, a stream processor can
unambiguously and deterministically decide which one of several conflicting operations came first in
the log. For example, in the case of several users trying to claim the same username
[^46]:
1. Every request for a username is encoded as a message, and appended to a shard determined by the
hash of the username.
2. A stream processor sequentially reads the requests in the log, using a local database to keep
track of which usernames are taken. For every request for a username that is available, it
records the name as taken and emits a success message to an output stream. For every request for
a username that is already taken, it emits a rejection message to an output stream.
3. The client that requested the username watches the output stream and waits for a success or
rejection message corresponding to its request.
This algorithm is the same as the construction for achieving consensus using a shared log, which we
saw in [Chapter 10](/en/ch10#ch_consistency). It scales easily to a large request throughput by
increasing the number of shards, as each shard can be processed independently.
The approach works not only for uniqueness constraints, but also for many other kinds of
constraints. Its fundamental principle is that any writes that may conflict are routed to the same
shard and processed sequentially. The definition of a conflict may depend on the application, but
the stream processor can use arbitrary logic to validate a request.
#### Multi-shard request processing {#id360}
Ensuring that an operation is executed atomically, while satisfying constraints, becomes more
interesting when several shards are involved. In [Example 13-2](/en/ch13#fig_future_request_id),
there are potentially three shards: the one containing the request ID, the one containing the payee
account, and the one containing the payer account. There is no reason why those three things should
be in the same shard, since they are all independent from each other.
In the traditional approach to databases, executing this transaction would require an atomic commit
across all three shards, which essentially forces it into a total order with respect to all other
transactions on any of those shards. Since there is now cross-shard coordination, different shards
can no longer be processed independently, so throughput is likely to suffer.
However, equivalent correctness can be achieved without cross-shard transactions using sharded logs
and stream processors. [Figure 13-2](/en/ch13#fig_future_multi_shard) shows an example of a payment
transaction that needs to check whether there is sufficient money in the source account, and if so,
atomically transfers some amount to a destination account while deducting fees. It works as follows
[^47]:
{{< figure src="/fig/ddia_1302.png" id="fig_future_multi_shard" caption="Figure 13-2. Checking whether a source account has enough money, and atomically transferring money to a destination account and a fees account, using event logs and stream processors." class="w-full my-4" >}}
1. The request to transfer money from the source account to the destination account is given a
unique request ID by the user's client, and appended to a log shard based on the source account
ID.
2. A stream processor reads the log of requests and maintains a database containing the state of
the source account and the IDs of requests it has already processed. The contents of this
database are entirely derived from the log. When the stream processor encounters a request with
an ID that it has not seen before, it checks in its local database whether the source account
has enough money to perform the transfer.
If yes, it updates its local database to reserve the payment amount on the source account, and
emits events to several other logs: an outgoing payment event to the log shard for the source
account (its own input log), an incoming payment event to the log shard for the destination
account, and an incoming payment event to the log shard for the fees account. The original
request ID is included in those emitted events.
3. Eventually the outgoing payment event is delivered back to the source account processor
(possibly after having received unrelated events in the meantime). The stream processor
recognises based on the request ID that this is a payment it previously reserved, and it now
executes the payment, again updating its local state of the source account. It ignores
duplicates based on request ID.
4. The log shards for the destination and fees accounts are consumed by independent stream
processing tasks. When they receive an incoming payment event, they update their local state to
reflect the payment, and they deduplicate events based on request ID.
[Figure 13-2](/en/ch13#fig_future_multi_shard) shows the three accounts as being in three separate
shards, but they could just as well be in the same shards---it doesn't matter. All we need is that
the events for any given account are processed strictly in log order with at-least-once semantics,
and that the stream processors are deterministic.
For example, consider what happens if the source account processor crashes while processing a
payment request. The output messages may or may not have been emitted before the crash occurred.
When it recovers from the crash, it will process the same request again (due to at-least-once
semantics), and it will make the same decision on whether to allow the payment (since it's
deterministic). It will therefore emit the same output messages with the same request ID to the
outgoing, incoming, and fees account shards. If they are duplicates, the downstream consumers will
ignore them based on the request ID.
Atomicity in this system comes not from any transactions, but from the fact that writing the initial
request event to the source account log is an atomic action. Once that one event in the log, all the
downstream events will eventually be written as well---possibly after stream processors have
recovered from crashes, and possibly with duplicates, but they will appear eventually.
With exactly-once semantics this example becomes easier to implement, since it ensures that the
stream processor's local state is consistent with the set of messages it has processed. Thus, if it
crashes and re-processes some messages, its local state is also reset to what it was before those
messages were processed.
If the user in [Figure 13-2](/en/ch13#fig_future_multi_shard) wants to find out whether their
transfer was approved or not, they can subscribe to the source account log shard and wait for the
outgoing payment event. In order to explicitly notify the user if the balance is insufficient, the
stream processor can emit a "declined payment" event to that log shard.
By breaking down the multi-shard transaction into several differently sharded stages and using the
end-to-end request ID, we have achieved the same correctness property (every request is applied
exactly once to both the payer and payee accounts), even in the presence of faults, and without
using an atomic commit protocol.
### Timeliness and Integrity {#sec_future_integrity}
A convenient property of many transactional systems is that as soon as one transaction commits, its
writes are immediately visible to other transactions. This property is formalized as *strict
serializability* (see ["Linearizability Versus
Serializability"](/en/ch10#sidebar_consistency_serializability)).
This is not the case when unbundling an operation across multiple stages of stream processors:
consumers of a log are asynchronous by design, so a sender does not wait until its message has been
processed by consumers. However, it is possible for a client to wait for a message to appear on an
output stream, like the user waiting for an outgoing payment or payment declined event in
[Figure 13-2](/en/ch13#fig_future_multi_shard), which depends on whether there was enough money in
the source account.
In this example, the correctness of the source account balance check does not depend on whether the
user making the request waits for the outcome. The waiting only has the purpose of synchronously
informing the user whether or not the payment succeeded, but this notification is decoupled from the
effects of processing the request.
More generally, the term *consistency* conflates two different requirements that are worth
considering separately:
Timeliness
: Timeliness means ensuring that users observe the system in an up-to-date state. We saw
previously that if a user reads from a stale copy of the data, they may observe it in an
inconsistent state (see ["Problems with Replication Lag"](/en/ch6#sec_replication_lag)).
However, that inconsistency is temporary, and will eventually be resolved simply by waiting and
trying again.
The CAP theorem uses consistency in the sense of linearizability, which is a strong way of
achieving timeliness. Weaker timeliness properties like *read-after-write* consistency can also
be useful.
Integrity
: Integrity means absence of corruption; i.e., no data loss, and no contradictory or false data.
In particular, if some derived dataset is maintained as a view onto some underlying data, the
derivation must be correct. For example, a database index must correctly reflect the contents of
the database---an index in which some records are missing is not very useful.
If integrity is violated, the inconsistency is permanent: waiting and trying again is not going
to fix database corruption in most cases. Instead, explicit checking and repair is needed. In
the context of ACID transactions, "consistency" is usually understood as some kind of
application-specific notion of integrity. Atomicity and durability are important tools for
preserving integrity.
In slogan form: violations of timeliness are "eventual consistency," whereas violations of integrity
are "perpetual inconsistency."
In most applications, integrity is much more important than timeliness. Violations of timeliness can
be annoying and confusing, but violations of integrity can be catastrophic.
For example, on your credit card statement, it is not surprising if a transaction that you made
within the last 24 hours does not yet appear---it is normal that these systems have a certain lag.
We know that banks reconcile and settle transactions asynchronously, and timeliness is not very
important here [^3]. However, it would be very bad if the statement balance was not equal
to the sum of the transactions plus the previous statement balance (an error in the sums), or if a
transaction was charged to you but not paid to the merchant (disappearing money). Such problems
would be violations of the integrity of the system.
#### Correctness of dataflow systems {#id453}
ACID transactions usually provide both timeliness (e.g., linearizability) and integrity (e.g.,
atomic commit) guarantees. Thus, if you approach application correctness from the point of view of
ACID transactions, the distinction between timeliness and integrity is fairly inconsequential.
On the other hand, an interesting property of the event-based dataflow systems that we have
discussed in this chapter is that they decouple timeliness and integrity. When processing event
streams asynchronously, there is no guarantee of timeliness, unless you explicitly build consumers
that wait for a message to arrive before returning. For example, a user could request a payment and
then read the state of their account before the stream processor has executed the request; the user
will not see the payment they just requested.
However, integrity is in fact central to streaming systems. *Exactly-once* or *effectively-once*
semantics is a mechanism for preserving integrity. If an event is lost, or if an event takes effect
twice, the integrity of a data system could be violated. Thus, fault-tolerant message delivery and
duplicate suppression (e.g., idempotent operations) are important for maintaining the integrity of a
data system in the face of faults.
As we saw in the last section, reliable stream processing systems can preserve integrity without
requiring distributed transactions and an atomic commit protocol, which means they can potentially
achieve comparable correctness with much better performance and operational robustness. We achieved
this integrity through a combination of mechanisms:
- Representing the content of the write operation as a single message, which can easily be written
atomically---an approach that fits very well with event sourcing
- Deriving all other state updates from that single message using deterministic derivation
functions, similarly to stored procedures
- Passing a client-generated request ID through all these levels of processing, enabling end-to-end
duplicate suppression and idempotence
- Making messages immutable and allowing derived data to be reprocessed from time to time, which
makes it easier to recover from bugs
#### Loosely interpreted constraints {#id362}
As discussed previously, enforcing a uniqueness constraint requires consensus, typically implemented
by funneling all events in a particular shard through a single node. This limitation is unavoidable
if we want the traditional form of uniqueness constraint, and stream processing cannot get around
it.
However, another thing to realize is that in many real applications there is actually a business
requirement to allow violations of what you might think of as hard constraints:
- If customers order more items than you have in your warehouse, you can order in more stock,
apologize to customers for the delay, and offer them a discount. This is actually the same as what
you'd have to do if, say, a forklift truck ran over some of the items in your warehouse, leaving
you with fewer items in stock than you thought you had [^3]. Thus, the apology workflow
already needs to be part of your business processes anyway in order to deal with forklift
incidents, and a hard constraint on the number of items in stock might be unnecessary.
- Similarly, many airlines overbook airplanes in the expectation that some passengers will miss
their flight, and many hotels overbook rooms, expecting that some guests will cancel. In these
cases, the constraint of "one person per seat" is deliberately violated for business reasons, and
compensation processes (refunds, upgrades, providing a complimentary room at a neighboring hotel)
are put in place to handle situations in which demand exceeds supply. Even if there was no
overbooking, apology and compensation processes would be needed in order to deal with flights
being cancelled due to bad weather or staff on strike---recovering from such issues is just a
normal part of business [^3].
- If someone withdraws more money than they have in their account, the bank can charge them an
overdraft fee and ask them to pay back what they owe. By limiting the total withdrawals per day,
the risk to the bank is bounded.
- In systems that integrate data between different organizations, inconsistencies will inevitably
arise, and correction mechanisms are necessary to handle them. As noted in ["Batch Use
Cases"](/en/ch11#sec_batch_output), settlement of payments between banks is an example of this.
In many business contexts, it is therefore acceptable to temporarily violate a constraint and fix it
up later by apologizing. This kind of change to correct a mistake is called a *compensating
transaction* [^48], [^49]. The cost of the apology (in terms of money or reputation)
varies, but it is often quite low: you can't unsend an email, but you can send a follow-up email
with a correction. If you accidentally charge a credit card twice, you can refund one of the
charges, and the cost to you is just the processing fees and perhaps a customer complaint. Once
money has been paid out of an ATM, you can't directly get it back, although in principle you can
send debt collectors to recover the money if the account was overdrawn and the customer won't pay it
back.
Whether the cost of the apology is acceptable is a business decision. If it is acceptable, the
traditional model of checking all constraints before even writing the data is unnecessarily
restrictive. It may well be reasonable to go ahead with a write optimistically, and to check the
constraint after the fact. You can still ensure that the validation occurs before doing things that
would be expensive to recover from, but that doesn't imply you must do the validation before you
even write the data.
These applications *do* require integrity: you would not want to lose a reservation, or have money
disappear due to mismatched credits and debits. But they *don't* require timeliness on the
enforcement of the constraint: if you have sold more items than you have in the warehouse, you can
patch up the problem after the fact by apologizing. Doing so is similar to the conflict resolution
approaches we discussed in ["Dealing with Conflicting
Writes"](/en/ch6#sec_replication_write_conflicts).
#### Coordination-avoiding data systems {#id454}
We have now made two interesting observations:
1. Dataflow systems can maintain integrity guarantees on derived data without atomic commit,
linearizability, or synchronous cross-shard coordination.
2. Although strict uniqueness constraints require timeliness and coordination, many applications
are actually fine with loose constraints that may be temporarily violated and fixed up later, as
long as integrity is preserved throughout.
Taken together, these observations mean that dataflow systems can provide the data management
services for many applications without requiring coordination, while still giving strong integrity
guarantees. Such *coordination-avoiding* data systems have a lot of appeal: they can achieve better
performance and fault tolerance than systems that need to perform synchronous coordination
[^45].
For example, such a system could operate distributed across multiple datacenters in a multi-leader
configuration, asynchronously replicating between regions. Any one datacenter can continue operating
independently from the others, because no synchronous cross-region coordination is required. Such a
system would have weak timeliness guarantees---it could not be linearizable without introducing
coordination---but it can still have strong integrity guarantees.
In this context, serializable transactions are still useful as part of maintaining derived state,
but they can be run at a small scope where they work well [^6]. Heterogeneous distributed
transactions such as XA transactions are not required. Synchronous coordination can still be
introduced in places where it is needed (for example, to enforce strict constraints before an
operation from which recovery is not possible), but there is no need for everything to pay the cost
of coordination if only a small part of an application needs it [^32].
Another way of looking at coordination and constraints: they reduce the number of apologies you have
to make due to inconsistencies, but potentially also reduce the performance and availability of your
system, and thus potentially increase the number of apologies you have to make due to outages. You
cannot reduce the number of apologies to zero, but you can aim to find the best trade-off for your
needs---the sweet spot where there are neither too many inconsistencies nor too many availability
problems.
### Trust, but Verify {#sec_future_verification}
All of our discussion of correctness, integrity, and fault-tolerance has been under the assumption
that certain things might go wrong, but other things won't. We call these assumptions our *system
model* (see ["System Model and Reality"](/en/ch9#sec_distributed_system_model)): for example, we
should assume that processes can crash, machines can suddenly lose power, and the network can
arbitrarily delay or drop messages. But we might also assume that data written to disk is not lost
after `fsync`, that data in memory is not corrupted, and that the multiplication instruction of our
CPU always returns the correct result.
These assumptions are quite reasonable, as they are true most of the time, and it would be difficult
to get anything done if we had to constantly worry about our computers making mistakes.
Traditionally, system models take a binary approach toward faults: we assume that some things can
happen, and other things can never happen. In reality, it is more a question of probabilities: some
things are more likely, other things less likely. The question is whether violations of our
assumptions happen often enough that we may encounter them in practice.
We have seen that data can become corrupted in memory (see ["Hardware and Software
Faults"](/en/ch2#sec_introduction_hardware_faults)), on disk (see ["Replication and
Durability"](/en/ch8#sidebar_transactions_durability)), and on the network (see ["Weak forms of
lying"](/en/ch9#sec_distributed_weak_lying)). Maybe this is something we should be paying more
attention to? If you are operating at large enough scale, even very unlikely things do happen.
#### Maintaining integrity in the face of software bugs {#id455}
Besides such hardware issues, there is always the risk of software bugs, which would not be caught
by lower-level network, memory, or filesystem checksums. Even widely used database software has
bugs: for example, past versions of MySQL have failed to correctly maintain uniqueness constraints
[^50] and PostgreSQL's serializable isolation level has exhibited write skew anomalies in
the past [^51], even though MySQL and PostgreSQL are robust and well-regarded databases
that have been battle-tested by many people for many years. In less mature software, the situation
is likely to be much worse.
Despite considerable efforts in careful design, testing, and review, bugs still creep in. Although
they are rare, and they eventually get found and fixed, there is still a period during which such
bugs can corrupt data.
When it comes to application code, we have to assume many more bugs, since most applications don't
receive anywhere near the amount of review and testing that database code does. Many applications
don't even correctly use the features that databases offer for preserving integrity, such as foreign
key or uniqueness constraints [^25].
Consistency in the sense of ACID is based on the idea that the database starts off in a consistent
state, and a transaction transforms it from one consistent state to another consistent state. Thus,
we expect the database to always be in a consistent state. However, this notion only makes sense if
you assume that the transaction is free from bugs. If the application uses the database incorrectly
in some way, for example using a weak isolation level unsafely, the integrity of the database cannot
be guaranteed.
#### Don't just blindly trust what they promise {#id364}
With both hardware and software not always living up to the ideal that we would like them to be, it
seems that data corruption is inevitable sooner or later. Thus, we should at least have a way of
finding out if data has been corrupted so that we can fix it and try to track down the source of the
error. Checking the integrity of data is known as *auditing*.
As discussed in ["Advantages of immutable events"](/en/ch12#sec_stream_immutability_pros), auditing
is not just for financial applications. However, auditability is very important in finance precisely
because everyone knows that mistakes happen, and we all recognize the need to be able to detect and
fix problems.
Mature systems similarly tend to consider the possibility of unlikely things going wrong, and manage
that risk. For example, large-scale storage systems such as HDFS and Amazon S3 do not fully trust
disks: they run background processes that continually read back files, compare them to other
replicas, and move files from one disk to another, in order to mitigate the risk of silent
corruption [^52], [^53].
If you want to be sure that your data is still there, you have to actually read it and check. Most
of the time it will still be there, but if it isn't, you really want to find out sooner rather than
later. By the same argument, it is important to try restoring from your backups from time to
time---otherwise you may only find out that your backup is broken when it is too late and you have
already lost data. Don't just blindly trust that it is all working.
Systems like HDFS and S3 still have to assume that disks work correctly most of the time---which is
a reasonable assumption, but not the same as assuming that they *always* work correctly. However,
not many systems currently have this kind of "trust, but verify" approach of continually auditing
themselves. Many assume that correctness guarantees are absolute and make no provision for the
possibility of rare data corruption. In the future we may see more *self-validating* or
*self-auditing* systems that continually check their own integrity, rather than relying on blind
trust [^54].
#### Designing for auditability {#id365}
If a transaction mutates several objects in a database, it is difficult to tell after the fact what
that transaction means. Even if you capture the transaction logs, the insertions, updates, and
deletions in various tables do not necessarily give a clear picture of *why* those mutations were
performed. The invocation of the application logic that decided on those mutations is transient and
cannot be reproduced.
By contrast, event-based systems can provide better auditability. In the event sourcing approach,
user input to the system is represented as a single immutable event, and any resulting state updates
are derived from that event. The derivation can be made deterministic and repeatable, so that
running the same log of events through the same version of the derivation code will result in the
same state updates.
Being explicit about dataflow makes the *provenance* of data much clearer, which makes integrity
checking much more feasible. For the event log, we can use hashes to check that the event storage
has not been corrupted. For any derived state, we can rerun the batch and stream processors that
derived it from the event log in order to check whether we get the same result, or even run a
redundant derivation in parallel.
A deterministic and well-defined dataflow also makes it easier to debug and trace the execution of a
system in order to determine why it did something [^4], [^55]. If something
unexpected occurred, it is valuable to have the diagnostic capability to reproduce the exact
circumstances that led to the unexpected event---a kind of time-travel debugging capability.
#### The end-to-end argument again {#id456}
If we cannot fully trust that every individual component of the system will be free from
corruption---that every piece of hardware is fault-free and that every piece of software is
bug-free---then we must at least periodically check the integrity of our data. If we don't check, we
won't find out about corruption until it is too late and it has caused some downstream damage, at
which point it will be much harder and more expensive to track down the problem.
Checking the integrity of data systems is best done in an end-to-end fashion: the more systems we
can include in an integrity check, the fewer opportunities there are for corruption to go unnoticed
at some stage of the process. If we can check that an entire derived data pipeline is correct end to
end, then any disks, networks, services, and algorithms along the path are implicitly included in
the check.
Having continuous end-to-end integrity checks gives you increased confidence about the correctness
of your systems, which in turn allows you to move faster [^56]. Like automated testing,
auditing increases the chances that bugs will be found quickly, and thus reduces the risk that a
change to the system or a new storage technology will cause damage. If you are not afraid of making
changes, you can much better evolve an application to meet changing requirements.
#### Tools for auditable data systems {#id366}
At present, not many data systems make auditability a top-level concern. Some applications implement
their own audit mechanisms, for example by logging all changes to a separate audit table, but
guaranteeing the integrity of the audit log and the database state is still difficult. A transaction
log can be made tamper-proof by periodically signing it with a hardware security module, but that
does not guarantee that the right transactions went into the log in the first place.
Blockchains such as Bitcoin or Ethereum are shared append-only logs with cryptographic consistency
checks; the transactions they store are events, and smart contracts are basically stream processors.
The consensus protocols they use ensure that all nodes agree on the same sequence of events. The
difference to the consensus protocols of [Chapter 10](/en/ch10#ch_consistency) is that blockchains
are Byzantine fault tolerant, i.e. they still work if some of the participating nodes have corrupted
data because the replicas continually check each other's integrity.
For most applications, blockchains have too high overhead to be useful. However, some of their
cryptographic tools can also be used in a lighterweight context. For example, *Merkle trees*
[^57], are trees of hashes that can be used to efficiently prove that a record appears in
some dataset (and a few other things). *Certificate transparency* uses cryptographically verified
append-only logs and Merkle trees to check the validity of TLS/SSL certificates [^58],
[^59]; it avoids needing a consensus protocol by having a single leader per log.
Integrity-checking and auditing algorithms, like those of certificate transparency and distributed
ledgers, might becoming more widely used in data systems in general in the future. Some work will be
needed to make them equally scalable as systems without cryptographic auditing, and to keep the
performance penalty as low as possible, but they are nevertheless interesting.
## Summary {#id367}
In this chapter we discussed new approaches to designing data systems based on ideas from stream
processing. We started with the observation that there is no one single tool that can efficiently
serve all possible use cases, and so applications necessarily need to compose several different
pieces of software to accomplish their goals. We discussed how to solve this *data integration*
problem by using batch processing and event streams to let data changes flow between different
systems.
In this approach, certain systems are designated as systems of record, and other data is derived
from them through transformations. In this way we can maintain indexes, materialized views, machine
learning models, statistical summaries, and more. By making these derivations and transformations
asynchronous and loosely coupled, a problem in one area is prevented from spreading to unrelated
parts of the system, increasing the robustness and fault-tolerance of the system as a whole.
Expressing dataflows as transformations from one dataset to another also helps evolve applications:
if you want to change one of the processing steps, for example to change the structure of an index
or cache, you can just rerun the new transformation code on the whole input dataset in order to
rederive the output. Similarly, if something goes wrong, you can fix the code and reprocess the data
in order to recover.
These processes are quite similar to what databases already do internally, so we recast the idea of
dataflow applications as *unbundling* the components of a database, and building an application by
composing these loosely coupled components.
Derived state can be updated by observing changes in the underlying data. Moreover, the derived
state itself can further be observed by downstream consumers. We can even take this dataflow all the
way through to the end-user device that is displaying the data, and thus build user interfaces that
dynamically update to reflect data changes and continue to work offline.
Next, we discussed how to ensure that all of this processing remains correct in the presence of
faults. We saw that strong integrity guarantees can be implemented scalably with asynchronous event
processing, by using end-to-end request identifiers to make operations idempotent and by checking
constraints asynchronously. Clients can either wait until the check has passed, or go ahead without
waiting but risk having to apologize about a constraint violation. This approach is much more
scalable and robust than the traditional approach of using distributed transactions, and fits with
how many business processes work in practice.
By structuring applications around dataflow and checking constraints asynchronously, we can avoid
most coordination and create systems that maintain integrity but still perform well, even in
geographically distributed scenarios and in the presence of faults. We then talked a little about
using audits to verify the integrity of data and detect corruption, and observed that the techniques
used by blockchains also have a similarity to event-based systems.
##### Footnotes
### References {#references}
[^1]: Rachid Belaid. [Postgres Full-Text Search is Good Enough!](https://rachbelaid.com/postgres-full-text-search-is-good-enough/) *rachbelaid.com*, July 2015. Archived at [perma.cc/ZVP9-YDCB](https://perma.cc/ZVP9-YDCB)
[^2]: Philippe Ajoux, Nathan Bronson, Sanjeev Kumar, Wyatt Lloyd, and Kaushik Veeraraghavan. [Challenges to Adopting Stronger Consistency at Scale](https://www.usenix.org/system/files/conference/hotos15/hotos15-paper-ajoux.pdf). At *15th USENIX Workshop on Hot Topics in Operating Systems* (HotOS), May 2015.
[^3]: Pat Helland and Dave Campbell. [Building on Quicksand](https://arxiv.org/pdf/0909.1788). At *4th Biennial Conference on Innovative Data Systems Research* (CIDR), January 2009.
[^4]: Jessica Kerr. [Provenance and Causality in Distributed Systems](https://jessitron.com/2016/09/25/provenance-and-causality-in-distributed-systems/). *jessitron.com*, September 2016. Archived at [perma.cc/DTD2-F8ZM](https://perma.cc/DTD2-F8ZM)
[^5]: Jay Kreps. [The Log: What Every Software Engineer Should Know About Real-Time Data's Unifying Abstraction](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying). *engineering.linkedin.com*, December 2013. Archived at [perma.cc/2JHR-FR64](https://perma.cc/2JHR-FR64)
[^6]: Pat Helland. [Life Beyond Distributed Transactions: An Apostate's Opinion](https://www.cidrdb.org/cidr2007/papers/cidr07p15.pdf). At *3rd Biennial Conference on Innovative Data Systems Research* (CIDR), January 2007.
[^7]: Lionel A. Smith. [The Broad Gauge Story](https://lionels.orpheusweb.co.uk/RailSteam/GWRBroadG/BGHist.html). *Journal of the Monmouthshire Railway Society*, Summer 1985. Archived at [perma.cc/DDK9-JA6X](https://perma.cc/DDK9-JA6X)
[^8]: Jacqueline Xu. [Online Migrations at Scale](https://stripe.com/blog/online-migrations). *stripe.com*, February 2017. Archived at [perma.cc/ZQY2-EAU2](https://perma.cc/ZQY2-EAU2)
[^9]: Flavio Santos and Robert Stephenson. [Changing the Wheels on a Moving Bus --- Spotify's Event Delivery Migration](https://engineering.atspotify.com/2021/10/changing-the-wheels-on-a-moving-bus-spotify-event-delivery-migration). *engineering.atspotify.com*, October 2021. Archived at [perma.cc/5C4V-G8EV](https://perma.cc/5C4V-G8EV)
[^10]: Molly Bartlett Dishman and Martin Fowler. [Agile Architecture](https://www.youtube.com/watch?v=VjKYO6DP3fo&list=PL055Epbe6d5aFJdvWNtTeg_UEHZEHdInE). At *O'Reilly Software Architecture Conference*, March 2015.
[^11]: Nathan Marz and James Warren. [*Big Data: Principles and Best Practices of Scalable Real-Time Data Systems*](https://www.manning.com/books/big-data). Manning, 2015. ISBN: 978-1-617-29034-3
[^12]: Jay Kreps. [Questioning the Lambda Architecture](https://www.oreilly.com/ideas/questioning-the-lambda-architecture). *oreilly.com*, July 2014. Archived at [perma.cc/PGH6-XUCH](https://perma.cc/PGH6-XUCH)
[^13]: Raul Castro Fernandez, Peter Pietzuch, Jay Kreps, Neha Narkhede, Jun Rao, Joel Koshy, Dong Lin, Chris Riccomini, and Guozhang Wang. [Liquid: Unifying Nearline and Offline Big Data Integration](https://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf). At *7th Biennial Conference on Innovative Data Systems Research* (CIDR), January 2015.
[^14]: Dennis M. Ritchie and Ken Thompson. [The UNIX Time-Sharing System](https://web.eecs.utk.edu/~qcao1/cs560/papers/paper-unix.pdf). *Communications of the ACM*, volume 17, issue 7, pages 365--375, July 1974. [doi:10.1145/361011.361061](https://doi.org/10.1145/361011.361061)
[^15]: Wes McKinney. [The Road to Composable Data Systems: Thoughts on the Last 15 Years and the Future](https://wesmckinney.com/blog/looking-back-15-years/). *wesmckinney.com*, September 2023. Archived at [perma.cc/J9SJ-886N](https://perma.cc/J9SJ-886N)
[^16]: Eric A. Brewer and Joseph M. Hellerstein. [CS262a: Advanced Topics in Computer Systems](https://people.eecs.berkeley.edu/~brewer/cs262/systemr.html). Lecture notes, University of California, Berkeley, *cs.berkeley.edu*, August 2011. Archived at [perma.cc/TE79-LGWU](https://perma.cc/TE79-LGWU)
[^17]: Michael Stonebraker. [The Case for Polystores](https://wp.sigmod.org/?p=1629). *wp.sigmod.org*, July 2015. Archived at [perma.cc/G7J2-KR45](https://perma.cc/G7J2-KR45)
[^18]: Jennie Duggan, Aaron J. Elmore, Michael Stonebraker, Magda Balazinska, Bill Howe, Jeremy Kepner, Sam Madden, David Maier, Tim Mattson, and Stan Zdonik. [The BigDAWG Polystore System](https://sigmod.org/publications/sigmodRecord/1506/pdfs/04_vision_Duggan.pdf). *ACM SIGMOD Record*, volume 44, issue 2, pages 11--16, June 2015. [doi:10.1145/2814710.2814713](https://doi.org/10.1145/2814710.2814713)
[^19]: David B. Lomet, Alan Fekete, Gerhard Weikum, and Mike Zwilling. [Unbundling Transaction Services in the Cloud](https://arxiv.org/pdf/0909.1768). At *4th Biennial Conference on Innovative Data Systems Research* (CIDR), January 2009.
[^20]: Martin Kleppmann and Jay Kreps. [Kafka, Samza and the Unix Philosophy of Distributed Data](https://martin.kleppmann.com/papers/kafka-debull15.pdf). *IEEE Data Engineering Bulletin*, volume 38, issue 4, pages 4--14, December 2015.
[^21]: John Hugg. [Winning Now and in the Future: Where Volt Active Data Shines](https://www.voltactivedata.com/blog/2016/03/winning-now-future-voltdb-shines/). *voltactivedata.com*, March 2016. Archived at [perma.cc/44MP-3MWM](https://perma.cc/44MP-3MWM)
[^22]: Felienne Hermans. [Spreadsheets Are Code](https://vimeo.com/145492419). At *Code Mesh*, November 2015.
[^23]: Dan Bricklin and Bob Frankston. [VisiCalc: Information from Its Creators](http://danbricklin.com/visicalc.htm). *danbricklin.com*. Archived at [archive.org](https://web.archive.org/web/20250905040530/http://danbricklin.com/visicalc.htm)
[^24]: D. Sculley, Gary Holt, Daniel Golovin, Eugene Davydov, Todd Phillips, Dietmar Ebner, Vinay Chaudhary, and Michael Young. [Machine Learning: The High-Interest Credit Card of Technical Debt](https://research.google.com/pubs/archive/43146.pdf). At *NIPS Workshop on Software Engineering for Machine Learning* (SE4ML), December 2014. Archived at <https://perma.cc/M3MD-U7WL>
[^25]: Peter Bailis, Alan Fekete, Michael J. Franklin, Ali Ghodsi, Joseph M. Hellerstein, and Ion Stoica. [Feral Concurrency Control: An Empirical Investigation of Modern Application Integrity](http://www.bailis.org/papers/feral-sigmod2015.pdf). At *ACM International Conference on Management of Data* (SIGMOD), June 2015. [doi:10.1145/2723372.2737784](https://doi.org/10.1145/2723372.2737784)
[^26]: Guy Steele. [Re: Need for Macros (Was Re: Icon)](https://people.csail.mit.edu/gregs/ll1-discuss-archive-html/msg01134.html). email to *ll1-discuss* mailing list, *people.csail.mit.edu*, December 2001. Archived at [perma.cc/K9X8-CJ65](https://perma.cc/K9X8-CJ65)
[^27]: Ben Stopford. [Microservices in a Streaming World](https://www.infoq.com/presentations/microservices-streaming). At *QCon London*, March 2016.
[^28]: Adam Bellemare. [*Building Event-Driven Microservices, 2nd Edition*](https://learning.oreilly.com/library/view/building-event-driven-microservices/9798341622180/). O'Reilly Media, 2025.
[^29]: Christian Posta. [Why Microservices Should Be Event Driven: Autonomy vs Authority](https://blog.christianposta.com/microservices/why-microservices-should-be-event-driven-autonomy-vs-authority/). *blog.christianposta.com*, May 2016. Archived at [perma.cc/E6N9-3X92](https://perma.cc/E6N9-3X92)
[^30]: Alex Feyerke. [Designing Offline-First Web Apps](https://alistapart.com/article/offline-first/). *alistapart.com*, December 2013. Archived at [perma.cc/WH7R-S2DS](https://perma.cc/WH7R-S2DS)
[^31]: Martin Kleppmann. [Turning the Database Inside-out with Apache Samza.](https://martin.kleppmann.com/2015/03/04/turning-the-database-inside-out.html) at *Strange Loop*, September 2014. Archived at [perma.cc/U6E8-A9MT](https://perma.cc/U6E8-A9MT)
[^32]: Sebastian Burckhardt, Daan Leijen, Jonathan Protzenko, and Manuel Fähndrich. [Global Sequence Protocol: A Robust Abstraction for Replicated Shared State](https://drops.dagstuhl.de/entities/document/10.4230/LIPIcs.ECOOP.2015.568). At *29th European Conference on Object-Oriented Programming* (ECOOP), July 2015. [doi:10.4230/LIPIcs.ECOOP.2015.568](https://doi.org/10.4230/LIPIcs.ECOOP.2015.568)
[^33]: Evan Czaplicki and Stephen Chong. [Asynchronous Functional Reactive Programming for GUIs](https://people.seas.harvard.edu/~chong/pubs/pldi13-elm.pdf). At *34th ACM SIGPLAN Conference on Programming Language Design and Implementation* (PLDI), June 2013. [doi:10.1145/2491956.2462161](https://doi.org/10.1145/2491956.2462161)
[^34]: Eno Thereska, Damian Guy, Michael Noll, and Neha Narkhede. [Unifying Stream Processing and Interactive Queries in Apache Kafka](https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/). *confluent.io*, October 2016. Archived at [perma.cc/W8JG-EAZF](https://perma.cc/W8JG-EAZF)
[^35]: Frank McSherry. [Dataflow as Database](https://github.com/frankmcsherry/blog/blob/master/posts/2016-07-17.md). *github.com*, July 2016. Archived at [perma.cc/384D-DUFH](https://perma.cc/384D-DUFH)
[^36]: Peter Alvaro. [I See What You Mean](https://www.youtube.com/watch?v=R2Aa4PivG0g). At *Strange Loop*, September 2015.
[^37]: Nathan Marz. [Trident: A High-Level Abstraction for Realtime Computation](https://blog.x.com/engineering/en_us/a/2012/trident-a-high-level-abstraction-for-realtime-computation). *blog.x.com*, August 2012. Archived at [archive.org](https://web.archive.org/web/20250515030808/https://blog.x.com/engineering/en_us/a/2012/trident-a-high-level-abstraction-for-realtime-computation)
[^38]: Edi Bice. [Low Latency Web Scale Fraud Prevention with Apache Samza, Kafka and Friends](https://www.slideshare.net/slideshow/extremely-low-latency-web-scale-fraud-prevention-with-apache-samza-kafka-and-friends/57068078). At *Merchant Risk Council MRC Vegas Conference*, March 2016. Archived at [perma.cc/T3H5-QN3R](https://perma.cc/T3H5-QN3R)
[^39]: Charity Majors. [The Accidental DBA](https://charity.wtf/2016/10/02/the-accidental-dba/). *charity.wtf*, October 2016. Archived at [perma.cc/6ANP-ARB6](https://perma.cc/6ANP-ARB6)
[^40]: Arthur J. Bernstein, Philip M. Lewis, and Shiyong Lu. [Semantic Conditions for Correctness at Different Isolation Levels](https://dsf.berkeley.edu/cs286/papers/isolation-icde2000.pdf). At *16th International Conference on Data Engineering* (ICDE), February 2000. [doi:10.1109/ICDE.2000.839387](https://doi.org/10.1109/ICDE.2000.839387)
[^41]: Sudhir Jorwekar, Alan Fekete, Krithi Ramamritham, and S. Sudarshan. [Automating the Detection of Snapshot Isolation Anomalies](https://www.vldb.org/conf/2007/papers/industrial/p1263-jorwekar.pdf). At *33rd International Conference on Very Large Data Bases* (VLDB), September 2007.
[^42]: Kyle Kingsbury. [Jespen: Distributed Systems Safety Research](https://jepsen.io/). *jepsen.io*.
[^43]: Michael Jouravlev. [Redirect After Post](https://www.theserverside.com/news/1365146/Redirect-After-Post). *theserverside.com*, August 2004. Archived at [archive.org](https://web.archive.org/web/20250904205736/https://www.theserverside.com/news/1365146/Redirect-After-Post)
[^44]: Jerome H. Saltzer, David P. Reed, and David D. Clark. [End-to-End Arguments in System Design](https://groups.csail.mit.edu/ana/Publications/PubPDFs/End-to-End%20Arguments%20in%20System%20Design.pdf). *ACM Transactions on Computer Systems*, volume 2, issue 4, pages 277--288, November 1984. [doi:10.1145/357401.357402](https://doi.org/10.1145/357401.357402)
[^45]: Peter Bailis, Alan Fekete, Michael J. Franklin, Ali Ghodsi, Joseph M. Hellerstein, and Ion Stoica. [Coordination Avoidance in Database Systems](https://arxiv.org/abs/1402.2237). *Proceedings of the VLDB Endowment*, volume 8, issue 3, pages 185--196, November 2014. [doi:10.14778/2735508.2735509](https://doi.org/10.14778/2735508.2735509)
[^46]: Alex Yarmula. [Strong Consistency in Manhattan](https://blog.x.com/engineering/en_us/a/2016/strong-consistency-in-manhattan). *blog.x.com*, March 2016. Archived at [archive.org](https://web.archive.org/web/20250713175819/https://blog.x.com/engineering/en_us/a/2016/strong-consistency-in-manhattan)
[^47]: Martin Kleppmann, Alastair R. Beresford, and Boerge Svingen. [Online Event Processing: Achieving consistency where distributed transactions have failed](https://martin.kleppmann.com/papers/olep-cacm.pdf). *Communications of the ACM*, volume 62, issue 5, pages 43-49, May 2019. [doi:10.1145/3312527](https://doi.org/10.1145/3312527)
[^48]: Jim Gray. [The Transaction Concept: Virtues and Limitations](https://jimgray.azurewebsites.net/papers/thetransactionconcept.pdf). At *7th International Conference on Very Large Data Bases* (VLDB), September 1981. Archived at [perma.cc/8VPT-N5H6](https://perma.cc/8VPT-N5H6)
[^49]: Hector Garcia-Molina and Kenneth Salem. [Sagas](https://www.cs.cornell.edu/andru/cs711/2002fa/reading/sagas.pdf). At *ACM International Conference on Management of Data* (SIGMOD), May 1987. [doi:10.1145/38713.38742](https://doi.org/10.1145/38713.38742)
[^50]: Annamalai Gurusami and Daniel Price. [Bug #73170: Duplicates in Unique Secondary Index Because of Fix of Bug#68021](https://bugs.mysql.com/bug.php?id=73170). *bugs.mysql.com*, July 2014. Archived at [perma.cc/P6BV-W7JJ](https://perma.cc/P6BV-W7JJ)
[^51]: Gary Fredericks. [Postgres Serializability Bug](https://github.com/gfredericks/pg-serializability-bug). *github.com*, September 2015. Archived at [perma.cc/N8UP-2822](https://perma.cc/N8UP-2822)
[^52]: Xiao Chen. [HDFS DataNode Scanners and Disk Checker Explained](https://www.cloudera.com/blog/technical/hdfs-datanode-scanners-and-disk-checker-explained.html). *blog.cloudera.com*, December 2016. Archived at [perma.cc/6S36-X98L](https://perma.cc/6S36-X98L)
[^53]: Daniel Persson. [How does Ceph scrubbing work?](https://www.youtube.com/watch?v=M9QGMoc3GU8) *youtube.com*, March 2022.
[^54]: Jay Kreps. [Getting Real About Distributed System Reliability](https://blog.empathybox.com/post/19574936361/getting-real-about-distributed-system-reliability). *blog.empathybox.com*, March 2012. Archived at [perma.cc/9B5Q-AEBW](https://perma.cc/9B5Q-AEBW)
[^55]: Martin Fowler. [The LMAX Architecture](https://martinfowler.com/articles/lmax.html). *martinfowler.com*, July 2011. Archived at [perma.cc/5AV4-N6RJ](https://perma.cc/5AV4-N6RJ)
[^56]: Sam Stokes. [Move Fast with Confidence](https://five-eights.com/2016/07/11/move-fast-with-confidence/). *five-eights.com*, July 2016. Archived at [perma.cc/J8C6-DHXB](https://perma.cc/J8C6-DHXB)
[^57]: Ralph C. Merkle. [A Digital Signature Based on a Conventional Encryption Function](https://people.eecs.berkeley.edu/~raluca/cs261-f15/readings/merkle.pdf). At *CRYPTO '87*, August 1987. [doi:10.1007/3-540-48184-2_32](https://doi.org/10.1007/3-540-48184-2_32)
[^58]: Ben Laurie. [Certificate Transparency](https://queue.acm.org/detail.cfm?id=2668154). *ACM Queue*, volume 12, issue 8, pages 10-19, August 2014. [doi:10.1145/2668152.2668154](https://doi.org/10.1145/2668152.2668154)
[^59]: Mark D. Ryan. [Enhanced Certificate Transparency and End-to-End Encrypted Mail](https://www.ndss-symposium.org/wp-content/uploads/2017/09/12_2_1.pdf). At *Network and Distributed System Security Symposium* (NDSS), February 2014. [doi:10.14722/ndss.2014.23379](https://doi.org/10.14722/ndss.2014.23379)