# Distributed Media Storage/Internship report/Final design

This chapter describes the final design of the Distributed Media Storage system.

For traceability, design descriptions and decisions that refer to the software requirements are marked by the requirement number, for example: (R3)

## Description

The final design is primarily based on the first prototype, i.e. a distributed system with a central, non distributed index stored in a SQL database. However, because both prototypes appeared to differ only in relatively minor interface details related to index handling, the system has been designed to be more generic in order for it to be more extendable. This means that it's easily possible to change or extend the system with different index components (for example, an index component based on a distributed hash table) later, without having to do big modifications to the overal design. (R10)

Compared to the prototype design, the final design has been extended with several components that were missing earlier. Most notable is the Replication component, that takes care of rebuilding and maintaining redundancy of objects in the presence of node failures. (R6) Other required features that were not incorporated in the prototype design spawned new components in the final design:

• Monitoring and logging (R9)
• Configuration

Furthermore, some requested features had to be integrated into many parts of the system. These include object classes, where objects in specified classes can have different redundancy storage requirements.

Not previously implemented and unfortunately also not available in the final design is the TRAVERSE operation, which delivers a (total or partial) list of objects in the system, facilitating backups. Being considered less important than the other operations and implementable using external methods and the GET operation, this requirement had to give way for more important ones due to time pressure. (R4.4) It should be relatively easy to add it to the design later, however. (R10)

### Client request methods

The following client requests are available:

• GET (R4.1)
• PUT (R4.2)
• DELETE (R4.3)

All of these operations are available in recursive and nonrecursive variants, as signaled by a HTTP request variable. 'Recursive' is the normal situation where the request can be handed over to an arbitrary other node, while in 'nonrecursive' mode, the contacted node is required to handle it entirely by itself, or fail the operation.

Additionally, the PUT request has a special replication mode that is used to replicate a certain object from one node to another. In that case, the object already exists in the system and does not need to be created in the index.

### Index handling

The Index interface to the rest of the system is defined by an abstract class Index, which specifies a minimal generic interface through abstract methods. Actual implementations of index classes, like SQLIndex and BucketIndex derive from this class, and implement the obligatory methods. Among these methods are:

• A method to retrieve the locations of a certain object
• A method to add the presence of an object on a certain node in the index
• A method to remove the presence of an object from a certain node in the index
• A method to request the index to suggest new locations for a certain object

#### SQL index

The SQL index implementation stores the entire index on a central SQL server. Communication with the SQL server occurs through an abstract SQL wrapper, hiding implementation details of specific SQL RDBMSes.

SQL index maintains a replica of the node availability index (maintained by Node Web) in a table in the SQL database, to be able to use this information in SQL queries, which can then be made more efficient. Another table stores information about each object available in the system. This info is then related to a table representing the presence of these objects on the nodes.

### Replication

Replication(R5,R6,R8) is very dependent on the index implementation used. Different index models (central/global index, distributed index, etc) have very different properties with respect to the data that can be gathered efficiently by a single node. Therefore the generic Replicator notification interface, as inherited and implemented by all derived classes for different index implementations, is very unspecific: it offers multiple ways to handle the problem.

The basic Replicator interface is notified of node discoveries and node disappearances, on which it can decide to act if it has enough information about the object presence on these nodes. It can be suggested by the controller (or, if needed, another component) to specifically replicate a specific object with priority. A periodic replicate task is started that allows the replicator to find objects with suboptimal redundancy by querying the index, and consecutively replicating them.

#### SQL index

For the SQL index based implementation, information about object redundancy is stored in a central place. A special replication table is available where individual nodes can register their intent to replicate a certain local object to a variety of other nodes, until the required redundancy factor is achieved. During the presence of this replication intent entry, no other node will try to replicate this particular object, until either the entry is removed or expires, due to failure of the replicating node.

Every node runs a periodic replication task. First, it will try to replicate objects that have been appended to its priority replication list by the Controller, for example because the relevant object has just been added to the system and is not yet present on any other nodes. Once the priority list is empty, the node will request (through the Index interface) a limited list of objects in need of replication, sorted in reverse order of their redundancy factor. This list is then shuffled in a random order for objects having the same redundancy factor, to reduce concurrency of different nodes trying to replicate the same objects.

