Distributed Media Storage/Internship report/Prototype stage

From Meta, a Wikimedia project coordination wiki

In order to investigate the feasibility and performance characteristics of some of the possible architectures of the system, I have developed a few prototype systems. These prototypes are implemented in a high level language and are only meant to capture the key design aspects of the system to be built. They do not have all (or even most) features the final system will have, and serve as a study and playground for ideas.

A fixed time period for implementing and studying these prototypes has been allocated in the time schedule, according to the time boxing method; i.e. Work on individual aspects of these prototypes was done in a "highest priority feature first" order, and when allocated time for the prototype stage ran out, I proceeded with the next stage of the project.

Prototype 1 was explicitly designed to be the simplest architecture fullfilling the requirements of the system. This procedure was chosen to establish a base to work from, while buying time to think about and solve design aspects of the more complex and sophisticated prototypes.

Prototype 1: Central SQL index[edit]

Prototype 1's architecture consists of a collection of equal nodes, all communicating with eachother and a central SQL index. All nodes are running an HTTP server, both to serve clients and to allow object passing between the nodes. Additionally, they are running a simple multicast UDP protocol to monitor eachother's reachability.

All nodes communicate with a central SQL server. This relational database serves several purposes:

  • Keeping track of all objects (and minimal metadata) in the system
  • Keeping track of where (on which nodes) objects are located
  • Keeping track of node uptime and reachability from other nodes' viewpoints
  • Keeping track of replication status

With one central database master being the authority, no possibility for desynchronisation between partitioned sets of nodes is possible: either a specific revision of an object exists in the central index database, or it does not. In the worst case, desynchronisation between the central database and a participating node can occur, for example because a node lost its disk content. In such a situation, the node is responsible for updating the central index according to its own inventory at startup, after which the distributed replication process will start to remedy the situation.

Design decisions[edit]

Before, and partly during the implementation of the prototype, some important decisions were made with regard to its design.

No usage of object chunks[edit]

Investigation of current usage showed that the mean object size is less than 64 KiB, with the current maximum object size being around 20 MiB. In this situation, it is believed that splitting up objects into fixed size chunks is not useful, as most objects have a smaller size than common, sensible chunk size values like 64 KiB. Even the largest objects do not have a substantial size in comparison to the total storage of a single node, and should have a neglegible impact on the size distribution over all participating nodes. Therefore, splitting up objects into chunks would needlessly complicate the design, performance and actually reduce overall reliability.

Asynchronous/non-blocking communication[edit]

The prototype is using asynchronous/non blocking communication exclusively. This avoids a multithreaded design and corresponding synchronisation overhead and complexity. For more details regarding the rationale behind this decision, see the final design's decisions.

For this prototype, this has been implemented using the Python Twisted framework.

UDP node discovery protocol[edit]

A UDP protocol is used for discovery of (other) nodes, because speed of delivery is more important than reliability and in-order delivery. Use of a reliable transport protocol would defeat the purpose of this protocol used for detecting node availability: during network problems, a transport protocol like TCP would start retrying and resending the stream of data, only to give up eventually when the network problems persist. For the simple purpose of this protocol, the loss of messages and accompanied reply is useful information which should be delivered to the index component as fast as possible.


Multicast is used to make broadcast information to all nodes more efficient. Modern LANs and LAN equipment as used by Wikimedia support this relatively modern technique to send the same message to a large set of hosts in a very efficient way, saving both network bandwidth and host processing compared to multiplexing point to point messages to all participating nodes.

Transactional file class[edit]

Storing large objects on disk takes time. Because this is an operation that needs to be atomic to the rest of the system, and cannot be allowed to block, a class had to be developed that does asynchronous writes and supports commit and rollback functionality. Behind the scenes, this class simply uses a temporary file to use during object writes. After this operation is complete and the object is ready to be added to the index, the file is "committed" and atomically renamed to its final name and location. On a "rollback", the temporary file is simply discarded.

Conservative decisions during inconsistencies[edit]

In case of inconsistency between the central database and an individual node, a conservative decision is made. When an object is not present on a certain node according to the central index database, it's not retrievable, even if the node has the object in its inventory. This is according to the principle that, if an object is not in the central database, it does not exist. Likewise, if an object should be present on a node according to the database, but it's not available on the node, a "does not exist" error message is returned as well, although alternative locations can be queried in that case.


