Sunday, November 2, 2014

Large-scale graph data process

Apache Giraph - is an iterative graph processing system originated as open-source Google Pregel, that implements BSP (Bulk Synchronous Parallel) model of distributed computation based on Hadoop.

Giraph uses Hadoop for scheduling workers and loading data into memory, but then implements its own communication protocol using HadoopRPC. Jobs are run as Mapper only job on Hadoop.

Apache Hama - is a pure BSP computing engine that generally used for iterative processing like Matrix, Graph. Jobs are runs as BSP job on HDFS only.






Tuesday, October 14, 2014

Implementation Log

Read Operation

Level-two-leader runs as ordinary quorum leader which accepts level-one-leader as an observer.

In level-one-leader, we combine three objects, a leader (run as normal leader in level-one cluster), a quorum peer (participant local quorum) and a observer. The observer is only used to connect to the level-two-leader, which ip address is read from configuration file at start up. The observer is running in its own thread.

The problem is, level-two-leader does not accepting the connection from level-one observer. The reason is when the observer start up, it uses its quorum peer function to initialize, which has a different epoch value accepted in level-one quorum by the leader (itself). It was different from level-two quorum, even if the observer's address info is in level-two configuration file.

Here is the error log from both leader:

2014-10-14 15:07:23,511 [myid:5] - INFO  [LearnerHandler-/127.0.0.1:59655:LearnerHandler@330] - Follower sid: 2 : info : org.apache.zookeeper.server.quorum.QuorumPeer$QuorumServer@29c69015
2014-10-14 15:07:23,514 [myid:5] - ERROR [LearnerHandler-/127.0.0.1:59655:LearnerHandler@633] - Unexpected exception causing shutdown while sock still open
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:63)
at org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:83)
at org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:103)
at org.apache.zookeeper.server.quorum.LearnerHandler.run(LearnerHandler.java:357)
2014-10-14 15:07:23,515 [myid:5] - WARN  [LearnerHandler-/127.0.0.1:59655:LearnerHandler@646] - ******* GOODBYE /127.0.0.1:59655 ********


2014-10-14 15:07:23,505 [myid:2] - INFO  [QuorumPeer[myid=2]/0:0:0:0:0:0:0:0:2182:LevelOneLeader@35] - Level one leader created.
2014-10-14 15:07:23,505 [myid:2] - INFO  [Thread-5:LevelOneLeader$1@41] - Starting Level one leader.
2014-10-14 15:07:23,506 [myid:2] - INFO  [Thread-5:LevelOneLeader$1@43] - Observing /127.0.0.1:2225
2014-10-14 15:07:23,507 [myid:2] - INFO  [QuorumPeer[myid=2]/0:0:0:0:0:0:0:0:2182:Leader@944] - Have quorum of supporters, sids: [ 1,2 ]; starting up and setting last processed zxid: 0x900000000
2014-10-14 15:07:23,512 [myid:2] - WARN  [Thread-5:LevelOneLeader$1@55] - Exception when observing the leader
java.io.IOException: Leaders epoch, 3 is less than accepted epoch, 9
at org.apache.zookeeper.server.quorum.Learner.registerWithLeader(Learner.java:290)
at org.apache.zookeeper.server.quorum.LevelOneLeader$1.run(LevelOneLeader.java:46)




Monday, September 29, 2014

ZooKeeper source code analysis

ZooKeeper servers start up with three ports reading from configuration file, one for listening client requests, the other two from server URL (ip:port:port) is used for inter-server communication and leader election.

When adding more replicas into zookeeper clusters, the read throughput will increase but write performance is reduce, because every operation proposed by leader needs to wait until more than half of the replicas to ACK. To resolve this issue, zookeeper introduced a new type of replica, observer. Unlike follower, observer do everything else except votes. Therefore, adding new observers to the cluster, will improve read performance without harming write throughput. However, availability still remain the same with the number of followers in the system. So there are three types of nodes: leader, follower and observer.


Follower and observer are two different kinds of learner. They extends the Learner class.