Concurrency of replication on specific objects is limited by the fact that nodes can only (try to) replicate objects that are present locally. Objects with the least amount of redundancy are replicated first, both because the risk of loss of these objects is highest, and because the replication concurrency on this object is small. Replication concurrency is also bounded by the configured redundancy target for the specific object class. (R5)

From the returned list, the first object is taken and removed from the list, and individually replicated. The first step of replication is to request the index to add the replication intent for this object, returning the current redundancy factor for the object in the process, to avoid race conditions. If permitted because no other replication intent for this object exists, the Replicator will request a list of suggested locations (nodes), after which the object will be replicated to these nodes using replication PUT-requests, until the required redundancy target is met. Afterwards, the replication intent record is removed from the central index. This process can be run in parallel for several nodes, up to a configured local replication concurrency factor.

Because nodes can fail during the replication process, replication intent records become stale in a certain configurable time period after their creation. All nodes need to ensure that the replication process for the object is complete or stopped before this time period expires. All nodes periodically remove stale intent records from the index.

Replication is halting because each object will eventually reach the target redundancy. However, problems arise when the total available node count drops below the redundancy target, or when individual nodes run out of available resources (for example, space). When Replicator processes notice these conditions, they should trigger warnings to the monitoring system so operators can intervene, because livelock of replication processes certainly does not solve this problem. (R9)

### Inter-node communication

Inter-node communication is basically handled the same as in both prototypes: A simple UDP protocol supporting simple message types and arguments, using no flow control or message ordering guarantees. The protocol supports both unicast messages to each configured node in the system, and multicast messages sent at once to every node in the system, for information targeted at all nodes.

Complex arguments can be sent by serializing objects into strings, which can then be unserialized by the receiver. Care must be taken however that unserializing doesn't introduce rogue objects in the statespace, so from a security point of view, input checking must be used.

The basic Node Web protocol supports only a few message types for finding and monitoring other nodes. (R8) Implementations of index models that need more than this, for example a distributed hash table implementation, can register new message types and corresponding message handlers for their purposes. Subimplementations can implement simple message ordering or flow control themselves if they need it, or revert to using other protocols using TCP, for example using the HTTP interface of the system. (R10)

### Event handling model

The entire system makes use of non-blocking, asynchronous operations. Because no operation will ever block execution of the system, it can remain responsive for all concurrent requests without needing multithreading and corresponding synchronisation overhead.

Asynchronous system need to implement their own event handler, which monitors events on resources with operations that would block the system in a blocking model, and dispatches event handling routines when the state of these resources changes. For Python, multiple primitives exist to make use of asynchronous I/O, ranging from fairly low level (mapped directly on to underlying OS system calls) to highly abstract frameworks. One such framework is Python Twisted. Twisted nicely abstracts away the underlying I/O primitives, by providing high level classes for handling most common I/O operations like network communication and object persistence. Operations/methods of these classes make use of a special return datatype, Deferred, which means "requested return data is not yet available, but will be eventually". Calling methods can attach event handlers for handling the returned data once available in a callchain, retrieving the returned data and any optional arguments in the process, making extensive bookkeeping within container classes unneccessary in many simple cases.

Twisted also provides an error handling model which closely resembles the exception/catch features of modern object oriented languages (like Python). Next to event handlers, error handling routines can be attached to the Deferred datatype, which get called if an error (for example, the raise of an actual exception) occurs during the asynchronous operation. Error handlers can then choose to either handle the exception and pass back control to the regular control flow in the program, or pass it on to the next error handler, higher upstream. This allows for similar error handling strategies as used in traditional sequential exception-style programs, but in an asynchronous context.

The excellent class/module structure of the Twisted framework, with its very flexible extendability and large number of useful support modules for a variety of commonly needed components (like a full blown HTTP server component) proved to be a great choice for the implementation of the prototypes, and with the decision of using Python, became a natural choice for the final implementation as well.

## Design assumptions and decisions

### Append only