The prototype is missing some components, features or behaviour that is required for a final implementation. These deficiencies are listed here.

  • Lack of replication component
  • Error handling of HTTP client connections are not properly passed through
  • Error handling in general
  • Request uploads and proxied downloads/uploads are read into memory first before further processing instead of streaming, taking up more memory and latency than necessary
  • Corruption of objects is not detected, i.e. no checksumming
  • State of an individual node can be different to the global state in the SQL object, in which the latter takes precedence
  • Manageability is nonexistent


During the implementation and partial testing of the prototype, some observations have been made.

Twisted's SQL abstraction layer is inadequate[edit]

During the implementation of the SQL index component, it turned out that Twisted's SQL abstraction layer is inadequate for anything but simple row selects and inserts. Even for the relatively simple SQL queries used by this prototype, it was often necessary to build the SQL query string by hand, which mostly defeats the purpose of a database abstraction layer.

For the final system, another SQL abstraction layer will have to be found, or implemented.

Out of order delivery of UDP diagrams[edit]

While the fact that the NodeWeb protocol uses a nonreliable transport protocol was a design decision, it was forgotten that it's possible for individual diagrams to be queued up somewhere in the network path for a relatively long time (minutes). Because the diagrams also do not carry any order or timing information, it is possible for nodes to act on out of date information upon receiving these diagrams. This information should be added in a final implementation, so out of date packets can be dropped on reception.

Streaming data[edit]

Because occasionally objects can be much larger than the mean object size, care needs to be taken to not store entire objects in memory, but make use of 'streaming' from source to destination instead. This has been partly corrected later for some paths through the system, by making use of Twisted Web 2 which supports this. The implementation is not complete however, and the final design should better reflect this.

Performance characteristics[edit]

  • = average network latency between client and a system node
  • = average network latency between a system node and the central SQL server
  • = average network latency between system nodes
  • = the amount of system nodes
  • = the average redundancy of a single object, i.e., the average amount of copies of a single object in the system
  • = the average object size in bytes
  • = the average transfer rate between client and system nodes (bytes/sec)
  • = the average transfer rate between system nodes (bytes/sec)

Therefore, the average service time of a GET request for a local object, is

The average service time of a GET request for a remote object, is

Given a uniform distribution of objects over nodes in the system, the probability of a certain randomly requested node having a local copy of the requested object, is

The total time to complete consecutive, non parallel requests for a random distribution of objects requested from a single node, is

Prototype 2: Distributed buckets[edit]

The second prototype is loosely built around the concept of a Distributed Hashtable. The system consists of a large number of buckets (say, 65536 or more). These buckets are divided over the available nodes. Objects are stored in one or more buckets, selected by a pre- or postfix of the object identifier, which itself is a hash. Objects that need redundant storage can select a redundancy of a power of two, . This redundancy is then used to determine which buckets are used to store this object by masking the bucket identifier by (the number of bits in the bucket identifier - 2log(object redundancy)).

All nodes keep track of where all buckets are, by broadcasting the bucket index and updates to it. When a certain node fails and a grace period is passed, the relevant buckets that were on this node are marked as lost. Nodes that previously shared objects with the buckets that have been lost, will then attempt to recreate the bucket. To do this, new nodes are selected to maintain these buckets, after which replication will start.

Because all nodes know where all buckets reside, they can quickly select a node that should contain the object if it's available in the system. Given the redundancy factor, they can also select a few alternative nodes that are likely to have the requested object.

Bucket index changes[edit]

There is no central index in the system; each node maintains indexes of where all the buckets are, and of the contents of the buckets they maintain. To simplify assignment of buckets to nodes however, a central coordinator exists. This coordinator is elected using the Garcia Molina Invitation Election algorithm. This protocol supports failover, i.e. whenever the coordinator node fails, a new coordinator is elected quickly, and processing can resume.

Nodes communicate to eachother using a UDP protocol. Nodes can request the location (node) for a bucket by sending a message to the coordinator. The coordinator will then respond with a message denoting the location of the bucket, after assigning it to a node first, if it hadn't been assigned before. In addition, the coordinator assigns random unassigned buckets as a background job, and multicasts these updates to all nodes.