If zookeeper finds out there's only one server configured in the system when started, it will start the stand alone version of "ZooKeeperServer". If there is multiple servers, QuorumPeer thread will start the lead election process. QuorumPeer has four states, looking, following, leading and observing. Leader election is based on Fast Paxos algorithm, implemented in "FastLeaderElection". After the election process, QuorumPeer will start the server class according to its role and call their main method:


leader -> LeaderZooKeeperServer -> lead()
follower -> FollowerZooKeeperServer -> followLeader()
observer -> ObserverZooKeeperServer -> observeLeader()

All types of servers shared the same request processors, but each has different processor chain.








The actual update operation is done in the FinalRequestProcessor.


DataTree

The client API provides create, get, set methods with path as the access point. I was wondering how the tree structure access would be better than a hash table lookup, with a simple id as the access point. Turns out, it's not. In zookeeper, the DataTree maintains two parallel data structures: a hashtable that maps from full paths to DataNodes and a tree of DataNodes. All accesses to a path is through the hashtable. The tree is traversed only when serializing to disk.

So logically given client a path to manage data node in hierarchy, but access to a full path is through a hashtable lookup.

Thursday, September 25, 2014

On GlusterFS

Type of a volume is specified at the time of volume creation, it determines how and where data is placed. Following volume types are supported in glusterfs:
1) Distribute
    Distributes files across various bricks of the volume, similar to file-level RAID 0.
    Directories are present on all bricks of the volume.
    It uses Davies-Meyer hash algorithm, with 32-bit hash space being divided into N ranges for N bricks. When a directory created, a range is assigned to each directory. Hash value is computed on the file name when a file is created.

2) Stripe
    Individual files split among bricks, similar to block-level RAID 0.

3) Replication
    Copy files to multiple bricks, similar to file-level RAID 1.
    Synchronous replication of all directory and file updates.
    Transaction driven consistency.

4) Distributed Replicate
    Most preferred model of deployment currently. Reads get load balanced.

5) Striped Replicate
    Similar to RAID 10 (1+0)

6) Distributed Striped Replicate
    Limited Use Cases – Map Reduce

There is no external metadata servers.

Access GlusterFS can be FUSE mount point or through libgfsapi, both sync and async interfaces are available.

A new feature in 3.5 is distributed geo-replication, which based on change log, one driver per replica set on the master, with no SPOF. It is asynchronous operation across LAN or WAN, in a master-slave model (cascading), so data is passed between defined master and slave only, continuous and incremental replicate.

Tuesday, September 23, 2014

On GPFS

From GPFS paper [2002], there's no open source version.

GPFS, to some perspective, still is the fastest parallel file system implemented in shared disk environment over storage area network (SAN). Since any file system node can access any portion of shared disks, GPFS requires a lock mechanism to support fully parallel access both to file data and metadata.

GPFS uses a centralized global lock manager, in conjunction with local lock managers in each file system node. Lock manager handing out lock tokens to requested node.

GPFS guarantees single-node equivalent POSIX semantics for file system operations across the cluster, meaning a read on node A will see either all or none of concurrent write on node B (read/write atomicity). But with one exception, the access time (atime in metadata) updates only periodically, due to concurrent read is very common, synchronizing atime would be very expensive.

The paper claims there are two approaches to achieving the necessary synchronization:
1. Distributed Locking : every FS operation acquires read/write lock to synchronize with conflicting operations.
2. Centralized Management : all conflicting operations are forwarded to a designated node, which performs requests.

GPFS uses byte-range locking for updates to file data, and dynamically elected "metanodes" for centralized management of file metadata. The argument of using different approach for data and metadata is this: 
(1) when lock conflicts are frequent, (e.g. many nodes may access different parts of a file, but all need to access the same metadata), the overhead for distributed locking may exceed the cost of forwarding requests to a central node.
(2) if different nodes operates on different pieces of file data, distributed locking allows greater parallelism.

Also, a smaller lock granularity means more overhead due to frequent lock requests. Whereas larger granularity may cause more frequent lock conflicts. Thus, byte-rang lock for file data, lock-per-file used for metadata.

However, there could be third approach, a middle solution: Panopticon