A major problem of distributed, replicating systems where the same data exists on multiple locations is updates to existing data. Care must be taken that consistency is maintained between all replicas on update. Multiple strategies to solve this update everywhere problem exist. The easiest and least flexible of them, is to simply disallow updates to existing data/objects.

While this may not seem like a feasible solution for many systems, during the design phase of this internship an independent decision was made by Wikimedia that made it more attractive. A future architecture of the images/multimedia system will start naming and storing media objects by their content hash. Part of the reason for this change is to discourage users uploading the same files (with exactly the same content) multiple times under different filenames, or on different wiki projects. The important consequence of this for the media storage system is that no multiple revisions of objects can exist. New revisions of objects that have actually changed will have a different content hash value, and consequently will be stored as a separate object, with a separate name.

This decision allows the media storage system to ignore the problem of object updates, making the design simpler and potentially more efficient. The storage system is effectively transformed into a append only database, where data is only added to the system, and never updated. This doesn't mean that objects cannot be removed, however. Support for object removals can exist, but is not required to be atomic nor efficient. A possible implementation of the final architecture is one where objects that are no longer needed and normally would've been deleted, are purged from the system in a batch job, during times of low load. It is however implemented as an atomic index-only operation in this design.

Content hashes as object identifiers also can be used as checksums, i.e. it's trivial for the system to compute the hash over its stored or served content and compare it to the object name to ascertain that the object's content has not corrupted in the process.

One problem of using hash functions is the existence of hash collisions. Infinitely different content strings give rise to the exact same hash value, meaning that no two different objects with the same content hash can exist in the system. Because hash collisions are extremely rare (unless explicitly searched for), in the order of hundreds-thousands of percentages, this was considered acceptable - the consequence being that a user cannot upload a certain colliding object to the system, without alterering it. This same problem would also be present for colliding names if hashing of arbitrary object names were used, like in the prototypes. (R2)

### Underlying network takes care of optimal routing and provides a full mesh

It is assumed that all nodes in the system are known in advance (by static configuration), and that the underlying network is able to provide a full communication mesh between them, which is also optimal in terms of latency and throughput. This means that point to point messages between nodes will automatically have the best available path without the system having to define complex routing structures itself. Broadcast messages to all nodes in the system can be implemented using multicast. (R3)

This model works well because all nodes are known and distributed densely in a few separate areas. Latency is low and bandwidth is high within such a dense area (e.g. a specific server cluster), making network communication optimal in this case. Messages between nodes are typically either point to point, specific communication between those two nodes, or broadcast messages with the same information for all nodes. In the case of point to point messages, routing through other nodes would generally only negatively affect metrics, while providing no gain for the intermediate nodes. In the case of broadcast messages, multicast routing handles these types of messages in a very efficient way.

### Front end cache

Because the system supports a standards compliant HTTP interface, it is possible to use standard caching and proxy products in front of it. It's assumed that in actual production deployment, a caching setup is used in front of the storage system, to serve the most popular objects directly out of memory (and/or disk) caches before hitting the multi-layer storage system. This could for example be implemented using standard HTTP caching proxies like Squid, as used by Wikimedia now.

Current performance statistics of Wikimedia's front-end HTTP cache cluster indicate around 80% cache hit rate for images, effectively reducing the read request rate to one fifth for the backend system.

### No in-memory content caching

The system will not attempt to do in-memory caching of object content. Because objects have to reside on disk for persistence anyway, OS disk caching is involved. Reimplenting this would achieve only marginal performance gains, at the cost of greater complexity and development time. Furthermore, the system is intended to be run on shared systems, running other components (like web servers). The operating system has a better overview of overal memory usage, access patterns and resource sharing and therefore can make better decisions than the storage system can by itself. It would also be naive to think that implementing a new caching system would provide better results than the mature and fully tuned caching strategies in the operating system kernels.

Should these assumptions turn out to be false (for example because of the overhead of context switches in this model, which is not present using application level caching), a new caching system can easily be integrated in the system later, as a layer in the ObjectStorage component.

Indexes are kept in memory for fast access.

### The problem is not very suited for a Peer 2 Peer system

