Thursday, April 19, 2012

R: running by Java process

After trying Multiple Linear Regression in sandbox, let's try some integration.
In this post we will concentrate on how to install and run R from regular Java process; in next post we will plug R into Hadoop mapreduce.

R is programming language and software environment written in C and FORTRAN, so interaction with Java requires JNI layer. It is provided by Java/R Interface project [1] and contains platform-specific .so files.
To prepare environment, we need both R and JRI installed and configured. For Ubuntu these are next two lines:
sudo apt-get install r-base r-recommended r-base-dev
sudo apt-get install r-cran-rjava

For other platforms follow steps from [5] to install R and [6] for JRI.

To reference .so files for Java processes, we need to update LD_LIBRARY_PATH and pass -Djava.library.path to JVM. Feel free to dig a little deeper on configuration reasoning in [2] and [3].
run.sh script in Ubuntu will look like:

#!/bin/sh
R_HOME=/usr/lib64/R
R_SHARE_DIR=/usr/share/R/share
R_INCLUDE_DIR=/usr/share/R/include
JRI_HOME=/usr/lib/R/site-library/rJava/jri
CLASSPATH=.:${JRI_HOME}/JRI.jar:${JRI_HOME}/REngine.jar:${JRI_HOME}/JRIEngine.jar:log4j-1.2.15.jar
LD_LIBRARY_PATH=${R_HOME}/lib:${R_HOME}/bin:${JRI_HOME}/
export R_HOME R_SHARE_DIR R_INCLUDE_DIR CLASSPATH LD_LIBRARY_PATH
JAVA=/usr/lib/jvm/java-6-sun/jre/bin/java
${JAVA} -Djava.library.path=.:${JRI_HOME} -cp ${CLASSPATH} RIntegrationExample
view raw run.sh hosted with ❤ by GitHub

Having environment configured, we can now turn to code:

