First question that comes to mind - why bother - wasn't it done by forefathers and known to us as sqoop? True, but I have two excuses:
- I wanted to pre-process data from PostgresSql before placing it into HBase. With sqoop, a set of additional map/reducers would be required
- Input data was in multiple csv dumps, so with straight-forward sqoop I needed set of map/reducers for every of them
By trying to avoid as much of additional work as possibly, I first came with idea of omnivore REST interface (hey, its only 3GB). However, even with thread-pooling and connection-pooling the most I squeezed out of it was about ~130 commits per second (totalling 150-200 MB per day).
At this point two things became apparent:
- Even with 3GB, import must be moved to Hadoop cluster
- It has to be either map/reduce or direct HBase tunnel
And so was it born - CSVImporter. Server-Worker design, based on Surus[1] with thread-pooling, connection-pooling and write-buffer. It also uses supercsv [2].
Performance: 403 MB per hour (largest 2.8 GB CSV dump was imported within 7 hours 11 minutes).
Performance: 403 MB per hour (largest 2.8 GB CSV dump was imported within 7 hours 11 minutes).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.reinvent.synergy.data.csvimport; | |
import com.reinvent.synergy.data.model.Constants; | |
import com.reinvent.synergy.data.model.UserLog; | |
import com.reinvent.synergy.data.system.PoolManager; | |
import com.reinvent.synergy.data.system.TableContext; | |
import com.reinvent.synergy.data.system.TimePeriodHelper; | |
import org.apache.log4j.Logger; | |
import org.apache.log4j.PropertyConfigurator; | |
import org.supercsv.io.CsvMapReader; | |
import org.supercsv.io.ICsvMapReader; | |
import org.supercsv.prefs.CsvPreference; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.util.Map; | |
import java.util.concurrent.*; | |
/** | |
* @author Bohdan Mushkevych | |
* date: 22 Mar 2012 | |
* Description: Method opens CSV file and streams it to workers for processing | |
*/ | |
public class CsvImportServer<T> extends Thread { | |
public static final String PROPERTY_LOG4J = "log4j.configuration"; | |
public static final int NUMBER_OF_CONCURRENT_THREADS = 5; | |
protected PoolManager<T> poolManager; | |
protected Logger logger; | |
protected String filePath; | |
protected String tableName; | |
protected String familyName; | |
protected Class<T> clazzDataModel; | |
protected BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<Runnable>(NUMBER_OF_CONCURRENT_THREADS * 11); | |
protected ExecutorService threadPool = new ThreadPoolExecutor(NUMBER_OF_CONCURRENT_THREADS, | |
NUMBER_OF_CONCURRENT_THREADS, | |
0L, | |
TimeUnit.MILLISECONDS, | |
blockingQueue); | |
static { | |
PropertyConfigurator.configure(System.getProperty(PROPERTY_LOG4J)); | |
} | |
public CsvImportServer(String filePath, String familyName, Class<T> clazzDataModel, String tableName) { | |
this.filePath = filePath; | |
this.familyName = familyName; | |
this.clazzDataModel = clazzDataModel; | |
this.tableName = tableName; | |
this.poolManager = TableContext.getPoolManager(tableName); | |
logger = Logger.getLogger(this.clazzDataModel.getSimpleName()); | |
logger.info(String.format("Started Synergy CVS Importer from %s for %s:%s", this.filePath, this.tableName, this.familyName)); | |
} | |
/** | |
* Algorithm: | |
* - Open CSV file and read CSV maps into thread-pool queue. | |
* - Queue must not be overloaded, as it causes OOM exceptions and degrades performance | |
*/ | |
public void run() { | |
Map<String, String> map; | |
ICsvMapReader reader = null; | |
try { | |
reader = new CsvMapReader(new FileReader(this.filePath), CsvPreference.STANDARD_PREFERENCE); | |
String[] header = reader.getCSVHeader(true); | |
map = reader.read(header); | |
while (map != null) { | |
try { | |
while (blockingQueue.remainingCapacity() < NUMBER_OF_CONCURRENT_THREADS) { | |
Thread.yield(); | |
} | |
threadPool.submit(new CsvImportWorker(map, familyName, poolManager)); | |
map = reader.read(header); | |
} catch (Exception e) { | |
logger.error("Exception at worker level", e); | |
} | |
} | |
logger.info(String.format("Imported %d events %n", reader.getLineNumber())); | |
shutdownAndFlush(); | |
} catch (IOException e) { | |
logger.error("Server side exception.", e); | |
} catch (Exception e) { | |
logger.error("Unexpected server side exception.", e); | |
} finally { | |
try { | |
if (reader != null) { | |
reader.close(); | |
} | |
} catch (IOException e) { | |
logger.error("Exception on closing CSV reader.", e); | |
} | |
} | |
} | |
/** | |
* Since we use both thread-pooling and resource-pooling, we must make sure they are properly disposed | |
* Thread-pooling is given 5 minutes for normal shut-down, while resource-pooling is default to 30 secs | |
* @throws InterruptedException the | |
*/ | |
void shutdownAndFlush() throws InterruptedException { | |
threadPool.shutdown(); // Disable new tasks from being submitted | |
try { | |
// Wait a while for existing tasks to terminate | |
if (!threadPool.awaitTermination(300, TimeUnit.SECONDS)) { | |
threadPool.shutdownNow(); // Cancel currently executing tasks | |
// Wait a while for tasks to respond to being cancelled | |
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { | |
logger.error("Thread Pool did not terminate"); | |
} | |
} | |
} catch (InterruptedException ie) { | |
// (Re-)Cancel if current thread also interrupted | |
threadPool.shutdownNow(); | |
logger.error("Interrupted Exception on Thread Pool closing. Re-shutting down."); | |
} | |
poolManager.flushTable(); | |
logger.info("HBase flush requested. Waiting to safely persist datum"); | |
while (poolManager.isFlushRequested()) { | |
Thread.yield(); | |
poolManager.flushTable(); | |
} | |
logger.info("HBase flush completed. Exiting"); | |
} | |
public static void main(String args[]) throws Exception { | |
if (args.length != 2) { | |
System.out.println("Wrong number of parameters. Should be csv_file_path column_name"); | |
return; | |
} | |
CsvImportServer<UserLog> server = new CsvImportServer<UserLog>(args[0], args[1], UserLog.class, Constants.TABLE_USER_LOG); | |
server.start(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.reinvent.synergy.data.csvimport; | |
import com.reinvent.synergy.data.model.Constants; | |
import com.reinvent.synergy.data.rest.RequestHandler; | |
import com.reinvent.synergy.data.rest.UserLogPostHandler; | |
import com.reinvent.synergy.data.system.PoolManager; | |
import org.apache.hadoop.hbase.client.HTable; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.log4j.Logger; | |
import java.util.Map; | |
/** | |
* @author Bohdan Mushkevych | |
* date 21 Mar 2012 | |
* Description: receives CSV mapping from Server and converts it into HBase Put | |
*/ | |
public class CsvImportWorker implements Runnable { | |
protected static Logger logger = Logger.getLogger(CsvImportWorker.class.getSimpleName()); | |
protected PoolManager poolManager; | |
protected Map<String, String> map; | |
public CsvImportWorker(Map<String, String> map, String strColumnFamily, PoolManager poolManager) { | |
this.map = map; | |
this.poolManager = poolManager; | |
} | |
public void run() { | |
HTable hTable = null; | |
try { | |
hTable = poolManager.getTable(); | |
if (map != null) { | |
Integer userId = Integer.valueOf(map.get(RequestHandler.PARAMETER_USER_ID)); | |
String timestamp = map.get(RequestHandler.PARAMETER_CREATED_AT); | |
Put putA = new Put(Bytes.toBytes(userId)); | |
// *** | |
// SOMETHING VERY USEFUL HERE | |
// *** | |
hTable.put(putA); | |
} | |
} catch (Exception e) { | |
logger.error("Unexpected error during processing"); | |
} finally { | |
if (hTable != null) { | |
poolManager.putTable(hTable); | |
} | |
} | |
} | |
} |
[1] Surus at guthub
https://github.com/mushkevych/surus
[2] Supercsv
http://supercsv.sourceforge.net/codeExamples_general.html
No comments:
Post a Comment