I decided against using pure P2P techniques for solving the problem. Although the high scalability and storage space requirements seem excellent arguments for using peer to peer strategies, some characteristics of the situation make it less attractive.

In general, peer to peer systems are used in situations where the problem extends beyond a single point of control, trust between the participating nodes is of little importance, and the criticality, reliability and service time (latency) of the service is low. This allows the system to scale up to very high amounts of nodes, at the cost of much greater complexity than a centrally controlled distributed system.

These aspects are not applicable to the problem situation, for the following reasons:

• The system does not extend beyond the control of a single entity. Wikimedia desires to have complete control over all nodes involved, to be able to maintain and ensure quality and reliability (criticality) aspects.
• Because the entire system is controlled by a single entity, more information about the layout of the system is available than in a pure P2P system. This simplifies discovery of and communication between nodes. No complex routing mechanisms are needed, because this can be offhanded to the underlying network.
• Service time of the system is important. While it's not a real-time system, some reasonable guarantees of the average/maximum service time are desirable. In a very dynamic P2P system this is hard to achieve, but in a somewhat smaller, better controlled system where participating nodes can form a full mesh, this is possible through more direct communication.
• Trust between participating nodes needs to be high in order to make an efficient and reliable system. Achieving this in a pure P2P system requires great complexity and overhead, which is unnecessary in Wikimedia's situation.

M. Roussopoulos, M. Baker et al. discuss these characteristics in a paper, presented along with a decision tree for analyzing the suitability of P2P systems for certain problems. Walking the tree (via high budget, or alternatively low budget, high relevance, high mutual trust, high rate of change, high criticality) indeed results in a decision not to use pure P2P techniques.

This doesn't mean that P2P techniques should be disregarded completely. Where appropriate and helpful, some elements of P2P systems can be used in a centrally controlled system as well. This includes for example the idea of making distributed nodes as equal to eachother as possible, which improves scalability and reliability. The tradeof of these aspects versus complexity and overhead remains, however, and an appropriate compromise should be met.

For I/O limited and essentially event driven systems, there are two fundamentally different routes to take to prevent nonresponsiveness during blocking (I/O) operations:

• Make use of multiple processes or threads so different threads can make progress during the blocked state of other threads (multithreaded design)
• Make sure that operations never block, and make use of some efficient I/O notification scheme (asynchronous design)

One way of reducing the complexity and overhead of synchronisation is to restrict the usage of threads to certain components within the program where high concurrency is desired, for example because the amount of blocking operations is high and consequentially the efficiency of use of available resources would be low in a singlethreaded model. In practice, this often means that subparts like I/O storage systems and network communication are moved to multithreaded designs, while the rest of the programs remain singlethreaded because the need for multithreading there is low. However, when looking carefully at this design practice, it turns out that this comes down to building an asynchronous I/O model around the primitives of threaded programs: blocking operations and synchronisation. This does not make use of the advantages of threads, and potentially has greater overhead than a pure asynchronous solution.

#### Asynchronous I/O

Therefore I decided against using multithreading, and for a non-blocking, asynchronous solution instead. The operations that potentially block in the Distributed Media Storage system are:

• Network I/O operations
• Disk I/O operations
• SQL server index queries (involving both of the above underneath)

These operations can be handled through efficient event notification schemes as delivered by all modern operating systems (like select() and its more modern/efficient counterparts in recent Unix implementations). Integrating this in a large system involves creating an event handler component through which all potentially blocking operations are handled and event dispatched. As explained in the overal design description, a Python framework is available that abstracts these aspects, solving some of the problems of asynchronous models in the process, like the strict need to define and maintain a state machine with its implied bookkeeping.

Removing the need for multiple threads or even processes also has a positive impact on the memory footprint, as every thread of control needs a certain amount of memory for bookkeeping by the OS, memory which is consequentially not available for other programs or buffering/caching. The use of an asynchronous model does not fully exclude the potential use of multiple threads however. Where needed or appropriate, a contained implementation of multiple threads is possible without having any impact on the rest of the program.

