Tuesday, March 27, 2012

Recovering from OutOfMemoryError in Hadoop mapreduce

Dealing with OutOfMemoryErrors in any environment indicates that something is very wrong. There is a good discussion at stackoverflow [1] that can be shortened to "you can catch the error, but is it worth recovering?".

In terms of Hadoop mapreduce framework, I am convinced that it is worth to catch OutOfMemoryError and be able to recover from it. Let me illustrate:


Here, we have typical dummy mapper. For exemplary purposes we introduced variable bigArray to be blamed for OutOfMemoryError.
Now, let's answer few questions:

  • Is it abnormal when system throws OOME?
    Yes. However, when we are dealing with user-generated content, we might not always secure ourselves from it.
    For example: in social networks, you have 99.99% with 10-20 MB of statistics data per account, and 0.01% of fake/misused/flash-mobbed accounts that barely fit into 200-300 MB. Handling those is a challenge.
  • How to prevent system from meltdown?
    First of all - keep all declarations, all references inside the try block. When OOME is thrown, JVM will run GC and clean up all objects referenced within your try block. This way you can proceed.
  • Is it OK to skip 0.01% of problematic data?
    This is polemic, however I am convinced that Yes.
    As Tom White put it "In an ideal world, your code would cope gracefully with all of these conditions. In practice, it is often expedient to ignore the offending records" [2]
To summarize above, let me also present screenshot from real-world mapper. It shows that within 274'022 records we have 9 causing OutOfMemoryExceptions, or 0.000033%.

Figure 1: real-world mapper output



[1] Stack Overflow
http://stackoverflow.com/questions/2679330/catching-java-lang-outofmemoryerror

[2] Tom White: Hadoop: The Definitive Guide, second edition
Chapter 6, page 185 "Skipping Bad Records"
http://shop.oreilly.com/product/0636920010388.do

Monday, March 26, 2012

Behaviour analysis

So, what is behavior analysis and how do we extract that data? Let's introduce a chart (to attract the reader's attention):

Figure 1: activity chart
Above, horizontal blue-bars are used to visualize accounts "lifespan"; account is considered "inactive" if its last activity was more than N days ago. We can safely take N equals 30. Red and yellow stars represents activities; for example: posting a photo, or messaging other users.

Bottom line presents summary for the time-period. It has a lot in common with traditional web analytics, where multiple counters are tracked per time-periods (whether it is visits per hour or number of "likes" per day). Looking at the chart, we can say that in the last period, we had 2 active accounts that produced 1 event of type "yellow star".

Now, what shall we do when our boss asks "why are we loosing accounts"? First, we must translate it to engineering language: "what behaviour patterns are intrinsic to <successful> and <failing> accounts, and how do they differ"?

Approach from chart above gives us good insight on feature usage (yesterday was St. Valentine's day, and users send 1M messages greeting each other), but apparently lacks ability to foresee if the user will stay, or is likely to leave.

What we need, is:

  • Segregate accounts by lifespan criteria (or other criteria of success)
  • Align accounts timeline (day 0 is the sign-up date)
  • Aggregate users data to get pattern for accounts with given lifespan

Figure 2: behavior data patterns

Figure 2 illustrates timeline aligning and data aggregation. To avoid cluttering, it lacks segregation by lifespan.

In summary, we engineered mechanism to extract behavioural data. For illustration purposes, let's assume that successful accounts add no less that 10 friends, and are followed by no less than 2 other accounts in first 5 days of its lifespan. Relatively to our terminology it can be 10 purple stars and 2 green.

Now, we can compare account's statistics against known behavioural patterns and predict account's lifespan. From business perspective, this information can be used to tailor boarding procedure to maximize user's lifespan (and thus - user's engagement into the social network).
For example: UI that reminds users to identify their friends, and propose former to follow the newcomer.

In real-world, there are multiple categories contributing to "success". What if we represent them as fraction of 100%, where 100% stands for matching appropriate behaviour pattern?
Wait... doesn't it seam familiar?

