Thursday, December 20, 2012

big data: self healing

While working on a recent project, I have noted its interesting side-effect of self-healing data. In this post I will try to show a simple yet functioning flow that keeps data fresh and healthy.

And for the beginning, let me start with definitions of an environment, and a problem we are trying to solve. For the environment, we will be using Hadoop+HBase+Surus ORM+Scheduler and following data flow:
Figure 1: data flow
To better understand the problem, let's review typical day at product aggregation system. It starts with importing millions of products (manufacturer, specs, image url, price, etc). Later, we group all imported records by product id to come up with the consolidated picture of product availability and pricing.

And now let's add a bit of bitter reality into our idyll - many products are coming "broken". Some have broken product id, some - missing description or improper pricing. Yes, we have business rules in place to address most of concerns at the phase of import or first mapreduce job. However, some are still breaking thru. In real-world this number is calculated in thousands.

As we follow the arrow of time, our database accumulates number of broken products, large enough to dissuade users. We need a "potion" to cure our data of the broken records, and here is the recipe.

Lets make few assumptions:
  • Data Sources (DS) are being updated every T1 interval
    For example: every 24 hours
  • Number of broken input records is statistically the same
    For example: 10 broken records per every 10000
  • Distribution of broken records is normal
    Thus, every data source is providing comparable number of broken records; records which were OK yesterday might be broken today and vice versa
Recipe:
  • Records from DS are inserted into log table. Timestamp, identifying DS update time, is attached to every record.
  • First mapreduce job:
    - source: log table
    - sink: product table
    - aggregates log records from specific timeperiod (for instance, from last day)
    - every product record has a field "last_update" that holds highest timestamp from constituting log records
  • Second mapreduce job:
    - source: product table
    - sink: stash table
    - reviews all product records and marks ones that have last_update value older than T2, where T2 >= T1 (for instance T2 can be 2 days)
    - all marked products are moved from product table to stash table
In described flow we move all outdated records into a stash table. With above assumptions in place, this means that broken products will _likely_ be eliminated from product table within time frame T2.

On the final note, let's compare this approach with setting HBase table TTL [1] to T2. First, TTL will remove key-value pairs from record, if they are not updated regularly. This is inefficient in our case, as some information (for instance specifications, tokens, comments) may be updated as rarely as once per record's life. With TTL in place, we will lose this information in T2 time. Secondly, information in valuable. By moving product records into stash table, we grand ourself ability to later use that information. For example: to track product lifetime price trend, do analytics on retired products, or even revive products once they are back in stores.

Saturday, October 06, 2012

Surus ORM - One Year Later

It has been a little more than a year since Surus - HBase ORM [1] became available via Github. For its birthday party, Surus got:
  • Support of multi-component rowKeys
    This feature is implemented in HPrimaryKey class and @HFieldComponent annotation
  • Support of List<T> properties, which is implemented as @HListProperty annotation
  • Integration with HBase Explorer [2] 
  • Code clean-up. Now, Surus ORM fits in less than 20 java files
    As a result, all non-related code was moved to synergy-framework project [3]
For the first time, Surus ORM has also a road-map. Currently it contains support of multi-component properties.
Looking back onto the past year, I see it as an interesting endeavour. Surus ORM is still Free Open Source, and you are welcome to fork/use/contribute to it!

Cheers!

[1] Surus ORM
https://github.com/mushkevych/surus/

[2] HBase Explorer + Surus ORM integration
https://github.com/mushkevych/hbase-explorer

[3] Synergy-framework repository
https://github.com/mushkevych/synergy-framework

Thursday, September 27, 2012

HBase Explorer + Surus = Integration

HBase Explorer (HBE) [1] is UI tool to manipulate and explore HBase instances. I have been using it on big data projects for more than a year, and gradually improved integration with Surus to a point, where:

  • Surus-covered tables are being processed by Surus ORM [3]
  • HBE supports multiple ORM and multi-component rowKeys via ORMInterface and ORMContext
  • All tables, not covered by custom ORM are processed by HBE default pattern mechanism

Let's take a look at two screenshots:


Please, note that rowKey components are changing to match the table structure. On the backend, it is supported by two new methods that were added to AbstractPrimaryKey class:

  • Map<String, Class> getComponents()
  • ImmutableBytesWritable generateRowKey(Map<String, Object> components);
First is irreplaceable, when it comes to finding out keyRow structure, and second is required to construct actual rowKey from HTML parameters. 

