Saturday, April 21, 2018

mdal: model driven ai language

AI, as any other technology, matures in phases. It is a field of active study, and like COBOL or FORTRAN from 60 years ago, AI techniques today are mostly exercised by a small cohort of people (22,000 in a recent estimate [1]), but will proliferate masses (18,200,000 as of 2013 [2]) once the period of breakthroughs are over.

In this post I want to experiment with nouns, verbs and syntax structures we can expect one day in high-level declarative AI languages akin to Apache Pig or SDPL [3] for data processing today.

Most applications of AI/ML produce models. In very simplistic terms, these are numbers (weights) assigned to every node in the neural network, or numbers (weights) for regression formulas. Models also have parameters. Among them - number of inputs for neural network or features for regression.

Let's use verb LOAD to load a model and verb STORE to persist it.

For most Deep Neural Networks, models are expensive to train, and will likely be re-used as open source libs are today.
Let's use verb EXTEND for declaring new derived models and REUSE # LAYERS construct to identify depth of model reuse. Verb TRAIN will commence model training and preposition WITH will point to the data repository.

Just like TensorFlow today abstracts the computing devices [4], and like Flow [5] abstracts action from the execution cluster, we should expect our declarative language to abstract training and execution. Hence, lets add --engine to the command-line arguments.

Two nouns to complete the picture: INPUT and OUTPUT. Both need detailization: SCALE FOR to match input data to AI/ML model requirements and IN FORMAT to define data format.

In summary, let's scribble an AI program in our hypothetical MDAL language:


Cheers!

[1] https://www.bloomberg.com/news/articles/2018-02-07/just-how-shallow-is-the-artificial-intelligence-talent-pool

[2] https://www.computerworld.com/article/2483690/it-careers/india-to-overtake-u-s--on-number-of-developers-by-2017.html

[3] https://github.com/mushkevych/sdpl

[4] TensorFlow dynamic placer
http://download.tensorflow.org/paper/whitepaper2015.pdf

[5] https://github.com/mushkevych/synergy_flow/

Wednesday, November 02, 2016

sdpl: schema driven processing language

Practically every company I worked for had a "meta-information" reader that was altering the workflow behavior: changing the data source, wrapping certain columns with UDF or passing performance tunings to the underlying engine.

The system was further expanding by injecting source code header/footer, primary key management, etc. Afterwards, it was quickly reaching its limits as every new requirement required reopening the rigid core and every data/database schema change was triggering cascading updates: from the initial input, thru all JOIN, and GROUP BY and causing costly and elusive bugs.

SDPL is not trying to fix all of that. For one, it is not a workflow engine and has nothing to do with the performance tunings. However, it was written to address one particular issue - introduce schema manipulation and schema versioning to existing tools, such as Pig or Spark or Hive.

SDPL stands for Schema Driven Processing Language. It introduces data schemas to Apache Pig and provides most common use cases: LOAD DATA, STORE DATA, JOIN BY, GROUP BY and FILTER BY. Goodies, such as CUBE, could be written as quotations. The snippet below illustrates how to load data from a given data repository, associate it with a schema, join, project and store:

Schema is described in a YAML file. For instance:

Data repository is a YAML file representing a connection string to your data storage - whether it is a DB, an S3 bucket or a local filesystem. For instance:

SDPL compiles into Apache Pig. Spark and Hive are planned. Typical schema operations are shown below. For example, for given two schemas:

we can compute field addition and removal:

Should you have a sizable body of existing code, you could benefit from SDPL by loading schema, implicit projections and explicit EXPAND:

Schema versioning is performed in a minimalistic way - every field of the schema is declared with a version attribute. While loading the schema, all fields with field.version > ${VERSION} are skipped. Thus, you can reuse the same schema by multiple scripts supporting different versions of the same data.

SDPL is available under BSD 3-Clause License from Github [1] and Pypi [2]. It uses Python3 and ANTLR4 under the hood.
Give it a try!

Cheers!


[1] https://github.com/mushkevych/sdpl

[2] https://pypi.python.org/pypi/sdpl

Saturday, September 10, 2016

Synergy Scheduler with workflows

As of release 1.18, Synergy Scheduler [1] comes with a workflow engine - Synergy Flow [2]. During its design I was thinking about a system an engineer like myself might want to use.

First of all, Synergy Flow follows "write once, run everywhere" principle, where "write" applies to the workflow definition and "run" to the execution cluster. For instance, Synergy Flow introduces set of environment-agnostic filesystem actions - declared once they run equally well on developer's desktop and EMR cluster.

To achieve this, Synergy Flow abstracts on the cluster and its filesystem. While it sounds as a Goliath task, it only takes 6 methods to add a new type of execution cluster, and 7 methods for a new filesystem.
Currently local, EMR and Qubole clusters are supported; as well as local and S3 filesystems;
In fact, most of mentioned methods are pretty trivial - cp, launch, terminate, etc.

