Friday, May 18, 2012

Integer encoding for Hadoop

In short, integer encoding is mechanism to compact multiple integer values into single byte array. It is well described in literature [1] and in this post we will review exemplary implementation.

Unless one wants to re-implement encoding algorithms, we will reuse Varint class from mahout-core [2].
Simplest use-case for 2 integers looks like:

import org.apache.log4j.Logger;
import org.apache.mahout.math.Varint;
import java.io.*;
/**
* @author Bohdan Mushkevych
* Description: module presents tuple of two int values: alpha and beta
*/
class Tuple2I {
protected int alpha;
protected int beta;
public Tuple2I(int alpha, int beta) {
this.alpha = alpha;
this.beta = beta;
}
/**
* @return int presenting alpha
*/
public int getAlpha() {
return alpha;
}
/**
* @return int presenting beta
*/
public int getBeta() {
return beta;
}
}
/**
* @author Bohdan Mushkevych
* Description: module contains logic for VarInt integer encoding: conversion to and from the byte array
*/
public class Encoder {
protected ByteArrayOutputStream baos = new ByteArrayOutputStream();
protected DataOutputStream dos = new DataOutputStream(baos);
private Logger log = Logger.getRootLogger();
public Encoder() {
}
/**
* Method generates byte[] for varargs of integers
* @param args varargs of Integer type
* @return generated value
*/
protected byte[] getByteArray(int... args) {
byte[] result = null;
try {
baos.reset();
for (int i : args) {
Varint.writeSignedVarInt(i, dos);
}
result = baos.toByteArray();
} catch (IOException e) {
log.error("Exception on Integer encoding", e);
}
return result;
}
/**
* method decodes tuple from byte array
* @param value encoded tuple
* @return formed tuple instance
*/
public Tuple2I decode(byte[] value) {
ByteArrayInputStream bais = new ByteArrayInputStream(value);
DataInputStream dis = new DataInputStream(bais);
Tuple2I tuple = null;
try {
int alpha = Varint.readSignedVarInt(dis);
int beta = Varint.readSignedVarInt(dis);
tuple = new Tuple2I(alpha, beta);
} catch (IOException e) {
log.error("Exception on Integer decoding", e);
}
return tuple;
}
}
view raw Encoder.java hosted with ❤ by GitHub

Here, we declared structure of 2 integers - Tuple2I, and followed it by Encoder example that encodes and decodes integers to and from byte array.

For real-world usages of the Integer encoder, refer to Surus [3]. By wide adoption of integer encoding on 3 and 4 integer tuples, I was able to reduce Mapper output by 20-30%, and saved about 30 minutes of computation time.

[1] Data-Intensive Text Processing with MapReduce
http://www.amazon.com/Data-Intensive-Processing-MapReduce-Synthesis-Technologies/dp/1608453421

[2] Typical Maven repository
http://mvnrepository.com/artifact/org.apache.mahout/mahout-core/

[3] Surus
https://github.com/mushkevych/surus

Friday, May 11, 2012

R: running by Hadoop mapreduce

Running R from Hadoop mapreduce ain't easy. Before any work can began, one must configure its environment:
  1. R must be installed along with all required libraries on each mapred node in the Hadoop cluster.
  2. Communication is performed by JNI interface via rJava/JRI interface.
    This package must also be installed on each mapred node in the cluster.
  3. Following env variables must be exported (paths are relative to specific environment):
export R_HOME=/usr/lib64/R
JRI_HOME=/usr/lib64/R/site-library/rJava/jri
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${JRI_HOME}/JRI.jar:${JRI_HOME}/JRIEngine.jar:${JRI_HOME}/REngine.jar
export JAVA_LIBRARY_PATH=${JRI_HOME}

You can safely book 1+ days for installation headaches. In my case we had to cook RPMs for almost every R component.
After it is complete, you might want to exercise mapreduce parallelism with R just to find out messages like:


WARNING: org.rosuda.JRI.Mutex was unlocked by other thread than locked! This may soon lead to a crash...

This leads us to primary limitation of the straight-forward rJava/JRI usage (see P.S. for details): 
There can be only 1 process/thread accessing R instance per box. 

In my circumstances, it was not critical, as I was able to complete computation on a single reducer within several hours... however with ever larger result sets this may be a prohibitive restriction.

Let's see how the mapreduce R workhorse looks like:

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
import org.rosuda.JRI.REXP;
import org.rosuda.JRI.RMainLoopCallbacks;
import org.rosuda.JRI.Rengine;
import java.io.IOException;
/**
* @author Bohdan Mushkevych
* date: May 2012
*/
public class ExemplaryRReducer extends AbstractTableReducer<ImmutableBytesWritable, Writable> {
private static Logger log = Logger.getLogger(ExemplaryRReducer.class);
protected Rengine re;
static class LoggingConsole implements RMainLoopCallbacks {
private Logger log;
LoggingConsole(Logger log) {
this.log = log;
}
public void rWriteConsole(Rengine re, String text, int oType) {
log.info("*** R: callback rWriteConsole(" + text + ")");
}
public void rBusy(Rengine re, int which) {
log.info("*** R: callback rBusy(" + which + ")");
}
public void rShowMessage(Rengine re, String message) {
log.info("*** R: callback rShowMessage \"" + message + "\"");
}
public String rReadConsole(Rengine re, String prompt, int addToHistory) {
return null;
}
public String rChooseFile(Rengine re, int newFile) {
return null;
}
public void rFlushConsole(Rengine re) {
}
public void rLoadHistory(Rengine re, String filename) {
}
public void rSaveHistory(Rengine re, String filename) {
}
}
/**
* R Engine initialization is in a different method than *setup* to enable Unit Testing
* @param runMainLoop if set to <code>true</code> the the event loop will be started as soon as possible,
* otherwise no event loop is started. Running loop requires <code>initialCallbacks</code> to be set correspondingly as well.
*/
public void initR(boolean runMainLoop) {
// Call R and perform coefficient computing
// just making sure we have the right version of everything
if (!Rengine.versionCheck()) {
throw new IllegalStateException("*** R: version mismatch - Java files don't match R library version.");
}
// --vanilla Combine --no-save, --no-restore, --no-site-file, --no-init-file and --no-environ
// --slave Make R run as quietly as possible
// for more details run <code>R --help</code> from command line
re = new Rengine(new String[]{"--vanilla", "--slave"}, runMainLoop, new LoggingConsole(log));
// the engine creates R is a new thread, so we should wait until it's ready
if (!re.waitForR()) {
throw new IllegalStateException("*** R: cannot start the engine.");
}
try {
// check if "reshape" package is installed
re.eval("is.installed <- function(mypkg) is.element(mypkg, installed.packages()[,1])");
REXP isInstalled = re.eval("is.installed(\"reshape\")");
if (isInstalled.asBool().isFALSE()) {
log.info("*** R: reshape package is missing. Installing locally.");
// install "reshape" package if this is needed
re.eval("install.packages(\"reshape\", repos=\"http://cran.stat.sfu.ca/\")");
} else {
log.info("*** R: reshape package is installed. Proceeding.");
}
} catch (Exception e) {
log.error("*** Exception during R initialization: ", e);
}
try {
// load "reshape" package
re.eval("library(reshape)");
} catch (Exception e) {
log.error("*** Exception while loading reshape package: ", e);
}
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
initR(false);
}
/**
* method performs R computations
*/
public void performRComputations() {
try {
// clear workspace before new processing round
re.eval("rm(list=ls())");
// perform some useful computations
re.eval("N <- SOMETHING_USEFUL");
} catch (Exception e) {
log.error("*** Exception on R stage: ", e);
}
}
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Writable> values, Context context) throws IOException, InterruptedException {
// aggregate records
for (Writable value : values) {
Result singleResult = (Result) value;
// do something useful with singleResult
// perform R computations
performRComputations();
// place computation results into HBase
// remember to configure Put object properly
Put put = new Put();
context.write(key, put);
}
}
/**
* R Engine closure is in a separate method than *cleanup* to enable Unit Testing
*/
public void cleanR() {
if (re == null) {
return;
}
re.end();
if (!re.waitForR()) {
log.info("*** R: engine is stopped.");
} else {
log.info("*** R: engine turned to zombie.");
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
cleanR();
super.cleanup(context);
}
}

I highly recommend following reading:

[1] rJava/JRI source code repos:
http://www.rforge.net/rJava/svn.html
http://www.rforge.net/JRI/svn.html 

[2] Previous post with working example Java-to-R interaction:
http://mushkevych.blogspot.com/2012/04/r-running-by-java-process.html

[3] Rengine: public Mutex getRsync()
http://www.rosuda.org/r/nightly/javadoc/org/rosuda/JRI/Rengine.html#getRsync()


P.S.
There might be a way to start and maintain an R instance per rJava/JRI client, however I was not lucky to identify it.

P.P.S.
It should be possible to share single R instance between several Java processes/threads, by direct synchronization at JNI level [3].
However, expected performance gains vs. code complexity must be weighted carefully.