Of course there are also disadvantages to using an asynchronous model over multithreading. The most important one is that a singlethreaded program by definition cannot make use of symmetric multiprocessing, i.e., multiple CPUs in the system. While this might be a problem for a system which is mostly CPU bound - a situation where a multithreaded model might actually make a lot of sense - an I/O limited system like this storage system should only require a tiny to modest amount of available CPU resources, and therefore have little gain of multiple CPUs. The fact that the system has been designed to work along with other processes/services on the same machine only strengthens this view.

An interesting discussion about various event models and their implementations is presented on Dan Kegel's C10k problem page.

### Streaming data I/O pipeline

An important observance about the storage system is that actual data of objects is always streamed between the network and disk, in either direction, or from network socket to network socket. Data is never needed to be kept in memory, and the few potentially wanted operations on the data (like checksumming/hashing) can be done in a streaming sequential, non randomly accessed way.

This leads to the desire for data moving to happen in a streaming way, where it is read from the source and concurrently written to the destination at the same time, as apposed to the somewhat simpler approach of reading all data into memory first before starting processing. Streaming improves both reduction of memory usage and latency of the operation.

When reading all data into memory first, obviously a potentially great amount of memory is needed because the entire content of objects needs to fit in memory, multiplied by the number of other requests handled concurrently by the system. Not only does this limit the maximum object size, it also reduces available memory for other processes in the system and available OS caching/buffer size. Furthermore, the rapidly fluctuating memory footprint would complicate decisions made by the OS memory subsystems. In contrast, a streaming solution only needs a small and fixed amount of buffer memory at each point in time.

Latency of the total operation time is also improved, because processing on the data can start as soon as data input starts, instead of after completion of the input read. This implies a latency of ${\displaystyle {\frac {S}{T_{r}\downarrow T_{w}}}}$ rather than ${\displaystyle {\frac {S}{T_{r}}}+{\frac {S}{T_{w}}}}$, with ${\displaystyle S}$ being the object size, ${\displaystyle T_{r}}$ and ${\displaystyle T_{w}}$ the transfer rates of the read and write operations, respectively.

Advantages of streaming already become evident when the object size is several times the size of a typical buffer used for network and/or disk read operations, about 4-64 KiB in practice. For objects that are much larger, the memory footprint advantage is rather big. For example: the largest objects currently supported by Wikimedia projects are 20 MiB in size, and this number is likely to be raised in the future. Several requests for large objects on one node would quickly reduce the amount of memory available for requests and other services on the system. The mean object size is currently around 64 KiB. For these sizes, data streaming does not make a real difference and becomes identical to the case without data streaming. Therefore streaming does not negatively affect performance or memory footprint for smaller objects, either.

Incorporating streaming data in a program design requires some care, and can be viewed as a software architectural pipeline. One visible consequence of this in the program design is that object containers and methods do not handle strings of memory buffers, but references to objects that support streaming in some way. These references are then passed from the source to the destination in the system. It's important to note that it's not the reference passing through the system that makes the difference, but the type of operations on these referenced objects: all intermediate objects need to support the pipeline or consumer/producer interface.

#### Multiplexing

Streaming data has one severe disadvantage: in case the streamed data needs to be multiplexed to several destinations sequentially, for example an operation at the destination fails, and the entire stream has to be restarted, possibly to another destination. If the source stream cannot be rewinded - for example because it was data received from the network - the source data is lost and cannot be started over.

In these cases, buffering is unavoidable. A possible solution is to use a practically unbounded buffer (for example, hard drive space), and fill it in parallel to streaming to the destination the first time. This solution maintains the latency benefits of a pure streaming setup, does not require more memory and allows the source stream to be restarted if needed. If the entire stream has completed successfully, the temporary disk buffer can be discarded.

There is an additional cost of needing hard drive space though, but that should not be a problem in this particular context. Furthermore, in some cases additional implementation optimizations can be made, for example when the first destination stream is a rewindable unbounded buffer as well. At the cost of extra complexity, the destination buffer and temporary disk based buffer could be combined to provide a source stream for consecutive destination streams.

### Language choice

The implementation programming language is of reasonable impact to the system's design. Although for most common imperative object oriented languages, the overal component / logical model does not differ much between languages, certain properties of languages are reflected in the design. Availability of external libraries, component packages and corresponding bindings for specific languages also impact it.