Figure 3: typical Social Network "profile completion" bar

Friday, March 23, 2012

CSV import into HBase

This week was about extracting user's behavior patterns, grouped by account lifespan. However, before processing themselves took place, I had to copy data from PostgresSql to HBase... and it appeared to be an intensive problem.

First question that comes to mind - why bother - wasn't it done by forefathers and known to us as sqoop? True, but I have two excuses:
  • I wanted to pre-process data from PostgresSql before placing it into HBase. With sqoop, a set of additional map/reducers would be required
  • Input data was in multiple csv dumps, so with straight-forward sqoop I needed set of map/reducers for every of them
By trying to avoid as much of additional work as possibly, I first came with idea of omnivore REST interface (hey, its only 3GB). However, even with thread-pooling and connection-pooling the most I squeezed out of it was about ~130 commits per second (totalling 150-200 MB per day).

At this point two things became apparent: 
  • Even with 3GB, import must be moved to Hadoop cluster 
  • It has to be either map/reduce or direct HBase tunnel
And so was it born - CSVImporter. Server-Worker design, based on Surus[1] with thread-pooling, connection-pooling and write-buffer. It also uses supercsv [2].

Performance: 403 MB per hour (largest 2.8 GB CSV dump was imported within 7 hours 11 minutes). 


Sunday, March 18, 2012

Repeating Timer in Python

While working on Scheduler [1], I've encountered an inconvenience caused by lack of auto-repeat timer in standard Python API.
In this post I will present RepeatTimer class that features:
  • RepeatTimer (as its threading.Timer ancestor) is being executed in a separate thread
  • RepeatTimer executes function on its triggering
  • Interface provides ability to trigger RepeatTimer immediately, and start new countdown
  • Interface provides ability to set new interval that will be applied after upcoming triggering
  • cancel method disengage RepeatTimer and kills its thread
Gist below presents the code:
[1] Scheduler at Github
https://github.com/mushkevych/scheduler

Thursday, March 08, 2012

Truss Extraction Algorithm in MapReduce&HBase

In this post I will cover implementation of the Truss Extraction Algorithm (TEA) [1], adapted to be used in Hadoop MapReduce&HBase environment.

First of all, let us cite Jonathan Cohen's work to clarify what we are actually after:

A k-truss is a non-trivial, one-component subgraph such that each edge is
reinforced by at least k–2 pairs of edges making a triangle with the that edge. (Non-
trivial here excludes an isolated vertex as a truss.)

Fundamental question we have to answer before getting drowned in bits and bytes - what is so valuable in truss? How can we benefit from having ability to associate network player with one group or another? As proverb goes "Tell me your friends, and I'll tell you who you are". 

Since we have mentioned graphs, let us also provide some illustration:


Figure 1: Typical example of social graph,
where players present nodes, and edges - social connections.

Figure 2: Trusses extracted from example of Figure 1.
In this case support threshold was set to 4.
Discussed algorithm is adaptation of Cohen's Truss Extraction Algorithm in MapReduce environment[2]. Main improvement I bring in here is usage of HBase for graph definition, holding working structures and final truss graphs. 

Overall, there are 6 steps in the algorithm. 

Step 0: Graph simplification 
This is business-sensitive step, which presents functionality outside of TEA body, and thus - will be presented in high-level details. 

