Lytnamo is a lite implementation of Amazon's Dynamo [1]. Most of the concepts are inspired by the paper issued by Amazon referenced in the end of this document. Lytnamo is a Key-value Distributed Storage System. The system contains with one membership coordinator to maintain membership and assign keys to replicas, one frontend as interface to receive requests from client, and several synchronized divergent backend replicas to store and replicate data.
To maintain consistency among its replicas, Lytnamo uses a consistency protocol similar to those used in quorum systems by configuring three parameters N, W, and R when the server starts. Each data with particular key is replicated into N backend replicas, and the N bockend replicas is the Preference List of that key. W is the minimum number of nodes that must participate in a successful write operation. R is the minimum number of nodes that must participate in a successful read operation.
A gossip-based protocol propagates membership changes and maintains an eventually consistent view of membership. A backend replica will keep gossiping with another backend replica randomly once every second. During the gossip, backend replicas will exchange their node adding/removing log, and they will use the log to reconcile their membership. Frontend will also gossip with backend replicas, but it will only receive the log from backend, and will not offer its log to backend replicas.
The paper mentioned a technic to avoid logically partitioned. For example, two nodes A and B join into the ring, but they cannot discover each other. Base on the brief description in the paper, I made some assumptions and implemented a mechanism to achieve that technic. There are two types of backend replica in Lytnamo which is either a seed or not. A seed is known to all nodes. Every time a new replica comes up, it registers itself to Membership Coordinator, then the coordiator assigns a key to the new replica, and adds the new node into the ring. Then, the coordinator responses to the new replica with a list of all seed nodes. So the new replica can start gossip with seed nodes, seed nodes can add the new node into the membership, and eventually, all nodes can know there is a new replica, which is fairly fast.
Backend replicas detect failure of other nodes during gossip and read/write operations. There are two types of failure: temporary failure and permanent failure. We will discuss the details later in Failure Handling section.
Every read/write request contains a key parameter indicated in the uri. The frontend assigns the request to a backend replica by hashing the key to yield its position on the ring, and then walking the ring clockwise to find the first node with a position larger than the key's position.
Instead of sending the request to the first node described above, frontend randomly picks one of the node in the preference list to send the request, and the node that is responsible for the request is called Replication Coordinator.
Frontend sends a read/write request to a backend replica, which is the replication coordinator described above. After it receives the request from a client, the replication coordinator checks if itself is in the preference list of the key in the request. If it is not, it redirects frontend to send the request to the correct replica. If it is, then it stores the data into its data storage, and start the replication process to other replicas in the preference list. The replication coordinator will response to the fronend base on the configuration of W and R. That is, it will response after W replicas, including the coordinator itself, successfully store the data. The rest of the replication operations will continue asynchronously. Similarly, for read request, the coordinator requests and gathers data from all replicas in the preference list. If the coordinator ends up gathering multiple versions of the data, it returns all the versions it deems to be causally unrelated. The divergent versions can be reconciled by the client later. In addition, if we set the W or R value equals to N, then the system will be fully synchronous.
Lytnamo provides two write operations: add item and remove item.
Lytnamo provides enventual consistency, which allows for updates to be propagated to all replicas asynchronously. In order to capture causality between different versions of the same object, Lytnamo uses Vector Clock, whcih is effectively a list of (node, timestamp) pairs. Each object, which is the data of particular key, in the data storage holds a Vector Clock. A replication coordinator updates the Vector Clock of an object by increasing the timestamp of its pair. And it then passes the object with the Vector Clock to other replicas. For each write operation, the client needs to indicate the version it is updating. If it is updating an older version, the coordinator will start a read operation and return the latest version, so the client can update to with the latest version later. For read operation, the client will always read from the latest version.
First, the new replica X registers itself to Membership Coordinator, and the coordinator automatically calculates the key (slot) on the ring for this new replica by finding the middle point of the largest space between other replica pairs. For example, space between A and B is the largest, then, X will be assign to the middle of that space.
When a new replica X is added into the ring between A and B. Due to the allocation of key ranges to X, some existing nodes (X's N successors) no longer have to handle some of their keys and these nodes transfer data base on those keys to X and remove those data from their end. For this particular example: B transfers data with keys between (E, F], C transfers data with keys between (F, A], and D transfers data with keys between (A, X]. When a node is removed from the system, the reallocation of keys happens in a reverse process. Predecessors of B, C, and D will offer data within particular key range, but this reallocate operation will not remove data from sender. The notification of transfer is been initialized by the membership coordinator when a new node registers to it.
Temporary failure is dicovered during read/write operation. When a replica is temporary unreachable, the replication coordinator will add hinted information, and send the hinted data to the preference list's next replica (N + 1th node). When a replica receives the hinted data, it stores the data. The replica that received the hinted data will send the data along with the next gossip to the replica that supposes to store this data. The hinted data will then be sent to a correct replica after the partition of the ring. If the gossip proceed successfully, the node that holds hinted data previously will remove it from its end.
Permanent failure is discovered during gossip operation. Lytnamo treats permanent failure as removing a node from the ring, handles as the operation described in the Removing Storage Nodes section.
When a client receives multi versions of an object after read request, the client can indicate the version(s) it want to reconcile. Then, the replication coordinator will merge the items in the object and recalculate the vector clock, and pass the reconciled version to other replicas. For example:
Two versions of data:
[{"items":["cs682", "cs631"], "clocks": [{"node": "n1", "timestamp": 2}]}, {"items":["cs682", "cs601"], "clocks": [{"node": "n1", "timestamp": 1}, {"node": "n2", "timestamp": 1}]}]
After reconciliation by node n1:
{"items":["cs682", "cs631", "cs601"], "clocks": [{"node": "n1", "timestamp": 3}, {"node": "n2", "timestamp": 1}]}
(n1 calculates the maximum of each clock and increases its clock since it is the replication coordinator)
POST /register
Request body:
{ "id": "node_uuid", "host": "host_address", "port": "listening_port", "seed": false, "key": -1 }
Responses:
Code | Description |
200 | Registration success{ "key": 190, "capacity": 256, "N": 3, "W": 2, "R": 2, "seeds": [ { "id": "seed_uuid", "host": "seed_address", "port": "seed_listening_port", "seed": true, "key": 0 }, { "id": "seed_uuid", "host": "seed_address", "port": "seed_listening_port", "seed": true, "key": 127 } ] } |
400 | Unable to register node into the ring |
POST /deregister
Request body:
{ "id": "node_uuid", "host": "host_address", "port": "listening_port", "seed": false, "key": 190 }
Responses:
Code | Description |
200 | Deregistration success |
GET /seeds
Responses:
Code | Description |
200 | Registration success{ "capacity": 256, "N": 3, "W": 2, "R": 2, "seeds": [ { "id": "seed_uuid", "host": "seed_address", "port": "seed_listening_port", "seed": true, "key": 0 }, { "id": "seed_uuid", "host": "seed_address", "port": "seed_listening_port", "seed": true, "key": 127 } ] } |
GET /gossip
Responses:
Code | Description |
200 | Add/Delete log and current replicas{ "add": ["node1_uuid", "node2_uuid", "node3_uuid"], "delete": ["node3_uuid"], "replicas": { "node1_uuid": { "id": "node1_uuid", "host": "node1_address", "port": "node1_listening_port", "seed": true, "key": 0 }, "node2_uuid": { "id": "node2_uuid", "host": "node2_address", "port": "node2_listening_port", "seed": true, "key": 127 } } } |
POST /gossip
Request body:
{ "add": ["node1_uuid", "node2_uuid", "node3_uuid"], "delete": ["node3_uuid"], "replicas": { "node1_uuid": { "id": "node1_uuid", "host": "node1_address", "port": "node1_listening_port", "seed": true, "key": 0 }, "node2_uuid": { "id": "node2_uuid", "host": "node2_address", "port": "node2_listening_port", "seed": true, "key": 127 } } }
Responses:
Code | Description |
200 | Add/Delete log and current replicas{ "add": ["node1_uuid", "node2_uuid"], "delete": [], "replicas": { "node1_uuid": { "id": "node1_uuid", "host": "node1_address", "port": "node1_listening_port", "seed": true, "key": 0 }, "node2_uuid": { "id": "node2_uuid", "host": "node2_address", "port": "node2_listening_port", "seed": true, "key": 127 } } } |
400 | Incorrect request body format: json |
GET /get/{hashKey}/{key}
Responses:
Code | Description |
200 | Object data[ { "items": ["cs682","cs631"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 }, { "node": "c41eafcf-046c-41d1-835f-f6ebcc2937ac", "timestamp": 1 } ] }, { "items": ["cs682","cs601"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 2 } ] } ] |
307 | Not responsible for this key, redirect to:{ "address": "correct_node_address:port" } |
400 | No data |
POST /put/{hashKey}/{key}
Request body:
{ "op": "add", "item": "cs631", "version": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 } ] }
Responses:
Code | Description |
200 | Object is stored successfully |
302 | Version is too old, update with this version:[ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 }, { "node": "c41eafcf-046c-41d1-835f-f6ebcc2937ac", "timestamp": 1 } ] |
307 | Not responsible for this key, redirect to:{ "address": "correct_node_address:port" } |
GET /internal_get/{hashKey}/{key}
Responses:
Code | Description |
200 | Object data{ "items": ["cs682","cs601"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 2 } ] } |
307 | Not responsible for this key, redirect to:{ "address": "correct_node_address:port" } |
400 | No data |
POST /reconcile/merge/{hashKey}/{key}
Request body:
[ { "items": ["cs682","cs631"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 }, { "node": "c41eafcf-046c-41d1-835f-f6ebcc2937ac", "timestamp": 1 } ] }, { "items": ["cs682","cs601"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 2 } ] } ]
Responses:
Code | Description |
200 | Reconciliation scuess |
307 | Not responsible for this key, redirect to:{ "address": "correct_node_address:port" } |
POST /hinted/put
Request body:
{ "id": "c41eafcf-046c-41d1-835f-f6ebcc2937ac", "hashKey": 97, "key": "brian", "op": "add", "item": "cs631", "version": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 } ], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 }, { "node": "c41eafcf-046c-41d1-835f-f6ebcc2937ac", "timestamp": 1 } ], "replicate": true }
Responses:
Code | Description |
200 | Hinted data is stored successfully |
POST /transfer
Request body:
{ "to": "node_address:port_copy_to", "from": "node_address:port_copy_from", "range": [0,255], "remove": false }
Responses:
Code | Description |
200 | Transfer scuess |
400 | Unable to transfer or incorrect request body format |
POST /receiver
Request body:
[ { "hashKey": 6, "data": [ { "key": "brian", "object": { "items": ["cs682"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 } ], "replicate": true } } ] }, { "hashKey": 97, "data": [ { "key": "a", "object": { "items": ["testing","testing","testing","testing"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 4 } ], "replicate": true } } ] } ]
Responses:
Code | Description |
200 | Data restore scuess |
400 | Incorrect request body format |
GET /get/{key}
Responses:
Code | Description |
200 | Object data[ { "items": ["cs682","cs631"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 }, { "node": "c41eafcf-046c-41d1-835f-f6ebcc2937ac", "timestamp": 1 } ] }, { "items": ["cs682","cs601"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 2 } ] } ] |
307 | Not responsible for this key, redirect to:{ "address": "correct_node_address:port" } |
400 | No data |
POST /put/{key}
Request body:
{ "op": "add", "item": "cs631", "version": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 } ] }
Responses:
Code | Description |
200 | Object is stored successfully |
302 | Version is too old, update with this version:[ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 }, { "node": "c41eafcf-046c-41d1-835f-f6ebcc2937ac", "timestamp": 1 } ] |
307 | Not responsible for this key, redirect to:{ "address": "correct_node_address:port" } |
400 | Write failed |
POST /reconcile/merge/{key}
Request body:
[ { "items": ["cs682","cs631"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 1 }, { "node": "c41eafcf-046c-41d1-835f-f6ebcc2937ac", "timestamp": 1 } ] }, { "items": ["cs682","cs601"], "clocks": [ { "node": "070568e8-3c04-46ef-b5d9-eaadf972ce41", "timestamp": 2 } ] } ]
Responses:
Code | Description |
200 | Reconciliation scuess |
307 | Not responsible for this key, redirect to:{ "address": "correct_node_address:port" } |
400 | Write failed |
Start Membership Coordinator
$ java -jar Coordinator.jar -p <port> -max <max_ring_size> -n <nodes_in_preference_list> -w <min_nodes_write> -r <min_nodes_read>
Start Backend Replica
$ java -jar Backend.jar -p <port> -s <seed_or_not> -c <coordinator_address>
Start Frontend
$ java -jar Frontend.jar -p <port> -c <coordinator_address>
Write Test
$ python3 test_write.py <address> <key> <op> <item> <version>
Read Test
$ python3 test_read.py <address> <key>
Redirection Test
$ python3 test_redirect.py <address> <hashKey> <key>
Concurrent Write Test
$ java -jar ConcurrentTest.jar -t <target_address> -k <key> -d1 '<json_data_1>' -d2 '<json_data_2>'
Hinted Handoff Test
Just add a "demo" parameter in the write request body like this:
{ "op": "add", "item": "lytnamo", "version": [], "demo": true }
- Environment setup and create frontend/backend services with basic feature: configure with pass-in arguments when service starts, and put/get methods APIs.
- Implementing Membership with gossip-base protocol.
- Implementing Consistent Hashing to support adding backend services and maintain the services in the ring.
- Developing testing framework for features above.
- Checkpoint on Tuesday 5/8/18 to complete and demonstrate the implementations above.
- Implementing Failure Decetion and Handling (removing).
- Implementing Replication.
- Implementing Data Versioning.
- Developing final testing framework.
- Demonstration on Tuesday 5/15/18.
- Due Wednesday 5/16/18 5pm PDT.
- Chien-Yu (Brian) Sung
- Dr. Rollins - page
This project is for academic purposes only.