Wednesday, November 02, 2016

sdpl: schema driven processing language

Practically every company I worked for had a "meta-information" reader that was altering the workflow behavior: changing the data source, wrapping certain columns with UDF or passing performance tunings to the underlying engine.

The system was further expanding by injecting source code header/footer, primary key management, etc. Afterwards, it was quickly reaching its limits as every new requirement required reopening the rigid core and every data/database schema change was triggering cascading updates: from the initial input, thru all JOIN, and GROUP BY and causing costly and elusive bugs.

SDPL is not trying to fix all of that. For one, it is not a workflow engine and has nothing to do with the performance tunings. However, it was written to address one particular issue - introduce schema manipulation and schema versioning to existing tools, such as Pig or Spark or Hive.

SDPL stands for Schema Driven Processing Language. It introduces data schemas to Apache Pig and provides most common use cases: LOAD DATA, STORE DATA, JOIN BY, GROUP BY and FILTER BY. Goodies, such as CUBE, could be written as quotations. The snippet below illustrates how to load data from a given data repository, associate it with a schema, join, project and store:

Schema is described in a YAML file. For instance:

Data repository is a YAML file representing a connection string to your data storage - whether it is a DB, an S3 bucket or a local filesystem. For instance:

SDPL compiles into Apache Pig. Spark and Hive are planned. Typical schema operations are shown below. For example, for given two schemas:

we can compute field addition and removal:

Should you have a sizable body of existing code, you could benefit from SDPL by loading schema, implicit projections and explicit EXPAND:

Schema versioning is performed in a minimalistic way - every field of the schema is declared with a version attribute. While loading the schema, all fields with field.version > ${VERSION} are skipped. Thus, you can reuse the same schema by multiple scripts supporting different versions of the same data.

SDPL is available under BSD 3-Clause License from Github [1] and Pypi [2]. It uses Python3 and ANTLR4 under the hood.
Give it a try!

Cheers!


[1] https://github.com/mushkevych/sdpl

[2] https://pypi.python.org/pypi/sdpl

Saturday, September 10, 2016

Synergy Scheduler with workflows

As of release 1.18, Synergy Scheduler [1] comes with a workflow engine - Synergy Flow [2]. During its design I was thinking about a system an engineer like myself might want to use.

First of all, Synergy Flow follows "write once, run everywhere" principle, where "write" applies to the workflow definition and "run" to the execution cluster. For instance, Synergy Flow introduces set of environment-agnostic filesystem actions - declared once they run equally well on developer's desktop and EMR cluster.

To achieve this, Synergy Flow abstracts on the cluster and its filesystem. While it sounds as a Goliath task, it only takes 6 methods to add a new type of execution cluster, and 7 methods for a new filesystem.
Currently local, EMR and Qubole clusters are supported; as well as local and S3 filesystems;
In fact, most of mentioned methods are pretty trivial - cp, launch, terminate, etc.

And finally, I wanted to avoid skein of hundreds or even thousands of small steps that are difficult and sometimes impossible to inspect without specialized tools as [3]. To address this issue, Synergy Flow introduces concepts of Action and Step. Step is an atomic element of the workflow. It contains three categories of actions: list of pre-actions, a single main action and a list of post-actions.
Idea is simple:
  • Each step is centered on the main action - whether it is a Pig query, a Spark script or any other process
  • Pre-actions are actions that have to be completed before the main action could take place
    For instance: an RDBMS export and further data cleansing, FTP download, etc
  • Post-actions are actions that have to completed after the main action
    For instance: RDBMS import, FTP upload, etc
Synergy Flow is fully integrated with the Synergy Scheduler: job life-cycle, run modes of the workflow, run-time UI dashboard.
You can define how you want the system to behave in case of a failure - to retry the Step from the beginning, or to continue from the last known successful action.
BTW: Synergy Flow allows you to execute the workflow on multiple concurrent EMR clusters, hence, eliminating the concurrency bottleneck.

Give it a try!

Cheers!

[1] Synergy Scheduler
https://github.com/mushkevych/scheduler

[2] Synergy Flow
https://github.com/mushkevych/synergy_flow

[3] Cytoscape
http://www.cytoscape.org/