import org.apache.log4j.Logger;
import org.rosuda.JRI.REXP;
import org.rosuda.JRI.RMainLoopCallbacks;
import org.rosuda.JRI.Rengine;
import java.util.*;
/**
* @author Bohdan Mushkevych
* date Apr 2012
* Description: Example illustrating Java->R integration
*/
public class RIntegrationExample {
private static Logger log = Logger.getLogger(RIntegrationExample.class);
protected Map<Integer, Map<String, Integer>> terms = new HashMap<Integer, Map<String, Integer>>();
protected Map<Integer, Double> value = new HashMap<Integer, Double>();
/**
* static class for JRI Callbacks
*/
static class LoggingConsole implements RMainLoopCallbacks {
private Logger log;
LoggingConsole(Logger log) {
this.log = log;
}
public void rWriteConsole(Rengine re, String text, int oType) {
log.info("*** Logging Console rWriteConsole(" + text + ")");
}
public void rBusy(Rengine re, int which) {
log.info("*** Logging Console rBusy(" + which + ")");
}
public void rShowMessage(Rengine re, String message) {
log.info("*** Logging Console rShowMessage \"" + message + "\"");
}
public String rReadConsole(Rengine re, String prompt, int addToHistory) {
return null;
}
public String rChooseFile(Rengine re, int newFile) {
return null;
}
public void rFlushConsole(Rengine re) {
}
public void rLoadHistory(Rengine re, String filename) {
}
public void rSaveHistory(Rengine re, String filename) {
}
}
public RIntegrationExample() {
Map<String, Integer> term_1 = new HashMap<String, Integer>();
term_1.put("term_1", 10);
term_1.put("term_2", 11);
term_1.put("term_3", 7);
term_1.put("term_4", 8);
term_1.put("term_5", 9);
Map<String, Integer> term_2 = new HashMap<String, Integer>();
term_2.put("term_1", 8);
term_2.put("term_2", 12);
term_2.put("term_5", 10);
Map<String, Integer> term_3 = new HashMap<String, Integer>();
term_3.put("term_3", 5);
term_3.put("term_4", 10);
term_3.put("term_5", 12);
terms.put(20120101, term_1);
terms.put(20120102, term_2);
terms.put(20120103, term_3);
value.put(20120101, 70.5);
value.put(20120102, 50.5);
value.put(20120103, 48.2);
}
/**
* method takes terms map and value map and performs two transformation:
* 1. transforms terms from "long" to "wide" format
* details in: http://mushkevych.blogspot.com/2012/04/r-from-long-to-wide.html
* 2. computes Multiple Linear Regression
* details in: http://mushkevych.blogspot.com/2012/04/r-multiple-linear-regression.html
*
* @param terms in format:
* datetime : {term_1 : value_1, term_2 : value_2, ...}
* @param values in format:
* datetime : {term_1 : value_1, term_2 : value_2, ...}
* @return linear regression's coefficients in format
* {(Intercept) : value, term_1 : value, ..., term_N : value}
*/
public Map<String, Double> calculateCoefficient(Map<Integer, Map<String, Integer>> terms, Map<Integer, Double> values) {
// Call R and perform coefficient computing
// just making sure we have the right version of everything
if (!Rengine.versionCheck()) {
throw new IllegalStateException("*** Version mismatch - Java files don't match R library version.");
}
Rengine re = new Rengine(new String[]{"--no-restore", "--slave", "--no-save"}, false, new LoggingConsole(log));
// the engine creates R is a new thread, so we should wait until it's ready
if (!re.waitForR()) {
throw new IllegalStateException("*** Cannot load R.");
}
int aggregatedLength = 0;
for (Map<String, Integer> value : terms.values()) {
aggregatedLength += value.size();
}
Map<String, Double> result = null;
try {
// declare data frame for terms
re.eval("library(reshape)");
re.eval("N <- " + aggregatedLength);
re.eval("DF <- data.frame(date=rep(NA, N), term=rep(\"\", N), value=rep(NA, N), stringsAsFactors=FALSE)");
int index = 1;
List<Integer> keys = new ArrayList<Integer>(terms.keySet());
Collections.sort(keys);
// fill terms data frame with actual data
for (Integer key : keys) {
Map<String, Integer> map = terms.get(key);
for (Map.Entry<String, Integer> entry : map.entrySet()) {
StringBuilder builder = new StringBuilder();
builder.append("DF[").append(index).append(",]");
builder.append("<-c(").append(key).append(",\"");
builder.append(entry.getKey()).append("\",");
builder.append(entry.getValue()).append(")");
re.eval(builder.toString());
index++;
log.info("debug: " + builder.toString());
}
}
//transform "value" column to numeric type
re.eval("DF <- transform(DF, value = as.numeric(value))");
// declare data frame for value
re.eval("M <- " + values.size());
re.eval("VAL <- data.frame(date=rep(NA, M), value=rep(NA, M), stringsAsFactors=FALSE)");
index = 1;
keys = new ArrayList<Integer>(values.keySet());
Collections.sort(keys);
// fill data frame with
for (Integer key : keys) {
Double value = values.get(key);
StringBuilder builder = new StringBuilder();
builder.append("VAL[").append(index).append(",]");
builder.append("<-c(").append(key).append(",").append(value).append(")");
re.eval(builder.toString());
index++;
log.info("debug: " + builder.toString());
}
re.eval("Y <- VAL$value");
re.eval("wide <- cast(data=DF, formula=date~term, value=\"value\", fun.aggregate=sum, fill=0)");
re.eval("X <- wide[, 2:length(wide)]");
re.eval("X. <- data.matrix(X)");
re.eval("LMR = lm(formula=Y ~ X.)");
re.eval("COEF = coefficients(LMR)");
REXP names = re.eval("names(COEF)");
REXP coef = re.eval("COEF");
result = new HashMap<String, Double>();
String[] arrayNames = names.asStringArray();
double[] arrayCoef = coef.asDoubleArray();
for (int i = 0; i < arrayCoef.length; i++) {
String name = arrayNames[i].startsWith("X.") ? arrayNames[i].substring(2) : arrayNames[i];
result.put(name, arrayCoef[i]);
}
} catch (Exception e) {
log.error("*** Exception on R stage: ", e);
}
return result;
}
public void testCoefficientCalculation() {
Map<String, Double> result = calculateCoefficient(terms, value);
for (Map.Entry<String, Double> entry : result.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}
public static void main(String[] args) {
RIntegrationExample example = new RIntegrationExample();
example.testCoefficientCalculation();
}
}



[1] Java/R Interface
http://www.rforge.net/rJava/

[2] Talking R through Java
http://binfalse.de/2011/02/talking-r-through-java/

[3] java.library.path and LD_LIBRARY_PATH
http://kalblogs.blogspot.ca/2009/01/java.html

[4] How to convert a data frame column to numeric type?
http://stackoverflow.com/questions/2288485/how-to-convert-a-data-frame-column-to-numeric-type

[5] CRAN mirrors: chose your favourite location and follow R installation instruction:
http://cran.stat.sfu.ca/mirrors.html

[6] rJava package on CRAN
http://cran.r-project.org/web/packages/rJava/

No comments: