- R must be installed along with all required libraries on each mapred node in the Hadoop cluster.
- Communication is performed by JNI interface via rJava/JRI interface.
This package must also be installed on each mapred node in the cluster. - Following env variables must be exported (paths are relative to specific environment):
export R_HOME=/usr/lib64/R
JRI_HOME=/usr/lib64/R/site-library/rJava/jri
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${JRI_HOME}/JRI.jar:${JRI_HOME}/JRIEngine.jar:${JRI_HOME}/REngine.jar
export JAVA_LIBRARY_PATH=${JRI_HOME}
You can safely book 1+ days for installation headaches. In my case we had to cook RPMs for almost every R component.
After it is complete, you might want to exercise mapreduce parallelism with R just to find out messages like:
After it is complete, you might want to exercise mapreduce parallelism with R just to find out messages like:
WARNING: org.rosuda.JRI.Mutex was unlocked by other thread than locked! This may soon lead to a crash...
This leads us to primary limitation of the straight-forward rJava/JRI usage (see P.S. for details):
There can be only 1 process/thread accessing R instance per box.
In my circumstances, it was not critical, as I was able to complete computation on a single reducer within several hours... however with ever larger result sets this may be a prohibitive restriction.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.log4j.Logger; | |
import org.rosuda.JRI.REXP; | |
import org.rosuda.JRI.RMainLoopCallbacks; | |
import org.rosuda.JRI.Rengine; | |
import java.io.IOException; | |
/** | |
* @author Bohdan Mushkevych | |
* date: May 2012 | |
*/ | |
public class ExemplaryRReducer extends AbstractTableReducer<ImmutableBytesWritable, Writable> { | |
private static Logger log = Logger.getLogger(ExemplaryRReducer.class); | |
protected Rengine re; | |
static class LoggingConsole implements RMainLoopCallbacks { | |
private Logger log; | |
LoggingConsole(Logger log) { | |
this.log = log; | |
} | |
public void rWriteConsole(Rengine re, String text, int oType) { | |
log.info("*** R: callback rWriteConsole(" + text + ")"); | |
} | |
public void rBusy(Rengine re, int which) { | |
log.info("*** R: callback rBusy(" + which + ")"); | |
} | |
public void rShowMessage(Rengine re, String message) { | |
log.info("*** R: callback rShowMessage \"" + message + "\""); | |
} | |
public String rReadConsole(Rengine re, String prompt, int addToHistory) { | |
return null; | |
} | |
public String rChooseFile(Rengine re, int newFile) { | |
return null; | |
} | |
public void rFlushConsole(Rengine re) { | |
} | |
public void rLoadHistory(Rengine re, String filename) { | |
} | |
public void rSaveHistory(Rengine re, String filename) { | |
} | |
} | |
/** | |
* R Engine initialization is in a different method than *setup* to enable Unit Testing | |
* @param runMainLoop if set to <code>true</code> the the event loop will be started as soon as possible, | |
* otherwise no event loop is started. Running loop requires <code>initialCallbacks</code> to be set correspondingly as well. | |
*/ | |
public void initR(boolean runMainLoop) { | |
// Call R and perform coefficient computing | |
// just making sure we have the right version of everything | |
if (!Rengine.versionCheck()) { | |
throw new IllegalStateException("*** R: version mismatch - Java files don't match R library version."); | |
} | |
// --vanilla Combine --no-save, --no-restore, --no-site-file, --no-init-file and --no-environ | |
// --slave Make R run as quietly as possible | |
// for more details run <code>R --help</code> from command line | |
re = new Rengine(new String[]{"--vanilla", "--slave"}, runMainLoop, new LoggingConsole(log)); | |
// the engine creates R is a new thread, so we should wait until it's ready | |
if (!re.waitForR()) { | |
throw new IllegalStateException("*** R: cannot start the engine."); | |
} | |
try { | |
// check if "reshape" package is installed | |
re.eval("is.installed <- function(mypkg) is.element(mypkg, installed.packages()[,1])"); | |
REXP isInstalled = re.eval("is.installed(\"reshape\")"); | |
if (isInstalled.asBool().isFALSE()) { | |
log.info("*** R: reshape package is missing. Installing locally."); | |
// install "reshape" package if this is needed | |
re.eval("install.packages(\"reshape\", repos=\"http://cran.stat.sfu.ca/\")"); | |
} else { | |
log.info("*** R: reshape package is installed. Proceeding."); | |
} | |
} catch (Exception e) { | |
log.error("*** Exception during R initialization: ", e); | |
} | |
try { | |
// load "reshape" package | |
re.eval("library(reshape)"); | |
} catch (Exception e) { | |
log.error("*** Exception while loading reshape package: ", e); | |
} | |
} | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
super.setup(context); | |
initR(false); | |
} | |
/** | |
* method performs R computations | |
*/ | |
public void performRComputations() { | |
try { | |
// clear workspace before new processing round | |
re.eval("rm(list=ls())"); | |
// perform some useful computations | |
re.eval("N <- SOMETHING_USEFUL"); | |
} catch (Exception e) { | |
log.error("*** Exception on R stage: ", e); | |
} | |
} | |
@Override | |
protected void reduce(ImmutableBytesWritable key, Iterable<Writable> values, Context context) throws IOException, InterruptedException { | |
// aggregate records | |
for (Writable value : values) { | |
Result singleResult = (Result) value; | |
// do something useful with singleResult | |
// perform R computations | |
performRComputations(); | |
// place computation results into HBase | |
// remember to configure Put object properly | |
Put put = new Put(); | |
context.write(key, put); | |
} | |
} | |
/** | |
* R Engine closure is in a separate method than *cleanup* to enable Unit Testing | |
*/ | |
public void cleanR() { | |
if (re == null) { | |
return; | |
} | |
re.end(); | |
if (!re.waitForR()) { | |
log.info("*** R: engine is stopped."); | |
} else { | |
log.info("*** R: engine turned to zombie."); | |
} | |
} | |
@Override | |
protected void cleanup(Context context) throws IOException, InterruptedException { | |
cleanR(); | |
super.cleanup(context); | |
} | |
} |
I highly recommend following reading:
[1] rJava/JRI source code repos:
http://www.rforge.net/rJava/svn.html
http://www.rforge.net/JRI/svn.html
[2] Previous post with working example Java-to-R interaction:
http://mushkevych.blogspot.com/2012/04/r-running-by-java-process.html
[3] Rengine: public Mutex getRsync()
http://www.rosuda.org/r/nightly/javadoc/org/rosuda/JRI/Rengine.html#getRsync()
P.S.
There might be a way to start and maintain an R instance per rJava/JRI client, however I was not lucky to identify it.
P.P.S.
It should be possible to share single R instance between several Java processes/threads, by direct synchronization at JNI level [3].
However, expected performance gains vs. code complexity must be weighted carefully.
No comments:
Post a Comment