Friday, May 18, 2012

Integer encoding for Hadoop

In short, integer encoding is mechanism to compact multiple integer values into single byte array. It is well described in literature [1] and in this post we will review exemplary implementation.

Unless one wants to re-implement encoding algorithms, we will reuse Varint class from mahout-core [2].
Simplest use-case for 2 integers looks like:


Here, we declared structure of 2 integers - Tuple2I, and followed it by Encoder example that encodes and decodes integers to and from byte array.

For real-world usages of the Integer encoder, refer to Surus [3]. By wide adoption of integer encoding on 3 and 4 integer tuples, I was able to reduce Mapper output by 20-30%, and saved about 30 minutes of computation time.

[1] Data-Intensive Text Processing with MapReduce
http://www.amazon.com/Data-Intensive-Processing-MapReduce-Synthesis-Technologies/dp/1608453421

[2] Typical Maven repository
http://mvnrepository.com/artifact/org.apache.mahout/mahout-core/

[3] Surus
https://github.com/mushkevych/surus

Friday, May 11, 2012

R: running by Hadoop mapreduce

Running R from Hadoop mapreduce ain't easy. Before any work can began, one must configure its environment:
  1. R must be installed along with all required libraries on each mapred node in the Hadoop cluster.
  2. Communication is performed by JNI interface via rJava/JRI interface.
    This package must also be installed on each mapred node in the cluster.
  3. Following env variables must be exported (paths are relative to specific environment):
export R_HOME=/usr/lib64/R
JRI_HOME=/usr/lib64/R/site-library/rJava/jri
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${JRI_HOME}/JRI.jar:${JRI_HOME}/JRIEngine.jar:${JRI_HOME}/REngine.jar
export JAVA_LIBRARY_PATH=${JRI_HOME}

You can safely book 1+ days for installation headaches. In my case we had to cook RPMs for almost every R component.
After it is complete, you might want to exercise mapreduce parallelism with R just to find out messages like:


WARNING: org.rosuda.JRI.Mutex was unlocked by other thread than locked! This may soon lead to a crash...

This leads us to primary limitation of the straight-forward rJava/JRI usage (see P.S. for details): 
There can be only 1 process/thread accessing R instance per box. 

In my circumstances, it was not critical, as I was able to complete computation on a single reducer within several hours... however with ever larger result sets this may be a prohibitive restriction.

Let's see how the mapreduce R workhorse looks like:


I highly recommend following reading:

[1] rJava/JRI source code repos:
http://www.rforge.net/rJava/svn.html
http://www.rforge.net/JRI/svn.html 

[2] Previous post with working example Java-to-R interaction:
http://mushkevych.blogspot.com/2012/04/r-running-by-java-process.html

[3] Rengine: public Mutex getRsync()
http://www.rosuda.org/r/nightly/javadoc/org/rosuda/JRI/Rengine.html#getRsync()


P.S.
There might be a way to start and maintain an R instance per rJava/JRI client, however I was not lucky to identify it.

P.P.S.
It should be possible to share single R instance between several Java processes/threads, by direct synchronization at JNI level [3].
However, expected performance gains vs. code complexity must be weighted carefully.