And finally, I wanted to avoid skein of hundreds or even thousands of small steps that are difficult and sometimes impossible to inspect without specialized tools as [3]. To address this issue, Synergy Flow introduces concepts of Action and Step. Step is an atomic element of the workflow. It contains three categories of actions: list of pre-actions, a single main action and a list of post-actions.
Idea is simple:
  • Each step is centered on the main action - whether it is a Pig query, a Spark script or any other process
  • Pre-actions are actions that have to be completed before the main action could take place
    For instance: an RDBMS export and further data cleansing, FTP download, etc
  • Post-actions are actions that have to completed after the main action
    For instance: RDBMS import, FTP upload, etc
Synergy Flow is fully integrated with the Synergy Scheduler: job life-cycle, run modes of the workflow, run-time UI dashboard.
You can define how you want the system to behave in case of a failure - to retry the Step from the beginning, or to continue from the last known successful action.
BTW: Synergy Flow allows you to execute the workflow on multiple concurrent EMR clusters, hence, eliminating the concurrency bottleneck.

Give it a try!

Cheers!

[1] Synergy Scheduler
https://github.com/mushkevych/scheduler

[2] Synergy Flow
https://github.com/mushkevych/synergy_flow

[3] Cytoscape
http://www.cytoscape.org/

Thursday, November 12, 2015

Synergy Scheduler

Synergy Scheduler started as a side-project to orchestrate data pipeline for a large project called... well, Synergy :) In late 2011 Scheduler got its first UI, a github repository, and my belief is that its two most attractive features were a custom db driver and a pythonic repeating timer.

Built by an engineer for engineers, Scheduler was rough and few outsiders were willing to like it. Every new deployment required anywhere from 30% to 50% of the codebase revisiting; features were rarely portable; maintenance was a nightmare.
It became clear that if Scheduler was to survive, it needed a significant overhaul to make it much simpler to operate and at least an order of magnitude easier to deploy and maintain.

An active development started in 2013 and for the next 18 months every release was very much backward-incompatible. While it was hardly seen as a news, the trend to productize Scheduler was positive. Below are the milestones:
  • v1.0 generalization! Gone were the days of total re-factoring - now you only needed to re-visit 25% of the codebase
  • v1.2 support for external configuration allowed Scheduler deployment as a Python .egg
  • v1.3 first version available via pypi
  • v1.7 configuration shrank from 7 to 2 files; introduction of the object-to-document mapping; new tiled UI
  • v1.11 garbage collector re-factoring; UI responsiveness
  • v1.15 Python3 compatibility
It is an interesting exercise to observe Scheduler's evolution thru the prism of its principal architectural decisions:
  • message queue: initially used for one-way communication with its subsidiaries, it ultimately became a two-way information highway keeping Scheduler up to date
  • document-based persistence: allowed quick development iterations, while keeping schema's maintenance cost low
  • notion of the timeperiod and the distinct life cycles for jobs and tasks (unit of work in SS vocabulary) allowed to build lean timetable and simple garbage collector
Parsing years of commits, I see how lucky I am as an engineer to have the Synergy Scheduler. It served as a test bed for many major techniques and approaches: multi-process deployment, multi-threaded core, message-driven communication bus, heap-based garbage collector, job trees, AJAX and template-driven front-end, etc.

Currently, Synergy Scheduler scores 14400 lines of code, 1600 lines of comments, 116 unit tests, and has two spin-offs:
  • launch.py: a toolset to deploy, launch and test Python projects
  • synergy odm: object-to-document mapping
While demanding and often difficult, this was and still is a rewarding journey. After going thru all the hurdles of the open-source release cycles, I came to appreciate the work that many of us take for granted, and truly admire the depth, breadth and high standard of the open-source community.

Cheers!

[1] Synergy Scheduler at github: https://github.com/mushkevych/scheduler

[2] launch.py at github: https://github.com/mushkevych/launch.py

[3] Synergy ODM at github: https://github.com/mushkevych/synergy_odm

Sunday, February 16, 2014

CSV logging in Log4J2

Full-featured CSV logging should satisfy following requirements:
  1. Dynamic CSV file header
    i.e. any CSV header change triggers automatic file rotation, and a new filename
  2. Usage of structured messages
    i.e. rather than a simple String object, message is presented as a Map<String, String>, where every key-value pair maps CSV column name and its value
  3. Dynamic mapping between message fields and the CSV header
    i.e. message carries superfluous information and it's up to the runtime configuration (.xml file) to select subset being recorded in the .log file
  4. Dynamic routing
    i.e. messages are routed dynamically to its destination file base on the value of one or more fields of the structured message
To be honest, there is still some way for Log4J2 to implement fully-featured CSV logging. As of the time of writing:
  • #2 is implemented in full
  • #3 and #4 are implemented partially 
  • #1 has been requested and documented in [2] and [3]
Feature absence, however, is not stopping Product Management from coming up with new requirements, and we will review what can be done today.

Log4J2 has two great features - EventLogger and StructuredDataMessage. Official documentation is available at [1], but in short - it gives us ability to:
  • Perform dynamic routing base on the message.type field
  • Perform logging in transactional context:
    i.e. we will accumulate messages during the transaction and flush them to the log file only if transaction is considered successful
First, our exemplary EventLogger transactional wrapper looks like:
Secondly, the log4j2.xml file refers to message.type for routing (see the in-line comments for explanation):
Finally, the workaround for CSV header is to store it in another file with the same message.type routing key, and later on use this separate .header file during CSV processing. For instance in example below message.type is the "tableName":

Cheers!


[1] Event logging in Log4J2:
http://logging.apache.org/log4j/2.x/manual/eventlogging.html

[2] RollingFile Appender - callbacks when rolling
https://issues.apache.org/jira/browse/LOG4J2-491

[3] Allow Layout header and footer to be configured in files and to contain properties
https://issues.apache.org/jira/browse/LOG4J2-496

Wednesday, December 04, 2013

Docker for Hadoop... or how to build, init and run a Hadoop Cluster in minutes

It literally takes seconds to start a Docker container with pseudo-distributed Hadoop cluster. Most of the credits go, of course, to the docker script below and a bunch of configuration files... but let's not outmanoeuvre ourselves and start slowly :)

In short: Docker is a lightweight container that allows you to run your process(es) in a complete isolation from the rest of the system. Almost like a Virtual Machine but faster and lighter.

In this post we will review the Docker skeleton to build, init and run Hadoop (HDFS, Yarn, HBase, Pig) in a Pseudo-Distributed mode. Let's start with the project's filesystem tree structure:


Here, we have three main categories:
  • Hadoop configuration files (found in hadoop, hbase, zookeeper, pig folders)
  • Docker scripts: Dockerfile, local_env.sh, build.sh and run.sh
  • Hadoop util scripts (found in etc, root_scripts directories and build.sh, run.sh scripts)
Hadoop configuration files and util scripts could be copied from my Github [1]. Tiny docker-helper scripts are as follows:

Now, with the foreplay complete, let's see the Dockerfile itself:


This docker instance is based on Ubuntu 12.04 (Precise Pangolin) and covers all required components: Oracle JDK, Hadoop+Ecosystem, basic system utils. Installation instructions are as follows:
  1. Pre-configure local environment:
    $> ./local_env.sh
     
  2. Build the container (it will take a minute or two):
    $> ./build.sh
     
  3. Run the container:
    $> ./run.sh
  4. Once in the container - emulate login (and hence - reads env variables):
    #> su -
  5. HDFS Initialization (once only):
    #> ./hdfs_format.sh
    #> ./hadoop_pseudo_start.sh
    #> ./hdfs_init.sh
     
  6. Restart the cluster to finalize initialization:
    #> ./hadoop_pseudo_stop.sh
    #> ./clear_hadoop_logs.sh
    #> ./hadoop_pseudo_start.sh
  7. Enjoy your cluster:
    #> hdfs dfs -ls -R /
    #> hbase shell
            status 'simple'
    #> pig
By default, container's filesystem state is reset at each run. In other words - all your data is gone the moment you exit the container. Natural solution to this issue is move HDFS mount point and few other folders outside of the container:

Host OS FilesystemContainer FilesystemDescription
/var/hstation/dfs/dfsFolder hosts HDFS filesystem
/var/hstation/workspace/workspaceFolder to exchange data to/from container
/var/hstation/logs/logsContains Hadoop/HBase/Zookeeper/Pig logs

We are also exposing HTTP ports, that allow us to connect to the Hadoop processes inside the container:

Exposed Container PortsDescription
http://CONTAINER_IP:8088/clusterResource Manager
http://CONTAINER_IP:19888/jobhistoryJob History
http://CONTAINER_IP:50070HDFS Name Node
http://CONTAINER_IP:60010HBase Master
http://CONTAINER_IP:8042/nodeYarn Node Manager

In the table above, CONTAINER_IP is found by running following command in your container:
#> domainname -i


To sum things up, container build time will take about 10 minutes and another 2-3 minutes to start and init the container for the first time. From that moment on - it's literally seconds before your Hadoop sandbox is ready to crunch the data.

Cheers!

[1] https://github.com/mushkevych/configurations/tree/master/CDH4.pseudo-distributed

Tuesday, August 20, 2013

