As usually, we will work in realm of Surus ORM [1] and Synergy Mapreduce Framework [2], and will start with the definition of a model. For illustration purposes we will use simplified variant of "product" class, that has lowest and highest prices and can only belong to one category. For instance:
ID | category | priceLowest | priceHighest | manufacturer |
Sony eReader PRST2BC | E-READER | 8900 | 12900 | SONY |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Product { | |
@HRowKey (components = { | |
@HFieldComponent(name = Constants.ID, length = Constants.LENGTH_STRING_DEFAULT, type = String.class) | |
}) | |
public byte[] key; | |
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.CATEGORY) | |
public String category; | |
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.PRICE_LOWEST) | |
public int priceLowest; | |
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.PRICE_HIGHEST) | |
public int priceHighest; | |
@HProperty(family = Constants.FAMILY_STAT, identifier = Constants.MANUFACTURER) | |
public String manufacturer; | |
public Product() { | |
} | |
} |
Instances will reside in a table product:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<TableSchema name="product"> | |
<ColumnSchema name="stat" BLOCKCACHE="true" COMPRESSION="snappy" VERSIONS="1" IN_MEMORY="true"/> | |
</TableSchema> |
To satisfy our search requests, we would like to get a following structure:
ID | products | ||
Sony eReader PRST2BC | Kobo ... | ... | |
E-READER | { priceLowest : 89000, priceHighest: 12900, manufacturer: SONY} |
{ ... } | { ... } |
To create an index as described above, we would need a new model to hold filtration criterias and a mapreduce job to periodically update it.
Secondary index model:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Grouping { | |
@HRowKey(components = { | |
@HFieldComponent(name = Constants.TIMEPERIOD, length = Bytes.SIZEOF_INT, type = Integer.class), | |
@HFieldComponent(name = Constants.CATEGORY, length = Constants.LENGTH_CATEGORY_NAME, type = String.class) | |
}) | |
public byte[] key; | |
/** | |
* format of the storage: | |
* {product_id : { | |
* price_highest: int | |
* price_lowest: int | |
* manufacturer: String | |
* }} | |
*/ | |
@HMapFamily(family = Constants.FAMILY_PRODUCT, keyType = String.class, valueType = Map.class) | |
@HNestedMap(keyType = String.class, valueType = byte[].class) | |
public Map<String, Map<String, byte[]>> product = new HashMap<String, Map<String, byte[]>>(); | |
public Grouping() { | |
} | |
protected String getStringEntry(String prodId, String key) { | |
Map<String, byte[]> entry = product.get(prodId); | |
if (entry == null || !entry.containsKey(key)) { | |
return null; | |
} | |
return Bytes.toString(entry.get(key)); | |
} | |
protected Integer getIntegerEntry(String prodId, String key) { | |
Map<String, byte[]> entry = product.get(prodId); | |
if (entry == null || !entry.containsKey(key)) { | |
return null; | |
} | |
return Bytes.toInt(entry.get(key)); | |
} | |
protected void setEntry(String prodId, String key, byte[] value) { | |
Map<String, byte[]> entry = product.get(prodId); | |
if (entry == null) { | |
entry = new HashMap<String, byte[]>(); | |
} | |
entry.put(key, value); | |
product.put(prodId, entry); | |
} | |
public Integer getPriceHighest(String prodId) { | |
return getIntegerEntry(prodId, Constants.PRICE_HIGHEST); | |
} | |
public void setPriceHighest(String prodId, int price) { | |
setEntry(prodId, Constants.PRICE_HIGHEST, Bytes.toBytes(price)); | |
} | |
public Integer getPriceLowest(String prodId) { | |
return getIntegerEntry(prodId, Constants.PRICE_LOWEST); | |
} | |
public void setPriceLowest(String prodId, int price) { | |
setEntry(prodId, Constants.PRICE_LOWEST, Bytes.toBytes(price)); | |
} | |
public void getManufacturer(String prodId) { | |
return getEntry(prodId, Constants.MANUFACTURER); | |
} | |
public void setManufacturer(String prodId, String manufacturer) { | |
setEntry(prodId, Constants.MANUFACTURER, Bytes.toBytes(manufacturer)); | |
} | |
} |
and its corresponding grouping table:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<TableSchema name="grouping"> | |
<ColumnSchema name="product" BLOCKCACHE="true" COMPRESSION="snappy" VERSIONS="1" IN_MEMORY="true" TTL="604800"/> | |
</TableSchema> |
Mapreduce job implies that Job Runner will use product table for source and grouping table for sink. Job's mapper:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// TcPrimaryKey stands for Timeperiod+Category primary key | |
private TcPrimaryKey pkTc = new TcPrimaryKey(); | |
private EntityService<Product> esProduct = new EntityService<Product>(Product.class); | |
@Override | |
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { | |
Product product = esProduct.parseResult(value); | |
ImmutableBytesWritable convertedKey = pkTc.generateKey(timePeriod, product.category); | |
context.write(convertedKey, value); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private EntityService<Product> esProduct = new EntityService<Product>(Product.class); | |
private EntityService<Grouping> esGrouping = new EntityService<Grouping>(Grouping.class); | |
@Override | |
protected void reduce(ImmutableBytesWritable key, Iterable<Result> values, Context context) throws IOException, InterruptedException { | |
Grouping targetDocument = new Grouping(); | |
targetDocument.key = key.get(); | |
for (Result singleResult : values) { | |
Product sourceDocument = esProduct.parseResult(singleResult); | |
targetDocument.category = sourceDocument.category; | |
String prodId = Bytes.toString(key.get()); | |
targetDocument.setPriceHighest(prodId, sourceDocument.priceHighest); | |
targetDocument.setPriceLowest(prodId, sourceDocument.priceLowest); | |
targetDocument.setManufacturer(prodId, sourceDocument.manufacturer); | |
} | |
try { | |
Put put = esGrouping.insert(targetDocument); | |
put.setWriteToWAL(false); | |
context.write(key, put); | |
} catch (OutOfMemoryError e) { | |
// ... | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public ResultScanner getProductScanner(HTableInterface hTable, | |
String manufacturer) throws IOException { | |
ProductPrimaryKey pkProduct = new ProductPrimaryKey(); | |
FilterList flMaster = new FilterList(FilterList.Operator.MUST_PASS_ALL); | |
if (manufacturer != null && !manufacturer.trim().isEmpty()) { | |
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(Constants.FAMILY_STAT), | |
Bytes.toBytes(Constants.MANUFACTURER), | |
CompareFilter.CompareOp.EQUAL, | |
new BinaryComparator(Bytes.toBytes(manufacturer))); | |
flMaster.addFilter(filter); | |
} | |
Scan scan = new Scan(); | |
scan.setFilter(flMaster); | |
return hTable.getScanner(scan); | |
} |
To sum it up, secondary indexes are not a trivial, but at the same time - not a paramount of complexity. While designing them, one should look carefully for the filtration criteria and "long-term" perspective.
Hopefully this tutorial would serve you with help.
Cheers!
[1] Surus ORM
https://github.com/mushkevych/surus
[2] Synergy Mapreduce Framework
https://github.com/mushkevych/synergy-framework