For the prototypes, the object oriented programming language Python was used. Being a high level, dynamic language with a large base of supporting libraries, it is a natural selection for rapid prototyping. In practice, it indeed proved to be very convenient for this task.

The final implementation has different requirements; here native speed and low level control are more important for an I/O limited system. Initially, the idea for the final implementation was to use C++, being a fast, object oriented, low level language with good industry support.

In the end another decision was made, however. The final design will be based on Python, as used for the prototypes. The primary reason for this decision is that implementation time has become a more important factor now it is apparent that the actual implementation will be outside the scope of this internship, and will have to happen in my spare time. One important observation here is that the prototype phase has actually delivered a substantial amount of code that is near-production quality and requires only relatively minor changes to make it production grade, suitable for the final system. (This doesn't hold for all prototype code, but some parts have actually entertained a good design cycle.) This effectively shortens the implementation time needed by a substantial factor.

#### External components

One factor that contributed to this decision was the observation that similarly powerful and clean implementations of both the HTTP server component and the event handler did not seem to be available on the same level as Twisted as used in the prototypes. Especially for the HTTP server component this is somewhat surprising, as it's such a common part of many web systems, and C++ is such a common, industry standard language. Some HTTP server libraries that had been found had a very different focus, and using them for this purpose would introduce a lot of unnecessary bloat. One nice C++ HTTP server component that had been found advertised with benchmark results that were quite disappointing, because it didn't seem to perform better than the system prototypes written using Python Twisted Web.

For event handling, a nice and especially high performance library libevent exists. This takes care of the actual event notification abstraction and partly event handler dispatching, but does not (intend to) provide support for deferred return values, callback chaining and error handling like Twisted does. A C++ implementation would thus have to take care of this itself, either by impacting a large amount of code spread over the system, or by providing an extensive wrapper class similar to the basic Twisted modules. Because of the substantial necessary extra time, this work should only be duplicated when absolutely necessited by other factors.

#### Performance of Python

The biggest disadvantage to using a dynamic and high level language like Python is probably its runtime speed. Being in essence an interpreted language, the system will run slower as a Python implementation than when written in low level languages like C/C++. However, for an I/O limited application like this, the difference may not be that bad. The biggest overhead in Python applications usually is in object instance creation and duplication. The system's design carefully avoids excessive use of these operations however, and especially the object path of forwarding the actual BLOB data is kept as small as possible.

Unfortunately, many benchmark results from the prototypes are not available. Some quick testing on a small node web comprised of 3 nodes showed the SQL index prototype could deliver between 200 - 250 distributed object requests per second from a single node, which is not that disappointing in the context of a dynamic language and unoptimized prototypes. Some perspective on further optimizing this exists, for example by using just-in-time compilers like Psyco, which can easily speed up the runtime speed by a small factor. Because the system is primarily designed to be scalable rather than fast, this will hopefully prove to be sufficient. Admittedly, this is not enough of a firm base to make a good decision on, and a better analysis of (the prototype's) performance would have proven very helpful if time permitted.

A nice overview of Python performance aspects is given on http://orca.mojam.com/~skip/python/fastpython.html.

Should the runtime speed of the Python implementation prove to be unsufficient over time - rather than e.g. index handling being a bottleneck, which can be solved by implementing a more distributed index component - then it will always be possible to redo the entire system or parts thereof in another lower level language like C++, still maintaining the essence of the design presented here.

## Formal base

In this section, some attempt at a semi-formal foundation for the consistency of objects over the distributed nodes will be made. This is certainly not sufficient (or even totally correct) as a proof, but is helpful to gain some insight in the statechanges of the system, and why the operations don't invalidate the global state.

The first observation to make is that all different objects (i.e., objects with different object identifiers) are completely independent of eachother. This implies that (the order of) operations on different objects can not have an influence on the consistency of objects in the system in any way.

Client operations that make changes to the state space are:

• PUT
• DELETE

GET requests do not alter the spacespace in any way, and therefore cannot invalidate it.

Apart from these client requests, the system autonomously runs a replication process, that consists of partial PUT requests. Obviously, this alters the statespace as well, and consequently can invalidate it.

Object state diagram

### Function definitions

${\displaystyle IE(x,s)}$
Object ${\displaystyle x}$ with index sequence number ${\displaystyle s}$ exists in the global index
${\displaystyle IL(x,s,l)}$
Object ${\displaystyle x}$ with sequence number ${\displaystyle s}$ is present on location ${\displaystyle n}$ according in the global index
${\displaystyle OL(x,s,n)}$
Object ${\displaystyle x}$ with sequence number ${\displaystyle s}$ is present and retrievable on node ${\displaystyle n}$
${\displaystyle S(x,s,n)}$
Active state of object ${\displaystyle x}$ with sequence number ${\displaystyle s}$ on node ${\displaystyle n}$

### Invariants

Maintain:

1. ${\displaystyle IL(x,s,n)\Rightarrow OL(x,s,n)}$
2. ${\displaystyle IL(x,s,n)\Rightarrow IE(x,s)}$
3. ${\displaystyle IE(x,s)\Rightarrow (\exists n::S(x,s,n)\in (ObjEG,ObjEL,LP))}$
4. ${\displaystyle IE(x,s)\Rightarrow \neg (\exists y,n:y\neq s:S(x,y,n)\in (ObjEG,ObjEL,LP))}$
5. Some notion of progress

Invariant (2) is maintained by the RDBMS through table constraints.

In the initial, empty state of the system, all invariants are trivially valid.

### PUT

When a node receives a PUT request, it opportunisticly assumes that the object does not yet exist within the system, and consequently starts in the Object nonexistent state in the diagram. It creates a "file transaction" (which underneath works using a temporary file with an arbitrary but guaranteed unique name), and writes all object data into the file (Object write state). If this operation fails, the file transaction will be canceled (the temporary file will be removed), and the active state will remain Object nonexistent, which is a valid global state.

In both the Object nonexistent and Object write states, the alterations to the statespace are local to one node only. Even within a node, two different transactions concerning the same object cannot influence each other, because the file transaction guarantees that both transactions are disjunct, and no information is shared between them.

On completion of the object write in the Object write state, an Index add operation will be dispatched to the Index component, which adds it to global SQL database. The RDBMS guarantees that this operation is atomic, and either completes successfully or fails totally. Through table constraints, the RDBMS also guarantees that no two objects with the same object identifier will exist at any visible point in time. In case of an object collision, the Index add operation will fail.

If the Index add operation fails, no changes in state can be observed in the global SQL index, as guaranteed by the RDBMS. On the creating node, the active state will change to Object nonexistent, which is a valid state.

On success of the Index add operation, the object has been added to the global index with a globally unique sequence number, which will not ever be reused. The RDBMS guarantees this, for example through the use of an autoincreasing integer. This sequence number is returned to the system and can be retrieved and used to ensure consistency in subsequent operations on this object. On success, the local state changes to Object exists globally, and the global index state changes to Object exists globally. None of the invariants have been invalidated, as can be verified.

From state Object exists globally, the creating node will commit the file transaction, which underneath does an atomic rename of the temporary file to its final name, overwriting any possibly existing previous file in the process. On failure of this operation, the file transaction will be rolled back (temporary file removed), and an Index remove method is triggered, changing both the local and the global index state to Object nonexistent. All invariants are still maintained.

In state Object exists locally, the object is available for retrieval by clients in principle, but needs to be added as "present" on the respective node in the index correspondingly. Therefore, an Add presence operation is started, during which an entry in the global index is added. This operation is again atomic, and either fails or completes. Failure of this operation is unusual and indicates a serious inconsistency. Therefore the entire PUT request is considered failed. The local file is removed and the global object index entry is removed as well, maintaining all invariants.

On success of the Add presence operation, the object ends up in the Local presence state, and can be found by all nodes through the index. All invariants are valid in this state.

### DELETE

A DELETE request is simply carried out by sending a request to the central SQL index, marking the existing object and its presence on nodes as deleted, in a single atomic operation. Within the SQL database, the object state changes from either Object exists globally or multi-state Object presence directly to state Object nonexistent. All invariants are maintained by this transaction.

On all nodes having this object locally present, the object state effectively changes to Object nonexistent too. If the local state is Object exists globally, then the next (commit) operation will succeed, changing the local state to Object exists locally. If the local object state is Object exists locally, then the consecutive Index add operation will fail, changing the state to Object nonexistent. From state Local presence, any subsequent operation will fail, also changing the state to Object nonexistent, maintaining all invariants.

It's important to note that whenever the "global" object state in the SQL index is Object nonexistent, then either of the three local object states Object exists globally, Object exists locally and Local presence are equivalent to Object nonexistent, because any subsequent index operation will fail, and no read queries will hit the local node for this object, as it's no longer present in the global index.

Any associated state regarding the object on the local node is obsolete in these states, including for example the corresponding object files on disk. These will later be overwritten by a new object with the same identifier if required, and can be removed in a batch operation if needed for resource/space requirements.

## Object model

### Component model

The system is separated into several components, each with a more or less specific and constrained set of tasks.

Event Handler
Responsible for administrating events (mostly socket, filedescriptor and timing events), and dispatching event handlers when they occur.
HTTP Server
A generic HTTP server component, implementing the HTTP 1.1 server protocol.
HTTP Service
Responsible for translating both the HTTP server and HTTP client interfaces to more generic interfaces within the system.
Controller
Contains the actual logic of the system, i.e. takes requests from HTTP service and dispatches commands to HTTP Service, Object Store and Index.
Object Store
Responsible for storing and retrieving object data on disk, with support for data streaming and atomic transactions.
Replication
Administrates and controls replication of objects within the system, maintaining redundancy. Gathers info from Index and dispatches commands to Controller to achieve this.
Index
Maintains an index of all objects in the system, which can be either a central or a distributed index, depending on the underlying implementation. Communicates with Node Web for information and events about available nodes in the system.
Node Web
Responsible for finding and administrating information about available nodes joining and leaving the system. Implements this using a specific UDP and Multicast protocol, which can be extended for use by other Index implementations.

Furthermore, there are a few components which are globally used as tools by most or all other components in the system:

Configuration
Reads and parses the configuration file (and/or other configuration methods), and provides this info in an easily accessible way to the rest of the system.
Monitoring
Facilitates output handling about the general state and activities of the system for monitoring purposes, e.g. logging.

## Traceability

Some software requirements deserve some extra attention with respect to traceability, beyond the markers in the design description and design decisions earlier.

#### R1: 8-bit clean storage

The system uses 8-bit clean paths for object content storage everywhere. No component parses the actual content data beyond computing a content hash (which doesn't alter the data on its path, and transfer over the network using HTTP, which supports 8-bit clean content in requests and responses. The content is written to object storage using only (transparently) 8 bit clean methods, for example binary files.

#### R2: Binary numbers for object identifiers

Objects are uniquely identified within the system by a fixed length numeric hash, which is a binary number.

#### R4.4: TRAVERSE method

Unfortunately, the TRAVERSE method software requirement has not been delivered in the final design, mostly due to deadline pressure. However, it should be relatively easy to add later. Backups, which is its primary purpose, are also possible without this method, although in a somewhat less convenient way.

#### R7: Atomic operations

The atomic operations requirement is implemented by always leaving the object state in either of two states: nonexistent, or exists globally after every operation, and through the use of isolated transactions. Matching return codes, denoting either success or failure, are delivered to the client. See also the semi-formal base.

#### R9: Monitoring

Monitoring is supported, although in a very crude way: only simple message logging is available. For a real-world production grade distributed system, this is not enough, and a more sophisticated monitoring design and interface should be added.

#### R10: Extendability

Special care has been taken while designing the system to make it extendable. Inherently different tasks have been carefully separated into components, generic and inheriteable interfaces have been used, and coupling between components in the system has been minimized.

#### R10.1: Extendability for different load balancing algorithms

All control flow with respect to load balancing has been seperated into either the Index class hierarchy, or the Controller/Replicator classes. The Index classes hide index-specific load balancing logic from the rest of the system, and can easily delegate this to separate classes/code if necessary. The Controller and Replicator classes have been designed to be replaceable with the Index component.