Input: social data (i.e. number of private messages, number of comments, user's contact list, etc) for a relationship between user_a and user_b.
Each positive fact should result in positive value, and undesired - in negative.
Further - we sum all values for given relationship and if sum crosses some predefined threshold, we consider this relationship as worth processing.

Output: table TABLE_TRUSS_TEMP in format:

family user
row user_a user_b user_c user_d
user_a
d(A), d(B) d(A), d(C) d(A), d(D)
user_b d(B), d(A)
d(B), d(C) d(B), d(D)
user_c d(C), d(A) d(C), d(B)
d(C), d(D)
user_d d(D), d(A) d(D), d(B) d(D), d(C)

where d(X) presents valency of the node X.

Step 1: Updating valencies in the table TABLE_TRUSS_TEMP
Steps 1-2-3-4 are executed in loop. Step 1 shall be skipped for the first time, since TABLE_TRUSS_TEMP is in its initial state left by Step 0.

***mapper***
reads table in format:
key: UserA    value for column B: d(A), d(B)
key: UserB    value for column A: d(B), d(A)

output in format:
key: (A, B)    value: A, d(A)
key: (A, B)    value: B, d(B)
where A < B by numerical comparison

***reducer***
receives input in format:
key: (A, B)    value: A, d(A)
key: (A, B)    value: B, d(B)
where A < B by numerical comparison

output format:
key: UserA    value for column B: d(A), d(B)
key: UserB    value for column A: d(B), d(A)

Step 2: First phase of triangle enumeration
Truss extraction algorithm is build on top of triangle enumeration algorithm [3]. In this and next steps we calculate number of triangles for every edge in the graph. 
In social networks this has the same meaning as number of mutual friends for any relationship.

***mapper***
reads TABLE_TRUSS_TEMP in format:
key: UserA    value for column B: d(A), d(B)
key: UserB    value for column A: d(B), d(A)
and emits output in format:
key: vertex V    value: edge (V', V''), d(V'), d(V'')
where V := d(V') < d(V'') ? V' : (|V'| < |V''| ? V' : V'');
in case d(V') == d(V''), V' < V'' by numerical comparison

***reducer***
input:
key: vertex A    value: [[(A, B), d(A), d(B)], [(A, C), d(A), d(C)] ...]
where:
d(A) < d(B) or if d(A) == d(B) : A < B by numerical comparison
d(A) < d(C) or if d(A) == d(C) : A < C by numerical comparison

outputs to file "step2.out":
key: edge (B, C)    value (A, B), (A, C)
key: edge (B, C)    value (B, C)
where d(B) < d(C)
in case d(B) == d(C), B < C by numerical comparison

Step 3: Second phase of triangle enumeration 
***mapper***
Identity mapper with input from "step2.out"

***reducer***
input in format:
key: edge (B, C)    value (A, B), (A, C)
key: edge (B, C)    value (B, C)

outputs to file "step3.out":
emit each edge for every triangle it is in
so, in summary we have:
key: edge (A, B)    value: 2 (number of triangles AB is in)

For every edge in open pair or every solitary edge, we are emitting ZERO:
key: edge (K, L)    value: 0 (ZERO)

Step 4: Dropping edges with insufficient support
As soon as we have tuple of [edge & number of triangles it is in], we can define if edge support is sufficient; otherwise the edge is dropped.

***mapper***
Identity mapper with input from "step3.out"

***reducer***
input example:
key: edge (A, B)    value: 2 (number of triangles AB is in)
key: edge (C, D)    value: 5 (number of triangles CD is in)

output:
emits Delete for every edge that has _insufficient_ support (|edge| <= threshold)

Workflow: Looping condition
Step1-Step2-Step3-Step4 are repeated in loop until number of dropped edges is 0

Step 5: Truss Identification Algorithm (TIA)
At this point, we have dropped all edges from TABLE_TRUSS_TEMP that are not part of trusses. TIA is 3-step procedure and goes beyond topic of this post. 
However, we can assume that at the end of the TIA we get table TABLE_TRUSS in format:


family user
row user_a user_b user_c user_d
Truss_id_1 1 1 1 1

where Truss_id_1 is unique Truss identification, that allows insignificant changes to the truss structure (for instance: truss has been left or joined by non-prominent player).


Performance:
For a social graph with:
  • 64.5K nodes 
  • number of edges varying from 1 to 15723 per node
and environment:
  • AWS hosting
  • 5 m1.large instances (8GB RAM, 2 Cores)
Algorithm completes in 25 iterations, that take per average 1 Day 7 Hours.



[1] Truss extraction algorithm

[2] Truss Extraction algorithm in MapReduce

[3] Triangle enumeration