Friday, January 11, 2013

HBase: secondary index

As your HBase project moves forward you will likely face a request to search by criteria that is neither included into the primary index nor can be included into it. In other words you will face a problem of fast and efficient search by secondary index. For instance: select all eReaders in a specific price range. In this post, we will review an approach of constructing a secondary index.

As usually, we will work in realm of Surus ORM [1] and Synergy Mapreduce Framework [2], and will start with the definition of a model. For illustration purposes we will use simplified variant of "product" class, that has lowest and highest prices and can only belong to one category. For instance:

ID category priceLowest priceHighest manufacturer
Sony eReader PRST2BC E-READER 8900 12900 SONY

public class Product {
@HRowKey (components = {
@HFieldComponent(name = Constants.ID, length = Constants.LENGTH_STRING_DEFAULT, type = String.class)
})
public byte[] key;
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.CATEGORY)
public String category;
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.PRICE_LOWEST)
public int priceLowest;
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.PRICE_HIGHEST)
public int priceHighest;
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.MANUFACTURER)
public String manufacturer;
public Product() {
}
}
view raw Product.java hosted with ❤ by GitHub

Instances will reside in a table product:
<TableSchema name="product">
<ColumnSchema name="stat" BLOCKCACHE="true" COMPRESSION="snappy" VERSIONS="1" IN_MEMORY="true"/>
</TableSchema>
view raw product.xml hosted with ❤ by GitHub

To satisfy our search requests, we would like to get a following structure:
ID products
Sony eReader PRST2BC Kobo ... ...
E-READER { priceLowest : 89000,
priceHighest: 12900,
manufacturer: SONY}
{ ... } { ... }
Here, any search within a specified category would allow us to quickly filter out products in a specific price range or manufacturer.

To create an index as described above, we would need a new model to hold filtration criterias and a mapreduce job to periodically update it.
Secondary index model:
public class Grouping {
@HRowKey(components = {
@HFieldComponent(name = Constants.TIMEPERIOD, length = Bytes.SIZEOF_INT, type = Integer.class),
@HFieldComponent(name = Constants.CATEGORY, length = Constants.LENGTH_CATEGORY_NAME, type = String.class)
})
public byte[] key;
/**
* format of the storage:
* {product_id : {
* price_highest: int
* price_lowest: int
* manufacturer: String
* }}
*/
@HMapFamily(family = Constants.FAMILY_PRODUCT, keyType = String.class, valueType = Map.class)
@HNestedMap(keyType = String.class, valueType = byte[].class)
public Map<String, Map<String, byte[]>> product = new HashMap<String, Map<String, byte[]>>();
public Grouping() {
}
protected String getStringEntry(String prodId, String key) {
Map<String, byte[]> entry = product.get(prodId);
if (entry == null || !entry.containsKey(key)) {
return null;
}
return Bytes.toString(entry.get(key));
}
protected Integer getIntegerEntry(String prodId, String key) {
Map<String, byte[]> entry = product.get(prodId);
if (entry == null || !entry.containsKey(key)) {
return null;
}
return Bytes.toInt(entry.get(key));
}
protected void setEntry(String prodId, String key, byte[] value) {
Map<String, byte[]> entry = product.get(prodId);
if (entry == null) {
entry = new HashMap<String, byte[]>();
}
entry.put(key, value);
product.put(prodId, entry);
}
public Integer getPriceHighest(String prodId) {
return getIntegerEntry(prodId, Constants.PRICE_HIGHEST);
}
public void setPriceHighest(String prodId, int price) {
setEntry(prodId, Constants.PRICE_HIGHEST, Bytes.toBytes(price));
}
public Integer getPriceLowest(String prodId) {
return getIntegerEntry(prodId, Constants.PRICE_LOWEST);
}
public void setPriceLowest(String prodId, int price) {
setEntry(prodId, Constants.PRICE_LOWEST, Bytes.toBytes(price));
}
public void getManufacturer(String prodId) {
return getEntry(prodId, Constants.MANUFACTURER);
}
public void setManufacturer(String prodId, String manufacturer) {
setEntry(prodId, Constants.MANUFACTURER, Bytes.toBytes(manufacturer));
}
}
view raw Grouping.java hosted with ❤ by GitHub

and its corresponding grouping table:
<TableSchema name="grouping">
<ColumnSchema name="product" BLOCKCACHE="true" COMPRESSION="snappy" VERSIONS="1" IN_MEMORY="true" TTL="604800"/>
</TableSchema>
view raw grouping.xml hosted with ❤ by GitHub

Mapreduce job implies that Job Runner will use product table for source and grouping table for sink. Job's mapper:
// TcPrimaryKey stands for Timeperiod+Category primary key
private TcPrimaryKey pkTc = new TcPrimaryKey();
private EntityService<Product> esProduct = new EntityService<Product>(Product.class);
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
Product product = esProduct.parseResult(value);
ImmutableBytesWritable convertedKey = pkTc.generateKey(timePeriod, product.category);
context.write(convertedKey, value);
}
view raw map.java hosted with ❤ by GitHub
and a reducer:
private EntityService<Product> esProduct = new EntityService<Product>(Product.class);
private EntityService<Grouping> esGrouping = new EntityService<Grouping>(Grouping.class);
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Result> values, Context context) throws IOException, InterruptedException {
Grouping targetDocument = new Grouping();
targetDocument.key = key.get();
for (Result singleResult : values) {
Product sourceDocument = esProduct.parseResult(singleResult);
targetDocument.category = sourceDocument.category;
String prodId = Bytes.toString(key.get());
targetDocument.setPriceHighest(prodId, sourceDocument.priceHighest);
targetDocument.setPriceLowest(prodId, sourceDocument.priceLowest);
targetDocument.setManufacturer(prodId, sourceDocument.manufacturer);
}
try {
Put put = esGrouping.insert(targetDocument);
put.setWriteToWAL(false);
context.write(key, put);
} catch (OutOfMemoryError e) {
// ...
}
}
view raw reduce.java hosted with ❤ by GitHub
As an alternative to secondary index you can use filtering. For instance SingleColumnValueFilter:
public ResultScanner getProductScanner(HTableInterface hTable,
String manufacturer) throws IOException {
ProductPrimaryKey pkProduct = new ProductPrimaryKey();
FilterList flMaster = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (manufacturer != null && !manufacturer.trim().isEmpty()) {
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(Constants.FAMILY_STAT),
Bytes.toBytes(Constants.MANUFACTURER),
CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(manufacturer)));
flMaster.addFilter(filter);
}
Scan scan = new Scan();
scan.setFilter(flMaster);
return hTable.getScanner(scan);
}
However, SingleColumnValueFilter approach is insufficient for large tables and frequent searches. Stretching it too far will cause performance degradation across the cluster.

To sum it up, secondary indexes are not a trivial, but at the same time - not a paramount of complexity. While designing them, one should look carefully for the filtration criteria and "long-term" perspective.

Hopefully this tutorial would serve you with help.
Cheers!

[1] Surus ORM
https://github.com/mushkevych/surus

[2] Synergy Mapreduce Framework
https://github.com/mushkevych/synergy-framework