TDT4225: Store, distribuerte datamengder
One day we'll be done with this, and we'll never have to look back unless some vague memory pops into our mind by happenstance which is actually apt to happen. So, y'know, good luck or whatever.
Curriculum fall 2015
The four lab exercices are relevant for the exam. In addition, the slides that have been used in lectures this semester and the following chapters:
|Thomas W. Doepnner, Operating Systems in Depth, Wiley, 2011||6||File Systems|
|Christian Forfang: Evaluation of High Performance Key-Value Stores||2.1-2.6, 3.1-3.5, 4.1-4.4||Key-value stores|
|George Coulouris et al: Distributed Systems - Concepts and Design, 5th ed.||10||Peer-to-peer Systems|
|14||Time and Global States|
|15||Coordination and Agreement|
|Kjell Bratbergsengen: Storing and Management of Large Data Volumes, Version: 27. mar 2015||7||Use of filters|
|12||Advanced Relational Algebra|
Chapter 9 and 13 from Kjell Bratbergsengen was removed from curriculum fall 2015
There are also four papers that are part of the curriculum.
NOTE: these tips were written based on old exams written by Kjell Bratbergsengen. They may not apply if the exam is made by someone else, or if he's not involved with the course the semester you're taking the exam.
- If information is not readily available in a problem (such as the specifications for a hard drive), make wild assumptions about it and reason based on those.
- Answer first whatever seems most obvious and simple -- even if it seems so blatantly obvious that you'd not expect to be asked about it in a fourth year course. Spend time trying to be clever once you've done that for all the questions.
- Values given in "[KMGT]B" might be either [KMGT]iB (binary, IEC definition) or [KMGT]B (decimal, metric system). It's usually not specified and you should state in your answer whatever you assume it is.
S5FS considers the read time of all sectors on the disk to be the same. It does not take the time the disk head needs to move into account.
It does not try to minimize seek time (the time the disk head needs to position itself), or rotational latency (time before the desired sector is under the disk head). The only thing S5FS optimizes for, is a minimum number of disk operations.
The first block on the disk is the boot block. It contains a program for loading the operating system into memory.
The second block is the superblock. It holds references to free lists and describes the layout of the file system.
Then comes the i-list. This is an array of inodes, which represent files. Last is the data region, which contain the disk blocks that make up file contents.
The inode contains direct pointers to the first 10 disk blocks of a file. If the file is larger than 10 disk blocks, the inode also points to what is called an indirect block. This is a disk block which contains pointers to up to 256 new data blocks. The inode may also point to a double indirect block, which contains pointers to 256 new indirect blocks. Finally, the inode may contain a pointer to a triple indirect block. In total, this allows for files up to around 16 GB. The actual maximum allowed file size however, is 2 GB, because the file size must be representable as a 32-bit word.
When opening a file, its inode is loaded into primary memory. This means that accessing the first 10 data blocks of the file will be very fast. When accessing blocks beyond this, additional indirect block lists will first need to be accessed.
Problems with S5FS
- Small block size
- Poor file-allocation strategy
- Separation of inodes and data blocks
- Vulnerable to data loss caused by crashes
The poor performance comes from lots of slow seeks. When loading a file, it must first fetch the inode. Then, it musk seek to another part of the disk to access the data blocks. For large files, when indirect blocks are needed, this takes even longer. Blocks are allocated when needed, and not placed for sequential access.
To quote Doeppner, the "performance is so terrible that one really wonders why this file system was used at all.".
(Operating Systems In Depth, Thomas Doeppner)
Fast File System (FFS) attempted to solve many of the problems in S5FS.
One of the problems with S5FS was the small block size. The system used too much time seeking for blocks, and when a block was located, there was only a small transfer of data. FFS increased the block size, but this did not come without its own set of challenges. As the block size increases, so does the wasted space of half-used blocks. FFS solves this by a complex approach where each block is divided into a set of fragments. A file might occupy only parts of a block. The file system must keep track of free blocks, and partially free blocks, so that the space can be used as efficiently as possible.
To reduce seek time, FFS organizes the disk into cylinder groups. Since the disk heads for all the disks move together, data that will be accessed at the same time should be stored in the same regions accross the disks of the harddrive. In addition to identifying which data should be stored together, it is also important to identify data that can safely be stored apart.
It seems intuitive that data that belongs together should be stored sequentially in the same track of a disk. In reality, this is not the case. When reading a block from disk, there is a small delay after one block is read before we are ready to read the next. Thus, we would need to wait a whole rotation before we can read the next block. FFS solves this by spacing the blocks out with one block spaces. This way, we only have to wait for the time it takes to rotate to the next block. This technique is called block interleaving.
A similar trick can be used on cylinders, calculating how many blocks will rotate on average before the read head can change position. We can then offset each track by this value, saving latency when changing to a nearby track.
Inodes are called file records. The inode file is called the master file table.
Journaling is essentialy write ahead logging. We use the term journaling to avoid confusion with log-structured file systems. The main principle is to write any changes that are to be commited to a log before the changes are written to disk. If a crash now happenes, we can recover by reading the log at restart. Ext3 uses journaling.
This is a different approach than journaling in the attempt to ensure that consistency is preserved in the file system. Shadow-paging works by copying the parts of the file system to be changed (usually a node, and its parents), and making the changes to the copy. When all nodes have been updated, the root of the file system is updated in a single disk write, and the new data is now present in the system.
One advantage of this is that the data on disk is always consistent, and all read requests can be fulfilled at any time, albeit the system might read outdated information.
The soft updates technique eliminates circular dependencies while mantaining the most recent updates in the cache. It does this by ensuring that when a cache page is to be written out, any updates that have been applied to it but must not be written out yet due to ordering constraints are undone: the cache page is restored to its state before the update was applied. While the page is in its undone stage, it's locked so as to prevent access until the updates are redone. The update is redone on completion.
When in need of more than one disk to keep a file system on, there are some challenges (and opportunities) that arise. The first challenge is that the file system usually assumes that data can be addressed within the address space of a single disk. Using a logical volume manager (LVM) we provide an interface that makes several disks appear as one. A LVM can provide several services:
Data written to disk is written in duplicate, one copy per disk. The write goes to the LVM, and is then sent to both (all) disks. Reads can go to any disk, and is usually done by the least busy disk.
Instead of writing a file to a single disk, we split the file up into parts, and send the parts to different disks. This distributes a file over several disks in a "stripe"-pattern.
Changing the size of each striping unit will alter the properties of the system. A small unit will result in the need for more seeks, but we can spread the file over more disks. On the other hand, a larger unit will make each read more efficient, but might reduce paralellism. Pure striping is vulnerable to disk crashes. Redundancy can be introduced using RAID.
Redundant Array of Inexpensive Disks (RAID) is a combination of several disks to achive a combination of redundancy and performance improvement. There are different levels of RAID, that provide a different combination of these attributes.
- RAID 0
- Pure striping as described above. There is no redundancy! (More like AID, amiright?)
- RAID 1
- Pure disk mirroring as described above. There is no performance improvement.
- RAID 2
- Striping on bit level, with Error Correcting Code (ECC). Separate disks are tasked with storing the ECC-bits.
- RAID 3
- Striping on bit level, using parity-bits. The advantage of this is that we only need one redundancy disk. Although we can't directly restore using parity bits, we will know which sector is suspected to contain an error, and can use this to restore the data.
- RAID 4
- Same as RAID 3, but block interleaving is used, instead of bit-interleaving.
- RAID 5
- Uses block interleaving, but moves the check blocks to the data disks, and has them rotating. This way, the check-disk is not a bottleneck.
- RAID 6
- Same as RAID 5, but introduces additional parity-blocks. This allows it to handle two errors.
- Cascaded RAID
- Combines two RAID-architectures into one. E.g. RAID 1+0 that stripes across mirrored disks, and RAID 0+1, that has two mirrored striped disks.
Fast key-value storage engine written at Google. Provides an ordered mapping from string keys to string values.
- Put(key, value)
The data in LevelDB is stored internally in memtables and sstables.
Stored entirely in memory.
Max. a few MB size.
(Sorted String Table)
Around 2 MB in size.
When data is to be stored in LevelDB using the
put command, it is placed in the Memtable, given that there is space. When the Memtable is full, a compaction is triggered, and the contents of the Memtable is written to an Sstable on disk. The Sstables are organised in a tree structure, and older entries will be pushed further down as time passes.
get is issued, the DB is searched to find the matching content. First, the Memtable is checked to see if the information sought can be found in memory. If not, we move down the levels of Sstables. When we get a hit, we can safely return the information, knowing that any hits further down the levels will be older, and therefore outdated, information.
LMDB is a read optimised database.
LMBD uses copy-on-write B-trees to store data. This means that when a write command is issued, the leaf node to be updated is copied, and the copy is updated. During this time, the old data is available to be read by other processes. The tree above the leaf node is also copied, and their pointers updated. When this process is complete, the new structure is inserted into the tree, ensuring that a crash will not leave the tree corrupted.
The goal of peer-to-peer systems is to enable the sharing of data and resources on a very large scale by eliminating any requirement for separately managed servers and their associated infrastructure
Structured and unstructured networks
In structured peer-to-peer networks objects are placed at nodes according to a given mapping function. Lookups is performed by investigating a routing table and forward the request along a route determined by calculating the distance. In unstructured peer-to-peer networks the objects form nodes into an ad-hoc network and lookups propagate through the nodes neighbors. There is no specific routing, every node is visited.
|Structured||Guaranteed to locate objects (assuming they exist) and can offer time and complexity bounds on this operation; relatively low message overhead||Need to maintain often complex overlay structures, which can be difficult and costly to achieve, especially in highly dynamic environments|
|Unstructured||Self-organizing and naturally resilient to node failure||Probabilistic and hence cannot offer absolute guarantees on locating objects; prone to excessive messaging overhead which can affect scalability|
Example of unstructured implementations are early Gnutella and Freenet and examples of structured implementations are Chord, CAN, Pastry, Tapestry and BitTorrent.
A hybrid approach implements some measures to improve performance in an unstructured network. For example, Gnutella uses these three techniques:
- The heart of the network, heavily connected to each other. Reduces hops, leaf nodes contact an ultrapeer and are routed faster
- Query Routing Protocol (QRP)
- Reduces the number of queries issued by a node. Exchanges information about files contained on nodes to only forward queries down paths where the file probably is. A node produces such a table by hashing the words in the filename and sends it to the ultrapeer. Ultrapeers produces its own table based on the union of all recieved tables. An ultrapeer will forward a query to a node if a match is found, and wait for a response before trying another path
- Avoiding reverse traversal
- A query contains the network address of the ultrapeer initiating the query so that when a file is found it can be sendt directly
Examples of hybrid implementations are Kazaa, Skype and later Gnutella.
The job of automatically place, and locate, distributed objects. Needs global scalability, load balancing, optimization for local interactions (neighbors), accomodate to hosts turning on/off, security of data, anonymity.
A distributed algorithm responsible for locating nodes and objects. Runs on the application layer. Finds the closest resource (replica of object/data) to a host. Needs to be able to insert/delete objects and add/remove nodes. There are two main implementations: Distributed Hash Tables (DHT), with methodes like put(), get() and remove(), and Distributed Object Location and Routing (DOLR), with methodes like publish(), unpublish() and sendToObj().
Pastry is such a routing overlay. It uses 128-bit Globaly Unique Identifiers (GUID's) to identify both nodes and objects. The GUID's are randomly distributed hashes, related to name or public key. Pastry routes a message to a GUID in O(log N) steps, with N nodes. It uses a 2-step routing algorithm, where step 1 alone routes correctly, but step 2 makes it efficient.
First, each node stores a leaf-set containing the GUID and IP addresses of its l (
Secondly, each node maintains a tree-structured prefix routing table with GUIDs and IP addresses for nodes throughout the entire network, with more entries for numerical close nodes. The table has as many rows as there are hexadecimal digits in the GUID (128-bit GUID: 128/4 = 32, since a hexadecimal character is 4 bits). The table has 16 columns, one for each possible value of a hexadecimal digit (0 -> F). Each cell in the table is a pointer to one (of potentionally many) node(s). The cells in the buttom row contains the full GUID to one node. For each higher row, one postfix character is "removed" / made "don't care" and the cell maps to the closest node in the set matching the GUID with the "don't care" characters (closest in the matter to numerical GUID (or network hops or roundtrip latencys)). The top row containes only the first character of the GUID and allows for long distance direct hops.
- On recieving the message, check if the target nodes GUID is within the leaf-set
- If it is, route it to the target (unless the target is yourself)
- If not in the leaf-set, look in the routing table
- Find the length of the longest common prefix the target GUID share with the current nodes GUID and call it p
- Lookup in the table at row p and the column for the p+1 shared prefix (latest shared prefix)
- If this cell is not null, forward the message to this node
- If this cell is null, the node has recently failed and the message is forwarded to another cell in the same row ?
New nodes use a joining protocol in order to acquire their routing table and leaf-set contents and notify other nodes of changes they must make to their tables. This is done by firstly, generating a new GUID, than contacting a member of the network and send a join-message to that GUID (itself). This is routed normally and ends up at the node with the closest GUID. This node sends the new node its leaf-set. All nodes that forward the message includes parts of its routing tables to help the new node build its own. Finally, the new node sends its leaf-set and routing table to all nodes in the set or table to make them aware of its location.
Host failure or departure
A node is considered failed when its immediate neighbor can no longer communicate with it. The leaf-set of the neighbors of the failed noed needs to be repaired. The node that discoveres the failure looks for a node close to the failed node and requests a copy of its leaf-set. This will partly overlap and a contain the appropriate value to replace the failed node. Other neighbors are informed.
When choosing a node for a cell in the routing table when there are multiple candidates a locality metric is used (IP hops or measured latency) It is not globally optimal, but on average it is only 30–50% longer than the optimum.
All nodes sends heartbeat signals to some (one) nodes in their leaf-set.
Time and global states
Time (clocks, events and process states)
The problem of determining the order of events occuring in different processes in a distributed system
Uses timestamps to deduce the order of events.
- The difference between readings of two clocks at a given time
- The offset of a single clock after a period of time
There are two approaches for synchronizing physical clocks:internal and external.
The clocks of a distributed system is synchronized locally against each other.
The Berkeley algorithm uses a master computer polls all other slave computers for their time. It then calulates the average and sends the amount of adjustment back to the slaves. If the master failes, another computer is selected as the master.
An authoritative, external source of time syncronizes all participants of a distributed system.
Cristian’s method for synchronizing clocks uses a central time server which recieves signals from UTC. Participants can request the time from this server and it responds with its own time. The requesting computer updates its clock accordingly and adds half the transmission time because of delay. It becomes a single-point-of-failure, and there are no checks for faulty clocks.
The Network Time Protocol
A protocol for synchronizing clocks on comuters connected to the internet. It uses UTC. Works as a network of servers, connected in a synchronization subnet with levels called strata. Primary servers are connected directly to a time source (root, stratum 1). Secondary servers are synchronized with primary servers (stratum 2). The user's workstation's are the leafs of the tree.
NTP servers synchronize in one of three modes:
- Multicast mode
- One or more servers periodically broadcast their time to other servers which sets their time accordingly, with a small delay. Intended for high-speed LAN. Relativly low accuracy.
- Procedure-call mode
- A server recieves requests and returns its time. Intended for neighboring LAN or where multicast is not supported in hardware. Higher accuracy.
- Symmetric mode
- Two servers operating in symmertric mode exchanges messages with timing information. Intended for the higher levels (lower strata). Highest accuracy.
All modes deliver messages by UDP.
Orders events based on their occurences and counters.
- The condition that a clock only ever advances
- Happened-before-relation (
- Also called casual ordering, is defined as the order the events is observed within a single process, and whenever a message is exchange between processes, the message-send happens before the message-receive
Lamport logical clocks
Captures the happened-before-relation nummerically.
A processes clock is incremented before an event is issued and it piggybacks this timestamp in every message sent. On receiving a message, the process finds the maximum value of its own clock and the timestamp sent with the message, before incrementing and timestamping the receiving of the message.
We cannot conclude that an event happened-before another just from their timestamps, since the clock values are independent for each process and only synced upon message exchange. All we know is that send happens before receive, and the ordering within a single process is given by the clock.
Fixes the shortcommings of Lamports logical clock, but needs storage and message payloads proportional to the number of processes.
Each process keeps a vector with timestamps for all processes, with all values initially set to 0 (
The problem of finding out whether a particular property is true of a distributed system as it executes.
Problems with distributed garbage collection, distributed deadlock detection, distributed termination detection and distributed debugging.
- A snapshot of the system for a point in time
- Consistent cut
- A cut such that no events that occur breaks the happened-before-relation (or in other words, no "effect" without a "cause")
- Consistent global state
- A consistent cut for all prosesses of the system
- The introduction of one event at a time, from a processes initialization to its termination
- Consistent run (linearization)
- A run such that for all events introduced, the system is in a global consistent state
- Global state predicate
- A function that maps from from the set of global states of processes in the system to either True or False. Predicates can be: "is an object garbage?", "have the system terminated?", etc.
- Stable predicate
- Once the system enters a state in which the predicate is True, it remains True in all future states reachable from that state
- The assertion that the predicate evaluates to the desired value (True or False) for all states reachable from the start-state. F.eks. "Is the system deadlocked?" is safe if it evaluates to False for all reachable global consistent states
- The guarantee that something 'good' will eventually happen. For example, the guarantee that a process will eventually terminate.
We wish, in retrospect, to determine whether a condition was obeyed during execution. A centeral monitor gathers states from the system as a trace (not snapshot(?)). Each process sends its state to the monitor initially and when it changes (can be based on only some variables to reduce traffic). Processes only send states if it may change the predicate.
- Possibly predicate
- At least one consistent run passes a global consistent state which evaluates
- Definitely predicate
- All consistent runs passes a global consistent state which evaluates
To observe consistent global states a lattice is constructed. It starts with initialization of all processes in level 0 and moves one level down/up for each event that occurs. For two process, the starting-state would be
Evaluating possibly predicate
Evaluating definitely predicate
Coordination and agreement
For a set of processes to coordinate their actions or to agree on one or more values.
A service that processes queries about whether a particular process has failed or not is called a failure detector. There exists reliable and unreliable failure detectors. The former can verify if a process has failed, but only give hints about if processes is non-failing. The latter can only provide hints of faild and not-failed. A reliable failure detector requires that the system is synchronous, which very few systems are. In a synchronous system there exists a maximum time for message exchange and instruction execution and you can use timeouts to detect faults, but in an asynchronous we can't make assumptions about time. Most systems are asynchronous. A chrashed process does not repair it self in any manner. A process is correct if it does not fail for its entire lifespan, from creating to termination.
Mutual exclusion is required when processes try to access a critical section simultaneously. Below are some formaly defined requirements for mutual exclusion:
- At most one process may execute in the critical section at a time
- Requests to enter and exit the critical section eventually succeed
- If one request to enter the critical section happened-before another, then entry to the critical section is granted in that order
A process will need to lock() -> access() -> release() to use critical sections.
Below are methodes that addresses the problem with mutual exclusion. They make the following assumptions:
- A process spend a fair amount of time inside the critical section (does not starv other processes)
- The system is asynchronous
- Processes do not fail
- Message delivery is reliable (sent messages will eventually be delivered intact)
None of the methodes can withstand failures as is, but can be modified to tolerate som faults.
Central server algorithm
This is the simplest approach. Processes sends requests to a server which manages access. To gain access a process must recieve a token in response from the server. The process with the token has access to the critical section and must return the token to the server when it exits the critical section. The server manages a FIFO-queue with requests and grant access to the next process. The algorithm fulfills the requirements safety and liveness, but not happened-before ordering. The central-server becomes a single-point-of-failure.
The simplest approach that does not require a process or server to manage access. Processes need to hold a token to access a critical section, but when the token is not needed it is forwarded in one direction to the next process. If the process needs access, it waits for the token. If it does not need access when it recieves the token, it forwards it. Requirements safety and liveness are met, but not happened-before ordering (since messages can be transmitted independantly of the rotation of the token). The algorithm consumes network bandwith.
A process that require entry to a critical section multicast a request-message, and can enter it only when all other processes have replied. Each process have unique identifiers and Lamport clocks. A request message is the ID and timestamp of that process. A process is either in the state RELEASED, WANTED or HELD for the critical section. When two processes request access at the same time, the one with the lowest timestamp will gain access first. If their timestamps are equal then the process with the lowest ID number will gain access first. The algorithm satifies all requirements (safety, liveness and happened-before ordering). It is a more expensive algorithm in terms of bandwith usage (
In the previous algorithm a process must recieve responses from all other processes before entering a critical section. In this algorithm a process only needs respons from a subset of the other processes, as long as the subset of any two processes overlap. A candidate process must collect sufficient votes to enter a critical section. The algorithm meets the requirement of safety, but it is deadlock-prone. Adaptions are made such that it is deadlock-free and also meets the requirement of happened-before ordering. The bandwith utilization is 2
Choosing a unique process to play a particular role.
A process is either participant or non-participant in an election. Each process has a variable called elected, which is either None or the ID of a process.
The requirements for the election algorithms is:
- A participant process has elected = None or elected = the non-crashed process with the largest identifier
- All processes participate and eventually either set elected to other than None or crash
Assumes that no failures occur and that the system is asynchronous. The goal is to select a process as coordinator.
Initially, every process is marked as a non-participant and any process can begin an election.
When starting an election, mark yourself as a participant, send your ID in an election message to clockwise neighbour. When recieveing an election message, check the ID towards your own and forward the message with the highest ID (substitute the ID of the message if yours is highest), and mark yourself as participant. If you are a participant and recieve a message with your ID you are the new coordinator. Mark yourself as non-participant and send out and elected message. When recieving an elected message, mark yourself as non-participant and set the elected variable equal to the ID of the message.
This algorithm satisfies both requirements. The network bandwith and turnoround time is both
Allows processes to crash, but assumes reliable message delivery (syncronous system) and that all processes knows of every process with a higher ID. Uses 3 types of messages: election, answer and coordinator. The algorithms starts when the last selected fails. The process which knows it is the highest can elect itself. Other processes can send a message to the one with the highest ID.
How to achieve the desired reliability and ordering properties across all members of a group
While broadcasting is sending a message to all processes, multicasting is sending a message to a group of processes.
multicast(g, m) sends the message m to all members of the group g of processes. deliver(m) delivers a message sent by multicast to the calling process (used as recieve).
Uses multicast() and send(). Guarantees that messages are delivered as long as the recieveing process does not fail. Used as the basis for more advanced algorithms.
Fullfills the requirements:
- A correct process delivers a message, and is a member in the intended group for the message
- If a correct process multicasts a message, then it will eventually be delivered
Recall that a correct process is one that does not fail for its entire lifespan, from creating to termination.
Can be based on IP multicast or the basic multicast. It adds an atomicity requirement:
- If a correct process delivers a message, then all other correct processes in the group will eventually deliver the message
Assumes that a process only belongs to one group. The goal is that all process should see the same order of messages. Introduces the following requirements:
- FIFO ordering
- If a correct process issues multicast of message-1 and then message-2, then every correct process that delivers message-2 will deliver message-1 before message-2
- Causal ordering
- If multicast message-1 hapened-before multicast message-2 (induced by messages sent between the members of the group), then any correct process that delivers message-2 will deliver message-1 before message-2
- Total ordering
- If a correct process delivers message-1 before it delivers message-2 , then any other correct process that delivers message-2 will deliver message-1 before message-2
Causal ordering implies FIFO ordering, and none assumes or imply reliability.
Uses sequence numbers
Ordering based on the happened-before-relation
Total delivery ordering
Global unique ID on all multicast messages
Multicast in synchronous and asynchronous systems
Guarantees of both reliable and totally ordered delivery is possible in a synchronous system, but impossible in an asynchronous distributed system .
The problem for processes to agree on a value after one or more of the processes has proposed what that value should be
To reach consensus, every process begins in the undecided state and proposes a single value, drawn from a set of values. The processes communicate with one another, exchanging values. Each process then sets the value of a decision variable and enters the decided state, in which it may no longer change the decision variable
There are 3 related agreement problems: Byzantine generals, interactive consistency and totally ordered multicast. All assume that communication is reliable but that processes may fail, and lists the following requirements:
- Eventually each correct process sets its decision variable
- The decision value of all correct processes is the same: if two processes are correct and have entered the decided state, then their decision variables are the same
- Integrity (validity)
- If the correct processes all proposed the same value, then any correct process in the decided state has chosen that value
Three or more generals are to agree to attack or to retreat. One, the commander, issues the order. The others, lieutenants to the commander, must decide whether to attack or retreat. But one or more of the generals may be faulty. If the commander is faulty, he proposes attacking to one general and retreating to another. If a lieutenant is faulty, he tells one of his peers that the commander told him to attack and another that they are to retreat.
The same requirements as above for termination and agreement, and:
- If the commander is correct, then all correct processes decide on the value that the commander proposed
Consensus is possible if the total number of processes is larger than 3 times the number of faulty processes (
Every process proposes a single value. The goal of the algorithm is for the correct processes to agree on a vector of values (decision vector), one for each process. The same requirements for termination, and:
- The decision vector of all correct processes is the same
- If process-i is correct, then all correct processes decide on process-i's value as the i-th component of their vector
Totally ordered multicast
In systems with crash failures, consensus is equivalent to solving reliable and totally ordered multicast
Impossibility in asynchronous systems
There is no guaranteed solution in an asynchronous system to the Byzantine generals problem, to interactive consistency or to totally ordered and reliable multicast. There exists 3 techniques for working around the impossibility:
Use persistent storage and restart processes
Consensus using failure detectors
No failure detector in an asynchronous system that works solely by message passing can be perfect. Solution 1) Processes can agree to deem a process that has not responded for more than a bounded time to have failed, and discards any new messages from that process. This turnes an asynchronous system into a synchronous one. Solution 2) Use imperfect failure detectors, and reach consensus while allowing suspected processes to behave correctly instead of excluding them. Adapts timeout values according to observed response times. Has the advantage that correct processes are not wasted by being falsely excluded.
Consensus using randomization
Introduce an element of chance in the processes’ behaviour, so that the adversary (attacker) can not effectively tamper with processes. Enables processes to reach consensus in a finite expected time.
Use of filters
Probabilistic data-structure for testing membership in sets.
Returns either probably in set or definitely not in set. That is, it may give false positives, but never false negatives.
The bloom filter is an array of m bits, all initialized to 0. It also needs k hash functions.
Inserting: When adding an element, it is run through all the k hash functions. The output of each hash function denotes a position in the m-bit array. All these positions are then set to 1.
Querying: To check if an element is in the set, run it through the hash functions, like when inserting. If any of the array positions returned by the hash functions contain a zero, the element is definitely not in the set. If all the bits are 1, then the element may be in the set (or they have all just been set to 1 by accident).
Because the bloom filter cannot permit false negatives, removing elements is not supported (that would mean changing 1-bits back to 0, which would also "remove" other elements that map to the same bit).
The big advantage with bloom filters, is that you can very easily check if an element is certainly not present, and thereby not do any extra work on these elements. Only the elements which turn out to may be in the set need to be properly checked. The bloom filter manages to give you this definitely not guarantee using far less space than other options, like hash tables, tries etc.
Used in systems that need to identify non-candidates, such as credit card number checks, duplicate control, finding matching keys in relational algebra etc.
Sorting large amounts of data
Two-phase reservoir/replacement sort (external sorting method)
This is "explained" in chapter 10 of SAMOLDV.
The replacement/reservoir sort described in the book is a kind of external tournament sort used when we don't have enough working memory to hold the entire data set that is to be sorted. It consists of two phases:
- The sort phase, see Tournament sort on wikipedia for a brief explanation and this stackoverflow question for a better explanation than what the book offers.
- The merge phase, in which the "runs" from the sort phase are merged. The book describes some hella weird merging scheme in 10.2.4 and 10.2.5.
- size of data set (?)
- track size (number of bytes on one disk track)
- block size in bytes
- number of passes
- number of disk rotations per second
- access start time
- data transfer time in milliseconds per byte or second per MB
Yeah that's right, relational algebra is up in this one as well.
But it's not exactly the same as in TDT4145.
It's outlined in 11.3 of SAMOLDV, page 352.
The "basic algebra operations" are defined in terms of what you might remember as relational algebra operations from TDT4145.
Which is weird because the book defines selection using both
- TableName = The name of the table we want to do stuff with.
- selector (s) = selection criteria for tuples/records/rows in TableName.
- reductor (r) = Which columns/attributes of the selected tuples should be "carried on"/made available in the result.
If the reductor is empty or
* then all attributes are returned.
If an attribute is listed and immediately followed by a
/ followed by a string (
Attribute/string) then that attribute/column is renamed to / made available as the string in the result.
Basic algebra operations
You can already tell it's weird because we've defined selection using the relational algebra operation selection (
I don't know what's up with that either. But it's totally unnecesary. Just check out the wikipedia article on Relational Algebra for a better explanation.
Nested-loop with filter
(A' = A stripped)
(B' = B stripped)
(A = Volume of A)
(B = Volume of B)
x = A'/WS
y = (x-1) / x
p = percent of matching B in A
Volume of result with filter on A = A + 2yA' + B + B'p + (x-1)B'*p
Testing table equality
Testing table unequality
The division is a binary operation that is written as
Order of operations / operand ordering
See Problem 6 on the 2013v Exam for an example problem of this kind.
Note that when calculating the "total I/O volume", tables are read when involved in an operation and intermediate results are written back to disk before being read again for the next operation. That's how the answer key usually describes it anyway.
For natural join, *, use a greedy approach and find the smallest given intermediate result and select that.
Repeat until there are no more intermediate results to select.
For example, if asked to find the execution order of R=A*B*C that results in the smallest I/O volume, pick whichever one of A*B, B*C or A*C (natural join is symmetric) that has the smallest intermediate result.
The next operation would be whichever one you just picked and the remaining table.
Total I/O volume is equal to:
Advanced relational algebra
See also chapter 12 of SAMOLDV.
"The Associations and Objects"
TAO is a geographically distributed data store built to serve Facebook's social graph.
A video presenting the article is found here: https://www.usenix.org/conference/atc13/technical-sessions/presentation/bronson
A Facebook note about TAO: https://www.facebook.com/notes/facebook-engineering/tao-the-power-of-the-graph/10151525983993920/
Nodes in Dynamo are assigned a position on a "ring".
Data items are assigned to nodes by hashing the key of the data item, and treating the hash as a position on the same ring. Each node is responsible for data items between itself and its predecessor on the ring. This ensures that when a node arrives or departs from the system, only neighboring nodes are affected.
To cope with different performance of different nodes, and mitigate causes of non-uniform distribution of data items between nodes, Dynamo uses a concept of virtual nodes. The ring contains many more virtual nodes than actual nodes, and each physical node is responsible for many virtual nodes.
Now, when a node suddenly becomes unavailable, its load can be distributed between multiple other nodes, by distributing the virtual nodes between them (and vice versa when a node connects to the system). Nodes with more capacity can manage more virtual nodes than other nodes.
In the case that nodes disappear, data will need to be stored more than one place. One node acts as the coordinator for each item. The coordinator stores the item itself, as well as replicates it to the next N-1 nodes in the ring.
Used for versioning objects.
The vector clock is a list of (node, counter) pairs. Each version of every object has its own vector clock. If the counter value for every node on one version of an object is less-than-or-equal to all the counters of the other version, the first version is an ancestor to the second, and is no longer needed. If this does not hold, there is a conflict.
To provide the desired level of availability, Dynamo does not enforce strict quorum. Read and write operations are instead performed on the first N healthy nodes, which are not necessarily the next N nodes on the consistent hashing ring. When a replica that was supposed to be stored on a node that is currently down is sent to another available node, it will contain a hint as to where it was originally intended to go. The node that has received the hinted replica will periodically check whether the intended recipient is recovered, and then try to deliver the replica to its final destination.
This concept of hinted handoff ensures that read and write operations may continue even if nodes are currently failing. If a very high level of availability is required by the application using Dynamo, it may lower the required amount of available nodes down to 1. That way, the write request will only fail if all nodes in the system are unavailable. This is seldom done in practice, though, since applications also require a certain level of durability.
In the most extreme scenarios, nodes which have received hinted replicas might become unavailable before managing to return them to their intended owner. To see how Dynamo mitigates this, we'll take a look at merkle trees.
A merkle tree is a hash tree where the leaves are hashes of the values of individual keys. The nodes higher in the tree are hashes of their children.
With merkle trees you can compare two branches by simply comparing the parent nodes of each branch. This helps reduce the amount of data which need to be transferred while checking for inconsistencies among replicas.
If the hash values of two nodes are equal, they need no synchronization. If they are different, it means that the value of some of the child nodes are different. Then the hash values of the direct children can be matched. This process continues recursively until the specific different leaves are reached.
In Dynamo, nodes maintain a Merkle tree for each of their virtual nodes. When two nodes want to synchronize their keys, they start by exchanging the root of the Merkle tree for the range of keys that they have in common.
This lets Dynamo synchronize keys with very little data transmission. The only drawback is that as nodes connect & reconnect, merkle trees have to be recomputed often.
The article explores the possibility of replacing all storage in a computer centre with DRAM. The argument is that this would substantially increase response times, while not been too expensive for the major players in the business. At the time of writing, Facebook already kept 150 of 200 TB of userdata (excluding images) in DRAM cache, and such, the transition would be minor.
The biggest problem with using only DRAM to store information is that DRAM is volatile, and would lose information when power is lost to the server. RAMCloud mitigates this by buffered logging. This technique consists of keeping several copies of the information at other servers. When a write is executed towards one server, the data is stored in DRAM. It is also sent to two other servers where it is temporarily stored in DRAM. When these serveres confirm that the data is stored, the write operation returnes to the user.
However, it is not good enough to keep the data in the DRAM of the other machines. It would require too much RAM, and not be safe if there is a massive power outage. Therefore, the backed up data is queued up and written to disk in batch jobs. This minimises the time used to write to disk, and also makes sure that we don't need to wait for harddrives before returning to the user. If the primary copy is lost, it can be restored from one of the backups.
Restoring from backup is however not trivial. If one single machine was to restore the entire lost server, reading the disk logs of the backup would take too much time. Instead, several hundred RAMCloud machines will be assigned a shard of other machines' DRAM to be responsible for backups. When a restore is needed, all machines keeping a backup shard will reconstruct this shard, and send it to a machine designated to take the place of the failed server. This makes it possible to restore a server in seconds.
One might expect RAMCloud to have poor ACID-properties due to storage in volatile RAM. But a lot of the problems we face with regards to the ACID-properties when implementing a system are due to a lot of concurrent transactions. As RAMCloud has very low latency, the risk of conflict between transactions is reduced. This, the authors argue, makes sure that RAMCloud has good ACID-properties.
A video presenting the article is found here: https://www.youtube.com/watch?v=lcUvU3b5co8
A video presenting the article is found here: https://www.usenix.org/conference/osdi12/technical-sessions/presentation/corbett
Removed from curriculum fall 2015
Access methodes for multi-dimensional keys
R-trees are used for spatial access methods. Like indexing multi-dimensional information such as geographical coordinates.
Imagine a two-dimensional grid with things on it that are indexed in the R-tree. The terminal/leaf nodes of the R-tree can be thought of as rectangular boxes defined by the coordinates of two of their opposite corners. All things/objects/items on the grid that are found within the rectangular box of a terminal node is stored in that terminal node. But guess what? Every node is represented in this manner! It's boxes all the way down.
The rectangular boxes are also known as bounding boxes.
Quadratic split find the pair of rectangles that are the least desirable to have in the same node, and uses these two as the initial objects in two new groups.
So within the node or block or whatever it is you are splitting, for each box/object, calculate the area of the bounding box that would encompass it and one other box/object (for all other objects).
These two objects,
Homogeneous data, matrices and arrays
Short for K-dimensional tree.
Storage of raster graphics and voxels
Raster graphics are pretty much just a bitmap: a 2-dimensional grid where each cell corresponds to a pixel or point in the image.
A voxel represents a value on a regular grid in 3-dimensional space. Think of it like a pixel but for 3d images. A voxel can be a color value (like a pixel) representing an arbitrary volume in the picture or model.
The ‘snapshot’ algorithm of Chandy and Lamport
Captures a consistent global state and allows us to make assertions about whether a stable predicate holds in the actual execution. The algorithm records state locally at processes. It allows messages to be in transmission for different states. Channels are what messages are transmitted over.
It assumes that:
- Channels and processes does not fail
- Channels are unidirectional with a FIFO ordering of messages
- The graph of processes and channels are strongly connected (there exists a path between any two processes)
- Any process may initiate a global snapshot at any time
- The process may continue normal execution as snapshot takes place
Missing the algorithm...