Puppet+Hiera+Jenkins. Jenkins Integration

Continuation. By now we should have a working bundle of Puppet and Hiera, and in this post we will focus on Jenkins integration, as well as management of the apps configuration.

First, let's recall that roles contain class {'configuration':} declaration. This tiny module allows us to keep a current copy of the apps configuration on the target nodes.
Module's file tree looks as follow:


The manifest file itself (init.pp) is as follows:


It can be summarized as procedure that base on ${environment} name and ${application} name copies configuration files from /biz/puppet/hieradata/${environment}/${application} on Puppet Master to /biz/configuration/${application} on Puppet Agent node.

It is important that destination point has no reference of the ${environment}. This later allows Jenkins to perform configuration update in a complete agnostic way - with only ${application} name being required.

Consider the file tree under files folder. It is grouped by environment and application name. Should you like to add a new application, let's say HelloWorld to the CI env, you would have to:
  1. Make sure that folder /biz/puppet/hieradata/ci exist
  2. Create new subfolder /biz/puppet/hieradata/ci/HelloWorld
  3. Place HelloWorld configuration files into /biz/puppet/hieradata/ci/HelloWorld 
  4. Create/update Jenkins script to take the application configuration files from /biz/configuration/${application}
  5. Create/update a role to reference the HelloWorld:


The flow can be illustrated as:
Fig 1: Deployment process

Last, but not least - let's describe development process on Puppet+Hiera+Jenkins govern cluster:

Fig 2: Development process
Fig 2 illustrates that the framework significantly fast-tracks configuration changes. In summary, we have shown a configuration management framework targeted for small-to-medium sized project, where you are comfortable with file-based configuration for your applications.

Cheers!

Puppet+Hiera+Jenkins. Custom Modules

Continuation. In this second post of the series we will focus on custom modules for the Puppet+Hiera+Jenkins framework. For the sake of brevity we will illustrate our efforts at an example of a single QA environment.

As it is defined in puppet.conf, site.pp file could be found at /biz/puppet/hieradata/${environment}. In terms of QA environment it is /biz/puppet/hieradata/qa:


Here, manifest (aka site.pp) assigns a single role to every node in the cluster. Nodes are identified by the full domain name. Let's assume that:
  • Puppet Master lives at ip-10-0-8-10.sampup.com
  • ip-10-0-8-11.sampup.com and ip-10-0-8-12.sampup.com are Puppet Agents. 
Roles are defined in /biz/puppet/modules/role module:


You might be asking what class {'configuration':} is about? This where actual application configuration is delivered to agent nodes, and we will address this module in the last post of the series.

Corresponding profiles are defined in /biz/puppet/modules/profile module:


Lets discuss Hiera configuration. There is a lot of reading available [1], however we can summarize following:
  1. Hiera data files are named after node's fully qualified domain name
    For instance: ip-10-0-8-12.sampup.com.json
  2. Files contain key-values pairs
    For instance: "ntp::restrict" : true
  3. Each key comprise of the module name and the property name, joined by ::
    For instance, in "ntp::restrict" : true - ntp is the module name
    - restrict is the property name
    - true is the value
  4. Declared key-values are applied during invocation of the parametrized Puppet classes
  5. In case Hiera finds no filename matching the node's domain name, it will first look in common.json
    Puppet will resolve to the default module parameters, should it find no relevant key-values pairs.
/biz/puppet/hieradata/qa/common.json


/biz/puppet/hieradata/qa/ip-10-0-8-12.sampup.com.json


Continue with Jenkins and application configuration.

[1] Hiera reading
http://docs.puppetlabs.com/hiera/1/puppet.html#hiera-lookup-functions

Puppet+Hiera+Jenkins. Concepts

Wouldn't it be great if we could transfer large part of the application configuration from Development to Ops? For instance, typical Java EE application hold tons of external references (such as DB Url, User, Password, Security Certificates, etc). Typical configuration change often requires hours and distracts developers, build engineers and QA. Lots of waste for trivial change.

In this series of three posts we will show a configuration management framework, build of Puppet, Hiera and Jenkins that supports multiple environments:
  • In this post we will review concepts and Puppet's configuration files
  • Second post will be devoted to custom modules and interaction with Hiera
  • Final, third post will focus on Jenkins integration
 Our target environment could be pictured as following:
Fig 1: Deployment schema for two environments

We will build on top of practices [1], and I strongly advice to read that article before proceeding further.

Conceptually, the workflow consist of the following steps:
1. Platform setup: during this step system interfaces and services of the nodes are configured (networking, file-system permissions, users and groups creations, rdbms, http servers, etc)
Fig 2: Puppet workflow


2.  Application setup: during this step business applications are installed and configured on the nodes
Fig 3: Jenkins workflow

Our skeleton's file structure will look as follow:


Three folders form a top-level hierarchy:
  • /etc/puppet soon after puppet package is installed this folder contains standard puppet modules
    In addition, we will place there two configuration files:
    - puppet.conf describing puppet search paths and config
    - hiera.yaml describing Hiera search paths and config
  • /biz/puppet/modules contains all of the custom modules
  • /biz/puppet/hieradata holds Hiera data files grouped by environment
Lets review puppet configuration files from /etc/puppet

puppet.conf

This file should be normally divided into two: one with [main] and [master] sections deployed at Puppet Master node and another with [main] and [agent] sections deployed to Puppet Agent nodes.
puppet.conf provides wide spectrum of settings [3], but here we define only most critical ones, such as:
  • modulepath defines path to the custom modules
  • manifest defines where site.pp files are located
    Note that in our case site.pp files are environment-specific. For instance site.pp for QA environment is stored in /biz/puppet/hieradata/qa/ folder
  • server identifies the Puppet Master domain name
  • environment defines environment of the Agent node. For instance: qa, ci, etc
    Details on the environment setting could be found at [2]

hiera.yaml
Above, we declared that configuration files for particular nodes in the cluster will be in JSON format, grouped by environment and identified by the full domain name.
Note that the attribute -common under :hierarchy: denotes common settings for the environment and in our case refers to the /biz/puppet/hieradata/${environment}/common.json file.

Continue with custom modules.

[1] Craig Dunn: Designing Puppet – Roles and Profiles
http://www.craigdunn.org/2012/05/239/

[2] Defining environment in Puppet/Hiera
http://docs.puppetlabs.com/guides/environment.html

[3] Puppet.conf in details
http://docs.puppetlabs.com/references/latest/configuration.html

Friday, July 26, 2013

OrderedSet for Python 2.7

While working on the Kosaraju algorithm for Coursera course [1] homework, I wrote a simple and (hopefully) convenient OrderedSet for Python 2.7

Feel free to use and share it:

And just in case you are looking for unit tests or simple usage examples, please refer to the gist below:

Cheers!

[1] Algorithms: Design and Analysis, Part 1
https://class.coursera.org/algo-004/class/index

Saturday, May 18, 2013

launch.py

Good news is - you don't have to write any extra code to run your Python application in a separate process (so called daemonization). All you need is - fully specified name of the method/function to execute. Something like:

    some_package.some_class.SomeClass.start_me
or
    some_package.some_script.main_function

Everything else is handled by launch.py [1].

launch.py is a set of friendly Python tools to install Virtual Environment, start, stop your application. In addition it gives you a single interface to test and analyze your code; provides configuration management.

In this post we will outline two features: installation and daemonization.

Installation. It's simple. launch.py will create a Virtual Environment for you and make sure that your application is executed within it. What you need to do is to download all of the required libraries for your application and place them in folder:
    launch.py/vendor 

Order of the libraries installation is defined by script:
    launch.py/scripts/install_virtualenv.sh


Once this step is complete just run ./launch.py -i to install Virtual Environment along with all of required libraries.

launch.py is also there to help with the application start and stop. There are two modes to run your application:
  • daemonized
    the application is started in a separate process
  • interactive
    the application is executed in the same command-line terminal where you have called launch.py and uses shared stdin, stdout and stderr to interact with the user