All bucket updates have a sequence number, which is increasing and defines a total order on the bucket assignments. Both the coordinator and the other nodes keep track of this sequence number and maintain a transaction log of the bucket updates. This is done to maintain consistency among nodes, and to support quick failover of the coordinator.

Because bucket update messages can arrive out of order, nodes maintain a buffer for these messages, and only process them once all previous consecutive updates have been received.

Coordinator failure[edit]

When the coordinator fails, a new coordinator is elected quickly by the invitation election algorithm. Nodes with a high sequence number obtain a higher priority in the election, in order to reduce the number of bucket updates (transactions) that need to be rolled back.

The following has not actually been implemented in the prototype, due to lack of time

While running the invitation algorithm, the coordinator received a list of all sequence numbers of all other nodes in the group, and has thus obtained a lower bound on the sequence number positions of the other nodes. The newly elected coordinator will start (re)broadcasting these bucket updates starting from the lower bound sequence number, to make sure that the other nodes will receive all bucket assignment changes.

Unfortunately, detailed design and experimentation with the transaction log in rollback situations was not possible due to lack of time.

Nodes joining the system[edit]

Nodes joining the system will be merged into the existing coordinator-nodes group quickly. Because the existing coordinator has a higher election priority (update sequence number) it will be reelected as the new coordinator.

The newly joined node will start receiving bucket update messages. As it cannot process these directly - because it's missing earlier bucket updates with lower sequence numbers - it will add these to the receive buffer for later processing. When the buffer is filling up and crossing a threshold, it will request a bucket index transfer from another node (random, or as indicated by the coordinator for being a good node with up to date info) over the HTTP interface.

When the transfer is complete, the node will check whether the sequence number is greather than or equal to the sequence number of the first update they have received from the coordinator. Because nodes should not serve bucket index transfers until their own index is complete, this transitively guarantees index consistency.

Network partitions[edit]

In case of one or more network partitions, a naive run of the invitation algorithm would form multiple separate groups, each with their own coordinator. These partitioned groups would function as autonomous groups, each serving clients. This would pose synchronisation problems when merging the groups later, and therefore needs to be avoided.

The following has not actually been implemented in the prototype because of lack of time

A simple way to do this is to make sure that a cluster only becomes active for serving clients once the number of active, visible nodes in the group is greater than or equal to:

As only one cluster can have a majority of nodes, this ensures that at most one cluster is active at all times.

Design decisions[edit]

Before, and partly during the implementation of the prototype, some important decisions were made with regard to its design.

Average performance is more important than total reliability[edit]

Average performance is considered to be more important than absolute reliability at all times. This means that, whenever necessary, the system might remove a small amount of recent data updates in order to maintain consistency and greatly reduce administration overhead in the general case.

For total consistency, a global synchronisation protocol like two-phase commit or three-phase commit would be needed, which is very expensive in terms of resources and effectively blocks the whole system if one node is unresponsive, something which we're trying to avoid. Therefore, average/general case performance is aimed at, at the sacrifice of possible loss of a small number of recent transactions in some rare occurances, where a node fails that has an update for an object that has not yet been replicated to any other nodes.

Central coordinator[edit]

A central coordinator is elected by all nodes to do administrative work like assigning buckets to nodes. This reduces inter-communication overhead and complexity, while maintaining robustness through failover.

Some simple decisions are made much easier by a single node acting as a coordinator than through a complex procedure of asynchronous communication between all nodes. The primary advantage of having control on all participating nodes is redundancy, but that is easily resolved by instating a quick failover protocol where any other node can quickly take over the coordinator's role because it has all needed information.

Some support for this was already available in the NodeWeb UDP protocol, which could be extended by an election protocol designed for asynchronous communication with failures.


Buckets are used as an indirection between nodes and objects, for less administrative and communicative overhead when nodes fail, and for more flexibility of distributing objects over geographically separated clusters.

Sequence numbers and transaction log[edit]

Ascending sequence numbers are used in bucket index updates to maintain a total order. These updates are stored in a simple sequential transaction log, that allows bucket updates to be reversed in case this is needed, for example in some situations where a coordinator failover is needed.

