Test In chapter 1 somewhere he talks about Twitter and the fan-out, and how in the end they have one model for normal users and one for celebrities.
book-designing-data-intensive-applications#distinction-between-system-of-record-and-derived1 2 3"Not all systems make a clear distinction between systems of record and derived data in their architecture, but it's a very helpful distinction to make, because it clarifies the dataflow through your system: it makes explicit which parts of the system have which inputs and which outputs, and how they depend on each other." book-designing-data-intensive-applications#distinction-between-system-of-record-and-derived1 2 3
book-designing-data-intensive-applications#sessionization1"Another common use for grouping is collating all the activity events for a particular user session, in order to find out the sequence of actions that the user took a process called sessionization" book-designing-data-intensive-applications#sessionization1
book-designing-data-intensive-applications#minimizing-itreversabilityThis principle of minimizing irreversibility is beneficial for Agile software development book-designing-data-intensive-applications#minimizing-itreversability
book-designing-data-intensive-applications#process-it-later1To put it bluntly, Hadoop opened up the possibility of indiscriminately dumping data into HDFS, and only later figuring out how to process it further book-designing-data-intensive-applications#process-it-later1
book-designing-data-intensive-applications#no-single-ideal-view1There may not even be one ideal data model, but rather different views onto the data that are suitable for different purposes. book-designing-data-intensive-applications#no-single-ideal-view1
book-designing-data-intensive-applications#designing-for-frequent-faults1The MapReduce approach is more appropriate for larger jobs: jobs that process so much data and run for such a long time that they are likely to experience at least one task failure along the way. In that case, rerunning the entire job due to a single task failure would be wasteful. Even if recovery at the granularity of an individual task introduces overheads that make fault-free processing slower, it can still be a reasonable trade-off if the rate of task failures is high enough. book-designing-data-intensive-applications#designing-for-frequent-faults1
book-designing-data-intensive-applications#design-makes-less-sense-if-more-reliable1In an environment where tasks are not so often terminated, the design decisions of MapReduce make less sense. book-designing-data-intensive-applications#design-makes-less-sense-if-more-reliable1
book-designing-data-intensive-applications#framework-fixes-failures1Thanks to the framework, your code in a batch processing job does not need to worry about implementing fault-tolerance mechanisms: the framework can guarantee that the final output of a job is the same as if no faults had occurred, even though in reality various tasks perhaps had to be retried. book-designing-data-intensive-applications#framework-fixes-failures1
book-designing-data-intensive-applications#more-info-in-a-log1Immutable events also capture more information than just the current state. For example, on a shopping website, a customer may add an item to their cart and then remove it again. Although the second event cancels out the first event from the point of view of order fulfillment, it may be useful to know for analytics purposes that the customer was considering a particular item but then decided against it. book-designing-data-intensive-applications#more-info-in-a-log1
book-designing-data-intensive-applications#materialized-views-examples1We saw in "Databases and Streams" that a stream of changes to a database can be used to keep derived data systems, such as caches, search indexes, and data warehouses, up to date with a source database. We can regard these examples as specific cases of maintaining materialized views book-designing-data-intensive-applications#materialized-views-examples1
book-designing-data-intensive-applications#partitioned-stream-event-ordering-is-ambiguous1 2 3 4The order of events in two different partitions is then ambiguous. book-designing-data-intensive-applications#partitioned-stream-event-ordering-is-ambiguous1 2 3 4
book-designing-data-intensive-applications#record-state-of-what-was-seen1If you can log an event to record the state of the system that the user saw before making a decision, and give that event a unique identifier, then any later events can reference that event identifier in order to record the causal dependency [ 4 ]. We will return to this idea in "Reads are events too" . book-designing-data-intensive-applications#record-state-of-what-was-seen1
book-designing-data-intensive-applications#one-type-of-processing-on-the-other1In principle, one type of processing can be emulated on top of the other, although the performance characteristics vary: for example, microbatching may perform poorly on hopping or sliding windows book-designing-data-intensive-applications#one-type-of-processing-on-the-other1
Location: 79 However, it's also important to choose the right tool for the job, and different technologies each have their own strengths and weaknesses. As we shall see, relational databases are important but not the final word on dealing with data.
book-designing-data-intensive-applications#single-node-distributed-online-offline1 2 3 4Location: 93 This book uses less ambiguous terms, such as single-node versus distributed systems, or online/interactive versus offline/batch processing systems. book-designing-data-intensive-applications#single-node-distributed-online-offline1 2 3 4
Location: 112 we start with a batch processing approach to derived data,
Location: 118 and it includes pointers to the original literature throughout the text.
book-designing-data-intensive-applications#database-like-durabilityLocation: 204 there are message queues with database-like durability guarantees (Apache Kafka). book-designing-data-intensive-applications#database-like-durability
book-designing-data-intensive-applications#special-purpose-data-systemLocation: 216 Now you have essentially created a new, special-purpose data system from smaller, general-purpose components. Your composite data system may provide certain guarantees: e.g., that the cache will be correctly invalidated or updated on writes so that outside clients see consistent results. You are now not only an application developer, but also a data system designer. book-designing-data-intensive-applications#special-purpose-data-system
Location: 253 It is impossible to reduce the probability of a fault to zero; therefore it is usually best to design fault-tolerance mechanisms that prevent faults from causing failures.
Location: 255 building reliable systems from unreliable parts.
Location: 257 Counterintuitively, in such fault-tolerant systems, it can make sense to increase the rate of faults by triggering them deliberately — for example, by randomly killing individual processes without warning. Many critical bugs are actually due to poor error handling [3]; by deliberately inducing faults, you ensure that the fault-tolerance machinery is continually exercised and tested, which can increase your confidence that faults will be handled correctly when they occur naturally.
Location: 288 Moreover, in some cloud platforms such as Amazon Web Services (AWS) it is fairly common for virtual machine instances to become unavailable without warning [7], as the platforms are designed to prioritize flexibility and elasticityi over single-machine reliability.
book-designing-data-intensive-applications#constantly-check-itself1Location: 319 If a system is expected to provide some guarantee (for example, in a message queue, that the number of incoming messages equals the number of outgoing messages), it can constantly check itself while it is running and raise an alert if a discrepancy is found [12]. book-designing-data-intensive-applications#constantly-check-itself1
Location: 326 For example, one study of large internet services found that configuration errors by operators were the leading cause of outages, whereas hardware faults (servers or network) played a role in only 10–25% of outages [13].
Location: 368 Load can be described with a few numbers which we call load parameters
Location: 430 We therefore need to think of response time not as a single number, but as a distribution of values that you can measure.
book-designing-data-intensive-applications#head-of-line-blockingLocation: 479 it only takes a small number of slow requests to hold up the processing of subsequent requests — an effect sometimes known as head-of-line blocking book-designing-data-intensive-applications#head-of-line-blocking
Location: 481 Due to this effect, it is important to measure response times on the client side.
Location: 491 Even if only a small percentage of backend calls are slow, the chance of getting a slow call increases if an end-user request requires multiple backend calls, and so a higher proportion of end-user requests end up being slow (an effect known as tail latency amplification [24]).
Location: 538 An architecture that scales well for a particular application is built around assumptions of which operations will be common and which will be rare — the load parameters.
Location: 555 Operability Make it easy for operations teams to keep the system running smoothly.
book-designing-data-intensive-applications#simplicityLocation: 556 Simplicity Make it easy for new engineers to understand the system, by removing as much complexity as possible from the system. (Note this is not the same as simplicity of the user interface.) book-designing-data-intensive-applications#simplicity
book-designing-data-intensive-applications#evolvability-changes-in-futureLocation: 559 Evolvability Make it easy for engineers to make changes to the system in the future, adapting it for unanticipated use cases as requirements change. Also known as extensibility, modifiability, or plasticity. book-designing-data-intensive-applications#evolvability-changes-in-future
Location: 581 easy-to-understand operational model ("If I do X, Y will happen")
Location: 611 keep our eyes open for good abstractions that allow us to extract parts of a large system into well-defined, reusable components.
Location: 621 In this book, we search for ways of increasing agility on the level of a larger data system, perhaps consisting of several different applications or services with different characteristics.
book-designing-data-intensive-applications#evolvability1 2Location: 625 since this is such an important idea, we will use a different word to refer to agility on a data system level: evolvability book-designing-data-intensive-applications#evolvability1 2
Location: 885 Whether you store an ID or a text string is a question of duplication. When you use an ID, the information that is meaningful to humans (such as the word Philanthropy) is stored in only one place, and everything that refers to it uses an ID (which only has meaning within the database). When you store the text directly, you are duplicating the human-meaningful information in every record that uses it. The advantage of using an ID is that because it has no meaning to humans, it never needs to change: the ID can remain the same, even if the information it identifies changes. Anything that is meaningful to humans may need to change sometime in the future — and if that information is duplicated, all the redundant copies need to be updated. That incurs write overheads, and risks inconsistencies (where some copies of the information are updated but others aren't). Removing such duplication is the key idea behind normalization in databases.ii
book-designing-data-intensive-applications#join-free-document-model1Location: 907 Moreover, even if the initial version of an application fits well in a join-free document model, data has a tendency of becoming more interconnected as features are added to applications. book-designing-data-intensive-applications#join-free-document-model1
book-designing-data-intensive-applications#schema-on-read-or-write1 2Location: 1,026 A more accurate term is schema-on-read (the structure of the data is implicit, and only interpreted when the data is read), in contrast with schema-on-write (the traditional approach of relational databases, where the schema is explicit and the database ensures all written data conforms to it) book-designing-data-intensive-applications#schema-on-read-or-write1 2
Location: 1,089 For example, Google's Spanner database offers the same locality properties in a relational data model, by allowing the schema to declare that a table's rows should be interleaved (nested) within a parent table [27].
Location: 1,148 A declarative query language is attractive because it is typically more concise and easier to work with than an imperative API.
Location: 1,395 headvertex integer REFERENCES vertices (vertexid),
Location: 1,414 Graphs are good for evolvability: as you add features to your application, a graph can easily be extended to accommodate changes in your application's data structures.
book-designing-data-intensive-applications#triple-store-modelLocation: 1,553 The triple-store model is mostly equivalent to the property graph model, using different words to describe the same ideas. book-designing-data-intensive-applications#triple-store-model
Location: 1,685 writing a triple as (subject, predicate, object),
book-designing-data-intensive-applications#five-tuples-in-datomicLocation: 1,792 Technically, Datomic uses 5-tuples rather than triples; the two additional fields are metadata for versioning. book-designing-data-intensive-applications#five-tuples-in-datomic
book-designing-data-intensive-applications#compaction1Location: 1,992 Compaction means throwing away duplicate keys in the log, and keeping only the most recent update for each key. book-designing-data-intensive-applications#compaction1
Location: 1,998 Segments are never modified after they have been written, so the merged segment is written to a new file. The
book-designing-data-intensive-applications#tombstone1Location: 2,010 you have to append a special deletion record to the data file (sometimes called a tombstone). book-designing-data-intensive-applications#tombstone1
Location: 2,049 Sorted String Table, or SSTable
Location: 2,050 SSTables have several big advantages over log segments with hash indexes:
Location: 2,067 You still need an in-memory index to tell you the offsets for some of the keys, but it can be sparse: one key for every few kilobytes of segment file is sufficient, because a few kilobytes can be scanned very quickly.i
Location: 2,071 Each entry of the sparse in-memory index then points at the start of a compressed block.
Location: 2,080 This in-memory tree is sometimes called a memtable.
Location: 2,083 In order to serve a read request, first try to find the key in the memtable, then in the most recent on-disk segment, then in the next-older segment, etc.
Location: 2,084 From time to time, run a merging and compaction process in the background to combine segment files and to discard overwritten or deleted values.
Location: 2,087 In order to avoid that problem, we can keep a separate log on disk to which every write is immediately appended, just like in the previous section. That log is not in sorted order, but that doesn't matter, because its only purpose is to restore the memtable after a crash.
Location: 2,101 inspired by Google's Bigtable paper 9.
Location: 2,103 Originally this indexing structure was described by Patrick O'Neil et al. under the name Log-Structured Merge-Tree (or LSM-Tree) [
Location: 2,122 In order to optimize this kind of access, storage engines often use additional Bloom filters
book-designing-data-intensive-applications#sstable-compaction-strategies1Location: 2,129 There are also different strategies to determine the order and timing of how SSTables are compacted and merged. The most common options are size-tiered and leveled compaction. LevelDB and RocksDB use leveled compaction (hence the name of LevelDB), HBase uses size-tiered, and Cassandra supports both [16]. In size-tiered compaction, newer and smaller SSTables are successively merged into older and larger SSTables. In leveled compaction, the key range is split up into smaller SSTables and older data is moved into separate "levels," which allows the compaction to proceed more incrementally and use less disk space. book-designing-data-intensive-applications#sstable-compaction-strategies1
Location: 2,167 The number of references to child pages in one page of the B-tree is called the branching factor
Location: 2,196 When the database comes back up after a crash, this log is used to restore the B-tree back to a consistent state
Location: 2,233 Reads are typically slower on LSM-trees because they have to check several different data structures and SSTables at different stages of compaction.
Location: 2,243 This effect — one write to the database resulting in multiple writes to the disk over the course of the database's lifetime — is known as write amplification
Location: 2,266 the response time of queries to log-structured storage engines can sometimes be quite high, and B-trees can be more predictable [28].
Location: 2,273 Typically, SSTable-based storage engines do not throttle the rate of incoming writes, even if compaction cannot keep up, so you need explicit monitoring to detect this situation [29
Location: 2,294 It is also very common to have secondary indexes
Location: 2,304 In the latter case, the place where rows are stored is known as a heap file
Location: 2,313 In some situations, the extra hop from the index to the heap file is too much of a performance penalty for reads, so it can be desirable to store the indexed row directly within an index. This is known as a clustered index
Location: 2,319 A compromise between a clustered index (storing all row data within the index) and a nonclustered index (storing only references to the data within the index) is known as a covering index or index with included columns, which stores some of a table's columns within the index [33]. This allows some queries to be answered by using the index alone (in which case, the index is said to cover the query)
Location: 2,330 column index is called a concatenated index, which simply combines several fields into one key by appending one column to another (the index definition specifies in which order the fields are concatenated).
Location: 2,393 Despite writing to disk, it's still an in-memory database, because the disk is merely used as an append-only log for durability, and reads are served entirely from memory.
Location: 2,409 Rather, they can be faster because they can avoid the overheads of encoding in-memory data structures in a form that can be written to disk [44
Location: 2,438 Because these applications are interactive, the access pattern became known as online transaction processing (OLTP).
Location: 2,448 it has been called online analytic processing (OLAP)
Location: 2,509 More recently, a plethora of open source SQL-on-Hadoop projects have emerged; they are young but aiming to compete with commercial data warehouse systems.
Location: 2,520 Many data warehouses are used in a fairly formulaic style, known as a star schema (also known as dimensional modeling [55]).
Location: 2,523 At the center of the schema is a so-called fact table (in this example, it is called fact_sales). Each row of the fact table represents an event that occurred at a particular time (here, each row represents a customer's purchase of a product).
Location: 2,528 Usually, facts are captured as individual events, because this allows maximum flexibility of analysis later. However, this means that the fact table can become extremely large. A big enterprise like Apple, Walmart, or eBay may have tens of petabytes of transaction history in its data warehouse, most of which is in fact tables [56].
Location: 2,533 As each row in the fact table represents an event, the dimensions represent the who, what, where, when, how, and why of the event.
Location: 2,538 Even date and time are often represented using dimension tables, because this allows additional information about dates (such as public holidays) to be encoded, allowing queries to differentiate between sales on holidays and non-holidays.
Location: 2,545 Snowflake schemas are more normalized than star schemas, but star schemas are often preferred because they are simpler for analysts to work with
Location: 2,583 If each column is stored in a separate file, a query only needs to read and parse those columns that are used in that query, which can save a lot of work.
Location: 2,603 One technique that is particularly effective in data warehouses is bitmap encoding
Location: 2,632 Developers of analytical databases also worry about efficiently using the bandwidth from main memory into the CPU cache, avoiding branch mispredictions and bubbles in the CPU instruction processing pipeline, and making use of single-instruction-multi-data (SIMD) instructions in modern CPUs [59, 60].
Location: 2,641 Operators, such as the bitwise AND and OR described previously, can be designed to operate on such chunks of compressed column data directly. This technique is known as vectorized processing [58, 49].
Location: 2,685 LSM-trees. All writes first go to an in-memory store, where they are added to a sorted structure and prepared for writing to disk. It doesn't matter whether the in-memory store is row-oriented or column-oriented. When enough writes have accumulated, they are merged with the column files on disk and written to new files in bulk.
Location: 2,692 Data Cubes
Location: 2,698 materialized aggregates
Location: 2,704 a virtual view is just a shortcut for writing queries.
Location: 2,710 A common special case of a materialized view is known as a data cube or OLAP cube [64]. It is a grid of aggregates grouped by different dimensions.
Location: 2,924 the idea of evolvability: we should aim to build systems that make it easy to adapt to change
Location: 3,100 there are no field names (userName, favoriteNumber, interests). Instead, the encoded data contains field tags, which are numbers (1, 2, and 3). Those are the numbers that appear in the schema definition.
Location: 3,111 bit packing
Location: 3,121 We said previously that schemas inevitably need to change over time. We call this schema evolution
Location: 3,133 Therefore, to maintain backward compatibility, every field you add after the initial deployment of the schema must be optional or have a default value.
Location: 3,195 writer's schema
Location: 3,197 reader's schema
Location: 3,230 how does the reader know the writer's schema with which a particular piece of data was encoded? We can't just include the entire schema with every record, because the schema would likely be much bigger than the encoded data, making all the space savings from the binary encoding futile.
Location: 3,280 If you have an object container file (which embeds the writer's schema), you can simply open it using the Avro library and look at the data in the same way as you could look at a JSON file. The file is self-describing since it includes all the necessary metadata.
Location: 3,341 However, there is an additional snag. Say you add a field to a record schema, and the newer code writes a value for that new field to the database. Subsequently, an older version of the code (which doesn't yet know about the new field) reads the record, updates it, and writes it back. In this situation, the desirable behavior is usually for the old code to keep the new field intact, even though
Location: 3,355 data outlives code
Location: 3,362 Schema evolution thus allows the entire database to appear as if it was encoded with a single schema, even though the underlying storage may contain records encoded with various historical versions of the schema.
Location: 3,429 REST has been gaining popularity compared to SOAP, at least in the context of cross-organizational service integration
Location: 3,510 However, a RESTful API has other significant advantages: it is good for experimentation and debugging (you can simply make requests to it using a web browser or the command-line tool curl, without any code generation or software installation), it is supported by all mainstream programming languages and platforms, and there is a vast ecosystem of tools available (servers, caches, load balancers, proxies, firewalls, monitoring, debugging tools, testing tools, etc.). For these reasons, REST seems to be the predominant style for public APIs. The main focus of RPC frameworks is on requests between services owned by the same organization, typically within the same datacenter.
Location: 3,522 simplifying assumption
Location: 3,573 However, a consumer may itself publish messages to another topic
Location: 3,577 a consumer republishes messages to another topic, you may need to be careful to preserve unknown fields, to prevent the issue described previously in the context of databases
Location: 3,591 Location transparency works better in the actor model than in RPC, because the actor model already assumes that messages may be lost,
Location: 3,635 message brokers
Location: 3,647 union types.
Location: 3,803 By contrast, shared-nothing architectures [3] (sometimes called horizontal scaling or scaling out
book-designing-data-intensive-applications#partitioning-and-sharding1Location: 3,825 partitions so that different partitions can be assigned to different nodes (also known as sharding). book-designing-data-intensive-applications#partitioning-and-sharding1
Location: 3,837 In a large machine, although any CPU can access any part of memory, some banks of memory are closer to one CPU than to others (this is called nonuniform memory access, or NUMA [1
Location: 3,865 All of the difficulty in replication lies in handling changes to replicated data,
Location: 3,866 single-leader, multi-leader, and leaderless replication.
Location: 3,877 read-your-writes and monotonic reads guarantees.
Location: 3,885 leader-based replication (also known as active/passive or master–slave replication)
Location: 4,023 violate clients' durability expectations.
Location: 4,035 it could happen that two nodes both believe that they are the leader. This situation is called split brain
Location: 4,113 change data capture
Location: 4,148 In this read-scaling architecture, you can increase the capacity for serving read-only requests simply by adding more followers.
Location: 4,177 In this situation, we need read-after-write consistency, also known as read-your-writes consistency [24
Location: 4,190 The client can remember the timestamp of its most recent write — then the system can ensure that the replica serving any reads for that user reflects updates at least until that timestamp. If a replica is not sufficiently up to date, either the read can be handled by another replica or the query can wait until the replica has caught up. The timestamp could be a logical timestamp (something that indicates ordering of writes, such as the log sequence number) or the actual system clock (in which case clock synchronization becomes critical; see "Unreliable Clocks").
Location: 4,260 This is why transactions exist: they are a way for a database to provide stronger guarantees so that the application can be simpler.
Location: 4,396 Thus, the database must resolve the conflict in a convergent way, which means that all replicas must arrive at the same final value when all changes have been replicated.
Location: 4,397 convergent conflict resolution:
Location: 4,401 last write wins (LWW).
Location: 4,434 Conflict-free replicated datatypes (CRDTs)
Location: 4,443 Operational transformation [42] is the conflict resolution algorithm behind collaborative editing applications such as Etherpad [30] and Google Docs [31].
book-designing-data-intensive-applications#dynamo-styleLocation: 4,526 Riak, Cassandra, and Voldemort are open source datastores with leaderless replication models inspired by Dynamo, so this kind of database is also known as Dynamo-style. book-designing-data-intensive-applications#dynamo-style
Location: 4,554 Anti-entropy process In addition, some datastores have a background process
book-designing-data-intensive-applications#not-rolled-back-on-successful-replicasLocation: 4,620 and overall succeeded on fewer than w replicas, it is not rolled back on the replicas where it succeeded. book-designing-data-intensive-applications#not-rolled-back-on-successful-replicas
Location: 4,647 Eventual consistency is a deliberately vague guarantee, but for operability it's important to be able to quantify "eventual."
book-designing-data-intensive-applications#individual-nodes-can-be-slowLocation: 4,656 They can also tolerate individual nodes going slow, because requests don't have to wait for all n nodes to respond book-designing-data-intensive-applications#individual-nodes-can-be-slow
Location: 4,668 the designated n 'home' nodes for a value.
book-designing-data-intensive-applications#sloppy-quorum-not-traditional-quorumLocation: 4,675 Thus, a sloppy quorum actually isn't a quorum at all in the traditional sense. It's only an assurance of durability, namely that the data is stored on w nodes somewhere. book-designing-data-intensive-applications#sloppy-quorum-not-traditional-quorum
Location: 4,679 and in Cassandra and Voldemort they are disabled by default
book-designing-data-intensive-applications#concurrent-writes-order-undefinedLocation: 4,728 we say the writes are concurrent, so their order is undefined. book-designing-data-intensive-applications#concurrent-writes-order-undefined
book-designing-data-intensive-applications#cassandra-only-last-write-winsLocation: 4,732 last write wins (LWW), is the only supported conflict resolution method in Cassandra book-designing-data-intensive-applications#cassandra-only-last-write-wins
book-designing-data-intensive-applications#cassandra-use-uuid-as-a-keyLocation: 4,740 For example, a recommended way of using Cassandra is to use a UUID as the key, thus giving each write operation a unique key [53]. book-designing-data-intensive-applications#cassandra-use-uuid-as-a-key
book-designing-data-intensive-applications#concurrent-physical-timeLocation: 4,765 we simply call two operations concurrent if they are both unaware of each other, regardless of the physical time at which they occurred. book-designing-data-intensive-applications#concurrent-physical-time
book-designing-data-intensive-applications#siblingsLocation: 4,821 if several operations happen concurrently, clients have to clean up afterward by merging the concurrently written values. Riak calls these concurrent values siblings. book-designing-data-intensive-applications#siblings
Location: 4,837 Riak's datatype support uses a family of data structures called CRDTs [38, 39, 55] that can automatically merge siblings in sensible ways, including preserving deletions.
Location: 4,848 The collection of version numbers from all the replicas is called a version vector
Location: 4,857 (Riak encodes the version vector as a string that it calls causal context
Location: 4,857 The version vector allows the database to distinguish between overwrites and concurrent writes.
vv leaderless replication vv Location: 4,889 Leaderless replication Clients send each write to several nodes, and read from several nodes in parallel in order to detect and correct nodes with stale data.
book-designing-data-intensive-applications#monotonic-readsLocation: 4,900 Monotonic reads After users have seen the data at one point in time, they shouldn't later see the data from some earlier point in time. book-designing-data-intensive-applications#monotonic-reads
book-designing-data-intensive-applications#consistent-prefix-readsLocation: 4,902 Consistent prefix reads Users should see the data in a state that makes causal sense: for example, seeing a question and its reply in the correct order. book-designing-data-intensive-applications#consistent-prefix-reads
Location: 4,993 AppJet, Inc.: "Etherpad and EasySync Technical Manual," github.com, March 26, 2011.
Location: 4,995 John Day-Richter: "What's Different About the New Google Docs: Making Collaboration Fast," googledrive.blogspot.com, 23 September 2010.
Location: 5,045 Jonathan Ellis: "Why Cassandra Doesn't Need Vector Clocks,"
Location: 5,085 Different partitions can be placed on different nodes in a shared-nothing cluster
Location: 5,095 observe how the indexing of data interacts with partitioning.
book-designing-data-intensive-applications#rebalancingLocation: 5,095 We'll then talk about rebalancing, which is necessary if you want to add or remove nodes in your cluster. book-designing-data-intensive-applications#rebalancing
book-designing-data-intensive-applications#skew-makes-partitioning-less-effectiveLocation: 5,114 If the partitioning is unfair, so that some partitions have more data or queries than others, we call it skewed. The presence of skew makes partitioning much less effective. book-designing-data-intensive-applications#skew-makes-partitioning-less-effective
book-designing-data-intensive-applications#hot-spotLocation: 5,116 A partition with disproportionately high load is called a hot spot. book-designing-data-intensive-applications#hot-spot
book-designing-data-intensive-applications#separate-range-query-for-each-sensorLocation: 5,154 Now, when you want to fetch the values of multiple sensors within a time range, you need to perform a separate range query for each sensor name. book-designing-data-intensive-applications#separate-range-query-for-each-sensor
book-designing-data-intensive-applications#consistent-hashing-hash-partitioningLocation: 5,184 Because this is so confusing, it's best to avoid the term consistent hashing and just call it hash partitioning instead. book-designing-data-intensive-applications#consistent-hashing-hash-partitioning
book-designing-data-intensive-applications#key-range-partitioningLocation: 5,186 Unfortunately however, by using the hash of the key for partitioning we lose a nice property of key-range partitioning: the ability to do efficient range queries. book-designing-data-intensive-applications#key-range-partitioning
Location: 5,201 A table in Cassandra can be declared with a compound primary key consisting of several columns. Only the first part of that key is hashed to determine the partition, but the other columns are used as a concatenated index for sorting the data in Cassandra's SSTables.
Location: 5,203 A query therefore cannot search for a range of values within the first column of a compound key, but if it specifies a fixed value for the first column, it can perform an efficient range scan over the other columns of the key.
book-designing-data-intensive-applications#problem-with-secondary-indexes-and-partitionsLocation: 5,242 The problem with secondary indexes is that they don't map neatly to partitions. book-designing-data-intensive-applications#problem-with-secondary-indexes-and-partitions
book-designing-data-intensive-applications#two-types-of-partitioned-indexes1 2Location: 5,243 There are two main approaches to partitioning a database with secondary indexes: document-based partitioning and term-based partitioning. book-designing-data-intensive-applications#two-types-of-partitioned-indexes1 2
Location: 5,251 (in a document database these would be fields; in a relational database they would be columns).
book-designing-data-intensive-applications#document-partitioned-index-aka-local-index1 2 3Location: 5,259 For that reason, a document-partitioned index is also known as a local index as opposed to a global index book-designing-data-intensive-applications#document-partitioned-index-aka-local-index1 2 3
book-designing-data-intensive-applications#scatter-gather-makes-read-queries-expensive1 2Location: 5,270 This approach to querying a partitioned database is sometimes known as scatter/gather, and it can make read queries on secondary indexes quite expensive. book-designing-data-intensive-applications#scatter-gather-makes-read-queries-expensive1 2
book-designing-data-intensive-applications#scatter-gather-tail-latency-amplification1 2Location: 5,271 Even if you query the partitions in parallel, scatter/gather is prone to tail latency amplification book-designing-data-intensive-applications#scatter-gather-tail-latency-amplification1 2
book-designing-data-intensive-applications#document-partitioned-secondary-indexes1Location: 5,272 MongoDB, Riak [15], Cassandra [16], Elasticsearch [17], SolrCloud [18], and VoltDB [19] all use document-partitioned secondary indexes. book-designing-data-intensive-applications#document-partitioned-secondary-indexes1
book-designing-data-intensive-applications#global-index-covers-data-in-all-partitions1Location: 5,285 Rather than each partition having its own secondary index (a local index), we can construct a global index that covers data in all partitions. book-designing-data-intensive-applications#global-index-covers-data-in-all-partitions1
book-designing-data-intensive-applications#global-index-partitioning1Location: 5,287 A global index must also be partitioned, but it can be partitioned differently from the primary key index. book-designing-data-intensive-applications#global-index-partitioning1
book-designing-data-intensive-applications#term-partitioned-index1Location: 5,291 We call this kind of index term-partitioned index, because the term we're looking for determines the partition of the index. book-designing-data-intensive-applications#term-partitioned-index1
book-designing-data-intensive-applications#term-definitionLocation: 5,293 The name term comes from full-text indexes (a particular kind of secondary index), where the terms are all the words that occur in a document. book-designing-data-intensive-applications#term-definition
book-designing-data-intensive-applications#partitioning-types-range-scans-vs-distribution-of-load1Location: 5,294 Partitioning by the term itself can be useful for range scans (e.g., on a numeric property, such as the asking price of the car), whereas partitioning on a hash of the term gives a more even distribution of load. book-designing-data-intensive-applications#partitioning-types-range-scans-vs-distribution-of-load1
book-designing-data-intensive-applications#global-index-async-updates1 2Location: 5,302 In practice, updates to global secondary indexes (global index) are often asynchronous (that is, if you read the index shortly after a write, the change you just made may not yet be reflected in the index). book-designing-data-intensive-applications#global-index-async-updates1 2
Location: 5,319 All of these changes call for data and requests to be moved from one node to another. The process of moving load from one node in the cluster to another is called rebalancing
Location: 5,336 The problem with the mod N approach is that if the number of nodes N changes, most of the keys will need to be moved from one node to another.
Location: 5,344 Fortunately, there is a fairly simple solution: create many more partitions than there are nodes, and assign several partitions to each node.
Location: 5,346 Now, if a node is added to the cluster, the new node can steal a few partitions from every existing node until partitions are fairly distributed once again.
Location: 5,348 Only entire partitions are moved between nodes. The number of partitions does not change, nor does the assignment of keys to partitions.
Location: 5,377 For that reason, key range–partitioned databases such as HBase and RethinkDB create partitions dynamically.
Location: 5,392 To mitigate this issue, HBase and MongoDB allow an initial set of partitions to be configured on an empty database (this is called pre-splitting).
Location: 5,413 Cassandra 3.0 introduced an alternative rebalancing algorithm that avoids unfair splits
Location: 5,439 Request Routing We have now partitioned our dataset across multiple nodes running on multiple machines. But there remains an open question: when a client wants to make a request, how does it know which node to connect to?
Location: 5,450 Allow clients to contact any node (e.g., via a round-robin load balancer).
Location: 5,487 These are not as fast-changing as the assignment of partitions to nodes, so it is often sufficient to use DNS for this purpose.
Location: 5,493 However, massively parallel processing (MPP) relational database products, often used for analytics, are much more sophisticated in the types of queries they support.
Location: 5,514 When partitioning by hash, it is common to create a fixed number of partitions in advance, to assign several partitions to each node, and to move entire partitions from one node to another when nodes are added or removed.
Location: 5,516 Hybrid approaches are also possible, for example with a compound key: using one part of the key to identify the partition and another part for the sort order.
Location: 5,565 Jonathan Ellis: "Facebook's Cassandra Paper, Annotated and Compared to Apache Cassandra 2.0," datastax.com, September 12, 2013.
Location: 5,594 Brandon Williams: "Virtual Nodes in Cassandra 1.2," datastax.com, December 4, 2012.
Location: 5,631 Transactions are not a law of nature; they were created with a purpose, namely to simplify the programming model for applications accessing a database.
Location: 5,632 By using transactions, the application is free to ignore certain potential error scenarios and concurrency issues, because the database takes care of them instead (we call these safety guarantees).
Location: 5,639 in the area of concurrency control, discussing various kinds of race conditions that can occur and how databases implement isolation levels such as read committed, snapshot isolation, and serializability.
Location: 5,654 many of this new generation of databases abandoned transactions entirely, or redefined the word to describe a much weaker set of guarantees than had previously been understood
Location: 5,676 the only sensible definition of BASE is "not ACID"; i.e., it can mean almost anything you want.)
Location: 5,681 In general, atomic refers to something that cannot be broken down into smaller parts.
Location: 5,683 in multi-threaded programming, if one thread executes an atomic operation, that means there is no way that another thread could see the half-finished result of the operation.
Location: 5,685 in the context of ACID, atomicity is not about concurrency.
Location: 5,695 Perhaps abortability would have been a better term than atomicity, but we will stick with atomicity since that's the usual word.
Location: 5,704 In the CAP theorem (see Chapter 9), the word consistency is used to mean linearizability (see "Linearizability").
Location: 5,705 In the context of ACID, consistency refers to an application-specific notion of the database being in a "good state."
Location: 5,730 The classic database textbooks formalize isolation as serializability, which means that each transaction can pretend that it is the only transaction running on the entire database.
Location: 5,737 In Oracle there is an isolation level called "serializable," but it actually implements something called snapshot isolation, which is a weaker guarantee than serializability
Location: 5,745 Durability is the promise that once a transaction has committed successfully, any data it has written will not be forgotten, even if there is a hardware fault or the database crashes.
Location: 5,748 In a replicated database, durability may mean that the data has been successfully copied to some number of nodes.
Location: 5,820 many nonrelational databases don't have such a way of grouping operations together.
Location: 5,836 Similarly popular is a compare-and-set operation, which allows a write to happen only if the value has not been concurrently changed by someone else
Location: 5,838 These single-object operations are useful, as they can prevent lost updates when several clients try to write to the same object concurrently
Location: 5,848 Many distributed datastores have abandoned multi-object transactions because they are difficult to implement across partitions, and they can get in the way in some scenarios where very high availability or performance is required.
Location: 5,861 However, document databases lacking join functionality also encourage denormalization
Location: 5,863 Transactions are very useful in this situation to prevent denormalized data from going out of sync.
book-designing-data-intensive-applications#application-level-deduplicationLocation: 5,887 additional application-level deduplication mechanism in place. book-designing-data-intensive-applications#application-level-deduplication
vv retry if transient but not permanent vv Location: 5,890 It is only worth retrying after transient errors (for example due to deadlock, isolation violation, temporary network interruptions, and failover); after a permanent error (e.g., constraint violation) a retry would be pointless.
book-designing-data-intensive-applications#two-phase-commitLocation: 5,893 If you want to make sure that several different systems either commit or abort together, two-phase commit book-designing-data-intensive-applications#two-phase-commit
book-designing-data-intensive-applications#read-committed1Location: 5,936 The most basic level of transaction isolation is read committed book-designing-data-intensive-applications#read-committed1
book-designing-data-intensive-applications#dirty-readsLocation: 5,938 When reading from the database, you will only see data that has been committed (no dirty reads). book-designing-data-intensive-applications#dirty-reads
book-designing-data-intensive-applications#dirty-writesLocation: 5,939 When writing to the database, you will only overwrite data that has been committed (no dirty writes). book-designing-data-intensive-applications#dirty-writes
Location: 5,955 Reasoning about the consequences quickly becomes mind-bending.
book-designing-data-intensive-applications#read-committed-must-prevent-dirty-writesLocation: 5,962 Transactions running at the read committed isolation level must prevent dirty writes, usually by delaying the second write until the first write's transaction has committed or aborted. book-designing-data-intensive-applications#read-committed-must-prevent-dirty-writes
Location: 6,016 This anomaly is called a nonrepeatable read or read skew: if Alice were to read the balance of account 1 again at the end of the transaction, she would see a different value ($600) than she saw in her previous query.
Location: 6,018 Read skew is considered acceptable under read committed isolation: the
Location: 6,035 Snapshot isolation [28] is the most common solution to this problem. The idea is that each transaction reads from a consistent snapshot of the database — that is, the transaction sees all the data that was committed in the database at the start of the transaction.
Location: 6,062 storage engines that support snapshot isolation typically use MVCC for their read committed isolation level as well.
Location: 6,108 Another approach is used in CouchDB, Datomic, and LMDB. Although they also use B-trees (see "B-Trees"), they use an append-only/copy-on-write variant that does not overwrite pages of the tree when they are updated, but instead creates a new copy of each modified page.
Location: 6,150 The lost update problem can occur if an application reads some value from the database, modifies it, and writes back the modified value (a read-modify-write cycle). If two transactions do this concurrently, one of the modifications can be lost, because the second write does not include the first modification.
Location: 6,153 (We sometimes say that the later write clobbers the earlier write.)
Location: 6,161 Many databases provide atomic update operations, which remove the need to implement read-modify-write cycles in application code.
Location: 6,173 Atomic operations are usually implemented by taking an exclusive lock on the object when it is read so that no other transaction can read it until the update has been applied. This technique is sometimes known as cursor stability [36, 37
Location: 6,176 Another option is to simply force all atomic operations to be executed on a single thread.
Location: 6,178 Unfortunately, object-relational mapping frameworks make it easy to accidentally write code that performs unsafe read-modify-write cycles instead of using atomic operations provided by the database
Location: 6,207 Atomic operations and locks are ways of preventing lost updates by forcing the read-modify-write cycles to happen sequentially.
Location: 6,216 Lost update detection is a great feature, because it doesn't require application code to use any special database features — you may forget to use a lock or an atomic operation and thus introduce a bug, but lost update detection happens automatically and is thus less error-prone.
Location: 6,237 In replicated databases (see Chapter 5), preventing lost updates takes on another dimension: since they have copies of the data on multiple nodes, and the data can potentially be modified concurrently on different nodes, some additional steps need to be taken to prevent lost updates.
Location: 6,244 a common approach in such replicated databases is to allow concurrent writes to create several conflicting versions of a value (also known as siblings), and to use application code or special data structures to resolve and merge these versions after the fact.
Location: 6,248 Atomic operations can work well in a replicated context, especially if they are commutative (i.e., you can apply them in a different order on different replicas, and still get the same result).
Location: 6,249 For example, incrementing a counter or adding an element to a set are commutative operations.
Location: 6,253 On the other hand, the last write wins (LWW) conflict resolution method is prone to lost updates,
Location: 6,284 This anomaly is called write skew [28]. It is neither a dirty write nor a lost update, because the two transactions are updating two different objects
Location: 6,288 You can think of write skew as a generalization of the lost update problem. Write skew can occur if two transactions read the same objects, and then update some of those objects (different transactions may update different objects).
Location: 6,298 Automatically preventing write skew requires true serializable isolation
Location: 6,304 If you can't use a serializable isolation level, the second-best option in this case is probably to explicitly lock the rows that the transaction depends on.
Location: 6,339 we used a lock to prevent lost updates (that is, making sure that two players can't move the same figure at the same time). However, the lock doesn't prevent players from moving two different figures to the same position on the board or potentially making some other move that violates the rules of the game.
Location: 6,350 With write skew, it could happen that two spending items are inserted concurrently that together cause the balance to go negative, but that neither transaction notices the other.
Location: 6,368 This effect, where a write in one transaction changes the result of a search query in another transaction, is called a phantom
Location: 6,369 Snapshot isolation avoids phantoms in read-only queries, but in read-write transactions like the examples we discussed, phantoms can lead to particularly tricky cases of write skew.
Location: 6,376 the problem of phantoms is that there is no object to which we can attach the locks,
Location: 6,383 This approach is called materializing conflicts, because it takes a phantom and turns it into a lock conflict on a concrete set of rows that exist in the database
Location: 6,386 materializing conflicts should be considered a last resort if no alternative is possible. A serializable isolation level is much preferable in most cases.
Location: 6,407 Serializable isolation is usually regarded as the strongest isolation level. It guarantees that even though transactions may execute in parallel, the end result is the same as if they had executed one at a time, serially, without any concurrency.
Location: 6,425 Even though this seems like an obvious idea, database designers only fairly recently — around 2007 — decided that a single-threaded loop for executing transactions was feasible
Location: 6,434 Database designers realized that OLTP transactions are usually short and only make a small number of reads and writes
Location: 6,439 The approach of executing transactions serially is implemented in VoltDB/H-Store, Redis, and Datomic
Location: 6,461 systems with single-threaded serial transaction processing don't allow interactive multi-statement transactions.
Location: 6,498 If you can find a way of partitioning your dataset so that each transaction only needs to read and write data within a single partition, then each partition can have its own transaction processing thread running independently from the others.
Location: 6,507 Simple key-value data can often be partitioned very easily, but data with multiple secondary indexes is likely to require a lot of cross-partition coordination
Location: 6,514 Write throughput must be low enough to be handled on a single CPU core, or else transactions need to be partitioned without requiring cross-partition coordination.
Location: 6,538 Snapshot isolation has the mantra readers never block writers, and writers never block readers (see "Implementing snapshot isolation"), which captures this key difference between snapshot isolation and two-phase locking.
Location: 6,551 If a transaction wants to read an object, it must first acquire the lock in shared mode.
Location: 6,553 If a transaction wants to write to an object, it must first acquire the lock in exclusive mode.
Location: 6,555 If a transaction first reads and then writes an object, it may upgrade its shared lock to an exclusive lock.
Location: 6,557 This is where the name "two-phase" comes from: the first phase (while the transaction is executing) is when the locks are acquired, and the second phase (at the end of the transaction) is when all the locks are released.
Location: 6,567 transaction throughput and response times of queries are significantly worse under two-phase locking than under weak isolation.
Location: 6,590 Conceptually, we need a predicate lock [3]. It works similarly to the shared/exclusive lock described earlier, but rather than belonging to a particular object (e.g., one row in a table), it belongs to all objects that match some search condition,
Location: 6,600 a predicate lock applies even to objects that do not yet exist in the database, but which might be added in the future (phantoms).
Location: 6,601 If two-phase locking includes predicate locks, the database prevents all forms of write skew and other race conditions, and so its isolation becomes serializable.
Location: 6,608 For that reason, most databases with 2PL actually implement index-range locking (also known as next-key locking), which
Location: 6,611 It's safe to simplify a predicate by making it match a greater set of objects.
Location: 6,613 This is safe, because any write that matches the original predicate will definitely also match the approximations.
Location: 6,640 an algorithm called serializable snapshot isolation (SSI) is very promising. It provides full serializability, but has only a small performance penalty compared to snapshot isolation.
Location: 6,654 Two-phase locking is a so-called pessimistic concurrency control mechanism: it is based on the principle that if anything might possibly go wrong (as indicated by a lock held by another transaction), it's better to wait until the situation is safe again before doing anything.
Location: 6,690 there may be a causal dependency between the queries and the writes in the transaction. In order to provide serializable isolation, the database must detect situations in which a transaction may have acted on an outdated premise and abort the transaction in that case.
Location: 6,698 snapshot isolation is usually implemented by multi-version concurrency control
Location: 6,711 By avoiding unnecessary aborts, SSI preserves snapshot isolation's support for long-running reads from a consistent snapshot.
Location: 6,756 transaction that reads and writes data over a long period of time is likely to run into conflicts and abort, so SSI requires that read-write transactions be fairly short
Location: 6,773 For example, denormalized data can easily go out of sync with the source data. Without transactions, it becomes very difficult to reason about the effects that complex interacting accesses can have on the database.
Location: 6,776 various examples of race conditions:
Location: 6,777 Dirty reads One client reads another client's writes before they have been committed. The read committed isolation level and stronger levels prevent dirty reads.
Location: 6,779 Dirty writes One client overwrites data that another client has written, but not yet committed. Almost all transaction implementations prevent dirty writes.
Location: 6,780 Read skew (nonrepeatable reads) A client sees different parts of the database at different points in time. This issue is most commonly prevented with snapshot isolation, which allows a transaction to read from a consistent snapshot at one point in time. It is usually implemented with multi-version concurrency control (MVCC).
Location: 6,785 Lost updates Two clients concurrently perform a read-modify-write cycle. One overwrites the other's write without incorporating its changes, so data is lost.
Location: 6,788 Write skew A transaction reads something, makes a decision based on the value it saw, and writes the decision to the database. However, by the time the write is made, the premise of the decision is no longer true. Only serializable isolation prevents this anomaly.
Location: 6,791 Phantom reads A transaction reads objects that match some search condition. Another client makes a write that affects the results of that search. Snapshot isolation prevents straightforward phantom reads, but phantoms in the context of write skew require special treatment, such as index-range locks.
Location: 6,795 Only serializable isolation protects against all of these issues.
Location: 7,229 TCP performs flow control (also known as congestion avoidance or backpressure), in which a node limits its own rate of sending in order to avoid overloading a network link or the receiving node
Location: 7,255 highly variable if someone near you (a noisy neighbor) is using a lot of resources
Location: 7,263 systems can continually measure response times and their variability (jitter), and automatically adjust timeouts according to the observed response time distribution. This can be done with a Phi Accrual failure detector [30], which is used for example in Akka and Cassandra
Location: 7,298 Why do datacenter networks and the internet use packet switching? The answer is that they are optimized for bursty traffic
Location: 7,311 With careful use of quality of service (QoS, prioritization and scheduling of packets) and admission control (rate-limiting senders), it is possible to emulate circuit switching on packet networks, or provide statistically bounded delay
Location: 7,336 there's no "correct" value for timeouts — they need to be determined experimentally.
Location: 7,423 The best way of handling leap seconds may be to make NTP servers "lie," by performing the leap second adjustment gradually over the course of a day (this is known as smearing) [47, 48], although actual NTP server behavior varies in practice [49].
Location: 7,441 Such accuracy can be achieved using GPS receivers, the Precision Time Protocol (PTP) [52], and careful deployment and monitoring.
Location: 7,461 Any node whose clock drifts too far from the others should be declared dead and removed from the cluster. Such monitoring ensures that you notice the broken clocks before they can cause too much damage.
Location: 7,491 Additional causality tracking mechanisms, such as version vectors, are needed in order to prevent violations of causality
Location: 7,504 logical clocks [56, 57], which are based on incrementing counters rather than an oscillating quartz crystal, are a safer alternative for ordering events
Location: 7,508 In contrast, time-of-day and monotonic clocks, which measure actual elapsed time, are also known as physical clocks
Location: 7,511 Clock readings have a confidence interval
Location: 7,530 An interesting exception is Google's TrueTime API in Spanner [41], which explicitly reports the confidence interval on the local clock.
Location: 7,583 One option is for the leader to obtain a lease from the other nodes, which is similar to a lock with a timeout
Location: 7,641 To avoid this problem, paging is often disabled on server machines (if you would rather kill a process to free up memory than risk thrashing).
Location: 7,648 When writing multi-threaded code on a single machine, we have fairly good tools for making it thread-safe: mutexes, semaphores, atomic counters, lock-free data structures, blocking queues, and so on.
Location: 7,661 In these systems, there is a specified deadline by which the software must respond; if it doesn't meet the deadline, that may cause a failure of the entire system. These are so-called hard real-time systems.
Location: 7,682 An emerging idea is to treat GC pauses like brief planned outages of a node, and to let other nodes handle requests from clients while one node is collecting its garbage.
Location: 7,703 A node in the network cannot know anything for sure — it can only make guesses based on the messages it receives (or doesn't receive) via the network.
Location: 7,705 If a remote node doesn't respond, there is no way of knowing what state it is in, because problems in the network cannot reliably be distinguished from problems at a node.
Location: 7,775 every time the lock server grants a lock or lease, it also returns a fencing token, which is a number that increases every time a lock is granted
Location: 7,804 the problem of reaching consensus in this untrusting environment is known as the Byzantine Generals Problem
Location: 7,846 Most Byzantine fault-tolerant algorithms require a supermajority of more than two-thirds of the nodes to be functioning correctly (i.e., if you have four nodes, at most one may malfunction).
Location: 7,869 NTP clients can be configured with multiple server addresses. When synchronizing, the client contacts all of them, estimates their errors, and checks that a majority of servers agree on some time range. As long as most of the servers are okay, a misconfigured NTP server that is reporting an incorrect time is detected as an outlier and is excluded from synchronization
Location: 7,883 With regard to timing assumptions, three system models are in common use:
Location: 7,885 The synchronous model assumes bounded network delay, bounded process pauses, and bounded clock error.
Location: 7,890 Partial synchrony means that a system behaves like a synchronous system most of the time, but it sometimes exceeds the bounds for network delay, process pauses, and clock drift
Location: 7,899 Crash-stop faults
Location: 7,901 Crash-recovery faults
Location: 7,906 For modeling real systems, the partially synchronous model with crash-recovery faults is generally the most useful model.
Location: 7,927 To clarify the situation, it is worth distinguishing between two different kinds of properties: safety and liveness properties. In the example just given, uniqueness and monotonic sequence are safety properties, but availability is a liveness property.
Location: 7,930 eventual consistency is a liveness property
Location: 7,932 Safety is often informally defined as nothing bad happens, and liveness as something good eventually happens.
Location: 7,935 If a safety property is violated, we can point at a particular point in time at which it was broken
Location: 7,940 For distributed algorithms, it is common to require that safety properties always hold, in all possible situations of a system model [88]. That is, even if all nodes crash, or the entire network fails, the algorithm must nevertheless ensure that it does not return a wrong result (i.e., that the safety properties remain satisfied).
Location: 7,944 The definition of the partially synchronous model requires that eventually the system returns to a synchronous state — that is, any period of network interruption lasts only for a finite duration and is then repaired.
Location: 7,951 For example, algorithms in the crash-recovery model generally assume that data in stable storage survives crashes.
Location: 7,965 That is not to say that theoretical, abstract system models are worthless — quite the opposite. They are incredibly helpful for distilling down the complexity of real systems to a manageable set of faults that we can reason about, so that we can understand the problem and try to solve it systematically.
Location: 7,991 Such a node that is "limping" but not dead can be even more difficult to deal with than a cleanly failed node.
Location: 8,000 If you can avoid opening Pandora's box and simply keep things on a single machine, it is generally worth doing so.
Location: 8,276 nobody else is concurrently accessing the database (isolation),
Location: 8,286 If two nodes both believe that they are the leader, that situation is called split brain, and it often leads to data loss.
Location: 8,291 The limits of what is and isn't possible have been explored in depth, both in theoretical proofs and in practical implementations.
Location: 8,304 A better name for eventual consistency may be convergence, as we expect all replicas to eventually converge to the same value
Location: 8,309 Eventual consistency is hard for application developers because it is so different from the behavior of variables in a normal single-threaded program.
Location: 8,324 distributed consistency is mostly about coordinating the state of replicas in the face of delays and faults.
Location: 8,325 We will start by looking at one of the strongest consistency models in common use, linearizability, and examine its pros and cons.
Location: 8,336 This is the idea behind linearizability [6] (also known as atomic consistency [7], strong consistency, immediate consistency, or external consistency [
Location: 8,340 The exact definition of linearizability is quite subtle, and we will explore it in the rest of this section. But the basic idea is to make a system appear as if there were only one copy of the data, and all operations on it are atomic.
Location: 8,362 three clients concurrently reading and writing the same key x in a linearizable database. In the distributed systems literature, x is called a register — in practice, it could be one key in a key-value store, one row in a relational database, or one document in a document database,
Location: 8,421 An atomic compare-and-set (cas) operation can be used to check the value hasn't been concurrently changed by another client:
Location: 8,443 unless you take additional measures such as materializing conflicts
Location: 8,446 A database may provide both serializability and linearizability, and this combination is known as strict serializability or strong one-copy serializability
Location: 8,464 No matter how this lock is implemented, it must be linearizable: all nodes must agree which node owns the lock; otherwise it is useless.
Location: 8,471 They use consensus algorithms to implement linearizable operations in a fault-tolerant way
Location: 8,477 a linearizable storage service is the basic foundation for these coordination tasks.
Location: 8,497 a hard uniqueness constraint, such as the one you typically find in relational databases, requires linearizability.
Location: 8,617 If the network between datacenters is interrupted in a single-leader setup, clients connected to follower datacenters cannot contact the leader, so they cannot make any writes to the database, nor any linearizable reads. They can still make reads from the follower, but they might be stale (nonlinearizable).
Location: 8,643 CAP encouraged database engineers to explore a wider design space of distributed shared-nothing systems, which were more suitable for implementing large-scale web services
Location: 8,675 Although linearizability is a useful guarantee, surprisingly few systems are actually linearizable in practice. For example, even RAM on a modern multi-core CPU is not linearizable [43]: if a thread running on one CPU core writes to a memory address, and a thread on another CPU core reads the same address shortly afterward, it is not guaranteed to read the value written by the first thread (unless a memory barrier or fence [44] is used).
Location: 8,688 The reason for dropping linearizability is performance, not fault tolerance.
Location: 8,693 if you want linearizability, the response time of read and write requests is at least proportional to the uncertainty of delays in the network.
Location: 8,709 the main purpose of the leader in single-leader replication is to determine the order of writes in the replication log — that is, the order in which followers apply those writes.
Location: 8,716 there are deep connections between ordering, linearizability, and consensus.
Location: 8,727 We say that there is a causal dependency between the question and the answer.
Location: 8,737 we said that a transaction reads from a consistent snapshot. But what does "consistent" mean in this context? It means consistent with causality: if the snapshot contains an answer, it must also contain the question being answered
Location: 8,743 Read skew (non-repeatable reads, as illustrated in Figure 7-6) means reading data in a state that violates causality.
Location: 8,747 Serializable snapshot isolation (see "Serializable Snapshot Isolation (SSI)") detects write skew by tracking the causal dependencies between transactions.
Location: 8,755 If a system obeys the ordering imposed by causality, we say that it is causally consistent.
Location: 8,756 snapshot isolation provides causal consistency:
Location: 8,758 The causal order is not a total order
Location: 8,764 We say they are incomparable, and therefore mathematical sets are partially ordered: in some cases one set is greater than another (if one set contains all the elements of another), but in other cases they are incomparable.
Location: 8,765 The difference between a total order and a partial order is reflected in different database consistency models:
Location: 8,773 This means that causality defines a partial order, not a total order: some operations are ordered with respect to each other, but some are incomparable.
Location: 8,785 Linearizability is stronger than causal consistency
Location: 8,788 linearizability implies causality: any system that is linearizable will preserve causality correctly
Location: 8,797 causal consistency is the strongest possible consistency model that does not slow down due to network delays, and remains available in the face of network failures
Location: 8,799 In many cases, systems that appear to require linearizability in fact only really require causal consistency, which can be implemented more efficiently.
Location: 8,821 Causal consistency goes further: it needs to track causal dependencies across the entire database, not just for a single key. Version vectors can be generalized to do this
Location: 8,988 Total order broadcast is asynchronous: messages are guaranteed to be delivered reliably in a fixed order, but there is no guarantee about when a message will be delivered (so one recipient may lag behind the others).
Location: 8,989 By contrast, linearizability is a recency guarantee: a read is guaranteed to see the latest value written.
Location: 8,990 if you have total order broadcast, you can build linearizable storage on top of it.
book-designing-data-intensive-applications#total-order-broadcast1Location: 8,988 total order broadcast is asynchronous: messages are guaranteed to be delivered reliably in a fixed order, but there is no guarantee about when a message will be delivered (so one recipient may lag behind the others) book-designing-data-intensive-applications#total-order-broadcast1
book-designing-data-intensive-applications#linearizability1Location: 8,989 linearizability is a recency guarantee: a read is guaranteed to see the latest value written. book-designing-data-intensive-applications#linearizability1
book-designing-data-intensive-applications#can-build-linearizability-on-total-order-broadcast1 2Location 8,990 if you have total order broadcast, you can build linearizable storage on top of it book-designing-data-intensive-applications#can-build-linearizability-on-total-order-broadcast1 2
Location: 10,071 Systems of record A system of
Location: 10,082 Denormalized values, indexes, and materialized views also fall into this category.
Location: 10,087 You can derive several different datasets from a single source, enabling you to look at the data from different "points of view."
Location: 10,088 Not all systems make a clear distinction between systems of record and derived data in their architecture, but it's a very helpful distinction to make, because it clarifies the dataflow through your system: it makes explicit which parts of the system have which inputs and which outputs, and how they depend on each other.
Location: 10,142 Although the importance of MapReduce is now declining
Location: 10,252 In this example, the working set of the job (the amount of memory to which the job needs random access) depends only on the number of distinct URLs:
Location: 10,260 The sort utility in GNU Coreutils (Linux) automatically handles larger-than-memory datasets by spilling to disk, and automatically parallelizes sorting across multiple CPU cores [9].
Location: 10,390 or an erasure coding scheme such as Reed–Solomon codes, which allows lost data to be recovered with lower storage overhead than full replication
Location: 10,405 Read a set of input files, and break it up into records
Location: 10,444 This principle is known as putting the computation near the data
Location: 10,513 In many datasets it is common for one record to have an association with another record: a foreign key in a relational model, a document reference in a document model, or an edge in a graph model.
Location: 10,574 This algorithm is known as a sort-merge join, since mapper output is sorted by key, and the reducers then merge together the sorted lists of records from both sides of the join.
Location: 10,602 Another common use for grouping is collating all the activity events for a particular user session, in order to find out the sequence of actions that the user took — a process called sessionization
Location: 10,616 Such disproportionately active database records are known as linchpin objects [38] or hot keys.
Location: 10,618 Collecting all activity related to a celebrity (e.g., replies to something they posted) in a single reducer can lead to significant skew (also known as hot spots) — that is, one reducer that must process significantly more records than the others
Location: 10,725 The output of a batch process is often not a report, but some other kind of structure.
Location: 10,742 in order to rank search results by relevance, correct misspellings, resolve synonyms, and so on — but the principle holds.
Location: 10,781 A much better solution is to build a brand-new database inside the batch job and write it as files to the job's output directory in the distributed filesystem,
Location: 10,801 encourages experimentation by being very explicit about dataflow: a program reads its input and writes its output. In the process, the input is left unchanged, any previous output is completely replaced with the new output, and there are no other side effects.
Location: 10,808 Or, even simpler, you can keep the old output in a different directory and simply switch back to it.
Location: 10,811 (The idea of being able to recover from buggy code has been called human fault tolerance [50].)
Location: 10,814 This principle of minimizing irreversibility is beneficial for Agile software development
Location: 10,830 On Hadoop, some of those low-value syntactic conversions are eliminated by using more structured file formats:
Location: 10,861 To put it bluntly, Hadoop opened up the possibility of indiscriminately dumping data into HDFS, and only later figuring out how to process it further
Location: 10,865 However, in practice, it appears that simply making data available quickly — even if it is in a quirky, difficult-to-use, raw format — is often more valuable than trying to decide on the ideal data model up front
book-designing-data-intensive-applications#data-lake-speeds-up-data-collection1Location: 10,871 The careful schema design required by an MPP database slows down that centralized data collection; collecting data in its raw form, and worrying about schema design later, allows the data collection to be speeded up (a concept sometimes known as a data lake or enterprise data hub [55]). book-designing-data-intensive-applications#data-lake-speeds-up-data-collection1
Location: 10,878 There may not even be one ideal data model, but rather different views onto the data that are suitable for different purposes. Simply dumping data in its raw form allows for several such transformations. This approach has been dubbed the sushi principle: "raw data is better" [57].
Location: 10,885 Data modeling still happens, but it is in a separate step, decoupled from the data collection.
Location: 10,908 Crucially, those various processing models can all be run on a single shared-use cluster of machines, all accessing the same files on the distributed filesystem.
Location: 10,919 Designing for frequent faults
Location: 10,929 The MapReduce approach is more appropriate for larger jobs: jobs that process so much data and run for such a long time that they are likely to experience at least one task failure along the way. In that case, rerunning the entire job due to a single task failure would be wasteful. Even if recovery at the granularity of an individual task introduces overheads that make fault-free processing slower, it can still be a reasonable trade-off if the rate of task failures is high enough.
Location: 10,940 This architecture allows non-production (low-priority) computing resources to be overcommitted, because the system knows that it can reclaim the resources if necessary.
Location: 10,946 At Google, a MapReduce task that runs for an hour has an approximately 5% risk of being terminated to make space for a higher-priority process. This rate is more than an order of magnitude higher than the rate of failures due to hardware issues, machine reboot, or other reasons
Location: 10,954 general priority preemption is not supported in YARN, Mesos, or Kubernetes at the time of writing
Location: 10,956 In an environment where tasks are not so often terminated, the design decisions of MapReduce make less sense.
Location: 10,986 Publishing data to a well-known location in the distributed filesystem allows loose coupling so that jobs don't need to know who is producing their input or consuming their output (see "Separation of logic and wiring").
Location: 10,993 The process of writing out this intermediate state to files is called materialization
book-designing-data-intensive-applications#skew1Location: 11,000 Skew or varying load on different machines means that a job often has a few straggler tasks that take much longer to complete than the others. Having to wait until all of the preceding job's tasks have completed slows down the execution of the workflow as a whole. book-designing-data-intensive-applications#skew1
Location: 11,005 Storing intermediate state in a distributed filesystem means those files are replicated across several nodes, which is often overkill for such temporary data.
Location: 11,018 they handle an entire workflow as one job,
book-designing-data-intensive-applications#dataflow-engines1Location: 11,020 Since they explicitly model the flow of data through several processing stages, these systems are known as dataflow engines book-designing-data-intensive-applications#dataflow-engines1
Location: 11,062 Spark uses the resilient distributed dataset (RDD) abstraction for tracking the ancestry of data
Location: 11,074 Such causes of nondeterminism need to be removed in order to reliably recover from faults, for example by generating pseudorandom numbers using a fixed seed.
Location: 11,083 A sorting operation inevitably needs to consume its entire input before it can produce any output, because it's possible that the very last input record is the one with the lowest key and thus needs to be the very first output record. Any operator that requires sorting will thus need to accumulate state, at least temporarily. But many other parts of a workflow can be executed in a pipelined manner.
Location: 11,110 Dataflow engines like Spark,
Location: 11,125 it will always read the entire input dataset and produce a completely new output dataset, even if only a small part of the graph has changed compared to the last iteration.
Location: 11,127 The Pregel processing model As an optimization for batch processing graphs, the bulk synchronous parallel (BSP) model of computation [70] has become popular.
Location: 11,196 Spark and Flink also include their own high-level dataflow APIs,
Location: 11,227 The freedom to easily run arbitrary code is what has long distinguished batch processing systems of MapReduce heritage from MPP databases
Location: 11,240 Hive, Spark DataFrames, and Impala also use vectorized execution
Location: 11,275 In the Unix world, the uniform interface that allows one program to be composed with another is files and pipes; in MapReduce, that interface is a distributed filesystem.
book-designing-data-intensive-applications#dataflow-engines-perform-less-materialization1 2Location: 11,286 Dataflow engines perform less materialization of intermediate state and keep more in memory, which means that they need to recompute more data if a node fails. Deterministic operators reduce the amount of data that needs to be recomputed. book-designing-data-intensive-applications#dataflow-engines-perform-less-materialization1 2
Location: 11,301 Thanks to the framework, your code in a batch processing job does not need to worry about implementing fault-tolerance mechanisms: the framework can guarantee that the final output of a job is the same as if no faults had occurred, even though in reality various tasks perhaps had to be retried.
Location: 11,306 Crucially, the input data is bounded: it has a known, fixed size (for example, it consists of a set of log files at some point in time, or a snapshot of a database's contents). Because it is bounded, a job knows when it has finished reading the entire input, and so a job eventually completes when it is done.
Location: 11,563 In this chapter we will look at event streams as a data management mechanism: the unbounded, incrementally processed counterpart to the batch data we saw in the last chapter
Location: 11,572 In a stream processing context, a record is more commonly known as an event, but it is essentially the same thing: a small, self-contained, immutable object containing the details of something that happened at some point in time.
Location: 11,610 What happens if the producers send messages faster than the consumers can process them? Broadly speaking, there are three options: the system can drop messages, buffer messages in a queue, or apply backpressure (also known as flow control; i.e., blocking the producer from sending more messages).
Location: 11,627 A nice property of the batch processing systems we explored in Chapter 10 is that they provide a strong reliability guarantee: failed tasks are automatically retried, and partial output from failed tasks is automatically discarded. This means the output is the same as if no failures had occurred, which helps simplify the programming model. Later in this chapter we will examine how we can provide similar guarantees in a streaming context.
Location: 11,658 message broker (also known as a message queue), which is essentially a kind of database that is optimized for handling message streams
Location: 11,716 message brokers use acknowledgments: a client must explicitly tell the broker when it has finished processing a message so that the broker can remove it from the queue.
Location: 11,720 Handling this case requires an atomic commit protocol,
Location: 11,741 A key feature of batch processes, as discussed in Chapter 10, is that you can run them repeatedly, experimenting with the processing steps, without risk of damaging the input (since the input is read-only).
Location: 11,747 Why can we not have a hybrid, combining the durable storage approach of databases with the low-latency notification facilities of messaging? This is the idea behind log-based message brokers.
book-designing-data-intensive-applications#no-ordering-across-partitions1 2 3Location: 11,763 There is no ordering guarantee across different partitions. book-designing-data-intensive-applications#no-ordering-across-partitions1 2 3
Location: 11,785 The log-based approach trivially supports fan-out messaging,
Location: 11,792 The number of nodes sharing the work of consuming a topic can be at most the number of log partitions in that topic, because messages within the same partition are delivered to the same node.i
Location: 11,794 If a single message is slow to process, it holds up the processing of subsequent messages in that partition (a form of head-of-line blocking; see "Describing Performance").
Location: 11,796 in situations where messages may be expensive to process and you want to parallelize processing on a message-by-message basis, and where message ordering is not so important, the JMS/AMQP style of message broker is preferable.
Location: 11,813 If the consumer had processed subsequent messages but not yet recorded their offset, those messages will be processed a second time upon restart. We will discuss ways of dealing with this issue later in the chapter.
Location: 11,821 Effectively, the log implements a bounded-size buffer that discards old messages when it gets full, also known as a circular buffer or ring buffer. However, since that buffer is on disk, it can be quite large.
Location: 11,842 This fact is a big operational advantage: you can experimentally consume a production log for development, testing, or debugging purposes, without having to worry much about disrupting production services.
book-designing-data-intensive-applications#derived-data-separated-from-input-data1 2Location: 11,860 This aspect makes log-based messaging more like the batch processes of the last chapter, where derived data is clearly separated from input data through a repeatable transformation process. It allows more experimentation and easier recovery from errors and bugs, making it a good tool for integrating dataflows within an organization book-designing-data-intensive-applications#derived-data-separated-from-input-data1 2
book-designing-data-intensive-applications#derived-data-systems1Location: 11,951 We can call the log consumers derived data systems, as discussed in the introduction to Part III: the data stored in the search index and the data warehouse is just another view onto the data in the system of record. book-designing-data-intensive-applications#derived-data-systems1
Location: 11,973 Bottled Water implements CDC for PostgreSQL using an API that decodes the write-ahead log [28],
Location: 12,014 This log compaction feature is supported by Apache Kafka. As we shall see later in this chapter, it allows the message broker to be used for durable storage, not just for transient messaging.
book-designing-data-intensive-applications#event-sourcing-expresses-intent1 2Location: 12,066 For example, storing the event "student cancelled their course enrollment" clearly expresses the intent of a single action in a neutral fashion, whereas the side effects "one entry was deleted from the enrollments table, and one cancellation reason was added to the student feedback table" embed a lot of assumptions about the way the data is later going to be used. If a new application feature is introduced — for example, "the place is offered to the next person on the waiting list" — the event sourcing approach allows that new side effect to easily be chained off the existing event. book-designing-data-intensive-applications#event-sourcing-expresses-intent1 2
Location: 12,071 Event sourcing is similar to the chronicle data model [45
book-designing-data-intensive-applications#event-sourcing-needs-full-historyLocation: 12,090 On the other hand, with event sourcing, events are modeled at a higher level: an event typically expresses the intent of a user action, not the mechanics of the state update that occurred as a result of the action. In this case, later events typically do not override prior events, and so you need the full history of events to reconstruct the final state. Log compaction is not possible in the same way. book-designing-data-intensive-applications#event-sourcing-needs-full-history
Location: 12,100 The event sourcing philosophy is careful to distinguish between events and commands [48]. When a request from a user first arrives, it is initially a command: at this point it may still fail, for example because some integrity condition is violated. The application must first validate that it can execute the command. If the validation is successful and the command is accepted, it becomes an event, which is durable and immutable.
Location: 12,108 At the point when the event is generated, it becomes a fact. Even if the customer later decides to change or cancel the reservation, the fact remains true that they formerly held a reservation for a particular seat, and the change or cancellation is a separate event that is added later.
book-designing-data-intensive-applications#serializable-transaction1 2Location: 12,111 Thus, any validation of a command needs to happen synchronously, before it becomes an event — for example, by using a serializable transaction that atomically validates the command and publishes the event. book-designing-data-intensive-applications#serializable-transaction1 2
Location: 12,172 Immutable events also capture more information than just the current state. For example, on a shopping website, a customer may add an item to their cart and then remove it again. Although the second event cancels out the first event from the point of view of order fulfillment, it may be useful to know for analytics purposes that the customer was considering a particular item but then decided against it.
Location: 12,180 Moreover, by separating mutable state from the immutable event log, you can derive several different read-oriented representations from the same log of events.
Location: 12,197 you gain a lot of flexibility by separating the form in which data is written from the form it is read, and by allowing several different read views. This idea is sometimes known as command query responsibility segregation
Location: 12,207 it is entirely reasonable to denormalize data in the read-optimized views, as the translation process gives you a mechanism for keeping it consistent with the event log.
Location: 12,223 With event sourcing, you can design an event such that it is a self-contained description of a user action. The user action then requires only a single write in one place — namely appending the events to the log — which is easy to make atomic.
book-designing-data-intensive-applications#remove-nondeterminism-by-serial-order-in-partitionLocation: 12,229 The log removes the nondeterminism of concurrency by defining a serial order of events in a partition book-designing-data-intensive-applications#remove-nondeterminism-by-serial-order-in-partition
Location: 12,276 You can process one or more input streams to produce one or more output streams. Streams may go through a pipeline consisting of several such processing stages before they eventually end up at an output (option 1 or 2).
book-designing-data-intensive-applications#operator-or-jobLocation: 12,280 processing streams to produce other, derived streams. A piece of code that processes streams like this is known as an operator or a job. book-designing-data-intensive-applications#operator-or-job
Location: 12,291 Fault-tolerance mechanisms must also change: with a batch job that has been running for a few minutes, a failed task can simply be restarted from the beginning, but with a stream job that has been running for several years, restarting from the beginning after a crash may not be a viable option.
Location: 12,310 When a match is found, the engine emits a complex event (hence the name) with the details of the event pattern that was detected [67].
Location: 12,322 Distributed stream processors like Samza are also gaining SQL support for declarative queries on streams
Location: 12,358 We saw in "Databases and Streams" that a stream of changes to a database can be used to keep derived data systems, such as caches, search indexes, and data warehouses, up to date with a source database. We can regard these examples as specific cases of maintaining materialized views
Location: 12,393 Communication between actors is often ephemeral and one-to-one, whereas event logs are durable and multi-subscriber.
Location: 12,422 There are many reasons why processing may be delayed: queueing, network faults (see "Unreliable Networks"), a performance issue leading to contention in the message broker or processor, a restart of the stream consumer, or reprocessing of past events (see "Replaying old messages") while recovering from a fault or after fixing a bug in the code.
Location: 12,445 A tricky problem when defining windows in terms of event time is that you can never be sure when you have received all of the events for a particular window, or whether there are some events still to come.
Location: 12,472 To adjust for incorrect device clocks, one approach is to log three timestamps
Location: 12,489 A tumbling window has a fixed length, and every event belongs to exactly one window.
Location: 12,493 A hopping window also has a fixed length, but allows windows to overlap in order to provide some smoothing.
Location: 12,498 sliding window contains all the events that occur within some interval of each other.
Location: 12,532 To implement this type of join, a stream processor needs to maintain state: for example, all the events that occurred in the last hour, indexed by session ID. Whenever a search event or click event occurs, it is added to the appropriate index, and the stream processor also checks the other index to see if another event for the same session ID has already arrived. If there is a matching event, you emit an event saying which search result was clicked. If the search event expires without you seeing a matching click event, you emit an event saying which search results were not clicked.
Location: 12,544 This process is sometimes known as enriching the activity events with information from the database.
Location: 12,605 In data warehouses, this issue is known as a slowly changing dimension (SCD), and it is often addressed by using a unique identifier for a particular version of the joined record:
Location: 12,631 One solution is to break the stream into small blocks, and treat each block like a miniature batch process. This approach is called microbatching, and it is used in Spark Streaming
Location: 12,683 When writing a value to an external database, you can include the offset of the message that triggered the last write with the value. Thus, you can tell whether an update has already been applied, and avoid performing the same update again.
Location: 12,734 Parallelism is achieved through partitioning,
Location: 12,747 including searching for event patterns (complex event processing),
Location: 12,748 computing windowed aggregations (stream analytics),
Location: 12,748 and keeping derived data systems up to date (materialized views).
Location: 12,761 As with batch processing, we need to discard the partial output of any failed tasks. However, since a stream process is long-running and produces output continuously, we can't simply discard all output. Instead, a finer-grained recovery mechanism can be used, based on microbatching, checkpointing, transactions, or idempotent writes.
Location: 13,056 you need to be very clear about the inputs and outputs: where is data written first, and which representations are derived from which sources? How do you get data into all the right places, in the right formats?
Location: 13,070 Whether you use change data capture or an event sourcing log is less important than simply the principle of deciding on a total order.
Location: 13,106 The order of events in two different partitions is then ambiguous.
Location: 13,119 In formal terms, deciding on a total order of events is known as total order broadcast, which is equivalent to consensus
Location: 13,129 Some other cases are easy to handle: for example, when there are multiple updates of the same object, they can be totally ordered by routing all updates for a particular object ID to the same log partition.
Location: 13,144 Logical timestamps can provide total ordering without coordination
Location: 13,147 If you can log an event to record the state of the system that the user saw before making a decision, and give that event a unique identifier, then any later events can reference that event identifier in order to record the causal dependency [4]. We will return to this idea in "Reads are events too".
Location: 13,170 microbatches
Location: 13,172 In principle, one type of processing can be emulated on top of the other, although the performance characteristics vary: for example, microbatching may perform poorly on hopping or sliding windows
book-designing-data-intensive-applications#batch-processing-functional-flavor1Location: 13,180 Batch processing has a quite strong functional flavor (even if the code is not written in a functional programming language): it encourages deterministic, pure functions whose output depends only on the input and which have no side effects other than the explicit outputs, treating inputs as immutable and outputs as append-only. book-designing-data-intensive-applications#batch-processing-functional-flavor1
Location: 13,183 The principle of deterministic functions with well-defined inputs and outputs is not only good for fault tolerance (see "Idempotence"), but also simplifies reasoning about the dataflows in an organization
Location: 13,186 No matter whether the derived data is a search index, a statistical model, or a cache, it is helpful to think in terms of data pipelines that derive one thing from another, pushing state changes in one system through functional application code and applying the effects to derived systems.
book-designing-data-intensive-applications#distributed-transactions-issues1 2Location: 13,190 In principle, derived data systems could be maintained synchronously, just like a relational database updates secondary indexes synchronously within the same transaction as writes to the table being indexed. However, asynchrony is what makes systems based on event logs robust: it allows a fault in one part of the system to be contained locally, whereas distributed transactions abort if any one participant fails, so they tend to amplify failures by spreading them to the rest of the system (see "Limitations of distributed transactions"). book-designing-data-intensive-applications#distributed-transactions-issues1 2
Location: 13,213 On the other hand, with reprocessing it is possible to restructure a dataset into a completely different model in order to better serve new requirements.
Location: 13,227 Derived views allow gradual evolution. If you want to restructure a dataset, you do not need to perform the migration as a sudden switch. Instead, you can maintain the old schema and the new schema side by side as two independently derived views onto the same underlying data. You can then start shifting a small number of users to the new view in order to test its performance and find any bugs, while most users continue to be routed to the old view. Gradually, you can increase the proportion of users accessing the new view, and eventually you can drop the old view
Location: 13,251 The lambda architecture was an influential idea that shaped the design of data systems for the better, particularly by popularizing the principle of deriving views onto streams of immutable events and reprocessing events when needed.
book-designing-data-intensive-applications#reasoning-about-time1Location: 13,264 This raises the problems discussed in "Reasoning About Time", such as handling stragglers and handling windows that cross boundaries between batches. book-designing-data-intensive-applications#reasoning-about-time1
Location: 13,271 More recent work has enabled the benefits of the lambda architecture to be enjoyed without its downsides, by allowing both batch computations (reprocessing historical data) and stream computations (processing events as they arrive) to be implemented in the same system [15].
Location: 13,290 For example, Apache Beam provides an API for expressing such computations, which can then be run using Apache Flink or Google Cloud Dataflow.
Location: 13,316 a short declarative query can draw on a lot of powerful infrastructure (query optimization, indexes, join methods, concurrency control, replication, etc.) without the author of the query needing to understand the implementation details.
Location: 13,347 This process is remarkably similar to setting up a new follower replica (see "Setting Up New Followers"), and also very similar to bootstrapping change data capture in a streaming system (see "Initial snapshot").
vv organizational database vv Location: 13,353 In this light, I think that the dataflow across an entire organization starts looking like one huge database [7]. Whenever a batch, stream, or ETL process transports data from one place and form to another place and form, it is acting like the database subsystem that keeps indexes or materialized views up to date. vv
Location: 13,356 Viewed like this, batch and stream processors are like elaborate implementations of triggers, stored procedures, and materialized view maintenance routines.
Location: 13,374 A federated query interface follows the relational tradition of a single integrated system with a high-level query language and elegant semantics, but a complicated implementation.
Location: 13,405 By contrast, the synchronous interaction of distributed transactions tends to escalate local faults into large-scale failures (see "Limitations of distributed transactions").
Location: 13,415 Databases are still required for maintaining state in stream processors, and in order to serve queries for the output of batch and stream processors
Location: 13,434 we don't yet have the unbundled-database equivalent of the Unix shell (i.e., a high-level language for composing storage and processing systems in a simple and declarative way).
Location: 13,440 Similarly, it would be great to be able to precompute and update caches more easily. Recall that a materialized view is essentially a precomputed cache, so you could imagine creating a cache by declaratively specifying materialized views for complex queries, including recursive queries on graphs (see "Graph-Like Data Models") and application logic. There is interesting early-stage research in this area, such as differential dataflow
book-designing-data-intensive-applications#unbundling-aka-database-inside-out1 2Location: 13,457 The approach of unbundling databases by composing specialized storage and processing systems with application code is also becoming known as the "database inside-out" approach [26], after the title of a conference talk I gave in 2014 [27]. book-designing-data-intensive-applications#unbundling-aka-database-inside-out1 2
book-designing-data-intensive-applications#unbundling-proposed-by-jay-kreps1Location: 13,475 The term unbundling in this context was proposed by Jay Kreps book-designing-data-intensive-applications#unbundling-proposed-by-jay-kreps1
book-designing-data-intensive-applications#dataflow-recalculation1Location: 13,476 Even spreadsheets have dataflow programming capabilities that are miles ahead of most mainstream programming languages [33]. In a spreadsheet, you can put a formula in one cell (for example, the sum of cells in another column), and whenever any input to the formula changes, the result of the formula is automatically recalculated. This is exactly what we want at a data system level: when a record in a database changes, we want any index for that record to be automatically updated, and any cached views or aggregations that depend on the record to be automatically refreshed. You should not have to worry about the technical details of how this refresh happens, but be able to simply trust that it works correctly. book-designing-data-intensive-applications#dataflow-recalculation1
book-designing-data-intensive-applications#unbundled-databases-and-dataflowLocation: 13,488 the ideas of unbundled databases and dataflow. book-designing-data-intensive-applications#unbundled-databases-and-dataflow
book-designing-data-intensive-applications#derivation-secondary-index1Location: 13,500 The derivation function for a secondary index is so commonly required that it is built into many databases as a core feature, and you can invoke it by merely saying CREATE INDEX book-designing-data-intensive-applications#derivation-secondary-index1
Location: 13,503 In machine learning, feature engineering is notoriously application-specific, and often has to incorporate detailed knowledge about the user interaction and deployment of an application
Location: 13,517 On the other hand, deployment and cluster management tools such as Mesos, YARN, Docker, Kubernetes, and others are designed specifically for the purpose of running application code. By focusing on doing one thing well, they are able to do it much better than a database that provides execution of user-defined functions as one of its many features.
Location: 13,519 think it makes sense to have some parts of a system that specialize in durable data storage, and other parts that specialize in running application code. The two can interact while still remaining independent.
Location: 13,528 In this typical web application model, the database acts as a kind of mutable shared variable that can be accessed synchronously over the network.
Location: 13,535 Databases have inherited this passive approach to mutable data: if you want to find out whether the content of the database has changed, often your only option is to poll
Location: 13,544 treating the log of changes to a database as a stream of events that we can subscribe to.
book-designing-data-intensive-applications#unbundling-the-database1Location: 13,550 Unbundling the database means taking this idea and applying it to the creation of derived datasets outside of the primary database: caches, full-text search indexes, machine learning, or analytics systems. We can use stream processing and messaging systems for this purpose. book-designing-data-intensive-applications#unbundling-the-database1
Location: 13,552 maintaining derived data is not the same as asynchronous job execution, for which messaging systems are traditionally designed (see "Logs compared to traditional messaging"): When maintaining derived data, the order of state changes is often important (if several views are derived from an event log, they need to process the events in the same order so that they remain consistent with each other). As discussed in "Acknowledgments and redelivery", many message brokers do not have this property when redelivering unacknowledged messages. Dual writes are also ruled out (see "Keeping Systems in Sync"). Fault tolerance is key for derived data: losing just a single message causes the derived dataset to go permanently out of sync with its data source. Both message delivery and derived state updates must be reliable. For example, many actor systems by default maintain actor state and messages in memory, so they are lost if the machine running the actor crashes.
Location: 13,565 Like Unix tools chained by pipes, stream operators can be composed to build large systems around dataflow.
book-designing-data-intensive-applications#different-teams-work-on-different-things1Location: 13,572 The advantage of such a service-oriented architecture over a single monolithic application is primarily organizational scalability through loose coupling: different teams can work on different services, which reduces coordination effort between teams (as long as the services can be deployed and updated independently). book-designing-data-intensive-applications#different-teams-work-on-different-things1
Location: 13,578 For example, say a customer is purchasing an item that is priced in one currency but paid for in another currency. In order to perform the currency conversion, you need to know the current exchange rate. This operation could be implemented in two ways [40, 41]: In the microservices approach, the code that processes the purchase would probably query an exchange-rate service or database in order to obtain the current rate for a particular currency. In the dataflow approach, the code that processes purchases would subscribe to a stream of exchange rate updates ahead of time, and record the current rate in a local database whenever it changes. When it comes to processing the purchase, it only needs to query the local database.
Location: 13,593 Subscribing to a stream of changes, rather than querying the current state when needed, brings us closer to a spreadsheet-like model of computation: when some piece of data changes, any derived data that depends on it can swiftly be updated.
book-designing-data-intensive-applications#shift-read-and-write-path-boundary1 2 3 4Location: 13,634 Viewed like this, the role of caches, indexes, and materialized views is simple: they shift the boundary between the read path and the write path. book-designing-data-intensive-applications#shift-read-and-write-path-boundary1 2 3 4
book-designing-data-intensive-applications#extending-the-write-path1 2 3Location: 13,672 In terms of our model of write path and read path, actively pushing state changes all the way to client devices means extending the write path all the way to the end user. book-designing-data-intensive-applications#extending-the-write-path1 2 3
Orange highlight | Location: 13,701 If you are designing data systems, I hope that you will keep in mind the option of subscribing to changes, not just querying the current state.
Orange highlight | Location: 13,712 It is also possible to represent read requests as streams of events, and send both the read events and the write events through a stream processor; the processor responds to read events by emitting the result of the read to an output stream
Orange highlight | Location: 13,723 For example, in an online shop, it is likely that the predicted shipping date and the inventory status shown to a customer affect whether they choose to buy an item [4]. To analyze this connection, you need to record the result of the user's query of the shipping and inventory status.
Orange highlight | Location: 13,727 Optimizing such systems to reduce the overhead is still an open research problem [
Orange highlight | Location: 13,762 However, those foundations are weaker than they seem: witness for example the confusion of weak isolation levels
book-designing-data-intensive-applications#serializability-and-atomic-commit1Location: 13,780 serializability and atomic commit are established approaches, but they come at a cost: they typically only work in a single datacenter (ruling out geographically distributed architectures), and they limit the scale and fault-tolerance properties you can achieve. book-designing-data-intensive-applications#serializability-and-atomic-commit1
book-designing-data-intensive-applications#immutable-and-append-only-data1 2Location: 13,789 argue in favor of immutable and append-only data, because it is easier to recover from such mistakes if you remove the ability of faulty code to destroy good data. book-designing-data-intensive-applications#immutable-and-append-only-data1 2
Orange highlight | Location: 13,798 In this context, exactly-once means arranging the computation such that the final effect is the same as if no faults had occurred, even if the operation actually was retried due to some fault.
Orange highlight | Location: 13,806 ensure fencing when failing over from one node to another (see "The leader and the lock").
Orange highlight | Location: 13,811 any duplicates are removed by the TCP stack before it hands the data to an application.
Orange highlight | Location: 13,839 Operation identifiers
Orange highlight | Location: 13,843 For example, you could generate a unique identifier for an operation (such as a UUID) and include it as a hidden form field in the client application, or calculate a hash of all the relevant form fields to derive the operation ID
book-designing-data-intensive-applications#nonserializable-isolation1Location: 13,862 (whereas an application-level check-then-insert may fail under nonserializable isolation, as discussed in "Write Skew and Phantoms"). book-designing-data-intensive-applications#nonserializable-isolation1
Orange highlight | Location: 13,867 processed exactly once, which can again be enforced using the request ID.
Orange highlight | Location: 13,870 This scenario of suppressing duplicate transactions is just one example of a more general principle called the end-to-end argument
Orange highlight | Location: 13,879 Solving the problem requires an end-to-end solution: a transaction identifier that is passed all the way from the end-user client to the database.
Orange highlight | Location: 13,886 If you want to catch all possible sources of data corruption, you also need end-to-end checksums.
Orange highlight | Location: 13,893 We just need to remember that the low-level reliability features are not by themselves sufficient to ensure end-to-end correctness.
Orange highlight | Location: 13,929 In Chapter 9 we saw that in a distributed setting, enforcing a uniqueness constraint requires consensus:
Orange highlight | Location: 13,940 If you want to be able to immediately reject any writes that would violate the constraint, synchronous coordination is unavoidable [56].
book-designing-data-intensive-applications#same-order-total-order-broadcastLocation: 13,946 The log ensures that all consumers see messages in the same order — a guarantee that is formally known as total order broadcast book-designing-data-intensive-applications#same-order-total-order-broadcast
Orange highlight | Location: 13,951 if the log is partitioned based on the value that needs to be unique, a stream processor can unambiguously and deterministically decide which one of several conflicting operations came first.
Orange highlight | Location: 13,958 The client that requested the username watches the output stream and waits for a success or rejection message corresponding to its request.
book-designing-data-intensive-applications#writes-that-would-conflict-routed-to-same-partition1 2Location: 13,963 writes that may conflict are routed to the same partition and processed sequentially book-designing-data-intensive-applications#writes-that-would-conflict-routed-to-same-partition1 2
Location: 13,986 To avoid the need for a distributed transaction, we first durably log the request as a single message, and then derive the credit and debit instructions from that first message.
Location: 13,993 end-to-end request ID.
Location: 14,007 However, it is possible for a client to wait for a message to appear on an output stream.
Location: 14,012 More generally, I think the term consistency conflates two different requirements that are worth considering separately:
book-designing-data-intensive-applications#linearizability-strongLocation: 14,019 linearizability, which is a strong way of achieving timeliness. book-designing-data-intensive-applications#linearizability-strong
book-designing-data-intensive-applications#read-after-write-consistencyLocation: 14,019 Weaker timeliness properties like read-after-write consistency book-designing-data-intensive-applications#read-after-write-consistency
Location: 14,030 violations of timeliness are "eventual consistency," whereas violations of integrity are "perpetual inconsistency."
Location: 14,045 But integrity is in fact central to streaming systems.
Location: 14,053 Representing the content of the write operation as a single message,
Location: 14,055 Deriving all other state updates from that single message using deterministic derivation functions, similarly to stored procedures
book-designing-data-intensive-applications#client-generated-request-id1Location: 14,057 Passing a client-generated request ID through all these levels of processing, enabling end-to-end duplicate suppression and idempotence book-designing-data-intensive-applications#client-generated-request-id1
Location: 14,058 Making messages immutable and allowing derived data to be reprocessed from time to time, which makes it easier to recover from bugs (see
Location: 14,070 ask them to choose a different one. This kind of change to correct a mistake is called a compensating transaction
Location: 14,079 compensation processes
Location: 14,091 It may well be a reasonable choice to go ahead with a write optimistically, and to check the constraint after the fact. You can still ensure that the validation occurs before doing things that would be expensive to recover from, but that doesn't imply you must do the validation before you even write the data.
Location: 14,106 Such coordination-avoiding data systems have a lot of appeal: they can achieve better performance and fault tolerance than systems that need to perform synchronous coordination
Location: 14,125 All of our discussion of correctness, integrity, and fault-tolerance has been under the assumption that certain things might go wrong, but other things won't. We call these assumptions our system model (see "Mapping system models to the real world"): for example, we should assume that processes can crash, machines can suddenly lose power, and the network can arbitrarily delay or drop messages.
Location: 14,161 Many applications don't even correctly use the features that databases offer for preserving integrity, such as foreign key or uniqueness constraints
book-designing-data-intensive-applications#mitigate-risks-of-silent-corruption1Location: 14,180 large-scale storage systems such as HDFS and Amazon S3 do not fully trust disks: they run background processes that continually read back files, compare them to other replicas, and move files from one disk to another, in order to mitigate the risk of silent corruption book-designing-data-intensive-applications#mitigate-risks-of-silent-corruption1
book-designing-data-intensive-applications#continually-check-own-integrity1Location: 14,193 I hope that in the future we will see more self-validating or self-auditing systems that continually check their own integrity, rather than relying on blind trust [68] book-designing-data-intensive-applications#continually-check-own-integrity1
Location: 14,202 If a transaction mutates several objects in a database, it is difficult to tell after the fact what that transaction means.
Location: 14,211 The derivation can be made deterministic and repeatable, so that running the same log of events through the same version of the derivation code will result in the same state updates.
Location: 14,215 or even run a redundant derivation in parallel.
Location: 14,218 If something unexpected occurred, it is valuable to have the diagnostic capability to reproduce the exact circumstances that led to the unexpected event — a kind of time-travel debugging capability.
Location: 14,234 If you are not afraid of making changes, you can much better evolve an application to meet changing requirements.
book-designing-data-intensive-applications#merkle-trees1 2Location: 14,258 Cryptographic auditing and integrity checking often relies on Merkle trees [74], which are trees of hashes that can be used to efficiently prove that a record appears in some dataset (and a few other things) book-designing-data-intensive-applications#merkle-trees1 2
Location: 14,350 A blind belief in the supremacy of data for making decisions is not only delusional, it is positively dangerous.
book-designing-data-intensive-applications#update-derived-state-observing-changes1 2 3Location: 14,576 Derived state can be updated by observing changes in the underlying data. book-designing-data-intensive-applications#update-derived-state-observing-changes1 2 3
blog-post-visualizing-lucene-segment-merges
blog-post-impact-of-full-page-rewrites
blog-post-the-death-of-map-reduce-at-google
talk-the-sushi-principle-raw-is-better
blog-post-local-state-fundamental-primitive
blog-post-dogfooding-druid-samza-kafka
blog-post-turning-database-inside-out-with-samza
blog-post-questioning-the-lambda-architecture
blog-post-provenance-and-causality-in-dist-systems