To use this feature simply follow the guide:
  1. Write the actual code
    List of the following assumptions is in place:
    - starter method or function has signature starter_name(*args)- classes implement __init__(self, process_name)
  2. Register your fully specified function/method name in launch.py/system/process_context.py as follows:
    PROCESS_CONTEXT = {
    ...
        'YOUR_PROCESS_NAME': _create_context_entry(
            process_name='YOUR_PROCESS_NAME',
            classname='workers.some_script.main',
            token='YOUR_PROCESS_NAME',
            time_qualifier=''),
    ...
    

    Should you wish to instantiate a class instance and start its method - simply define the class name:

            classname='workers.some_module.YourPythonClass.starter_name'
    

  3. ./launch.py --run --app YOUR_PROCESS_NAME will start your class' or script's starter method/function in a separate process (daemonized mode) 
  4. Add --interactive to the command above and run it in an interactive mode
In summary - launch.py is here to make your life easier. You can find details and read on many other features of the launch.py framework at its wiki [2].

Cheers!

[1] launch.py on the Github
https://github.com/mushkevych/launch.py

[2] launch.py WIKI
https://github.com/mushkevych/launch.py/wiki

Friday, March 08, 2013

PMP Audit and Exam

If you are like me, then after 4 months of studying PMP Exam Prep in transit, you registered at pmi.org and applied for the PMP exam. And (if you are like me) you have seen the sacred "your application has been chosen for the audit". Feel free to be like me and give yourself 20 minutes to panic: I recall running into the bathroom and confessing to my wife that I have just wasted $630 and 4 months of transit reading. I do recall sticky heat that covered me from from head to toe and, as mentioned earlier, it took me 20 minutes to get myself together. 

Now, supposedly, you got yourself together and got ready to face the reality - you have 90 days to complete the audit. In fact it took me 40 days to notify my former bosses, gather all of the envelopes and send the application to the PMI. Never shall I forget Canada Post and 10 long days before my documents crossed the border and reached the recipient.

After PMI approval I have spent another 2 months of reading and preparing for the exam, and today I have finally passed it! My first impressions are as following:
  • I find that fingerprinting, metal detector and eversion of pockets is an overkill for a certification exam
  • Despite $630 tag there was no coffee or even potable water dispenser (at least this is true for the certification center at Metrotown in Burnaby, BC)
  • As mentioned in hundreds of posts from around the world - PMI requires you to immerse into their "world of PMI-ism", and your exam success is fully driven by the level of immersion
In summary:
  • I have used PMP Examp Prep [1] as my primary source:
  • PM Fastrack [2] turned out to be a very useful tool:
  • It took me 6 month to prepare myself for the exam and additional 40 days to prepare documents package for the PMI Audit.
  • Don't forget to enrol to PMI before applying for PMP. This will save you $25.
  • Remember to fuel yourself with coffee before entering the examination room, as you might have no other opportunity before the end of exam.
Cheers!

[1] PMP Exam Prep:
http://store.rmcproject.com/Detail.bok?no=392

[2] PM Fastrack:
http://store.rmcproject.com/Detail.bok?no=310

Friday, January 11, 2013

HBase: secondary index

As your HBase project moves forward you will likely face a request to search by criteria that is neither included into the primary index nor can be included into it. In other words you will face a problem of fast and efficient search by secondary index. For instance: select all eReaders in a specific price range. In this post, we will review an approach of constructing a secondary index.

As usually, we will work in realm of Surus ORM [1] and Synergy Mapreduce Framework [2], and will start with the definition of a model. For illustration purposes we will use simplified variant of "product" class, that has lowest and highest prices and can only belong to one category. For instance:

ID category priceLowest priceHighest manufacturer
Sony eReader PRST2BC E-READER 8900 12900 SONY


Instances will reside in a table product:

To satisfy our search requests, we would like to get a following structure:
ID products
Sony eReader PRST2BC Kobo ... ...
E-READER { priceLowest : 89000,
priceHighest: 12900,
manufacturer: SONY}
{ ... } { ... }
Here, any search within a specified category would allow us to quickly filter out products in a specific price range or manufacturer.

To create an index as described above, we would need a new model to hold filtration criterias and a mapreduce job to periodically update it.
Secondary index model:

and its corresponding grouping table:

Mapreduce job implies that Job Runner will use product table for source and grouping table for sink. Job's mapper:
and a reducer:
As an alternative to secondary index you can use filtering. For instance SingleColumnValueFilter:
However, SingleColumnValueFilter approach is insufficient for large tables and frequent searches. Stretching it too far will cause performance degradation across the cluster.

To sum it up, secondary indexes are not a trivial, but at the same time - not a paramount of complexity. While designing them, one should look carefully for the filtration criteria and "long-term" perspective.

Hopefully this tutorial would serve you with help.
Cheers!

[1] Surus ORM
https://github.com/mushkevych/surus

[2] Synergy Mapreduce Framework
https://github.com/mushkevych/synergy-framework

Thursday, December 20, 2012

big data: self healing

While working on a recent project, I have noted its interesting side-effect of self-healing data. In this post I will try to show a simple yet functioning flow that keeps data fresh and healthy.

And for the beginning, let me start with definitions of an environment, and a problem we are trying to solve. For the environment, we will be using Hadoop+HBase+Surus ORM+Scheduler and following data flow:
Figure 1: data flow
To better understand the problem, let's review typical day at product aggregation system. It starts with importing millions of products (manufacturer, specs, image url, price, etc). Later, we group all imported records by product id to come up with the consolidated picture of product availability and pricing.

And now let's add a bit of bitter reality into our idyll - many products are coming "broken". Some have broken product id, some - missing description or improper pricing. Yes, we have business rules in place to address most of concerns at the phase of import or first mapreduce job. However, some are still breaking thru. In real-world this number is calculated in thousands.

As we follow the arrow of time, our database accumulates number of broken products, large enough to dissuade users. We need a "potion" to cure our data of the broken records, and here is the recipe.

Lets make few assumptions:
  • Data Sources (DS) are being updated every T1 interval
    For example: every 24 hours
  • Number of broken input records is statistically the same
    For example: 10 broken records per every 10000
  • Distribution of broken records is normal
    Thus, every data source is providing comparable number of broken records; records which were OK yesterday might be broken today and vice versa
Recipe:
  • Records from DS are inserted into log table. Timestamp, identifying DS update time, is attached to every record.
  • First mapreduce job:
    - source: log table
    - sink: product table
    - aggregates log records from specific timeperiod (for instance, from last day)
    - every product record has a field "last_update" that holds highest timestamp from constituting log records
  • Second mapreduce job:
    - source: product table
    - sink: stash table
    - reviews all product records and marks ones that have last_update value older than T2, where T2 >= T1 (for instance T2 can be 2 days)
    - all marked products are moved from product table to stash table
In described flow we move all outdated records into a stash table. With above assumptions in place, this means that broken products will _likely_ be eliminated from product table within time frame T2.

On the final note, let's compare this approach with setting HBase table TTL [1] to T2. First, TTL will remove key-value pairs from record, if they are not updated regularly. This is inefficient in our case, as some information (for instance specifications, tokens, comments) may be updated as rarely as once per record's life. With TTL in place, we will lose this information in T2 time. Secondly, information in valuable. By moving product records into stash table, we grand ourself ability to later use that information. For example: to track product lifetime price trend, do analytics on retired products, or even revive products once they are back in stores.

Saturday, October 06, 2012

Surus ORM - One Year Later

It has been a little more than a year since Surus - HBase ORM [1] became available via Github. For its birthday party, Surus got:
  • Support of multi-component rowKeys
    This feature is implemented in HPrimaryKey class and @HFieldComponent annotation
  • Support of List<T> properties, which is implemented as @HListProperty annotation
  • Integration with HBase Explorer [2] 
  • Code clean-up. Now, Surus ORM fits in less than 20 java files
    As a result, all non-related code was moved to synergy-framework project [3]
For the first time, Surus ORM has also a road-map. Currently it contains support of multi-component properties.
Looking back onto the past year, I see it as an interesting endeavour. Surus ORM is still Free Open Source, and you are welcome to fork/use/contribute to it!

Cheers!

[1] Surus ORM
https://github.com/mushkevych/surus/

[2] HBase Explorer + Surus ORM integration
https://github.com/mushkevych/hbase-explorer

[3] Synergy-framework repository
https://github.com/mushkevych/synergy-framework

Thursday, September 27, 2012

HBase Explorer + Surus = Integration

HBase Explorer (HBE) [1] is UI tool to manipulate and explore HBase instances. I have been using it on big data projects for more than a year, and gradually improved integration with Surus to a point, where:

  • Surus-covered tables are being processed by Surus ORM [3]
  • HBE supports multiple ORM and multi-component rowKeys via ORMInterface and ORMContext
  • All tables, not covered by custom ORM are processed by HBE default pattern mechanism

Let's take a look at two screenshots:


Please, note that rowKey components are changing to match the table structure. On the backend, it is supported by two new methods that were added to AbstractPrimaryKey class:

  • Map<String, Class> getComponents()
  • ImmutableBytesWritable generateRowKey(Map<String, Object> components);
First is irreplaceable, when it comes to finding out keyRow structure, and second is required to construct actual rowKey from HTML parameters. 

Next, let's review what would you need to do to plug-in custom ORM for HBE. It would be two simple steps:
  1. Implement interface ORMInterface
    Let's assume class' name will be "AnotherOrm"
  2. Register "AnotherOrm" instance in ORMContext static section:
        static {
            CONTEXT.add(new OrmSurus());
            CONTEXT.add(new AnotherOrm());
        }
  3. Build, deploy and use!
In summary: both Surus and HBE got cool features to make your life easier.
Cheers!

[1] HBase Explorer with Surus Integration and multi-ORM support:

[2] Original HBase Explorer:

[3] Surus ORM:

Friday, July 27, 2012

Cloudera Hadoop Certifications

 And so it happened! After 4 weeks of studying and 2 exams I am officially Cloudera Certified Developer for Apache Hadoop and Cloudera Certified Specialist in Apache HBase

Policies that each candidate signs before the test begins prohibit disclosure of questions and answers, so don't look for them here :)
However, I will gladly share my training materials, as well as focus on most problematic topics.

I have been studying from books [1] and [2], and read them from cover to cover despite considering many chapters as "well known". I strongly advice everybody seeking the certification to go thru each chapter. It will save you training expenses $1800 + $3000 + $Travel [3] and as any reading - provide solid knowledge base. You can also try to reduce certification costs [4] by asking Cloudera to grand you discount coupons.

Topics that I found intriguing:
- Write path and coherency model for HDFS
While reading the chapters, try to ask yourself a question: what will happen to files, shall the client die in the middle of the copy process; how HBase handles real-time duplication of WAL;
- InputFormat
Good news is that there are not many of them, so it should not take long to put sample format for each of them. As stated in [5], you might be asked to identify proper InputFormat given the sample.
- HBase Region Lookups mechanism
Be sure to understand how client finds -ROOT-, .META. and data. When is it querying ZooKeeper and what are fail-over algorithms. 
- HFile format and performance dependency on block size

In summary: exams are not too scary, but give yourself 2 weeks per exam to prepare properly.
Cheers!

[1] Hadoop: The Definitive Guide, 2nd Edition

[2] HBase: The Definitive Guide


[3] Cloudera training schedule

[4] Direct Cloudera certification 

[5] List of probable exam topics
http://university.cloudera.com/certification/CCDH.html

Monday, June 11, 2012

JMX for Python

As soon as head-less Python projects crosses certain level of complexity, you start noticing that something is missing - something with buttons, text areas and everything else to visualize data structures, state of your project... something like UI :)