Bucket index kept in memory[edit]

The bucket indexes entirely reside in memory, for very quick access. Because the system scales well with the number of nodes, this should not limit the total object count capacity too much. Additionally, because the index is hash-based, it's easily possible to replace it by a disk-based index, should that be needed.


The prototype is missing some components, features or behaviour that is required for a final implementation. These deficiencies are listed here.

  • Buckets are assigned to nodes in an entirely random way. A final system may need something more sophisticated to attain a more balanced distribution.
  • Storage and retrieval logic and index handling are too tightly coupled with the HTTP service. These should be moved to separate components and a generic interface, applicable to both prototypes.
  • Messages are sent using serializing and unserializing. There is, however, no input sanitizing (security issues).
  • Quick coordinator restarts (within the coordinator timeout) are not detected by other nodes. This could be solved by using sequence numbers in invitation messages.
  • Bucket content indexes are not maintained at all, except for file entries on disk.
  • Data replication is not implemented.
  • Dependencies between classes, components and packages are too tightly coupled, and should be reduced/minimized.


During the implementation and partial testing of the prototype, some observations have been made.

Revision management[edit]

Lacking a central index, revision management is very hard to achieve in an efficient way. It seems impossible to make sure the latest revision of an object is present on a (replicating) node without having to contact some form of master node, which effectively defeats the purpose of trying to spread reads.

An append-only situation, as became possible later because of changed system requirements could have solved this problem.

Module node.py is an ad-hoc mess[edit]

The NodeWeb module was not properly designed with all requirements in mind, and was changed multiple times in an ad-hoc manner. Even for the simpler case in prototype 1, this should be redone for the final design.

Lessons learned[edit]

The prototype stage turned out to be a very useful learning stage. It offered a nice playground, both for the possible different theoretical models behind a final implementation, as well as for the actual program architecture and design.

Main difference is index handling[edit]

When designing and implementing the second prototype, it turned out it was fairly easy to integrate the new index handling into the existing code of the first prototype, notably the HTTP interface component, object storage and node web components. Very little code changes were needed to achieve this. Pretty much all code changes were related to the rather different index interface and index handling, and the rest was due to the slightly different optimal behaviour of locating and transferring objects as opposed to the central index situation.

The resulting lesson was that it's useful to factor out this code into separate components, especially if it's possible to find a generic index interface for both prototype models.

It should be noted however that the lack of some sophisticated features like replication also made the differences in interface smaller. A final design supporting both very different index models and corresponding control flows will be harder to achieve with a generic interface.

The more distributed index handling of the second prototype naturally required more sophisticated communication between nodes. The node web protocol of the first prototype had to be extended for that, but it clearly hadn't been designed for this, resulting in working but messful code.

Central index offers good performance and a relatively simple design[edit]

Using a SQL RDBMS as central index gives a lot of useful properties for free: ACID. Especially atomicity and consistency provide nice handles for synchronisation, which are not available without blocking interfaces in distributed systems.

The fact that the typical work load of the required system has a very low write load compared to the read load, still allows good average read performance, because index location queries can be sent to a replicated slave index. Replication lag does not pose a real problem in that case because read requests cannot invalidate the system state, and out of date information can be detected.

Distributed index offers potentially better performance at much greater complexity[edit]

Trying to eliminate every nonscalable bottleneck like the central SQL index should provide even better performance, but there is a clear tradeof with complexity and communication overhead. The second prototype eliminated the need for a remote index lookup for every read query and thereby saving a lot of communication time, but there seemed to be many pitfalls that had to be discovered and worked around to actually arrive at a fully complete and reliable system meeting the requirements. Although interesting and promising, this didn't seem like a smart route to take because of the very limited time and strict deadline of the project.

Even the very distributed design ended up with designating some relatively simple tasks (like assigning buckets to nodes) to a single coordinator, though with quick failover. There didn't seem much use in distributing even these relatively light tasks, because of the potential for conflict and desynchronisation, which would require even more communication and complex control flow to solve them. Apparently having a single controller as a bottleneck is often unavoidable and actually a sensible decision if its tasks are lightweight enough to not pose a problem even when scaling to very high levels of concurrency.