Next, let's review what would you need to do to plug-in custom ORM for HBE. It would be two simple steps:
  1. Implement interface ORMInterface
    Let's assume class' name will be "AnotherOrm"
  2. Register "AnotherOrm" instance in ORMContext static section:
        static {
            CONTEXT.add(new OrmSurus());
            CONTEXT.add(new AnotherOrm());
        }
  3. Build, deploy and use!
In summary: both Surus and HBE got cool features to make your life easier.
Cheers!

[1] HBase Explorer with Surus Integration and multi-ORM support:

[2] Original HBase Explorer:

[3] Surus ORM:

Friday, July 27, 2012

Cloudera Hadoop Certifications

 And so it happened! After 4 weeks of studying and 2 exams I am officially Cloudera Certified Developer for Apache Hadoop and Cloudera Certified Specialist in Apache HBase

Policies that each candidate signs before the test begins prohibit disclosure of questions and answers, so don't look for them here :)
However, I will gladly share my training materials, as well as focus on most problematic topics.

I have been studying from books [1] and [2], and read them from cover to cover despite considering many chapters as "well known". I strongly advice everybody seeking the certification to go thru each chapter. It will save you training expenses $1800 + $3000 + $Travel [3] and as any reading - provide solid knowledge base. You can also try to reduce certification costs [4] by asking Cloudera to grand you discount coupons.

Topics that I found intriguing:
- Write path and coherency model for HDFS
While reading the chapters, try to ask yourself a question: what will happen to files, shall the client die in the middle of the copy process; how HBase handles real-time duplication of WAL;
- InputFormat
Good news is that there are not many of them, so it should not take long to put sample format for each of them. As stated in [5], you might be asked to identify proper InputFormat given the sample.
- HBase Region Lookups mechanism
Be sure to understand how client finds -ROOT-, .META. and data. When is it querying ZooKeeper and what are fail-over algorithms. 
- HFile format and performance dependency on block size

In summary: exams are not too scary, but give yourself 2 weeks per exam to prepare properly.
Cheers!

[1] Hadoop: The Definitive Guide, 2nd Edition

[2] HBase: The Definitive Guide


[3] Cloudera training schedule

[4] Direct Cloudera certification 

[5] List of probable exam topics
http://university.cloudera.com/certification/CCDH.html

Monday, June 11, 2012

JMX for Python

As soon as head-less Python projects crosses certain level of complexity, you start noticing that something is missing - something with buttons, text areas and everything else to visualize data structures, state of your project... something like UI :)

In world of Java you have JMX to address most of it... but what can we use in Python? While working under Scheduler [1], I found that the easiest way to implement "Python MX" is to use werkzeug [2].

So, what do we need to get it working? First of all, we need HTML template (here, we arranged output in two columns):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<body>
<table style="width: 100%"> <tbody>
{%- for row in details.entries %}
<tr>
<td>{{ row[0] }}</td>
<td>{{ row[1] }}</td>
</tr>
{%- endfor %}
</tbody> </table>
</body>
</html>