In world of Java you have JMX to address most of it... but what can we use in Python? While working under Scheduler [1], I found that the easiest way to implement "Python MX" is to use werkzeug [2].

So, what do we need to get it working? First of all, we need HTML template (here, we arranged output in two columns):

Next, we need to expose URL for the template:
Now, all we left to do is - start the MX and set the Bean:
Where MX is:
And utils.py looks like:


As soon as coding is complete navigate in your browser to "localhost:5000":
And with some efforts it can turn to:


All things considered - Python MX is not a trivial hack, however not a paramount either. With great tools like Werkzeug our life is much easier.
For real-world example, please refer to Scheduler MX folder [3].

[1] Scheduler
[2] Werkzeug

Friday, May 18, 2012

Integer encoding for Hadoop

In short, integer encoding is mechanism to compact multiple integer values into single byte array. It is well described in literature [1] and in this post we will review exemplary implementation.

Unless one wants to re-implement encoding algorithms, we will reuse Varint class from mahout-core [2].
Simplest use-case for 2 integers looks like:


Here, we declared structure of 2 integers - Tuple2I, and followed it by Encoder example that encodes and decodes integers to and from byte array.

For real-world usages of the Integer encoder, refer to Surus [3]. By wide adoption of integer encoding on 3 and 4 integer tuples, I was able to reduce Mapper output by 20-30%, and saved about 30 minutes of computation time.

