Saturday, November 28, 2009

Query Processing for NOSQL DB

The recent rise of NoSQL provides an alternative model in building extremely large scale storage system. Nevetheless, compare to the more mature RDBMS, NoSQL has some fundamental limitations that we need to be aware of.
  1. It calls for a more relaxed data consistency model
  2. It provides primitive querying and searching capability
There are techniques we can employ to mitigate each of these issue. Regarding the data consistency concern, I have discussed a number of design patterns in my previous blog to implement system with different strength of consistency guarantee.

Here I like to give myself a try to tackle the second issue.

So what is the problem ?

Many of the NoSQL DB today is based on the DHT (Distributed Hash Table) model, which provides a hashtable access semantics. To access or modify any object data, the client is required to supply the primary key of the object, then the DB will lookup the object using an equality match to the supplied key.

For example, if we use DHT to implement a customer DB, we can choose the customer id as the key. And then we can get/set/operate on any customer object if we know its id
  • cust_data = DHT.get(cust_id)
  • DHT.set(cust_id, modified_cust_data)
  • DHT.execute(cust_id, func(cust) {cust.add_credit(200)})
In the real world, we may want to search data based on other attributes than its primary key, we may also search attributes based on "greater/less than" relationship, or we may want to combine multiple search criteria using a boolean expression.

Using our customer DB example, we may do ...
  • Lookup customers within a zip code
  • Lookup customers whose income is above 200K
  • Lookup customer using keywords "chief executives"
Although query processing and indexing technique is pretty common in RDBMS world, it is seriously lacking in the NoSQL world because of the very nature of the "distributed architecture" underlying most of NoSQL DB.

It seems to me that the responsibility of building an indexing and query mechanism lands on the NoSQL user. Therefore, I want to explore some possible techniques to handle these.

Companion SQL-DB

A very straighforward approach is provide querying capability is to augment NoSQL with an RDBMS or TextDB (for keyword search). e.g. We add the metadata of the object into a RDBMS so we can query its metadata using standard SQL query.

Of course, this requires the RDBMS to be large enough to store the search-able attributes of each object. Since we only store the attributes required for search, rather than the whole object into the RDBMS, this turns out to be a very practical and common approach.

Scatter/Gather Local Search

Some of the NOSQL DB provides indexing and query processing mechanism within the local DB. In this case, we can have the query processor broadcast the query to every node in the DHT where a local search will be conducted with results sent back to the query processor which aggregates into a single response.

Notice that the search is happening in parallel across all nodes in the DHT.

Distributed B-Tree

B+Tree is a common indexing structure using in RDBMS. A distributed version of B+Tree can also be used in a DHT environment. The basic idea is to hash the search-able attribute to locate the root node of the B+ Tree. The "value" of the root node contains the id of its children node. So the client can then issue another DHT lookup call to find the children node. Continue this process, the client eventually navigate down to the leaf node, where the object id of the matching the search criteria is found. Then the client will issue another DHT lookup to extract the actual object.

Caution is needed when the B+Tree node is updated due to split/merge caused by object creation and deletion. This should be ideally done in an atomic fashion. This paper from Microsoft, HP and Toronto U describe a distributed transaction protocol to provide the required atomicity. Distributed transaction is an expensive operation but its uses here is justified because most of the B+ tree updates rarely involve more than a single machine.

Prefix Hash Table (distributed Trie)

Trie is an alternative data structure, where every path (from the root) contains the prefix of the key. Basically, every node in the Trie contains all the data whose key is prefixed by it. Berkeley and Intel research has a paper to describe this mechanism.

1. Lookup a key
To locate a particular key, we start with its one digit prefix and do a DHT lookup to see if we get a leaf node. If so, we search within this leaf node as we know the key must be contained inside. If it is not a leaf node, we extend the prefix with an extra digit and repeat the whole process again.
# Locate the key next to input key
def locate(key)
 leaf = locate_leaf_node(key)
 return leaf.locate(key)

# Locate leaf node containing input key
def locate_leaf_node(key)
 for (i in 1 .. key.length)
   node = DHT.lookup(key.prefix(n))
   return node if node.is_leaf?
 raise exception