Next, we need to expose URL for the template:
from werkzeug.utils import cached_property, redirect
from werkzeug.wrappers import Response
from utils import render_template, expose, jinja_env
@expose('/')
@expose('/example_details/')
def example_details(request):
details = ExampleDetails(jinja_env.globals['mbean'])
return render_template('template_example.html', details=details)
def not_found(request):
return render_template('not_found.html')
# And implement details themselves:
class ExampleDetails(object):
def __init__(self, mbean):
self.mbean = mbean
@cached_property
def entries(self):
return [[self.mbean.property_1, self.mbean.property_2]]
view raw mx.views.py hosted with ❤ by GitHub
Now, all we left to do is - start the MX and set the Bean:
class MBeanExample(object):
def __init__(self):
self.property_1 = 'this is'
self.property_2 = 'example'
def start_mx(self):
""" import MX module (which has back-reference import to self) and start it """
from mx.mx import MX
self.mx = MX(self)
self.mx.start_mx_thread()
if __name__ == '__main__':
source = MBeanExample()
source.start_mx()
Where MX is:
from threading import Thread
from werkzeug.wrappers import Request
from werkzeug.wsgi import ClosingIterator, SharedDataMiddleware
from werkzeug.exceptions import HTTPException, NotFound
from werkzeug.serving import run_simple
from utils import STATIC_PATH, local, local_manager, url_map, jinja_env
import views
class MX(object):
def __init__(self, mbean):
local.application = self
self.mbean = mbean
jinja_env.globals['mbean'] = mbean
self.dispatch = SharedDataMiddleware(self.dispatch, {
'/static': STATIC_PATH
})
def dispatch(self, environ, start_response):
local.application = self
request = Request(environ)
local.url_adapter = adapter = url_map.bind_to_environ(environ)
try:
endpoint, values = adapter.match()
handler = getattr(views, endpoint)
response = handler(request, **values)
except NotFound as e:
response = views.not_found(request)
response.status_code = 404
except HTTPException as e:
response = e
return ClosingIterator(response(environ, start_response),
[local_manager.cleanup])
def __call__(self, environ, start_response):
return self.dispatch(environ, start_response)
def start_mx_thread(self, hostname = None, port = None):
"""Spawns a new HTTP server, residing on defined hostname and port
:param hostname: the default hostname the server should listen on.
:param port: the default port of the server.
"""
hostname = '0.0.0.0'
port = 5000
reloader = False # use_reloader: the default setting for the reloader.
debugger= False #
evalex=True # use_evalex: the default setting for the evalex flag of the debugger.
threaded=False # threaded: the default threading setting.
processes=1 # processes: the default number of processes to start.
reloader_interval = 1
static_files=None # static_files: optional dict of static files.
extra_files=None # extra_files: optional list of extra files to track for reloading.
ssl_context=None # ssl_context: optional SSL context for running server in HTTPS mode.
mx_thread = Thread(target=run_simple(hostname = hostname,
port = port,
application = self,
use_debugger = debugger,
use_evalex = evalex,
extra_files = extra_files,
use_reloader = reloader,
reloader_interval = reloader_interval,
threaded = threaded,
processes = processes,
static_files=static_files,
ssl_context=ssl_context))
mx_thread.daemon = True
mx_thread.start()
view raw mx.mx.py hosted with ❤ by GitHub
And utils.py looks like:

from werkzeug.utils import cached_property, redirect
from werkzeug.wrappers import Response
from utils import render_template, expose, jinja_env
@expose('/')
@expose('/example_details/')
def example_details(request):
details = ExampleDetails(jinja_env.globals['mbean'])
return render_template('template_example.html', details=details)
def not_found(request):
return render_template('not_found.html')
# And implement details themselves:
class ExampleDetails(object):
def __init__(self, mbean):
self.mbean = mbean
@cached_property
def entries(self):
return [[self.mbean.property_1, self.mbean.property_2]]
view raw mx.views.py hosted with ❤ by GitHub

As soon as coding is complete navigate in your browser to "localhost:5000":
And with some efforts it can turn to:


All things considered - Python MX is not a trivial hack, however not a paramount either. With great tools like Werkzeug our life is much easier.
For real-world example, please refer to Scheduler MX folder [3].

[1] Scheduler
[2] Werkzeug

Friday, May 18, 2012

Integer encoding for Hadoop

In short, integer encoding is mechanism to compact multiple integer values into single byte array. It is well described in literature [1] and in this post we will review exemplary implementation.

Unless one wants to re-implement encoding algorithms, we will reuse Varint class from mahout-core [2].
Simplest use-case for 2 integers looks like:

import org.apache.log4j.Logger;
import org.apache.mahout.math.Varint;
import java.io.*;
/**
* @author Bohdan Mushkevych
* Description: module presents tuple of two int values: alpha and beta
*/
class Tuple2I {
protected int alpha;
protected int beta;
public Tuple2I(int alpha, int beta) {
this.alpha = alpha;
this.beta = beta;
}
/**
* @return int presenting alpha
*/
public int getAlpha() {
return alpha;
}
/**
* @return int presenting beta
*/
public int getBeta() {
return beta;
}
}
/**
* @author Bohdan Mushkevych
* Description: module contains logic for VarInt integer encoding: conversion to and from the byte array
*/
public class Encoder {
protected ByteArrayOutputStream baos = new ByteArrayOutputStream();
protected DataOutputStream dos = new DataOutputStream(baos);
private Logger log = Logger.getRootLogger();
public Encoder() {
}
/**
* Method generates byte[] for varargs of integers
* @param args varargs of Integer type
* @return generated value
*/
protected byte[] getByteArray(int... args) {
byte[] result = null;
try {
baos.reset();
for (int i : args) {
Varint.writeSignedVarInt(i, dos);
}
result = baos.toByteArray();
} catch (IOException e) {
log.error("Exception on Integer encoding", e);
}
return result;
}
/**
* method decodes tuple from byte array
* @param value encoded tuple
* @return formed tuple instance
*/
public Tuple2I decode(byte[] value) {
ByteArrayInputStream bais = new ByteArrayInputStream(value);
DataInputStream dis = new DataInputStream(bais);
Tuple2I tuple = null;
try {
int alpha = Varint.readSignedVarInt(dis);
int beta = Varint.readSignedVarInt(dis);
tuple = new Tuple2I(alpha, beta);
} catch (IOException e) {
log.error("Exception on Integer decoding", e);
}
return tuple;
}
}
view raw Encoder.java hosted with ❤ by GitHub

