Thursday, December 20, 2012

big data: self healing

While working on a recent project, I have noted its interesting side-effect of self-healing data. In this post I will try to show a simple yet functioning flow that keeps data fresh and healthy.

And for the beginning, let me start with definitions of an environment, and a problem we are trying to solve. For the environment, we will be using Hadoop+HBase+Surus ORM+Scheduler and following data flow:
Figure 1: data flow
To better understand the problem, let's review typical day at product aggregation system. It starts with importing millions of products (manufacturer, specs, image url, price, etc). Later, we group all imported records by product id to come up with the consolidated picture of product availability and pricing.

And now let's add a bit of bitter reality into our idyll - many products are coming "broken". Some have broken product id, some - missing description or improper pricing. Yes, we have business rules in place to address most of concerns at the phase of import or first mapreduce job. However, some are still breaking thru. In real-world this number is calculated in thousands.

As we follow the arrow of time, our database accumulates number of broken products, large enough to dissuade users. We need a "potion" to cure our data of the broken records, and here is the recipe.

Lets make few assumptions:
  • Data Sources (DS) are being updated every T1 interval
    For example: every 24 hours
  • Number of broken input records is statistically the same
    For example: 10 broken records per every 10000
  • Distribution of broken records is normal
    Thus, every data source is providing comparable number of broken records; records which were OK yesterday might be broken today and vice versa
Recipe:
  • Records from DS are inserted into log table. Timestamp, identifying DS update time, is attached to every record.
  • First mapreduce job:
    - source: log table
    - sink: product table
    - aggregates log records from specific timeperiod (for instance, from last day)
    - every product record has a field "last_update" that holds highest timestamp from constituting log records
  • Second mapreduce job:
    - source: product table
    - sink: stash table
    - reviews all product records and marks ones that have last_update value older than T2, where T2 >= T1 (for instance T2 can be 2 days)
    - all marked products are moved from product table to stash table
In described flow we move all outdated records into a stash table. With above assumptions in place, this means that broken products will _likely_ be eliminated from product table within time frame T2.

On the final note, let's compare this approach with setting HBase table TTL [1] to T2. First, TTL will remove key-value pairs from record, if they are not updated regularly. This is inefficient in our case, as some information (for instance specifications, tokens, comments) may be updated as rarely as once per record's life. With TTL in place, we will lose this information in T2 time. Secondly, information in valuable. By moving product records into stash table, we grand ourself ability to later use that information. For example: to track product lifetime price trend, do analytics on retired products, or even revive products once they are back in stores.