[1] Data-Intensive Text Processing with MapReduce
http://www.amazon.com/Data-Intensive-Processing-MapReduce-Synthesis-Technologies/dp/1608453421

[2] Typical Maven repository
http://mvnrepository.com/artifact/org.apache.mahout/mahout-core/

[3] Surus
https://github.com/mushkevych/surus

Friday, May 11, 2012

R: running by Hadoop mapreduce

Running R from Hadoop mapreduce ain't easy. Before any work can began, one must configure its environment:
  1. R must be installed along with all required libraries on each mapred node in the Hadoop cluster.
  2. Communication is performed by JNI interface via rJava/JRI interface.
    This package must also be installed on each mapred node in the cluster.
  3. Following env variables must be exported (paths are relative to specific environment):
export R_HOME=/usr/lib64/R
JRI_HOME=/usr/lib64/R/site-library/rJava/jri
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${JRI_HOME}/JRI.jar:${JRI_HOME}/JRIEngine.jar:${JRI_HOME}/REngine.jar
export JAVA_LIBRARY_PATH=${JRI_HOME}

You can safely book 1+ days for installation headaches. In my case we had to cook RPMs for almost every R component.
After it is complete, you might want to exercise mapreduce parallelism with R just to find out messages like:


WARNING: org.rosuda.JRI.Mutex was unlocked by other thread than locked! This may soon lead to a crash...

This leads us to primary limitation of the straight-forward rJava/JRI usage (see P.S. for details): 
There can be only 1 process/thread accessing R instance per box. 

In my circumstances, it was not critical, as I was able to complete computation on a single reducer within several hours... however with ever larger result sets this may be a prohibitive restriction.

Let's see how the mapreduce R workhorse looks like:


I highly recommend following reading:

[1] rJava/JRI source code repos:
http://www.rforge.net/rJava/svn.html
http://www.rforge.net/JRI/svn.html 

[2] Previous post with working example Java-to-R interaction:
http://mushkevych.blogspot.com/2012/04/r-running-by-java-process.html

[3] Rengine: public Mutex getRsync()
http://www.rosuda.org/r/nightly/javadoc/org/rosuda/JRI/Rengine.html#getRsync()


P.S.
There might be a way to start and maintain an R instance per rJava/JRI client, however I was not lucky to identify it.

P.P.S.
It should be possible to share single R instance between several Java processes/threads, by direct synchronization at JNI level [3].
However, expected performance gains vs. code complexity must be weighted carefully.