Here, we declared structure of 2 integers - Tuple2I, and followed it by Encoder example that encodes and decodes integers to and from byte array.

For real-world usages of the Integer encoder, refer to Surus [3]. By wide adoption of integer encoding on 3 and 4 integer tuples, I was able to reduce Mapper output by 20-30%, and saved about 30 minutes of computation time.

[1] Data-Intensive Text Processing with MapReduce
http://www.amazon.com/Data-Intensive-Processing-MapReduce-Synthesis-Technologies/dp/1608453421

[2] Typical Maven repository
http://mvnrepository.com/artifact/org.apache.mahout/mahout-core/

[3] Surus
https://github.com/mushkevych/surus

Friday, May 11, 2012

R: running by Hadoop mapreduce

Running R from Hadoop mapreduce ain't easy. Before any work can began, one must configure its environment:
  1. R must be installed along with all required libraries on each mapred node in the Hadoop cluster.
  2. Communication is performed by JNI interface via rJava/JRI interface.
    This package must also be installed on each mapred node in the cluster.
  3. Following env variables must be exported (paths are relative to specific environment):
export R_HOME=/usr/lib64/R
JRI_HOME=/usr/lib64/R/site-library/rJava/jri
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${JRI_HOME}/JRI.jar:${JRI_HOME}/JRIEngine.jar:${JRI_HOME}/REngine.jar
export JAVA_LIBRARY_PATH=${JRI_HOME}

You can safely book 1+ days for installation headaches. In my case we had to cook RPMs for almost every R component.
After it is complete, you might want to exercise mapreduce parallelism with R just to find out messages like:


WARNING: org.rosuda.JRI.Mutex was unlocked by other thread than locked! This may soon lead to a crash...

This leads us to primary limitation of the straight-forward rJava/JRI usage (see P.S. for details): 
There can be only 1 process/thread accessing R instance per box. 

In my circumstances, it was not critical, as I was able to complete computation on a single reducer within several hours... however with ever larger result sets this may be a prohibitive restriction.

