Friday, December 27, 2013

Spark: Low latency, massively parallel processing framework

While Hadoop fits well in most batch processing workload, and is the primary choice of big data processing today, it is not optimized for other types of workload  due to its following limitation
  • Lack of iteration support
  • High latency due to persisting intermediate data onto disk
 For a more detail elaboration of the Hadoop limitation, refer to my previous post.

Nevertheless, the Map/Reduce processing paradigm is a proven mechanism for dealing with large scale data.  On the other hand, many of Hadoop's infrastructure piece such as HDFS, HBase has been mature over time.

In this blog post, we'll look at a different architecture called Spark, which has taken the strength of Hadoop and make improvement in a number of Hadoop's weakness, and provides a more efficient batch processing framework with a much lower latency (from the benchmark result, Spark (using RAM cache) claims to be 100x faster than Hadoop, and 10x faster than Hadoop when running on disk.  Although competing with Hadoop MapRed, Spark integrates well with other parts of Hadoop Ecosystem (such as HDFS, HBase ... etc.).  Spark has generated a lot of excitement in the big data community and represents a very promising parallel execution stack for big data analytics.

Berkeley Spark

Within the Spark cluster, there is a driver program where the application logic execution is started, with multiple workers which processing data in parallel.  Although this is not mandated, data is typically collocated with the worker and partitioned across the same set of machines within the cluster.  During the execution, the driver program will passed code/closure into the worker machine where processing of corresponding partition of data will be conducted.  The data will undergoing different steps of transformation while staying in the same partition as much as possible (to avoid data shuffling across machines).  At the end of the execution, actions will be executed at the worker and result will be returned to the driver program.


Underlying the cluster, there is an important Distributed Data Structure called RDD (Resilient Distributed Dataset), which is a logically centralized entity but physically partitioned across multiple machines inside a cluster based on some notion of key.  Controlling how different RDD are co-partitioned (with the same keys) across machines can reduce inter-machine data shuffling within a cluster.  Spark provides a "partition-by" operator which create a new RDD by redistributing the data in the original RDD across machines within the cluster.



RDD can optionally be cached in RAM and hence providing fast access.  Currently the granularity of caching is done at the RDD level, either the whole or none of the RDD is cached.  Cached is a hint but not a guarantee.  Spark will try to cache the RDD if sufficient memory is available in the cluster, based on LRU (Least Recent Use) eviction algorithm.

RDD provides an abstract data structure from which application logic can be expressed as a sequence of transformation processing, without worrying about the underlying distributed nature of the data.

Typically an application logic are expressed in terms of a sequence of TRANSFORMATION and ACTION.  "Transformation" specifies the processing dependency DAG among RDDs and "Action" specifies what the output will be (ie: the sink node of the DAG with no outgoing edge).  The scheduler will perform a topology sort to determine the execution sequence of the DAG, tracing all the way back to the source nodes, or node that represents a cached RDD.


Notice that dependencies in Spark come in two forms.  "Narrow dependency" means the all partitions of an RDD will be consumed by a single child RDD (but a child RDD is allowed to have multiple parent RDDs).  "Wide dependencies" (e.g. group-by-keys, reduce-by-keys, sort-by-keys) means a parent RDD will be splitted with elements goes to different children RDDs based on their keys.  Notice that RDD with narrow dependencies preserve the key partitioning between parent and child RDD.  Therefore RDD can be co-partitioned with the same keys (parent key range to be a subset of child key range) such that the processing (generating child RDD from parent RDD) can be done within a machine with no data shuffling across network.  On the other hand, RDD will wide dependencies involves data shuffling.


Narrow transformation (involves no data shuffling) includes the following operators
  • Map
  • FlatMap
  • Filter
  • Sample
Wide transformation (involves data shuffling) includes the following operators
  •  SortByKey
  • ReduceByKey
  • GroupByKey
  • CogroupByKey
  • Join
  • Cartesian
Action output the RDD to the external world and includes the following operators
  • Collect
  • Take(n)
  • Reduce
  • ForEach
  • Sample
  • Count
  • Save
The scheduler will examine the type of dependencies and group the narrow dependency RDD into a unit of processing called a stage.  Wide dependencies will span across consecutive stages within the execution and require the number of partition of the child RDD to be explicitly specified.


A typical execution sequence is as follows ...
  1. RDD is created originally from external data sources (e.g. HDFS, Local file ... etc)
  2. RDD undergoes a sequence of TRANSFORMATION (e.g. map, flatMap, filter, groupBy, join), each provide a different RDD that feed into the next transformation.
  3. Finally the last step is an ACTION (e.g. count, collect, save, take), which convert the last RDD into an output to external data sources
The above sequence of processing is called a lineage (outcome of the topological sort of the DAG).  Each RDD produced within the lineage is immutable.  In fact, unless if it is cached, it is used only once to feed the next transformation to produce the next RDD and finally produce some action output.

In a classical distributed system, fault resilience is achieved by replicating data across different machines together with a active monitoring system.  In case of any machine crashes, there is always another copy of data residing in a different machine from where recovery can take place.

Fault resiliency in Spark takes a different approach.  First of all, as a large scale compute cluster, Spark is not meant to be a large scale data cluster at all.  Spark makes two assumptions of its workload.
  • The processing time is finite (although the longer it takes, the cost of recovery after fault will be higher)
  • Data persistence is the responsibility of external data sources, which keeps the data stable within the duration of processing.
Spark has made a tradeoff decision that in case of any data lost during the execution, it will re-execute the previous steps to recover the lost data.  However, this doesn't mean everything done so far is discarded and we need to start from scratch at the beginning.  We just need to re-executed the corresponding partition in the parent RDD which is responsible for generating the lost partitions, in case of narrow dependencies, this resolved to the same machine.

Notice that the re-execution of lost partition is exactly the same as the lazy evaluation of the DAG, which starts from the leaf node of the DAG, tracing back the dependencies on what parent RDD is needed and then eventually track all the way to the source node.  Recomputing the lost partition is done is a similar way, but taking partition as an extra piece of information to determine which parent RDD partition is needed.

However, re-execution across wide dependencies can touch a lot of parent RDD across multiple machines and may cause re-execution of everything. To mitigate this, Spark persist the intermediate data output from a Map phase before it shuffle them to different machines executing the reduce phase.  In case of machine crash, the re-execution (from another surviving machine) just need to trace back to fetch the intermediate data from the corresponding partition of the mapper's persisted output.  Spark also provide a checkpoint API to explicitly persist intermediate RDD so re-execution (when crash) doesn't need to trace all the way back to the beginning.  In future, Spark will perform check-pointing automatically by figuring out a good balance between the latency of recovery and the overhead of check-pointing based on statistical result.

Spark provides a powerful processing framework for building low latency, massively parallel processing for big data analytics.  It supports API around the RDD abstraction with a set of operation for transformation and action for a number of popular programming language like Scala, Java and Python.

In future posts, I'll cover other technologies in the Spark stack including real-time analytics using streaming as well as machine learning frameworks.

Thursday, December 12, 2013

Escape local optimum trap

Optimization is a very common technique in computer science and machine learning to search for the best (or good enough) solution.  Optimization itself is a big topic and involves a wide range of mathematical techniques in different scenarios.


In this post, I will be focusing in local search, which is a very popular technique in searching for an optimal solution based on a series of greedy local moves.  The general setting of local search is as follows ...

1. Define an objective function
2. Pick an initial starting point
3. Repeat
     3.1 Find a neighborhood
     3.2 Locate a subset of neighbors that is a candidate move
     3.3 Select a candidate from the candidate set
     3.4 Move to the candidate

One requirement is that the optimal solution need to be reachable by a sequence of moves.  Usually this requires a proof that any arbitrary state is reachable by any arbitrary state through a sequence of moves.

Notice that there are many possible strategies for each steps in 3.1, 3.2, 3.3.  For example, one strategy can examine all members within the neighborhood, pick the best one (in terms of the objective function) and move to that.  Another strategy can randomly pick a member within the neighborhood, and move to the member if it is better than the current state.

Regardless of the strategies, the general theme is to move towards the members which is improving the objective function, hence the greedy nature of this algorithm.

One downside of this algorithm is that it is possible to be trapped in a local optimum, whose is the best candidate within its neighborhood, but not the best candidate from a global sense.

In the following, we'll explore a couple meta-heuristic techniques that can mitigate the local optimum trap.

Multiple rounds

We basically conduct k rounds of local search, each round getting a local optimum and then pick the best one out of these k local optimum.

Simulated Anealing

This strategy involves a dynamic combination of exploitation (better neighbor) and exploration (random walk to worse neighbor).  The algorithm works in the following way ...

1. Pick an initial starting point
2. Repeat until terminate condition
     2.1 Within neighborhood, pick a random member
     2.2 If neighbor is better than me
           move to the neighbor
         else
           With chance exp(-(myObj - neighborObj)/Temp)
               move to the worse neighbor
     2.3 Temp = alpha * Temp

Tabu Search

This strategy maintains a list of previously visited states (called Tabu list) and make sure these states will not be re-visited in subsequent exploration.  The search will keep exploring the best move but skipping the previously visited nodes.  This way the algorithm will explore the path that hasn't be visited before.  The search also remember the best state obtained so far.

1. Initialization
     1.1 Pick an initial starting point S
     1.2 Initial an empty Tabu list
     1.3 Set the best state to S
     1.4 Put S into the Tabu list
2. Repeat until terminate condition
     2.1 Find a neighborhood
     2.2 Locate a smaller subset that is a candidate move
     2.3 Remove elements that is already in Tabu list
     2.4 Select the best candidate and move there
     2.5 Add the new state to the Tabu list
     2.6 If the new state is better that best state
          2.6.1 Set the best state to this state
     2.7 If the Tabu list is too large
          2.7.1 Trim Tabu list by removing old items

Saturday, November 9, 2013

Diverse recommender

This is a continuation of my previous blog in recommendation systems, which describes some basic algorithm for building recommendation systems.  These techniques evaluate each item against user's interest independently and pick the topK items to construct the recommendation.  However, it suffers from the lack of diversity.  For example, the list may contain the same book with soft cover, hard cover, and Kindle version.  Since human's interests are usually diverse, a better recommendation list should contain items that covers a broad spectrum of user's interests, even though each element by itself is not the most aligned with the user's interests.

In this post, I will discuss a recommendation algorithm that consider diversity in its list of recommendation.

Topic Space

First of all, lets define a "topic space" where both the content and user will be map to.  Having a "topic space" is a common approach in recommendation because it can reduce dimensionality and resulting in better system performance and improved generality.

The set of topics in topic space can be extracted algorithmically using Text Mining techniques such as LDA, but for simplicity here we use a manual approach to define the topic space (topics should be orthogonal to each other, as highly correlated topics can distort the measures).  Lets say we have the following topics defined ...
  • Romance
  • Sports
  • Politics
  • Weather
  • Horror
  • ...

Content as Vector of topic weights

Once the topic space is defined, content author can assign topic weights to each content.  For example, movie can be assigned with genres and web page can be assigned with topics as well.  Notice that a single content can be assigned with multiple topics of different weights.  In other words, each content can be described as a vector of topic weights.

User as Vector of topic weights

On the other hand, user can also be represented as a vector of topic weights based on their interaction of content, such as viewing a movie, visiting a web page, buying a product ... etc.  Such interaction can have a positive or negative effect depends on whether the user like or dislike the content.  If the user like the content, the user vector will have the corresponding topic weight associated with the content increases by multiplying alpha (alpha > 1).  If the user dislike the content, the corresponding topic weight will be divided by alpha.  After each update, the user vector will be normalized to a unit vector.

Diversifying the recommendation

We use a utility function to model the diversity of the document and then maximize the utility function.



 










In practice, A is not computed from the full set of documents, which is usually huge.  The full set of documents is typically indexed using some kind of Inverted index technologies using the set of topics as keywords, each c[j,k] is represented as tf-idf.

User is represented as a "query", and will be sent to the inverted index as a search request.  Relevant documents (based on cosine distance measures w.r.t. user) will be return as candidate set D (e.g. top 200 relevant content).

To pick the optimal set of A out of D, we use a greedy approach as follows ...
  1. Start with an empty set A
  2. Repeat until |A| > H
  • Pick a doc i from D such that by adding it to the set A will maximize the utility function
  • Add doc i into A and remove doc i from D

Saturday, September 7, 2013

Exploration and Exploitation

"Cold Start" is a common problem happens quite frequent in recommendation system.  When a new item enters, there is no prior history that the recommendation system can use.  Since recommendation is an optimization engine which recommends item that matches the best with the user's interests.  Without prior statistics, the new item will hardly be picked up as recommendation and hence continuously lacking the necessary statistics that the recommendation system can use.

One example is movie recommendation where a movie site recommends movies to users based on their past viewing history.  When a new movie arrives the market, there isn't enough viewing statistics about the movie and therefore the new movie will not have a strong match score and won't be picked as a recommendation.  Because we never learn from those that we haven't recommended, the new movies will continuously not have any statistics and therefore will never be picked in future recommendations.

Another cold start example is online Ad serving when a new Ad enters the Ad repository.

Multilevel Granularity Prediction

One solution of cold-start problem is to leverage existing items that are "SIMILAR" to the new item; "similarity" is based on content attributes (e.g. actors, genres).  Notice that here we are using a coarser level of granularity (group of similar items) for prediction, which can be less accurate than a fine-grain model that use view statistics history for prediction.

In other words, we can make recommendation based on two models of different granularity.  Fine-grain model based on instance-specific history data is preferred because it usually has higher accuracy.  For cold-start problem when the new items don't have history data available, we will fall back to use the coarse-grain model based on other similar items to predict user's interests on the new items.

A common approach is to combine both models of different granularity using different weights where the weights depends on the confidence level of the fine-grain model.  For new items, the fine-grain model will have low confidence and therefore it gives more weights to the coarse-grain model.

However, in case we don't have a coarser level of granularity, or the coarse level is too coarse and doesn't give good prediction.  We have to use the fine-grain model to predict.  But how can we build up the instance-specific history for the fine-grain model when we are not sure if the new items are good recommendation for the user ?

Optimization under Uncertainty

 The core of our problem is we need to optimize under uncertainties.  We have two approaches
  1. Exploitation: Make the most optimal choice based on current data.  Because of uncertainty (high variation) the current data may deviate from its true expected value, we may end up picking a non-optimal choice.
  2. Exploration: Make a random choice or choices that we haven't made before.  The goal is to gather more data point and reduce the uncertainty.  This may waste our cycles of picking the optimal choice. 
 Lets start with a simple, multi-bandit problem.  There are multiple bandit in a Casino, each Bandit has a different probability to win.  If you know the true underlying winning probability of each bandit, you will pick the one with the highest winning probability and keep playing on that one.
Unfortunately, you don't know the underlying probability and has only a limited number of rounds to play.  How would you choose which bandit to play to maximize the total number of rounds you win.

Our strategy should strike a good balance between exploiting and exploring.  To measure how good the strategy is, there is a concept of "regret", which is the ratio of the two elements
  • Value you obtain by following Batch Optimal strategy (after you have done batch analysis and have a clear picture in the underlying probability distribution)
  • Value you obtain by following the strategy
We'll pick our strategy to do more Exploration initially when you have a lot of uncertainty, and gradually tune down the ratio of Exploration (and leverage more on Exploitation) as we collected more statistics.

Epsilon-Greedy Strategy 

In the "epsilon-greedy" strategy, at every play we throw a dice between explore and exploit.
With probability p(t) = k/t (where k is a constant and t is the number of tries so far),we pick a bandit randomly with equal chance (regardless of whether the bandit has been picked in the past).  And with probability 1 - p(t), we pick the bandit that has the highest probability of win based on passed statistics.

Epsilon-greedy has the desirable property of spending more time to explore initially and gradually reduce the portion as time passes.  However, it doesn't have a smooth transition between explore and exploit.  Also while it explores, it picks each bandit uniformly without giving more weight to the unexplored bandits.  While it exploits, it doesn't consider the confidence of probability estimation.

Upper Confidence Bound: UCB

In the more sophisticated UCB strategy, each bandit is associated with an estimated mean with a confidence interval.  In every play, we choose the bandit whose upper confidence bound (ie: mean + standard deviation) is the largest.

Initially each bandit has a zero mean and a large confidence interval.  As time goes, we estimated the mean p[i] of bandit i based on how many time it wins since we play the bandit i.  We also adjust the confidence interval (reducing deviation) as we play the bandit.
e.g. standard deviation is (p.(1-p)/n)^0.5

Notice that  the UCB model can be used in a more general online machine learning setting.  We require the machine learning model be able to output its estimation based on a confidence parameter.  As a concrete example, lets say a user is visiting our movie site and we want to recommend a movie to the user, based on a bunch of input features (e.g. user feature, query feature ... etc.).

We can do a first round selection (based on information retrieval technique) to identify movie candidate based on relevancy (ie: user's viewing history or user search query).  For each movie candidate, we can invoke the ML model to estimated interest level, as well as the 68% confidence boundary (the confidence level is arbitrary and need to be hand-tuned, 68% is roughly one standard deviation of a Gaussian distribution).  We then combine them by add the 68% confidence range as an offset to its estimation and recommend the movie that has the highest resulting value.

After recommendation, we monitor whether user click on it, view it ... etc. and the response will be fed back to our ML model as a new training data.  Our ML model is an online learning setting and will update the model with this new training data.  Over time the 68% confidence range will be reduced over time as more and more data is gathered.

Relationship with A/B Testing

For most web sites, we run experiments continuously to improve user experience by trying out different layouts, or to improve user's engagement by recommending different types of contents, or by trying out different things.  In general, we have an objective function that defines what aspects we are trying to optimize, and we run different experiments through A/B testing to try out different combinations of configuration to see which one will maximize our objective function.

When the number of experiments (combinations of different configuration) is small, then A/B is exploration mainly.  In a typical setting, we use the old user experience as a control experiment and use the new user experience as a treatment.  The goal is to test if the treatment causes any significant improvement from the control.  Certain percentage of production users (typically 5 - 10%) will be directed to the new experience and we measured whether the user engagement level (say this is our objective function) has increased significantly in a statistical sense.  Such splitting is typically done by hashing the user id (or browser cookies), and based on the range of the hash code falls to determine whether the user should get the new experience.  This hashing is consistent (same user will hash into the same bucket in subsequent request) and so the user will get the whole new user experience when visiting the web site.

When the number of experiments is large and new experiments comes out dynamically and unpredictable, traditional A/B testing model described above will not be able to keep track of all pairs of control and treatment combination.  In the case, we need to use the dynamic exploration/exploitation mechanism to find out the best user experience.

Using the UCB approach, we can treat each user experience as a bandit that the A/B test framework can choose from.  Throughout the process, A/B test framework will explore/exploit among different user experience to optimize for the objective function.  At any time, we can query the A/B testing framework to find out the latest statistics of each user experience.  This provides a much better way to look at large number of experiment result at the same time.

Saturday, August 17, 2013

Six steps in data science

"Data science" and "Big data" has become a very hot term in last 2 years.  Since the data storage price dropped significantly, enterprises and web companies has collected huge amount of their customer's behavioral data even before figuring out how to use them.  Enterprises also start realizing that these data, if use appropriately, can turn into useful insight that can guide its business along a more successful path.

From my observation, the effort of data science can be categorized roughly into the following areas.
  • Data acquisition
  • Data visualization
  • OLAP, Report generation
  • Response automation
  • Predictive analytics
  • Decision optimization

Data Acquisition

Data acquisition is about collecting data into the system.  This is an important step before any meaningful processing or analysis can begin.  Most companies start with collecting business transaction records from their OLTP system.  Typically there is an ETL (Extract/Transform/Load) process involved to ingest the raw data, clean the data and transform them appropriately.  Finally, the data is loaded into a data warehouse where the subsequent data analytics exercises are performed.  In today's big data world, the destination has been shifted from traditional data warehouse to Hadoop distributed file system (HDFS).

Data Visualization

Data visualization is usually the first step of analyzing the data.  It is typically done by plotting data in different ways to establish give a quick sense of its shape, in order to guide the data scientist to determine what subsequent analysis should be conducted.  This is also where a more experience data scientist can be distinguished from an less experienced one based on how fast they can spot common patterns or anomalies from data.  Most of the plotting package works only for data that fits into one single machine.  Therefore, in the big data world, data sampling is typically conducted first to reduce the data size, and then import to a single machine where R, SAS, SPSS can be used to visualized the sample data.

OLAP / Reporting

OLAP is about aggregating transaction data (e.g. revenue) along different dimensions (e.g. Month, Location, Product) where the enterprise defines its KPI/business metrics that measures the companies' performance.  This can be done either in an ad/hoc manner (OLAP), or in a predefined manner (Report template).  Report writer (e.g. Tableau, Microstrategy) are use to produce the reports.  The data is typically stored in regular RDBMS or a multidimensional cube which is optimizing for OLAP processing (ie: slice, dice, rollup, drilldown).  In the big data world, Hive provides a SQL-like access mechanism and is commonly used to access data stored in HDFS.  Most popular report writers have integrated with Hive (or declare plan to integrate with it) to access big data stored in HDFS.

Response Automation

Response automation is about leveraging domain-specific knowledge to encode a set of "rules" which include event/condition/action.  The system monitors all events observed, matches them against the conditions, (which can be a boolean expression of event attributes, or a sequence of event occurrence), and trigger appropriate actions.  In the big data world, automating such response is typically done by stream processing mechanism (such as Flume and Storm).  Notice that the "rule" need to be well-defined and unambiguous.

Predictive Analytics

Prediction is about estimating unknown data based on observed data through statistical/probabilistic approach.  Depends on the data type of the output, "prediction" can be subdivided into "classification" (when the output is a category) or "regression" (when the output is a number).

Prediction is typically done by first "train" a predictive model using historic data (where all input and output value is known).  This training is done via an iterative process where the performance of the model in each iteration is measured at the end of each iteration.  Additional input data or different model parameters will be used in next round of iteration.  When the predictive performance is good enough and no significant improvement is made between subsequent iterations, the process will stop and the best model created during the process will be used.

Once we have the predictive model, we can use it to predict information we haven't observed, either this is information that is hidden, or hasn't happen yet (ie: predicting future)

For a more detail description of what is involved in performing predictive analytics, please refer to my following blog.

Decision Optimization

Decision optimization is about making the best decision after carefully evaluating possible options against some measure of business objectives.  The business objective is defined by some "objective function" which is expressed as a mathematical formula of some "decision variables".  Through various optimization technique, the system will figure out what the decision variables should be in order ro maximize (or minimize) the value of the objective function.  The optimizing technique for discrete decision variables is typically done using exhaustive search, greedy/heuristic search, integer programming technique, while optimization for continuous decision variables is done using linear/quadratic programming.

From my observation, "decision optimization" is the end goal of most data science effort.  But "decision optimization" relies on previous effort.  To obtain the overall picture (not just observed data, but also hidden or future data) at the optimization phase, we need to make use of the output of the prediction.  And in order to train a good predictive model, we need to have the data acquisition to provide clean data.  All these efforts are inter-linked with each other in some sense.

Monday, July 29, 2013

OLAP operation in R

OLAP (Online Analytical Processing) is a very common way to analyze raw transaction data by aggregating along different combinations of dimensions.  This is a well-established field in Business Intelligence / Reporting.  In this post, I will highlight the key ideas in OLAP operation and illustrate how to do this in R.

Facts and Dimensions

The core part of OLAP is a so-called "multi-dimensional data model", which contains two types of tables; "Fact" table and "Dimension" table

A Fact table contains records each describe an instance of a transaction.  Each transaction records contains categorical attributes (which describes contextual aspects of the transaction, such as space, time, user) as well as numeric attributes (called "measures" which describes quantitative aspects of the transaction, such as no of items sold, dollar amount).

A Dimension table contain records that further elaborates the contextual attributes, such as user profile data, location details ... etc.

In a typical setting of Multi-dimensional model ...
  • Each fact table contains foreign keys that references the primary key of multiple dimension tables.  In the most simple form, it is called a STAR schema.
  • Dimension tables can contain foreign keys that references other dimensional tables.  This provides a sophisticated detail breakdown of the contextual aspects.  This is also called a SNOWFLAKE schema.
  • Also this is not a hard rule, Fact table tends to be independent of other Fact table and usually doesn't contain reference pointer among each other.
  • However, different Fact table usually share the same set of dimension tables.  This is also called GALAXY schema.
  • But it is a hard rule that Dimension table NEVER points / references Fact table
 A simple STAR schema is shown in following diagram.


Each dimension can also be hierarchical so that the analysis can be done at different degree of granularity.  For example, the time dimension can be broken down into days, weeks, months, quarter and annual; Similarly, location dimension can be broken down into countries, states, cities ... etc.

Here we first create a sales fact table that records each sales transaction.

# Setup the dimension tables

state_table <- 
  data.frame(key=c("CA", "NY", "WA", "ON", "QU"),
             name=c("California", "new York", "Washington", "Ontario", "Quebec"),
             country=c("USA", "USA", "USA", "Canada", "Canada"))

month_table <- 
  data.frame(key=1:12,
            desc=c("Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"),
            quarter=c("Q1","Q1","Q1","Q2","Q2","Q2","Q3","Q3","Q3","Q4","Q4","Q4"))

prod_table <- 
  data.frame(key=c("Printer", "Tablet", "Laptop"),
            price=c(225, 570, 1120))

# Function to generate the Sales table
gen_sales <- function(no_of_recs) {

  # Generate transaction data randomly
  loc <- sample(state_table$key, no_of_recs, 
                replace=T, prob=c(2,2,1,1,1))
  time_month <- sample(month_table$key, no_of_recs, replace=T)
  time_year <- sample(c(2012, 2013), no_of_recs, replace=T)
  prod <- sample(prod_table$key, no_of_recs, replace=T, prob=c(1, 3, 2))
  unit <- sample(c(1,2), no_of_recs, replace=T, prob=c(10, 3))
  amount <- unit*prod_table[prod,]$price

  sales <- data.frame(month=time_month,
                      year=time_year,
                      loc=loc,
                      prod=prod,
                      unit=unit,
                      amount=amount)

  # Sort the records by time order
  sales <- sales[order(sales$year, sales$month),]
  row.names(sales) <- NULL
  return(sales)
}

# Now create the sales fact table
sales_fact <- gen_sales(500)

# Look at a few records
head(sales_fact)

  month year loc   prod unit amount
1     1 2012  NY Laptop    1    225
2     1 2012  CA Laptop    2    450
3     1 2012  ON Tablet    2   2240
4     1 2012  NY Tablet    1   1120
5     1 2012  NY Tablet    2   2240
6     1 2012  CA Laptop    1    225


Multi-dimensional Cube

Now, we turn this fact table into a hypercube with multiple dimensions.  Each cell in the cube represents an aggregate value for a unique combination of each dimension.  


 
# Build up a cube
revenue_cube <- 
    tapply(sales_fact$amount, 
           sales_fact[,c("prod", "month", "year", "loc")], 
           FUN=function(x){return(sum(x))})

# Showing the cells of the cude
revenue_cube

, , year = 2012, loc = CA

         month
prod         1    2     3    4    5    6    7    8    9   10   11   12
  Laptop  1350  225   900  675  675   NA  675 1350   NA 1575  900 1350
  Printer   NA 2280    NA   NA 1140  570  570  570   NA  570 1710   NA
  Tablet  2240 4480 12320 3360 2240 4480 3360 3360 5600 2240 2240 3360

, , year = 2013, loc = CA

         month
prod         1    2    3    4    5    6    7    8    9   10   11   12
  Laptop   225  225  450  675  225  900  900  450  675  225  675 1125
  Printer   NA 1140   NA 1140  570   NA   NA  570   NA 1140 1710 1710
  Tablet  3360 3360 1120 4480 2240 1120 7840 3360 3360 1120 5600 4480

, , year = 2012, loc = NY

         month
prod         1     2    3    4    5    6    7    8    9   10   11   12
  Laptop   450   450   NA   NA  675  450  675   NA  225  225   NA  450
  Printer   NA  2280   NA 2850  570   NA   NA 1710 1140   NA  570   NA
  Tablet  3360 13440 2240 2240 2240 5600 5600 3360 4480 3360 4480 3360

, , year = 2013, loc = NY

.....

dimnames(revenue_cube)

$prod
[1] "Laptop"  "Printer" "Tablet" 

$month
 [1] "1"  "2"  "3"  "4"  "5"  "6"  "7"  "8"  "9"  "10" "11" "12"

$year
[1] "2012" "2013"

$loc
[1] "CA" "NY" "ON" "QU" "WA"


OLAP Operations

Here are some common operations of OLAP
  • Slice
  • Dice
  • Rollup
  • Drilldown
  • Pivot
"Slice" is about fixing certain dimensions to analyze the remaining dimensions.  For example, we can focus in the sales happening in "2012", "Jan", or we can focus in the sales happening in "2012", "Jan", "Tablet".

# Slice
# cube data in Jan, 2012
revenue_cube[, "1", "2012",]

         loc
prod        CA   NY   ON   QU   WA
  Laptop  1350  450   NA  225  225
  Printer   NA   NA   NA 1140   NA
  Tablet  2240 3360 5600 1120 2240
 
# cube data in Jan, 2012
revenue_cube["Tablet", "1", "2012",]

  CA   NY   ON   QU   WA 
2240 3360 5600 1120 2240 


 "Dice" is about limited each dimension to a certain range of values, while keeping the number of dimensions the same in the resulting cube.  For example, we can focus in sales happening in [Jan/ Feb/Mar, Laptop/Tablet, CA/NY].

revenue_cube[c("Tablet","Laptop"), 
             c("1","2","3"), 
             ,
             c("CA","NY")]


, , year = 2012, loc = CA

        month
prod        1    2     3
  Tablet 2240 4480 12320
  Laptop 1350  225   900

, , year = 2013, loc = CA

        month
prod        1    2    3
  Tablet 3360 3360 1120
  Laptop  225  225  450

, , year = 2012, loc = NY

        month
prod        1     2    3
  Tablet 3360 13440 2240
  Laptop  450   450   NA

, , year = 2013, loc = NY

        month
prod        1    2    3
  Tablet 3360 4480 6720
  Laptop  450   NA  225


"Rollup" is about applying an aggregation function to collapse a number of dimensions.  For example, we want to focus in the annual revenue for each product and collapse the location dimension (ie: we don't care where we sold our product). 

apply(revenue_cube, c("year", "prod"),
      FUN=function(x) {return(sum(x, na.rm=TRUE))})


      prod
year   Laptop Printer Tablet
  2012  22275   31350 179200
  2013  25200   33060 166880
 


"Drilldown" is the reverse of "rollup" and applying an aggregation function to a finer level of granularity.  For example, we want to focus in the annual and monthly revenue for each product and collapse the location dimension (ie: we don't care where we sold our product).

apply(revenue_cube, c("year", "month", "prod"), 
      FUN=function(x) {return(sum(x, na.rm=TRUE))})


, , prod = Laptop

      month
year      1    2    3    4    5    6    7    8    9   10   11   12
  2012 2250 2475 1575 1575 2250 1800 1575 1800  900 2250 1350 2475
  2013 2250  900 1575 1575 2250 2475 2025 1800 2025 2250 3825 2250

, , prod = Printer

      month
year      1    2    3    4    5    6    7    8    9   10   11   12
  2012 1140 5700  570 3990 4560 2850 1140 2850 2850 1710 3420  570
  2013 1140 4560 3420 4560 2850 1140  570 3420 1140 3420 3990 2850

, , prod = Tablet

      month
year       1     2     3     4     5     6     7     8     9    10    11    12
  2012 14560 23520 17920 12320 10080 14560 13440 15680 25760 12320 11200  7840
  2013  8960 11200 10080  7840 14560 10080 29120 15680 15680  8960 12320 22400



"Pivot" is about analyzing the combination of a pair of selected dimensions.  For example, we want to analyze the revenue by year and month.  Or we want to analyze the revenue by product and location.

apply(revenue_cube, c("year", "month"), 
      FUN=function(x) {return(sum(x, na.rm=TRUE))})


      month
year       1     2     3     4     5     6     7     8     9    10    11    12
  2012 17950 31695 20065 17885 16890 19210 16155 20330 29510 16280 15970 10885
  2013 12350 16660 15075 13975 19660 13695 31715 20900 18845 14630 20135 27500
 

apply(revenue_cube, c("prod", "loc"),
      FUN=function(x) {return(sum(x, na.rm=TRUE))})


         loc
prod         CA     NY    ON    QU    WA
  Laptop  16425   9450  7650  7425  6525
  Printer 15390  19950  7980 10830 10260
  Tablet  90720 117600 45920 34720 57120



I hope you can get a taste of the richness of data processing model in R.

However, since R is doing all the processing in RAM.  This requires your data to be small enough so it can fit into the local memory in a single machine.

Saturday, February 23, 2013

Text processing (part 2): Inverted Index

This is the second part of my text processing series.  In this blog, we'll look into how text documents can be stored in a form that can be easily retrieved by a query.  I'll used the popular open source Apache Lucene index for illustration.

There are two main processing flow in the system ...
  • Document indexing: Given a document, add it into the index
  • Document retrieval: Given a query, retrieve the most relevant documents from the index.
The following diagram illustrate how this is done in Lucene.


Index Structure

Both documents and query is represented as a bag of words.  In Apache Lucene, "Document" is the basic unit for storage and retrieval.  A "Document" contains multiple "Fields" (also call zones).  Each "Field" contains multiple "Terms" (equivalent to words).

To control how the document will be indexed across its containing fields, a Field can be declared in multiple ways to specified whether it should be analyzed (a pre-processing step during index), indexed (participate in the index) or stored (in case it needs to be returned in query result).
  • Keyword (Not analyzed, Indexed, Stored)
  • Unindexed (Not analyzed, Not indexed, Stored)
  • Unstored (Analyzed, Indexed, Not stored)
  • Text (Analyzed, Indexed, Stored)
The inverted index is a core data structure of the storage.  It is organized as an inverted manner from terms to the list of documents (which contain the term).  The list (known as posting list) is ordered by a global ordering (typically by document id).  To enable faster retrieval, the list is not just a single list but a hierarchy of skip lists.  For simplicity, we ignore the skip list in subsequent discussion.

This data structure is illustration below based on Lucene's implementation.  It is stored on disk as segment files which will be brought to memory during the processing.


The above diagram only shows the inverted index.  The whole index contain an additional forward index as follows.



Document indexing

Document in its raw form is extracted from a data adaptor.  (this can be making an Web API to retrieve some text output, or crawl a web page, or receiving an HTTP document upload).  This can be done in a batch or online manner.

When the index processing start, it parses each raw document and analyze its text content.  The typical steps includes ...
  • Tokenize the document (breakdown into words)
  • Lowercase each word (to make it non-case-sensitive, but need to be careful with names or abbreviations)
  • Remove stop words (take out high frequency words like "the", "a", but need to careful with phrases)
  • Stemming (normalize different form of the same word, e.g. reduce "run", "running", "ran" into "run")
  • Synonym handling.  This can be done in two ways.  Either expand the term to include its synonyms (ie: if the term is "huge", add "gigantic" and "big"), or reduce the term to a normalized synonym (ie: if the term is "gigantic" or "huge", change it to "big")
At this point, the document is composed with multiple terms.  doc = [term1, term2 ...].  Optionally, terms can be further combined into n-grams.  After that we count the term frequency of this document.  For example, in a bi-gram expansion, the document will become ...
doc1 -> {term1: 5, term2: 8, term3: 4, term1_2: 3, term2_3:1}

We may also compute a "static score" based on some measure of quality of the document.  After that, we insert the document into the posting list (if it exist, otherwise create a new posting list) for each terms (all n-grams), this will create the inverted list structure as shown in previous diagram.

There is a boost factor that can be set to the document or field.  The boosting factor effectively multiply the term frequency which effectively affecting the importance of the document or field.

Document can be added to the index in one of the following ways; inserted, modified and deleted.
Typically the document will first added to the memory buffer, which is organized as an inverted index in RAM.
  • When this is a document insertion, it goes through the normal indexing process (as I described above) to analyze the document and build an inverted list in RAM.
  • When this is a document deletion (the client request only contains the doc id), it fetches the forward index to extract the document content, then goes through the normal indexing process to analyze the document and build the inverted list.  But in this case the doc object in the inverted list is labeled as "deleted".
  • When this is a document update (the client request contains the modified document), it is handled as a deletion followed by an insertion, which means the system first fetch the old document from the forward index to build an inverted list with nodes marked "deleted", and then build a new inverted list from the modified document.  (e.g. If doc1 = "A B" is update to "A C", then the posting list will be {A:doc1(deleted) -> doc1, B:doc1(deleted), C:doc1}.  After collapsing A, the posting list will be {A:doc1, B:doc1(deleted), C:doc1}

As more and more document are inserted into the memory buffer, it will become full and will be flushed to a segment file on disk.  In the background, when M segments files have been accumulated, Lucene merges them into bigger segment files.  Notice that the size of segment files at each level is exponentially increased (M, M^2, M^3).  This maintains the number of segment files that need to be search per query to be at the O(logN) complexity where N is the number of documents in the index.  Lucene also provide an explicit "optimize" call that merges all the segment files into one.




Here lets detail a bit on the merging process, since the posting list is already vertically ordered by terms and horizontally ordered by doc id, merging two segment files S1, S2 is basically as follows
  • Walk the posting list from both S1 and S2 together in sorted term order.  For those non-common terms (term that appears in one of S1 or S2 but not both), write out the posting list to a new segment S3.
  • Until we find a common term T, we merge the corresponding posting list from these 2 segments. Since both list are sorted by doc id, we just walk down both posting list to write out the doc object to a new posting list.  When both posting lists have the same doc (which is the case when the document is updated or deleted), we pick the latest doc based on time order.
  • Finally, the doc frequency of each posting list (of the corresponding term) will be computed.

Document retrieval

Consider a document is a vector (each term as the separated dimension and the corresponding value is the tf-idf value) and the query is also a vector.  The document retrieval problem can be defined as finding the top-k most similar document that match a query, where similarity is defined as the dot-product or cosine distance between the document vector and the query vector.

tf-idf is a normalized frequency.  TF (term frequency) represents how many time the term appears in the document (usually a compression function such as square root or logarithm is applied).  IDF is the inverse of document frequency which is used to discount the significance if that term appears in many other documents.  There are many variants of TF-IDF but generally it reflects the strength of association of the document (or query) with each term.

Given a query Q containing terms [t1, t2], here is how we fetch the corresponding documents.  A common approach is the "document at a time approach" where we traverse the posting list of t1, t2 concurrently (as opposed to the "term at a time" approach where we traverse the whole posting list of t1 before we start the posting list of t2).  The traversal process is described as follows ...
  • For each term t1, t2 in query, we identify all the corresponding posting lists.
  • We walk each posting list concurrently to return a sequence of documents (ordered by doc id).  Notice that each return document contains at least one term but can also also contain multiple terms.
  • We compute the dynamic score which is dot product of the query to document vector.  Notice that we typically don't concern the TF/IDF of the query (which is short and we don't care the frequency of each term).  Therefore we can just compute the sum up all the TF score of the posting list that has a match term after dividing the IDF score (at the head of each posting list).  Lucene also support query level boosting where a boost factor can be attached to the query terms.  The boost factor will multiply the term frequency correspondingly.
  • We also look up the static score which is purely based on the document (but not the query).  The total score is a linear combination of static and dynamic score.
  • Although the score we used in above calculation is based on computing the cosine distance between the query and document, we are not restricted to that.  We can plug in any similarity function that make sense to the domain.  (e.g. we can use machine learning to train a model to score the similarity between a query and a document).
  • After we compute a total score, we insert the document into a heap data structure where the topK scored document is maintained.
Here the whole posting list will be traversed.  In case of the posting list is very long, the response time latency will be long.  Is there a way that we don't have to traverse the whole list and still be able to find the approximate top K documents ?  There are a couple strategies we can consider.
  1. Static Score Posting Order: Notice that the posting list is sorted based on a global order, this global ordering provide a monotonic increasing document id during the traversal that is important to support the "document at a time" traversal because it is impossible to visit the same document again.  This global ordering, however, can be quite arbitrary and doesn't have to be the document id.  So we can pick the order to be based on the static score (e.g. quality indicator of the document) which is global.  The idea is that we traverse the posting list in decreasing magnitude of static score, so we are more likely to visit the document with the higher total score (static + dynamic score).
  2. Cut frequent terms: We do not traverse the posting list whose term has a low IDF value (ie: the term appears in many documents and therefore the posting list tends to be long).  This way we avoid to traverse the long posting list.
  3. TopR list: For each posting list, we create an extra posting list which contains the top R documents who has the highest TF (term frequency) in the original list.  When we perform the search, we perform our search in this topR list instead of the original posting list.

Since we have multiple inverted index (in memory buffer as well as the segment files at different levels), we need to combine the result them.  If termX appears in both segmentA and segmentB, then the fresher version will be picked.  The fresher version is determine as follows; the segment with a lower level (smaller size) will be considered more fresh.  If the two segment files are at the same level, then the one with a higher number is more fresh.  On the other hand, the IDF value will be the sum of the corresponding IDF of each posting list in the segment file (the value will be slightly off if the same document has been updated, but such discrepancy is negligible).  However, the processing of consolidating multiple segment files incur processing overhead in document retrieval.  Lucene provide an explicit "optimize" call to merge all segment files into one single file so there is no need to look at multiple segment files during document retrieval.

Distributed Index

For large corpus (like the web documents), the index is typically distributed across multiple machines.  There are two models of distribution: Term partitioning and Document partitioning.


 
In document partitioning, documents are randomly spread across different partitions where the index is built.  In term partitioning, the terms are spread across different partitions.  We'll discuss document partitioning as it is more commonly used.  Distributed index is provider by other technologies that is built on Lucene, such as ElasticSearch.  A typical setting is as follows ...

In this setting, machines are organized as columns and rows.  Each column represent a partition of documents while each row represent a replica of the whole corpus.



During the document indexing, first a row of the machines is randomly selected and will be allocated for building the index.  When a new document crawled, a column machine from the selected row is randomly picked to host the document.  The document will be sent to this machine where the index is build.  The updated index will be later propagated to the other rows of replicas.

During the document retrieval, first a row of replica machines is selected.  The client query will then be broadcast to every column machine of the selected row.  Each machine will perform the search in its local index and return the TopM elements to the query processor which will consolidate the results before sending back to client.  Notice that K/P < M < K, where K is the TopK documents the client expects and P is the number of columns of machines.  Notice that M is a parameter that need to be tuned.

One caveat of this distributed index is that as the posting list is split horizontally across partitions, we lost the global view of the IDF value without which the machine is unable to calculate the TF-IDF score.  There are two ways to mitigate that ...
  1. Do nothing: here we assume the document are evenly spread across different partitions so the local IDF represents a good ratio of the actual IDF.
  2. Extra round trip: In the first round, query is broadcasted to every column which returns its local IDF.  The query processor will collected all IDF response and compute the sum of the IDF.  In the second round, it broadcast the query along with the IDF sum to each column of machines, which will compute the local score based on the IDF sum.

Friday, February 15, 2013

Text Processing (part 1) : Entity Recognition

Entity recognition is commonly used to parse unstructured text document and extract useful entity information (like location, person, brand) to construct a more useful structured representation.  It is one of the most common text processing to understand a text document.

I am planning to write a blog series on text processing.  In this first blog of a series of basic text processing algorithm, I will introduce some basic algorithm for entity recognition.

Given the sentence: Ricky is living in CA USA.
Output: Ricky/SP is/NA living/NA in/NA CA/SL USA/CL

Basically we want to tag each word with the entity, whose definition is domain specific.  In this example, we define the following tags
  • NA - Not Applicable
  • SP - Start of a person name
  • CP - Continue of a person name
  • SL - Start of a location name
  • CL - Continue of a location name

Hidden Markov Model

Lets say there is a sequence of state, lets look at multiple probabilistic graph.


However, in our tagging example, we don't directly observe the tag.  Instead, we only observe the words.  In this case, we can use a hidden markov model (ie: HMM).


 
Now the tagging problem can be structured as follows.

Given a sequence of words, we want to predict the most likely tag sequence.

Find a tag sequence t1, t2, ... tn that maximize the probability of P(t1, t2, .... | w1, w2 ...)

Using Bayes rules,
P(t1, t2, .... | w1, w2 ...) = P(t1, t2, ... tn, w1, w2, ... wn) / P(w1, w2, ... wn)

Since the sequence w1, w2 ... wn is observed and constant among all tag sequence.  This is equivalent to maximize P(t1, t2, ... tn, w1, w2, ... wn) which is equal to P(t1|S)*P(t2|t1)…P(E|tn)*P(w1|t1)*P(w2|t2)…

Now, P(t1|S), P(t2|t1) ... can be estimated by counting the occurrence within the training data.
P(t2|t1) = count(t1, t2) / count(*, t2)

Similarly, P(w1|t1) = count(w1, t1) / count(*, t1)

Viterbi Algorithm

Now the problem is find a tag sequence t1, ... tn to maximize
P(t1|S)*P(t2|t1)…P(E|tn)*P(w1|t1)*P(w2|t2)…

A naive method is to find all possible combination of tag sequence and then evaluate the above probability.  The order of complexity will be O(|T|^n) where T is the number of possible tag values.  Notice that this is exponential complexity with respect to the length of the sentence.

However, there is a more effective Viterbi algorithm that leverage the Markov chain properties as well as dynamic programming technique.



The key element is M(k, L) which indicates the max probability of any length k sequence that ends at tk = L.  On the other hand, M(k, L) is computed by looking back different choices of S of the length k-1 sequence, and pick the one that gives the maximum M(k-1, S)*P(tk=L | tk-1=S)*P(wk|tk=L).  The complexity of this algorithm is O(n*|T|^2).

To find the actual tag sequence, we also maintain a back pointer from every cell to S which leads to the cell.  Then we can trace back the path from the max cell  M(n, STOP) where STOP is the end of sentence.

Notice that for some rare words that is not observed from the training data,  P(wk|tk=L) will be zero and cause the whole term to be zero.  Such words can be numbers, dates.  One way to solve this problem is to group these rare words into patterns (e.g. 3 digits, year2012 ... etc) and then compute P(group(wk) | tk=L).  However, such grouping is domain specific and has to be hand-tuned.

Reference

NLP course from Michael Collins of Columbia Unversity

Tuesday, February 12, 2013

Basic Planning Algorithm

You can think of planning as a graph search problem where each node in the graph represents a possible "state" of the reality. A directed edge from nodeA to nodeB representing an "action" is available to transition stateA to stateB.

Planning can be thought of as another form of a constraint optimization problem which is quite different from the one I described in my last blog. In this case, the constraint is the goal state we want to achieve, where a sequence of actions needs to be found to meet the constraint. The sequence of actions will incur cost and our objective is to minimize the cost associated with our chosen actions

Basic Concepts 

A "domain" defined the structure of the problem.
  • A set of object types.  e.g. ObjectTypeX, ObjectTypeY ... etc.
  • A set of relation types  e.g. [ObjectTypeX RelationTypeA ObjectTypeY] or [ObjectTypeX RelationTypeA ValueTypeY]
A "state" is composed of a set of relation instances,  It can either be a "reality state" or a "required state".

A reality state contains tuples of +ve atoms.  e.g. [(personX in locationA), (personX is male)].  Notice that -ve atoms will not exist in reality state.  e.g. If personX is NOT in locationB, such tuple will just not show up in the state.

A required state contains both +ve and -ve atoms.  e.g. [(personX in locationA), NOT(personX is male)]  The required state is used to check against the reality state.  The required state is reached if all of the following is true.
  • All +ve atoms in the required state is contained in the +ve atoms of the reality state
  • None of the -ve atoms in the required state is contained in the +ve atoms of the reality state
Notice that there can be huge (or even infinite) number of nodes and edges in the graph if we are to expand the whole graph (with all possible states and possible actions).  Normally we will expressed only a subset of nodes and edges in an analytical way.  Instead of enumerating all possible states, we describe the state as a set of relations that we care, in particular we describe the initial state of the environment with all the things we observed and the goal state as what we want to reach.  Similarity, we are not enumerate every possible edges, instead we describe actions with variables such that it describe rules that can transition multiple situations of states.


An "action" causes transition from one state to the other.  It is defined as action(variable1, variable2 ...) and contains the following components.
  • Pre-conditions: a required state containing a set of tuples (expressed by variables).  The action is feasible if the current reality state contains all the +ve atoms but not any -ve atoms specified in the pre-conditions.
  • Effects: A set of +ve atoms and -ve atoms (also expressed by variables).  After the action is taken, it removes all the -ve atoms from the current reality state and then insert all the +ve atoms into the current reality state.
  • Cost of executing this actio.
Notice that since actions contains variables but the reality state does not, therefore before an action can be execution, we need to bind the variables in the pre-conditions to a specific value such that it will match the current reality state.  This binding will propagate to the variable in the effects of the actions and new atoms will be insert / removed from the reality state.

Planning Algorithm

This can be think of a Search problem.  Given an initial state and a goal state, our objective is to search for a sequence of actions such that the goal state is reached.



We can perform the search from the initial state, expand all the possible states that can be reached by taking some actions, and check during this process whether the goal state has been reached.  If so, terminate the process and return the path.

Forward planning build the plan from the initial state.  It works as follows ...
  1. Put the initial state into the exploration queue, with an empty path.
  2. Pick a state (together with its path from initial state) from the exploration queue as the current state according to some heuristics.
  3. If this current state is the goal state, then return its path that contains the sequence of action and we are done.  Else move on.
  4. For this current state, explore which action is possible by seeing whether the current state meet the pre-conditions (ie: contains all +ve and no -ve state specified in the action pre-conditions).
  5. If the action is feasible, compute the next reachable state, and the path (by adding this action to the original path), insert the next state into the exploration queue.
  6. Repeat 5 for all feasible actions of current state.

Alternatively, we can perform the search from the goal state.  We looked at what need to be accomplished and identify what possible actions can accomplish that (ie: the effect of the action meets the goal state).  Then we looked at whether those actions are feasible (ie: the initial state meets the action's pre-conditions).  If so we can execute the action, otherwise we take the action's pre-conditions as our sub-goal and expand our over goal state.

Backward planning build the plan from the goal state.  It works as follows ...
  1. Put the goal state into the exploration queue, with an empty path.
  2. Pick a regression state (a state that can reach the goal state, can be considered as a sub-goal) from the exploration queue according to some heuristics.
  3. If the regression state is contained in initial state, then we are done and return the path as the plan.  Else move on.
  4. From this regression state, identify all "relevant actions"; those actions who has some +ve effect is contained in the regression state; and all of its +ve effect is not overlap with the -ve regression state; and all of its -ve effect is not overlap with the +ve regression state.
  5. If the action is relevant, compute the next regression state by removing the action effect from the current regression state and adding the action pre-conditions into the current regression state, insert the next regression state into the exploration queue.
  6. Repeat 5 from all relevant actions of current regression state.

Heuristic Function

In above algorithms, to pick the next candidate from the exploration queue.  We can employ many strategies.
  • If we pick the oldest element in the queue, this is a breathe-first search
  • If we pick the youngest element in the queue, this is a depth-first search
  • We can pick the best element in the queue based on some value function.
Notice that what is "best" is very subjective and is also domain specific. A very popular approach is using the A* search whose value function = g(thisState) + h(thisState).

Notice that g(thisState) is the accumulative cost to move from initial state to "thisState", while h(thisState) is a domain-specific function that estimate the cost from "thisState" to the goal state.  It can be proved that in order for A* search to return an optimal solution (ie: the least cost path), the chosen h(state) function must not over-estimate (ie: need to underestimate) the actual cost to move from "thisState" to the goal state.

Here is some detail of A* search.

Monday, January 14, 2013

Optimization in R

Optimization is a very common problem in data analytics.  Given a set of variables (which one has control), how to pick the right value such that the benefit is maximized.  More formally, optimization is about determining a set of variables x1, x2, … that maximize or minimize an objective function f(x1, x2, …).

Unconstrained optimization

In an unconstraint case, the variable can be freely selected within its full range.

A typical solution is to compute the gradient vector of the objective function [∂f/∂x1, ∂f/∂x2, …] and set it to [0, 0, …].  Solve this equation and output the result x1, x2 … which will give the local maximum.

In R, this can be done by a numerical analysis method.

> f <- function(x){(x[1] - 5)^2 + (x[2] - 6)^2}
> initial_x <- c(10, 11)
> x_optimal <- optim(initial_x, f, method="CG")
> x_min <- x_optimal$par
> x_min
[1] 5 6

Equality constraint optimization

Moving onto the constrained case, lets say x1, x2 … are not independent and then have to related to each other in some particular way: g1(x1, x2, …) = 0,  g2(x1, x2, …) = 0.

The optimization problem can be expressed as …
Maximize objective function: f(x1, x2, …)
Subjected to equality constraints:
  • g1(x1, x2, …) = 0
  • g2(x1, x2, …) = 0
A typical solution is to turn the constraint optimization problem into an unconstrained optimization problem using Lagrange multipliers.

Define a new function F as follows ...
F(x1, x2, …, λ1, λ2, …) = f(x1, x2, …) + λ1.g1(x1, x2, …) + λ2.g2(x1, x2, …) + …

Then solve for ...
[∂F/∂x1, ∂F/∂x2, …, ∂F/∂λ1, ∂F/∂λ2, …] = [0, 0, ….]

Inequality constraint optimization

In this case, the constraint is inequality.  We cannot use the Lagrange multiplier technique because it requires equality constraint.  There is no general solution for arbitrary inequality constraints.

However, we can put some restriction in the form of constraint.  In the following, we study two models where constraint is restricted to be a linear function of the variables:
w1.x1 + w2.x2 + … >= 0

Linear Programming

Linear programming is a model where both the objective function and constraint function is restricted as linear combination of variables.  The Linear Programming problem can be defined as follows ...

Maximize objective function: f(x1, x2) = c1.x1 + c2.x2

Subjected to inequality constraints:
  • a11.x1 + a12.x2 <= b1
  • a21.x1 + a22.x2 <= b2
  • a31.x1 + a32.x2 <= b3
  • x1 >= 0, x2 >=0
As an illustrative example, lets walkthrough a portfolio investment problem.  In the example, we want to find an optimal way to allocate the proportion of asset in our investment portfolio.
  • StockA gives 5% return on average
  • StockB gives 4% return on average
  • StockC gives 6% return on average
To set some constraints, lets say my investment in C must be less than sum of A, B.  Also, investment in A cannot be more than twice of B.  Finally, at least 10% of investment in each stock.

To formulate this problem:

Variable: x1 = % investment in A, x2 = % in B, x3 = % in C

Maximize expected return: f(x1, x2, x3) = x1*5% + x2*4% + x3*6%

Subjected to constraints:
  • 10% < x1, x2, x3 < 100%
  • x1 + x2 + x3 = 1
  • x3 < x1 + x2
  • x1 < 2 * x2

> library(lpSolve)
> library(lpSolveAPI)
> # Set the number of vars
> model <- make.lp(0, 3)
> # Define the object function: for Minimize, use -ve
> set.objfn(model, c(-0.05, -0.04, -0.06))
> # Add the constraints
> add.constraint(model, c(1, 1, 1), "=", 1)
> add.constraint(model, c(1, 1, -1), ">", 0)
> add.constraint(model, c(1, -2, 0), "<", 0)
> # Set the upper and lower bounds
> set.bounds(model, lower=c(0.1, 0.1, 0.1), upper=c(1, 1, 1))
> # Compute the optimized model
> solve(model)
[1] 0
> # Get the value of the optimized parameters
> get.variables(model)
[1] 0.3333333 0.1666667 0.5000000
> # Get the value of the objective function
> get.objective(model)
[1] -0.05333333
> # Get the value of the constraint
> get.constraints(model)
[1] 1 0 0


Quadratic Programming

Quadratic programming is a model where both the objective function is a quadratic function (contains up to two term products) while constraint function is restricted as linear combination of variables.

The Quadratic Programming problem can be defined as follows ...

Minimize quadratic objective function:
f(x1, x2) = c1.x12 + c2. x1x2 + c2.x22 - (d1. x1 + d2.x2)
Subject to constraints
  • a11.x1 + a12.x2 == b1
  • a21.x1 + a22.x2 == b2
  • a31.x1 + a32.x2 >= b3
  • a41.x1 + a42.x2 >= b4
  • a51.x1 + a52.x2 >= b5
Express the problem in Matrix form:

Minimize objective ½(DTx) - dTx
Subject to constraint ATx >= b
First k columns of A is equality constraint

As an illustrative example, lets continue on the portfolio investment problem.  In the example, we want to find an optimal way to allocate the proportion of asset in our investment portfolio.
  • StockA gives 5% return on average
  • StockB gives 4% return on average
  • StockC gives 6% return on average
We also look into the variance of each stock (known as risk) as well as the covariance between stocks.

For investment, we not only want to have a high expected return, but also a low variance.  In other words, stocks with high return but also high variance is not very attractive.  Therefore, maximize the expected return and minimize the variance is the typical investment strategy.

One way to minimize variance is to pick multiple stocks (in a portfolio) to diversify the investment.  Diversification happens when the stock components within the portfolio moves from their average in different direction (hence the variance is reduced).

The Covariance matrix ∑ (between each pairs of stocks) is given as follows:
1%, 0.2%, 0.5%
0.2%, 0.8%, 0.6%
0.5%, 0.6%, 1.2%

In this example, we want to achieve a overall return of at least 5.2% of return while minimizing the variance of return

To formulate the problem:

Variable: x1 = % investment in A, x2 = % in B, x3 = % in C

Minimize variance: xT∑x

Subjected to constraint:
  • x1 + x2 + x3 == 1
  • X1*5% + x2*4% + x3*6% >= 5.2%
  • 0% < x1, x2, x3 < 100%
> library(quadprog)
> mu_return_vector <- c(0.05, 0.04, 0.06) 
> sigma <- matrix(c(0.01, 0.002, 0.005, 
+                   0.002, 0.008, 0.006, 
+                   0.005, 0.006, 0.012), 
+                  nrow=3, ncol=3)
> D.Matrix <- 2*sigma
> d.Vector <- rep(0, 3)
> A.Equality <- matrix(c(1,1,1), ncol=1)
> A.Matrix <- cbind(A.Equality, mu_return_vector, 
                    diag(3))
> b.Vector <- c(1, 0.052, rep(0, 3))
> out <- solve.QP(Dmat=D.Matrix, dvec=d.Vector, 
                  Amat=A.Matrix, bvec=b.Vector, 
                  meq=1)
> out$solution
[1] 0.4 0.2 0.4
> out$value
[1] 0.00672
>

Integration with Predictive Analytics

Optimization is usually integrated with predictive analytics, which is another important part of data analytics.  For an overview of predictive analytics, here is my previous blog about it.

The overall processing can be depicted as follows:


In this diagram, we use machine learning to train a predictive model in batch mode.  Once the predictive model is available, observation data is fed into it at real time and a set of output variables is predicted.  Such output variable will be fed into the optimization model as the environment parameters (e.g. return of each stock, covariance ... etc.) from which a set of optimal decision variable is recommended.

Wednesday, January 2, 2013

MapReduce: Detecting Cycles in Network Graph

I recently received an email from an audience of my blog on Map/Reduce algorithm design regarding how to detect whether a graph is acyclic using Map/Reduce.  I think this is an interesting problem and can imagine there can be wide range of application to it.

Although I haven't solved this exact problem in the past, I'd like to sketch out my thoughts on a straightforward approach, which may not be highly optimized.  My goal is to invite other audience who has solved this problem to share their tricks.

To define the problem:  Given a simple directed graph, we want to tell whether it contains any cycles.

Lets say the graph is represented in incidence edge format where each line represent an edge from one node to another node.  The graph can be split into multiple files.

node1, node2
node4, node7
node7, node1
....

Here is a general description of the algorithm.

We'll maintain two types of graph data.
  1. A set of link files where each line represent an edge in the graph.  This file will be static.
  2. A set of path files where each line represent a path from one node to another node.  This file will be updated in each round of map/reduce.
The general idea is to grow the path file until either the path cannot grow further, or the path contain a cycle, we'll use two global flags to keep track of that: "change" flag to keep track of whether new path is discovered, and "cycle" flag to keep track whether a cycle is detected.

The main driver program will create the initial path file by copying the link file.  And it will set the initial flag condition:  Set cycle and change flag to true.  In each round, the driver will
  • Check if the cycle is still false and change is true, exit if this is not the case
  • Otherwise, it will clear the change flag and invoke the map/reduce
Within each round, the map function will do the following
  • Emit the tuple [key=toNode, value=fromNode] if it is a path
  • Emit the tuple [key=fromNode, value=toNode] if it is a link
This ensure a path and all extended link reaches the same reducer, which will do the following
  • Check to see if the beginning point of the path is the same as the endpoint of the link.  If so, it detects a cycle and mark the cycle flag to be true.
  • Otherwise, it compute the product of every path to every link, and form the new path which effectively extend the length by one.

The following diagram illustrates the algorithm ...