2. Range Query
Perform a range query can be done by first locate the leaf node that contains the start key and then walk in the ascending order direction until we exceed the end key. Note that we can walk across a leaf node by following the leaf node chain.
def range(startkey, endkey) {
 result =
 leaf = locate_leaf_node(startkey)
 while leaf != nil
   result.append(leaf.range(startkey, endkey))
   if (leaf.largestkey < endkey)
     leaf = leaf.nextleaf
 return result
To speedup the search, we can use a parallel search mechanism. Instead of walking from the start key in a sequential manner, we can find the common prefix of the start key and end key (as we know all the result is under its subtree) and perform a parallel search of the children leaf nodes of this subtree.

3. Insert and Delete keys
To insert a new key, we first identify the leaf node that contains the inserted key. If the leaf node has available capacity (less than B keys), then simply add it there. Otherwise, we need to split the leaf node into two branches and redistribute its existing keys to the newly created child nodes.

To delete a key, we similarly identify the leaf node that contains the deleted key and then remove it there. This may cause some of my parents to have less than B + 1 keys so I may need to merge some child nodes.

Combining Multiple Search Criteria

When we have multiple criteria in the search, each criteria may use a different index that resides within a different set of machines in the DHT. Multiple criterias can be combined using boolean operators such as OR / AND. Performing OR operation is very straightforward because we just need to union the results of each individual index search that is performed separately. On the other hand, performing AND operation is trickier because we need to deal with the situation that each individual criteria may have a large number of matches but their intersection is small. The challenge is: how can we efficiently perform an intersection between two potentially very large sets ?

One naive implementation is to send all matched object ids of each criteria to a server that performs the set intersection. If each data set is large, this approach may cause a large bandwidth consumption for sending across all the potential object ids.

A number of techniques are described here in this paper

1. Bloom Filter
Instead of sending the whole set of matched object id, we can send a more compact representation called "Bloom Filter". Bloom filter is a much more compact representation that can be used for testing set membership. The output has zero false negative, but has a chance of false positive p, which is controllable.

For minimizing bandwidth, we typically pick the one with the larger set as the sending machine and perform the intersection on the receiving machine who has the smaller set.

Notice that the false positive can actually be completely eliminated by sending the matched result of Set2 back to Set1 machine, which double check the membership of set1 again. In most cases, 100% precision is not needed and a small probability of false positive is often acceptable.

2. Caching
It is possible that certain search criteria is very popular and will be issued over and over again. The corresponding bloom filter of this hot spots can be cached in the receiving machine. Since the bloom filter has a small footprint, we can cache a lot of bloom filters of popular search criterias.

3. Incremental fetch
In case if the client doesn't need to get the full set of matched results, we can stream the data back to client using a cursor mode. Basically, at the sending side, set1 is sorted and broken into smaller chunks with a bloom filter computed and attached to each chunk. At the receiving side, every element of set2 is checked for every bloom filter per chunk.

Notice that we save computation at the sending side (compute the bloom filter for the chunk rather than the whole set1) at the cost of doing more at the receiving side (since we need to repeat the checking of the whole set2 for each chunk of set1). The assumption is that client only needs a small subset of all the matched data.

An optimization we can do is to mark the range of each chunk in set1 and ask set2 to skip the objects that falls within the same range.

Thursday, November 19, 2009

Cloud Computing Patterns

I have attended a presentation by Simon Guest from Microsoft on their cloud computing architecture. Although there was no new concept or idea introduced, Simon has provided an excellent summary on the major patterns of doing cloud computing.

I have to admit that I am not familiar with Azure and this is my first time hearing a Microsoft cloud computing presentation. I felt Microsoft has explained their Azure platform in a very comprehensible way. I am quite impressed.

Simon talked about 5 patterns of Cloud computing. Let me summarize it (and mix-in a lot of my own thoughts) ...

1. Use Cloud for Scaling
The key idea is to spin up and down machine resources according to workload so the user only pay for the actual usage. There is two types of access patterns: passive listener model and active worker model.

Passive listener model uses a synchronous communication pattern where the client pushes request to the server and synchronously wait for the processing result.
In the passive listener model, machine instances are typically sit behind a load balancer. To scale the resource according to the work load, we can use a monitor service that send NULL client request and use the measured response time to spin up and down the size of the machine resources.

On the other hand, Active worker model uses an asynchronous communication patterns where the client put the request to a queue, which will be periodically polled by the server. After queuing the request, the client will do some other work and come back later to pickup the result. The client can also provide a callback address where the server can push the result into after the processing is done.
In the active worker model, the monitor can measure the number of requests sitting in the queue and use that to determine whether machine instances (at the consuming end) need to be spin up or down.

2. Use Cloud for Multi-tenancy
Multi-tenancy is more a SaaS provider (rather than an enterprise) usage scenario. The key idea is to use the same set of code / software to host the application for different customers (tenants) who may have slightly different requirement in
  • UI branding
  • Business rules for decision criteria
  • Data schema
The approach is to provide sufficient "customization" capability for their customer. The most challenging part is to determine which aspects should be opened for customization and which shouldn't. After identifying these configurable parameters, it is straightforward to define configuration metadata to capture that.

3. Use Cloud for Batch processingThis is about running things like statistics computation, report generation, machine learning, analytics ... etc. These task is done in batch mode and so it is more economical to use the "pay as you go" model. On the other hand, batch processing has very high tolerance in latency and so is a perfect candidate of running in the cloud.
Here is an example of how to run Map/Reduce framework in the cloud. Microsoft hasn't provided a Map/Reduce solution at this moment but Simon mentioned that Dryad in Microsoft research may be a future Microsoft solution. Interestingly, Simon also recommended Hadoop.

Of course, one challenge is how to move the data from the cloud in the first place. In my earlier blog, I have describe some best practices on this.

4. Use Cloud for Storage
The idea of storing data into the cloud and no need to worry about DBA tasks. Most cloud vendor provide large scale key/value store as well as RDBMS services. Their data storage services will also take care of data partitioning, replication ... etc. Building cloud storage is a big topic involving many distributed computing concepts and techniques, I have covered it in a separate blog.

5. Use Cloud for Communication
A queue (or mailbox) service provide a mechanism for different machines to communicate in an asynchronous manner via message passing.

Azure also provide a relay service in the cloud which is quite useful for machines behind different firewall to communicate. In a typical firewall setup, incoming connection is not allowed so these machine cannot directly establish a socket to each other. In order for them to communicate, each need to open an on-going socket connection to the cloud relay, which will route traffic between these connections.

I have used the same technique in a previous P2P project where user's PC behind their firewall need to communicate, and I know this relay approach works very well.

Monday, November 16, 2009

Impression on Scala

I have been hearing quite a lot of good comments about the Scala programming language. I personally use Java extensively in the past and have switched to Ruby (and some Erlang) in last 2 years. The following features that I heard about Scala really attracts me ...
  • Scala code is compiled in Java byte code and run natively in JVMs. Code written in Scala immediately enjoy the performance and robustness of Java VM technologies.
  • Easy to integrate with Java code and libraries, immediately enjoy the wide portfolio of exiting Java libraries.
  • It has good support to the Actor model, which I believe is an important programming paradigm for multi-core machine architecture.
So I decide to take a Scala tutorial from Dean Wampler today in the Qcon conference. This is a summary of my impression on Scala after the class.

First of all, Scala is a strongly typed language. However it has a type inference mechanism so you don't have to type declaration is optional. But in some place (like a method signature), type declaration is mandatory. It is not very clear to me when I have to declare a type.

Having the "val" and "var" declaration in variables is very nice because it makes immutability explicit. In Ruby, you can make an object immutable by sending it a freeze() method but Scala do this more explicitly.

But I found it confusing to have a method define in 2 different ways

class A() {
def hello {
class A() {
def hello = {
The MyFunction[+A1, -A2] is really confusing to me. I feel the typeless language is much more easy.

Removing the open and close bracket is also causing a lot of confusion to me.
class Person(givenName: String) {
var myName = givenName
def name =(anotherName: String) = {
myName = anotherName

class Person(givenName: String) {
var myName = givenName
def name =(anotherName: String) = myName = anotherName
The special "implicit" conversion method provides a mechanism to develop DSL (Domain Specific Language) in Scala but it also looks very odd to me. Basically, you need to import a SINGLE implicit conversion method that needs to take care of all possible conversions.

All the method that ends with ":" has a reverse calling order is also an odd stuff to me.

Traits provides mixins for Scala but I feel the "Module" mechanism in Ruby has done a better job.

Scala has the notion of "function" and can pass "function" as parameters. Again, I feel Ruby blocks has done a better job.

Perhaps due to JVM's limitation of supporting a dynamic language, Scala is not very strong in doing meta-programming, Scala doesn't provide the "open class" property where you can modify an existing class (add methods, change method implementation, add class ... etc.) at run time

Scala also emulate a number of Erlang features but I don't feel it is doing a very clean job. For example, it emulate the pattern matching style of Erlang programming using the case Class and unapply() method but it seems a little bit odd to me.

Erlang has 2 cool features which I couldn't find in Scala (maybe I am expecting too much)
  • The ability to run two version of class at the same time
  • Able to create and pass function objects to a remote process (kinda like a remote code loading)
Overall impression

I have to admit that my impression on Scala is not as good as before I attend the tutorial. Scala tries to put different useful programming paradigm in the JVM but I have a feeling of force-fit. Of course its close tie to JVM is still a good reason to use Scala. But from a pure programming perspective, I will prefer to use a combination of Ruby and Erlang, rather than Scala.

Sunday, November 15, 2009

NOSQL Patterns

Over the last couple years, we see an emerging data storage mechanism for storing large scale of data. These storage solution differs quite significantly with the RDBMS model and is also known as the NOSQL. Some of the key players include ...
  • GoogleBigTable, HBase, Hypertable
  • AmazonDynamo, Voldemort, Cassendra, Riak
  • Redis
  • CouchDB, MongoDB
These solutions has a number of characteristics in common
  • Key value store
  • Run on large number of commodity machines
  • Data are partitioned and replicated among these machines
  • Relax the data consistency requirement. (because the CAP theorem proves that you cannot get Consistency, Availability and Partitioning at the the same time)
The aim of this blog is to extract the underlying technologies that these solutions have in common, and get a deeper understanding on the implication to your application's design. I am not intending to compare the features of these solutions, nor to suggest which one to use.

API model

The underlying data model can be considered as a large Hashtable (key/value store).

The basic form of API access is
  • get(key) -- Extract the value given a key
  • put(key, value) -- Create or Update the value given its key
  • delete(key) -- Remove the key and its associated value
More advance form of API allows to execute user defined function in the server environment
  • execute(key, operation, parameters) -- Invoke an operation to the value (given its key) which is a special data structure (e.g. List, Set, Map .... etc).
  • mapreduce(keyList, mapFunc, reduceFunc) -- Invoke a map/reduce function across a key range.

Machines layout

The underlying infratructure is composed of large number (hundreds or thousands) of cheap, commoditized, unreliable machines connected through a network. We call each machine a physical node (PN). Each PN has the same set of software configuration but may have varying hardware capacity in terms of CPU, memory and disk storage. Within each PN, there will be a variable number of virtual node (VN) running according to the available hardware capacity of the PN.

Data partitioning (Consistent Hashing)

Since the overall hashtable is distributed across many VNs, we need a way to map each key to the corresponding VN.

One way is to use
partition = key mod (total_VNs)

The disadvantage of this scheme is when we alter the number of VNs, then the ownership of existing keys has changed dramatically, which requires full data redistribution. Most large scale store use a "consistent hashing" technique to minimize the amount of ownership changes.

In the consistent hashing scheme, the key space is finite and lie on the circumference of a ring. The virtual node id is also allocated from the same key space. For any key, its owner node is defined as the first encountered virtual node if walking clockwise from that key. If the owner node crashes, all the key it owns will be adopted by its clockwise neighbor. Therefore, key redistribution happens only within the neighbor of the crashed node, all other nodes retains the same set of keys.

Data replication

To provide high reiability from individually unreliable resource, we need to replicate the data partitions.

Replication not only improves the overall reliability of data, it also helps performance by spreading the workload across multiple replicas.

While read-only request can be dispatched to any replicas, update request is more challenging because we need to carefully co-ordinate the update which happens in these replicas.

Membership Changes

Notice that virtual nodes can join and leave the network at any time without impacting the operation of the ring.

When a new node joins the network
  1. The joining node announce its presence and its id to some well known VNs or just broadcast)
  2. All the neighbors (left and right side) will adjust the change of key ownership as well as the change of replica memberships. This is typically done synchronously.
  3. The joining node starts to bulk copy data from its neighbor in parallel asynchronously.
  4. The membership change is asynchronously propagate to the other nodes.

Notice that other nodes may not have their membership view updated yet so they may still forward the request to the old nodes. But since these old nodes (which is the neighbor of the new joined node) has been updated (in step 2), so they will forward the request to the new joined node.

On the other hand, the new joined node may still in the process of downloading the data and not ready to serve yet. We use the vector clock (described below) to determine whether the new joined node is ready to serve the request and if not, the client can contact another replica.

When an existing node leaves the network (e.g. crash)
  1. The crashed node no longer respond to gossip message so its neighbors knows about it.
  2. The neighbor will update the membership changes and copy data asynchronously

We haven't talked about how the virtual nodes is mapped into the physical nodes. Many schemes are possible with the main goal that Virtual Node replicas should not be sitting on the same physical node. One simple scheme is to assigned Virtual node to Physical node in a random manner but check to make sure that a physical node doesn't contain replicas of the same key ranges.

Notice that since machine crashes happen at the physical node level, which has many virtual nodes runs on it. So when a single Physical node crashes, the workload (of its multiple virtual node) is scattered across many physical machines. Therefore the increased workload due to physical node crashes is evenly balanced.

Client Consistency

Once we have multiple copies of the same data, we need to worry about how to synchronize them such that the client can has a consistent view of the data.

There is a number of client consistency models
  1. Strict Consistency (one copy serializability): This provides the semantics as if there is only one copy of data. Any update is observed instantaneously.
  2. Read your write consistency: The allows the client to see his own update immediately (and the client can switch server between requests), but not the updates made by other clients
  3. Session consistency: Provide the read-your-write consistency only when the client is issuing the request under the same session scope (which is usually bind to the same server)
  4. Monotonic Read Consistency: This provide the time monotonicity guarantee that the client will only see more updated version of the data in future requests.
  5. Eventual Consistency: This provides the weakness form of guarantee. The client can see an inconsistent view as the update are in progress. This model works when concurrent access of the same data is very unlikely, and the client need to wait for some time if he needs to see his previous update.

Depends on which consistency model to provide, 2 mechanisms need to be arranged ...
  • How the client request is dispatched to a replica
  • How the replicas propagate and apply the updates
There are various models how these 2 aspects can be done, with different tradeoffs.

Master Slave (or Single Master) Model

Under this model, each data partition has a single master and multiple slaves. In above model, B is the master of keyAB and C, D are the slaves. All update requests has to go to the master where update is applied and then asynchronously propagated to the slaves. Notice that there is a time window of data lost if the master crashes before it propagate its update to any slaves, so some system will wait synchronously for the update to be propagated to at least one slave.

Read requests can go to any replicas if the client can tolerate some degree of data staleness. This is where the read workload is distributed among many replicas. If the client cannot tolerate staleness for certain data, it also need to go to the master.

Note that this model doesn't mean there is one particular physical node that plays the role as the master. The granularity of "mastership" happens at the virtual node level. Each physical node has some virtual nodes acts as master of some partitions while other virtual nodes acts as slaves of other partitions. Therefore, the write workload is also distributed across different physical node, although this is due to partitioning rather than replicas

When a physical node crashes, the masters of certain partitions will be lost. Usually, the most updated slave will be nominated to become the new master.

Master Slave model works very well in general when the application has a high read/write ratio. It also works very well when the update happens evenly in the key range. So it is the predominant model of data replication.

There are 2 ways how the master propagate updates to the slave; State transfer and Operation transfer. In State transfer, the master passes its latest state to the slave, which then replace its current state with the latest state. In operation transfer, the master propagate a sequence of operations to the slave which then apply the operations in its local state.

The state transfer model is more robust against message lost because as long as a latter more updated message arrives, the replica still be able to advance to the latest state.

Even in state transfer mode, we don't want to send the full object for updating other replicas because changes typically happens within a small portion of the object. In will be a waste of network bandwidth if we send the unchanged portion of the object, so we need a mechanism to detect and send just the delta (the portion that has been changed). One common approach is break the object into chunks and compute a hash tree of the object. So the replica can just compare their hash tree to figure out which chunk of the object has been changed and only send those over.

In operation transfer mode, usually much less data need to be send over the network. However, it requires a reliable message mechanism with delivery order guarantee.

Multi-Master (or No Master) Model

If there is hot spots in certain key range, and there is intensive write request, the master slave model will be unable to spread the workload evenly. Multi-master model allows updates to happen at any replica (I think call it "No-Master" is more accurate).

If any client can issue any update to any server, how do we synchronize the states such that we can retain client consistency and also eventually every replica will get to the same state ? We describe a number of different approaches in following ...

Quorum Based 2PC

To provide "strict consistency", we can use a traditional 2PC protocol to bring all replicas to the same state at every update. Lets say there is N replicas for a data. When the data is update, there is a "prepare" phase where the coordinator ask every replica to confirm whether each of them is ready to perform the update. Each of the replica will then write the data to a log file and when success, respond to the coordinator.

After gathering all replicas responses positively, the coordinator will initiate the second "commit" phase and then ask every replicas to commit and each replica then write another log entry to confirm the update. Notice that there are some scalability issue as the coordinator need to "synchronously" wait for quite a lot of back and forth network roundtrip and disk I/O to complete.

On the other hand, if any one of the replica crashes, the update will be unsuccessful. As there are more replicas, chance of having one of them increases. Therefore, replication is hurting the availability rather than helping. This make traditional 2PC not a popular choice for high throughput transactional system.

A more efficient way is to use the quorum based 2PC (e.g. PAXOS). In this model, the coordinator only need to update W replicas (rather than all N replicas) synchronously. The coordinator still write to all the N replicas but only wait for positive acknowledgment for any W of the N to confirm. This is much more efficient from a probabilistic standpoint.

However, since no all replicas are update, we need to be careful when reading the data to make sure the read can reach at least one replica that has been previously updated successful. When reading the data, we need to read R replicas and return the one with the latest timestamp.

For "strict consistency", the important condition is to make sure the read set and the write set overlap. ie: W + R > N

As you can see, the quorum based 2PC can be considered as a general 2PC protocol where the traditional 2PC is a special case where W = N and R = 1. The general quorum-based model allow us to pick W and R according to our tradeoff decisions between read and write workload ratio.

If the user cannot afford to pick W, R large enough, ie: W + R <= N, then the client is relaxing its consistency model to a weaker one.
If the client can tolerate a more relax consistency model, we don't need to use the 2PC commit or quorum based protocol as above. Here we describe a Gossip model where updates are propagate asynchronous via gossip message exchanges and an auto-entropy protocol to apply the update such that every replica eventually get to the latest state.

Vector Clock

Vector Clock is a timestamp mechanism such that we can reason about causal relationship between updates. First of all, each replica keeps vector clock. Lets say replica i has its clock Vi. Vi[i] is the logical clock which if every replica follows certain rules to update its vector clock.
  • Whenever an internal operation happens at replica i, it will advance its clock Vi[i]
  • Whenever replica i send a message to replica j, it will first advance its clock Vi[i] and attach its vector clock Vi to the message
  • Whenever replica j receive a message from replica i, it will first advance its clock Vj[j] and then merge its clock with the clock Vm attached in the message. ie: Vj[k] = max(Vj[k], Vm[k])

A partial order relationship can be defined such that Vi > Vj iff for all k, Vi[k] >= Vj[k]. We can use these partial ordering to derive causal relationship between updates. The reasoning behind is
  • The effect of an internal operation will be seen immediately at the same node
  • After receiving a message, the receiving node knows the situation of the sending node at the time when the message is send. The situation is not only including what is happening at the sending node, but also all the other nodes that the sending node knows about.
  • In other words, Vi[i] reflects the time of the latest internal operation happens at node i. Vi[k] = 6 reflects replica i has known the situation of replica k up to its logical clock 6.
Notice that the term "situation" is used here in an abstract sense. Depends on what information is passed in the message, the situation can be different. This will affect how the vector clock will be advanced. In below, we describe the "state transfer model" and the "operation transfer model" which has different information passed in the message and the advancement of their vector clock will also be different.

Because state is always flow from the replica to the client but not the other way round, the client doesn't have an entry in the Vector clock. The vector clock contains only one entry for each replica. However, the client will also keep a vector clock from the last replica it contacts. This is important for support the client consistency model we describe above. For example, to support monotonic read, the replica will make sure the vector clock attached to the data is > the client's submitted vector clock in the request.

Gossip (State Transfer Model)

In a state transfer model, each replica maintain a vector clock as well as a state version tree where each state is neither > or < among each other (based on vector clock comparison). In other words, the state version tree contains all the conflicting updates.

At query time, the client will attach its vector clock and the replica will send back a subset of the state tree which precedes the client's vector clock (this will provide monotonic read consistency). The client will then advance its vector clock by merging all the versions. This means the client is responsible to resolve the conflict of all these versions because when the client sends the update later, its vector clock will precede all these versions.

At update, the client will send its vector clock and the replica will check whether the client state precedes any of its existing version, if so, it will throw away the client's update.

Replicas also gossip among each other in the background and try to merge their version tree together.

Gossip (Operation Transfer Model)

In an operation transfer approach, the sequence of applying the operations is very important. At the minimum causal order need to be maintained. Because of the ordering issue, each replica has to defer executing the operation until all the preceding operations has been executed. Therefore replicas save the operation request to a log file and exchange the log among each other and consolidate these operation logs to figure out the right sequence to apply the operations to their local store in an appropriate order.

"Causal order" means every replica will apply changes to the "causes" before apply changes to the "effect". "Total order" requires that every replica applies the operation in the same sequence.

In this model, each replica keeps a list of vector clock, Vi is the vector clock the replica itself and Vj is the vector clock when replica i receive replica j's gossip message. There is also a V-state that represent the vector clock of the last updated state.

When a query is submitted by the client, it will also send along its vector clock which reflect the client's view of the world. The replica will check if it has a view of the state that is later than the client's view.

When an update operation is received, the replica will buffer the update operation until it can be applied to the local state. Every submitted operation will be tag with 2 timestamp, V-client indicates the client's view when he is making the update request. V-@receive is the replica's view when it receives the submission.

This update operation request will be sitting in the queue until the replica has received all the other updates that this one depends on. This condition is reflected in the vector clock Vi when it is larger than V-client

On the background, different replicas exchange their log for the queued updates and update each other's vector clock. After the log exchange, each replica will check whether certain operation can be applied (when all the dependent operation has been received) and apply them accordingly. Notice that it is possible that multiple operations are ready for applying at the same time, the replica will sort these operation in causal order (by using the Vector clock comparison) and apply them in the right order.

The concurrent update problem at different replica can also happen. Which means there can be multiple valid sequences of operation. In order for different replica to apply concurrent update in the same order, we need a total ordering mechanism.

One approach is whoever do the update first acquire a monotonic sequence number and late comers follow the sequence. On the other hand, if the operation itself is commutative, then the order to apply the operations doesn't matter

After applying the update, the update operation cannot be immediately removed from the queue because the update may not be fully exchange to every replica yet. We continuously check the Vector clock of each replicas after log exchange and after we confirm than everyone has receive this update, then we'll remove it from the queue.

Map Reduce Execution

Notice that the distributed store architecture fits well into distributed processing as well. For example, to process a Map/Reduce operation over an input key list.

The system will push the map and reduce function to all the nodes (ie: moving the processing logic towards the data). The map function of the input keys will be distributed among the replicas of owning those input, and then forward the map output to the reduce function, where the aggregation logic will be executed.

Handling Deletes

In a multi-master replication system, we use Vector clock timestamp to determine causal order, we need to handle "delete" very carefully such that we don't lost the associated timestamp information of the deleted object, otherwise we cannot even reason the order of when to apply the delete.

Therefore, we typically handle delete as a special update by marking the object as "deleted" but still keep its metadata / timestamp information around. Around a long enough time that we are confident that every replica has marked this object deleted, then we garbage collected the deleted object to reclaim its space.

Storage Implementaton

One strategy is to use make the storage implementation pluggable. e.g. A local MySQL DB, Berkeley DB, Filesystem or even a in memory Hashtable can be used as a storage mechanism.

Another strategy is to implement the storage in a highly scalable way. Here are some techniques that I learn from CouchDB and Google BigTable.

CouchDB has a MVCC model that uses a copy-on-modified approach. Any update will cause a private copy being made which in turn cause the index also need to be modified and causing the a private copy of the index as well, all the way up to the root pointer.

Notice that the update happens in an append-only mode where the modified data is appended to the file and the old data becomes garbage. Periodic garbage collection is done to compact the data. Here is how the model is implemented in memory and disks

In Google BigTable model, the data is broken down into multiple generations and the memory is use to hold the newest generation. Any query will search the mem data as well as all the data sets on disks and merge all the return results. Fast detection of whether a generation contains a key can be done by checking a bloom filter.

When update happens, both the mem data and the commit log will be written so that if the machine crashes before the mem data flush to disk, it can be recovered from the commit log.

Thursday, November 12, 2009

Amazon Cloud Computing

Cloud computing is becoming a very hot area as it provides cost savings and time-to-market benefits to a wide spectrum of organizations.

At the consumer end, small startup companies found Cloud computing can significantly reduce their initial setup cost. Large enterprises also found Cloud computing allows them to improve resource utilization and cost effectiveness, although they also have security and control concerns. Here is a very common cloud deployment model across many large enterprises.

Traditional software companies who distributes software on CD also look into the SaaS model as a new way of doing business. However, a SaaS model typically requires the companies to build some kind of web site. But these companies may not have the expertise to build large scale web sites and operate it. Cloud computing also allows them to outsource the SaaS infrastructure.

Here we look at the leader in the cloud computing provider space. AWS from Amazon.

Amazon Web Service
Amazon is the current leading provider in the Cloud computing space. At the heart of its technology stack (which is known as the Amazon Web Services), it includes an IaaS stack, a PaaS stack and a SaaS stack.
  1. Their IaaS stack includes infrastructure resource such as virtual machine, virtual mount disks, virtual network, load balancer, VPN, Databases.
  2. Their PaaS stack provides a set of distributed computing services including queuing, data storage, metadata, parallel batch processing,
  3. Their SaaS stack provides a set of high level services such as content delivery network, payment processing services, ecommerce fulfillment services.
Since we are focusing in the Cloud Computing aspects, we will describe their IaaS and PaaS stack below but will skip their SaaS stack.

EC2 – Elastic Computing

Amazon has procured a large number of commoditized Intel boxes running virtualization software Xen. On top of Xen, Linux or Windows can be run as the guest OS . The guest operating system can have many variations with different set of software packages installed.

Each configuration is bundled as a custom machine image (called AMI). Amazon host a catalog of AMI for the users to choose from. Some AMI is free while other requires a usage charge. User can also customize their own setup by starting from a standard AMI, make their special configuration changes and then create a specific AMI that is customized for their specific needs. The AMIs are stored in Amazon’s storage subsystem S3.

Amazon also classifies their machines in terms of their processor power (no of cores, memory and disk size) and charged their usage at a different rate. These machines can be run in different network topology specified by the users. There is an “availability zone” concept which is basically a logical data center. “Availability zone” has no interdependency and is therefore very unlikely to fail at the same time. To achieve high availability, users should consider putting their EC2 instances in different availability zones.

“Security Group” is the virtual firewall of Amazon EC2 environment. EC2 instances can be grouped under “security group” which specifies which port is open to which incoming range of IP addresses. So EC2 instances that running applications at various level of security requirements can be put into appropriated security groups and managed using ACL (access control list). Somewhat very similar to what network administrator configure their firewalls.

User can start the virtual machine (called an EC2 instance) by specifying the AMI, the machine size, the security group, and its authentication key via command line or an HTTP/XML message. So it is very easy to startup the virtual machine and start running the user’s application. When the application completes, the user can also shutdown the EC2 instance via command line or HTTP/XML message. The user is only charged for the actual time when the EC2 instance is running.

One of the issue of extremely dynamic machine configuration (such as EC2) is that a lot of configuration setting is transient and does not survive across reboot. For example, the node name and IP address may have been changed, all the data stored in local files is lost. Latency and network bandwidth between machines may also have changed. Fortunately, Amazon provides a number of ways to mitigate these issues.
  • By paying some charge, user can reserve a stable IP address, called “elastic IP”, which can be attached to EC2 instance after they bootup. External facing machine is typically done this way.
  • To deal with data persistence, Amazon also provides a logical network disk, called “elastic block storage” to store the data. By paying some charges, EBS is reserved for the user and it survives across EC2 reboots. User can attach the EBS to EC2 instances after the reboot.

EBS – Elastic Block Storage

Based on RAID disks, EBS provides a persistent block storage device for data persistence where user can attach it to a running EC2 instance within the same availability zone. EBS is typically used as a file system that is mounted to EC2 instance, or as raw devices for database.

Although EBS is a network devices to the EC2 instance, benchmark from Amazon shows that it has higher performance than local disk access. Unlike S3 which is based on eventual consistent model, EBS provides strict consistency where latest updates are immediately available.

CloudWatch -- Monitoring Services

CloudWatch provides an API to extract system level metrics for each VM (e.g. CPU, network I/O and disk I/O) as well as for each load balancer services (e.g. response time, request rate). The collected metrics is modeled as a multi-dimensional data cube and therefore can be queried and aggregated (e.g. min/max/avg/sum/count) in different dimensions, such as by time, or by machine groups (by ami, by machine class, by particular machine instance id, by auto-scaling group).

This metrics is also used to drive the auto-scaling services (described below). Note that the metrics are predefined by Amazon and custom metrics (application level metrics) is not supported at this moment.

Load Balancing Services

Load balancer provides a way to group identical VMs into a pool. Amazon provides a way to create a software load balancer in a region and then attach EC2 instances (of the same region) to the it. The EC2 instances under a particular load balancer can be in different availability zone but they have to be in the same region.

Auto-Scaling Services

Auto-scaling allows the user to group a number of EC2 instances (typically behind the same load balancer) and specify a set of triggers to grow and shrink the group. Trigger defines the condition which is matching the collected metrics from the CloudWatch and match that against some threshold values. When match, the associated action can be to grow or shrink the group.

Auto-scaling allows resource capacity (number of EC2 instances) automatically adjusted to the actual workload. This way user can automatically spawn more VMs as the workload increases and shutdown the VM as the load decreases.

Relational DB Services

RDS is basically running MySQL in the EC2.

S3 – Simple Storage Service

Amazon S3 provides a HTTP/XML services to save and retrieve content. It provides a file system-like metaphor where “objects” are group under “buckets”. Based on a REST design, each object and bucket has its own URL.

With HTTP verbs (PUT, GET, DELETE, POST), user can create a bucket, list all the objects within the bucket, create object within a bucket, retrieve an object, remove an object, remove a bucket … etc.

Under S3, each object has a unique URI which serves as its key. There is no query mechanism in S3 and User has to lookup the object by its key. Each object is stored as an opaque byte array with maximum 5GB size. S3 also provides an interesting partial object retrieval mechanism by specifying the ranges of bytes in the URL.

However, partial put is not current support but it can be simulated by breaking the large object into multiple small objects and then do the assembly at the app level. Breaking down the object also help to speed up the upload and download by doing the data transfer in parallel.

Within Amazon S3, each S3 objects are replicated across 2 (or more) data center and also cache at the edge for fast retrieval.

Amazon S3 is based on an “eventual consistent” model which means it is possible that an application won’t see the change it just made. Therefore, some degree of tolerance of inconsistent view is required by the application. Application should avoid the situation of having two concurrent modifications to the same object. And application should wait for some time between updates, and also should expect all the data it reads is potentially stale for few seconds.

There is also no versioning concept in S3, but it is not hard to build one on top of S3.

SimpleDB – queriable data storage

Unlike S3 where data has to be looked up by key, SimpleDB provides a semi-structured data store with querying capability. Each object can be stored as a number of attributes where the user can search the object by the attribute name.

Similar to the concepts of “buckets “ and “objects” in S3, SimpleDB is organized as a set of “items” grouped by “domains”. However, each item can have a number of “attributes” (up to 256). Each attribute can store one or multiple values and the value must be a string (or a string array in case of multi-valued attribute). Each attribute can store up to 1K bytes, so it is not appropriate to store binary content.

SimpleDB is typically used as a metadata store in conjuction with S3 where the actual data is being stored. SimpleDB is also schema-less. Each item can define its own set of attributes and is free to add more or remove some attributes at runtime.

SimpleDB provides a query capability which is quite different from SQL. The “where” clause can only match an attribute value with a constant but not with other attributes. On the other hand, the query result only return the name of the matched items but not the attributes, which means subsequent lookup by item name is needed. Also, there is no equivalent of “order by” and the returned query result is unsorted.

Since all attribute are store as strings (even number, dates … etc). All comparison operation is done based on lexical order. Therefore, special encoding is needed for data type such as date, number to string to make sure comparison operation is done correctly.

SimpleDB is also based on an eventual consistency model like S3.

SQS – Simple Queue Service

Amazon provides a queue services for application to communicate in an asynchronous way with each other. Message (up to 256KB size) can be sent to queues. Each queue is replicated across multiple data centers.

Enterprises use HTTP protocol to send messages to a queue. “At least once” semantics is provided, which means, when the sender get back a 200 OK response, SQS guarantees that the message will be received by at least one receiver.

Receiving messages from a queue is done by polling rather than event driven calling interface. Since messages are replicated across queues asynchronously, it is possible that receivers only get some (but not all) messages sent to the queue. But the receiver keep polling the queue, he will eventually get all messages sent to the queue. On the other hand, message can be delivered out of order or delivered more than once. So the message processing logic needs to be idempotent as well as independent of message arrival order.

Once message is taken by a receiver, the message is invisible to other receivers for a period of time but it is not gone yet. The original receiver is supposed to process the message and make an explicit call to remove the message permanently from the queue. If such “removal” request is not made within the timeout period, the message will be visible in the queue again and will be picked up by subsequent receivers.

Elastic Map/Reduce

Amazon provides an easy way to run Hadoop Map/Reduce in the EC2 environment. They provide a web UI interface to start/stop a Hadoop Cluster and submit jobs to it. For a detail of how Hadoop works, see here.

Under elastic MR, both input and output data are stored into S3 rather than HDFS. This means data need to be loaded to S3 before the Hadoop processing can be started. Elastic also provides a job flow definition so user can concatenate multiple Map/Reduce job together. Elastic MR supports the program to be written in Java (jar) or any programming language (Hadoop streaming) as well as PIG and Hive.

Virtual Private Cloud

VPC is a VPN solution such that the user can extend its data center to include EC2 instances running in the Amazon cloud. Notice that this is an "elastic data center" because its size can grow and shrink when the user starts / stops EC2 instances.

User can create a VPC object which represents an isolated virtual network in the Amazon cloud environment and user can create multiple virtual subnets under a VPC. When starting the EC2 instance, the subnet id need to be specified so that the EC2 instance will be put into the subnet under the corresponding VPC.

EC2 instances under the VPC is completely isolated from the rest of Amazon's infrastructure at the network packet routing level (of course it is software-implemented isolation). Then a pair of gateway objects (VPN Gateway on the Amazon side and Customer gateway on the data center side) need to be created. Finally a connection object is created that binds these 2 gateway objects together and then attached to the VPC object.

After these steps, the two gateway will do the appropriate routing between your data center and the Amazon VPC with VPN technologies used underneath to protect the network traffic.

Things to watch out

I personally think Amazon provides a very complete set of services that is sufficient for a wide spectrum of deployment scenarios. Nevertheless, there are a number of limitations that needs to pay attention to …
  • There are no Cloud standards today. Whatever choice made for a provider will imply some degree of lock-in to a vendor specific architecture. Amazon is no exception. One way to minimize such lock-in is to introduce an insulation layer to localize all the provider-specific API.
  • Cloud providers typically run their infrastructure on low-cost commodity hardware inside some data center with network connected between them. Amazon is not making their hosting environment very transparently and so it is not very clear how much reliability one can expect from their environment. On the other hand, the SLA guarantee that Amazon is willing to provide is relatively low.
  • Multicast communication is not supported between EC2 instances. This means application has to communicate using TCP point-to-point protocol. Some cluster replication framework based on IP multicast simply doesn’t work in EC2 environment.
  • EBS currently cannot be attached to a multiple EC2 instance at the same time. This means some application (e.g. Oracle cluster) which based on having multiple machines accessing a shared disk simply won’t work in EC2 environment.

Monday, November 9, 2009

Support Vector Machine

Support vector machine is a very powerful classification technique. Its theory is based on the linear model but can also handle non-linear model very well. It is also immute to the curse of high dimensionality.

In support vector machine (SVM), inputs are numeric and output are binary. Each data sample can be consider as a m dimension point label as + or - depends on the output.

Optimal separating hyperplane
Assume there are m numeric input attributes, the key approach of SVM is to try finding a (m - 1) dimension hyperplane which can separate the points in the best way. (ie: all the +ve points on one side of the plane and all the -ve points on the other side).

There are many planes that divide the regions. But we need to find the red line which has the maximum margin.

Sometimes there may be noise or variation that not all points lies in the same side of the plane. So we modify the equation to allow for some errors in the constraints and we want to minimize the overall errors in the optimization goal.

At first glance, it seems like the classification will be O(n) where n is the size of training data. This is not the case because most of the alpha values are zero except the supporting vectors (points touching the margin band) which is a very small value.

So far, we have made an assumption that the data is "linearly separable". What if this assumption is not true ? (e.g. y = a1.x1.x1 + a2.x1.x2)

The answer is to create another attribute x3 = x1.x1 and x4 = x1.x2.
In other words, we can always make a non-linear equation becomes linear by introducing extra variables which is a non-linear combination of existing variables. Notice that adding these extra variables effectively is increasing the dimension of the data, but we maintain the linearity of the data point.

As an example, a quadratic equation y = 3x.x + 2x + 5 is a one variable, non-linear equation. But if we introduce z = x.x, then it becomes a 2 variable, linear equation with
y = 3z + 2x + 5

So by adding more attributes to increase the dimensionality of the data points, we can keep the model in linear form. So, we can solve non-linear model by transforming the current data into a higher dimension (adding extra attributes by combining existing attributes in a non-linear way). And then apply the hyperplane separation technique described above to build the model (figure out the alpha value) and use that to classify new data points.

But ! How do we decide what extra attributes should be added and how they should be composed from existing attributes, and how many of them do we need to reconstruct the linearity ?

The Kernal Trick
From examine the above algorithm, an interesting finding is that we only need to know the dot product between two data points but not individual input attribute values. In other words, we don't need to care about how to calculate the extra attributes as long as we know how to calculate the dot product of the new transform space.

The process of using SVM is the same as the other machine learning algorithms
  1. Pick a tool (such as libSVM)
  2. Prepare the input data (convert them to numeric or filter them, normalize their range)
  3. Pick a Kernel function and its parameters
  4. Run cross-validation against different combination of parameters
  5. Pick the best parameter and retrain them.
  6. Now we have the learned model, we can use this for classifying new data

Sunday, November 8, 2009

Machine Learning with Linear Model

Linear Model is a family of model-based learning approaches that assume the output y can be expressed as a linear algebraic relation with the input attributes x1, x2 ...

The input attributes x1, x2 ... is expected to be numeric and the output is expected to be numeric as well.

Here our goal is to learn the parameters of the underlying model, which is the coefficients.

Linear Regression

Here the input and output are both numeric, related through a simple linear relationship. The learning goal is to figure out the hidden weight value (ie: the W vector).

Notice that non-linear relationship is equivalent of a linear relationship at a higher dimension. e.g. if x2 = x1 * x1, then it becomes a quadratic relationship. Because of this, the polynomial regression can be done using linear regression technique.

Given a batch of training data, we want to figure out the weight vector W such that the total sum of error (which is the difference between the predicted output and the actual output) to be minimized.

Instead of using the batch processing approach, a more effective approach is to learn incrementally (update the weight vector for each input data) using a gradient descent approach.

Gradient Descent

Gradient descent is a very general technique that we can use to incrementally adjust the parameters of the linear model. The basic idea of "gradient descent" is to adjust each dimension (w0, w1, w2) of the W vector according to their contribution of the square error. Their contribution is measured by the gradient along the dimension which is the differentiation of the square error with respect to w0, w1, w2.

In the case of Linear Regression ...

Logistic Regression

Logistic Regression is used when the output y is binary and not a real number. The first part is the same as linear regression while a second step sigmod function is applied to clamp the output value between 0 and 1.

We use the exact same gradient descent approach to determine the weight vector W.

Neural Network

Inspired by how our brain works, Neural network organize many logistic regression units into layers of perceptrons (each unit has both input and outputs in binary form).

Learning in Neural network is to discover all the hidden values of w. In general, we use the same technique above to adjust the weight using gradient descent layer by layer. We start from the output layer and move towards the input layer (this technique is called backpropagation). Except the output layer, we don't exactly know the error at the hidden layer, we need to have a way to estimate the error at the hidden layers.

But notice there is a symmetry between the weight and the input, we can use the same technique how we adjust the weight to estimate the error of the hidden layer.

Thursday, November 5, 2009

What Hadoop is good at

Hadoop is getting more popular these days. Lets look at what it is good at and what not.

The Map/Reduce Programming model
Map/Reduce offers a different programming model for handling concurrency than the traditional multi-thread model.

Multi-thread programming model allows multiple processing units (with different execution logic) to access the shared set of data. To maintain data integrity, each processing units co-ordinate their access to the shared data by using Locks, Semaphores. Problem such as "race condition", "deadlocks" can easily happen but hard to debug. This makes multi-thread programming difficult to write and hard to maintain. (Java provides a concurrent library package to ease the development of multi-thread programming)

Data-driven programming model feeds data into different processing units (with same or different execution logic). Execution is triggered by arrival of data. Since processing units can only access data piped to them, data sharing between processing units is prohibited upfront. Because of this, there is no need to co-ordinate access to data.

This doesn't mean there is no co-ordination for data access. We should think of the co-ordination is done explicitly by the graph. ie: by defining how the nodes (processing units) are connected to each other via data pipes.

Map-Reduce programming model is a specialized form of data-driven programming model where the graph is defined as a "sequential" list of MapReduce jobs. Within each Map/Reduce job, execution is broken down into a "map" phase and a "reduce" phase. In the map phase, each data split is processed and one or multiple output is produced with a key attached. This key is used to route the outputs (of the Map phase) to the second "reduce" phase, where data with the same key is collected and processed in an aggregated way.

Note that in a Map/Reduce model, parallelism happens only within a Job and execution between jobs are done in a sequential manner. As different jobs may access the same set of data, knowing that jobs is executed serially eliminate the needs of coordinating data access between jobs.

Design application to run in Hadoop is a matter of breaking down the algorithm in a number of sequential jobs and then exploit data parallelism within each job. Not all algorithms can fit in to the Map Reduce model. For a more general approach to break down an algorithm into parallel, please visit here.

Characteristics of Hadoop Processing
A detail explanation of Hadoop implementation can be found here. Basically Hadoop has the following characteristics ...
  • Hadoop is "data-parallel", but "process-sequential". Within a job, parallelism happens within a map phase as well as a reduce phase. But these two phases cannot run in parallel, the reduce phase cannot be started until the map phase is fully completed.
  • All data being accessed by the map process need to be freezed (update cannot happen) until the whole job is completed. This means Hadoop processes data in chunks using a batch-oriented fashion, making it not very suitable for stream-based processing where data flows in continuously and immediate processing is needed.
  • Data communication happens via a distributed file system (HDFS). Latency is introduced as extensive network I/O is involved in moving data around (ie: Need to write 3 copies of data synchronously). This latency is not an issue for batch-oriented processing where throughput is the primary factor. But this means Hadoop is not suitable for online access where low latency is critical.
Given the above characteristics, Hadoop is NOT good at the following ...
  • Perform online data access where low latency is critical (Hadoop can be used together with HBase or NOSQL store to deliver low latency query response)
  • Perform random ad/hoc processing of a small subset of data within a large data set (Hadoop is designed to scan all data in parallel)
  • Process small data volume (for data volume less than hundred GB range, many more mature solutions exist)
  • Perform real-time, stream-based processing where data is arrived continuously and immediate processing is needed (to keep the overhead small enough, typically data need to be batched for at least 30 minutes, which you won't be able to see the current data until 30 minutes has passed)

Sunday, November 1, 2009

Principal Component Analysis

One common problem of machine learning is the "curse of high dimensionality". When there are too many attributes in the input data, many of the ML algorithms will be very inefficient or some of them will even be non-performing (e.g. in nearest neighbor computation, data points in a high-dimensional space are pretty much equal distance with each other).

It is quite possible that the attributes we selected are inter-dependent on each other. If so, we may be able to extract a smaller subset of independent attributes that may still be very useful to describe the data characteristics. In other words, we may be able to reduce the number of dimensions significantly without losing much fidelity of the data.

"Dimension Reduction" is a technique to determine how we can reduce the number of dimensions while minimizing the loss of fidelity of data characteristics. It is typically applied during the data cleansing stage before feeding into the machine learning algorithm.

"Feature Selection" is a simple techniques to select a subset of features that is more significant. A very simple "filtering" approach can be used by looking at each attribute independently and rank their significance using some measurement (e.g. info gain) and throw away those that has minimum significance. A more sophisticated "wrapper" approach is to look at different subset of features to do the evaluation. There are two common model in the "wrapper" approach, "forward selection" and "backward elimination".

In forward selection, we start with no attribute and then start to pick the attribute with the highest statistical significance, (ie: prediction improves a lot from the cross validation check). After picking the first attribute, and we start to pair it up with another unselected attribute and find the one with the most significant improvement in the cross-validation-check. We keep growing the set of attributes until we don't find significant improvements. One issue of the "forward selection" approach is that it may miss "grouped features". For example, attribute1 and attribute2 may be insignificant when they are stand-alone, but combining them will give very big improvement.

Backward elimination can be used to handle this problem. It basically goes the opposite direction, starts with the full set of attributes and start to drop those attribute that has least statistical significance (ie: prediction degrades very little from the cross validation check). The downside of "backward elimination" is that it is much more expensive to run.

A more powerful approach called "Feature Extraction" is more commonly used to extract a different set of attributes by linearly combining the existing set of attributes. Principal Component Analysis "PCA" is a very popular technique in this arena. PCA can analyze the interdependency between pair of attributes and identify those significant ones.

The intuition of PCA
The intuition is to rearrange the linear combination of existing m attributes in different way to form another set of m attributes. The new set of attributes has the characteristic that
  • Each attribute is independent of each other
  • The set of attributes is ranked according to the range of variation
Note that attributes with narrow range of variation doesn't provide much information to describe the data samples and so can be ignored with minimal lost of fidelity. So we remove that to reduce the dimensionality.

The question is : How do we recompose the m attributes to exhibit the above 2 characteristics. Lets take a deeper look into it.

Underlying theory of PCA
Assume there are N data points in the input data set and each data point is described by M attributes. We use the statistical definition for the "mean", "variance" of each attribute and "co-variance" for every pair of attributes. Co-variance is an indicator of dependencies of two attributes with zero implies independence.

In an ideal situation, we want COV-x to be a diagonal matrix, which means COV(i, j) to be zero. In other words, all pairs of attribute-i and attribute-j are independent to each other. We also want the diagonal to be ranked in descending order.

So the problem can be reduced to finding a different combination of the m attributes to form a new set of m attributes (Y = P. X) such that COV-y is a ranked diagonal matrix.

How do we determine P ?

Some Matrix theory
Here is a review of Matrice theory that will be used

Lets find the transformation matrice P

So the PCA process can be summarized in following ...
  1. Input: X, a matrice of (m * n), a set of N sample data points, each with M attributes.
  2. Compute Cov-X, a matrice of (m * m), the Covariance matrice of X
  3. Compute the m Eigenvectors and m Eigenvalues of Cov-X
  4. Order the Eigenvectors according to the Eigenvalues
  5. Now found the transformation matrice P, which is a matrice of (m * m). Note that each row vector of P corresponding to an eigenvector, which is effectively the axis of the new co-ordinate system.
  6. Truncate P to just take the top k rows. Now P' is a (k * m) matrice.
  7. Apply P' . X to all input data to result in a matrice of (k * n). This is effectively reduce each data point from m-dimension to k-dimension.

A very good paper
Some Matrix math review and step by step PCA calculation