Let's see how the mapreduce R workhorse looks like:

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
import org.rosuda.JRI.REXP;
import org.rosuda.JRI.RMainLoopCallbacks;
import org.rosuda.JRI.Rengine;
import java.io.IOException;
/**
* @author Bohdan Mushkevych
* date: May 2012
*/
public class ExemplaryRReducer extends AbstractTableReducer<ImmutableBytesWritable, Writable> {
private static Logger log = Logger.getLogger(ExemplaryRReducer.class);
protected Rengine re;
static class LoggingConsole implements RMainLoopCallbacks {
private Logger log;
LoggingConsole(Logger log) {
this.log = log;
}
public void rWriteConsole(Rengine re, String text, int oType) {
log.info("*** R: callback rWriteConsole(" + text + ")");
}
public void rBusy(Rengine re, int which) {
log.info("*** R: callback rBusy(" + which + ")");
}
public void rShowMessage(Rengine re, String message) {
log.info("*** R: callback rShowMessage \"" + message + "\"");
}
public String rReadConsole(Rengine re, String prompt, int addToHistory) {
return null;
}
public String rChooseFile(Rengine re, int newFile) {
return null;
}
public void rFlushConsole(Rengine re) {
}
public void rLoadHistory(Rengine re, String filename) {
}
public void rSaveHistory(Rengine re, String filename) {
}
}
/**
* R Engine initialization is in a different method than *setup* to enable Unit Testing
* @param runMainLoop if set to <code>true</code> the the event loop will be started as soon as possible,
* otherwise no event loop is started. Running loop requires <code>initialCallbacks</code> to be set correspondingly as well.
*/
public void initR(boolean runMainLoop) {
// Call R and perform coefficient computing
// just making sure we have the right version of everything
if (!Rengine.versionCheck()) {
throw new IllegalStateException("*** R: version mismatch - Java files don't match R library version.");
}
// --vanilla Combine --no-save, --no-restore, --no-site-file, --no-init-file and --no-environ
// --slave Make R run as quietly as possible
// for more details run <code>R --help</code> from command line
re = new Rengine(new String[]{"--vanilla", "--slave"}, runMainLoop, new LoggingConsole(log));
// the engine creates R is a new thread, so we should wait until it's ready
if (!re.waitForR()) {
throw new IllegalStateException("*** R: cannot start the engine.");
}
try {
// check if "reshape" package is installed
re.eval("is.installed <- function(mypkg) is.element(mypkg, installed.packages()[,1])");
REXP isInstalled = re.eval("is.installed(\"reshape\")");
if (isInstalled.asBool().isFALSE()) {
log.info("*** R: reshape package is missing. Installing locally.");
// install "reshape" package if this is needed
re.eval("install.packages(\"reshape\", repos=\"http://cran.stat.sfu.ca/\")");
} else {
log.info("*** R: reshape package is installed. Proceeding.");
}
} catch (Exception e) {
log.error("*** Exception during R initialization: ", e);
}
try {
// load "reshape" package
re.eval("library(reshape)");
} catch (Exception e) {
log.error("*** Exception while loading reshape package: ", e);
}
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
initR(false);
}
/**
* method performs R computations
*/
public void performRComputations() {
try {
// clear workspace before new processing round
re.eval("rm(list=ls())");
// perform some useful computations
re.eval("N <- SOMETHING_USEFUL");
} catch (Exception e) {
log.error("*** Exception on R stage: ", e);
}
}
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Writable> values, Context context) throws IOException, InterruptedException {
// aggregate records
for (Writable value : values) {
Result singleResult = (Result) value;
// do something useful with singleResult
// perform R computations
performRComputations();
// place computation results into HBase
// remember to configure Put object properly
Put put = new Put();
context.write(key, put);
}
}
/**
* R Engine closure is in a separate method than *cleanup* to enable Unit Testing
*/
public void cleanR() {
if (re == null) {
return;
}
re.end();
if (!re.waitForR()) {
log.info("*** R: engine is stopped.");
} else {
log.info("*** R: engine turned to zombie.");
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
cleanR();
super.cleanup(context);
}
}

I highly recommend following reading:

[1] rJava/JRI source code repos:
http://www.rforge.net/rJava/svn.html
http://www.rforge.net/JRI/svn.html 

[2] Previous post with working example Java-to-R interaction:
http://mushkevych.blogspot.com/2012/04/r-running-by-java-process.html

[3] Rengine: public Mutex getRsync()
http://www.rosuda.org/r/nightly/javadoc/org/rosuda/JRI/Rengine.html#getRsync()


P.S.
There might be a way to start and maintain an R instance per rJava/JRI client, however I was not lucky to identify it.

P.P.S.
It should be possible to share single R instance between several Java processes/threads, by direct synchronization at JNI level [3].
However, expected performance gains vs. code complexity must be weighted carefully.

Sunday, April 22, 2012

Resetting BIOS Password

Ain't if funny how passwords complicate our lives? My land-lord has an old HP Pavilion with Phoenix BIOS and sometime ago installed boot-up password to make her feel secure, and then - she forgot it.

So, brute force approach says that you have to take out CMOS battery and wait for 12 hours. But disassembling laptop takes time and imposes unnecessary liability. So, after trying list of default back-door passwords [1], time came to either declare defeat or use Google for another 15 minutes :)

In result, please consider dogber1.blogspot.ca [2] and its Web UI [3].
BIOS in question was showing check-sum after third invalid password:
 

Dogber1 provides .py script [2] to be run on your box, or you can try your luck with online tool [3]:
It took me 3 attempts to enter the BIOS and reset password.
Question that is left unanswered - if I was able to enter the system in 15 minutes... how much time it takes for gentlemen doing that for living?


[1] List of known back-door BIOS passwords
http://www.uktsupport.co.uk/reference/biosp.htm

[2] BIOS reset password solution
http://dogber1.blogspot.ca/2009/05/table-of-reverse-engineered-bios.html

[3] Web UI for dogber1 generator
http://bios-pw.org.ua