Data harvesting with MapReduce

Combine harvesters
(original image source)

“The combine harvester, […] is a machine that combines the tasks of harvesting, threshing and cleaning grain crops.” If you have acres upon acres of wheat and want to separate the grain from the chaff, a group of combines is what you really want. If you have a bonsai tree and want to trim it, a harvester may be less than ideal.

MapReduce is like a pack of harvesters, well-suited for weeding through a huge volumes of data, residing on a distributed storage system. However, a lot of machine learning work is more akin to trimming bonsai into elaborate patterns. Vice versa, it’s not uncommon to see trimmers used to harvest a wheat field. Well-established and respected researchers, as recently as this year write in their paper “Planetary Scale Views on a Large Instant-messaging Network“:

We gathered data for 30 days of June 2006. Each day yielded about 150 gigabytes of compressed text logs (4.5 terabytes in total). Copying the data to a dedicated eight-processor server with 32 gigabytes of memory took 12 hours. Our log-parsing system employed a pipeline of four threads that parse the data in parallel, collapse the session join/leave events into sets of conversations, and save the data in a compact compressed binary format. This process compressed the data down to 45 gigabytes per day. Processing the data took an additional 4 to 5 hours per day.

Doing the math, that’s five full days of processing to parse and compress the data on a beast of a machine. Even more surprisingly, I found this exact quote singled out among all the interesting results in the paper! Let me make clear that I’m not criticizing the study; in fact, both the dataset and the exploratory analysis are interesting in many ways. However, it is somewhat surprising that, at least among the research community, such a statement is still treated more like a badge of honor rather than an admission of masochism.

The authors should be applauded for their effort. Me, I’m an impatient sod. Wait one day for the results, I think I can do that. Two days, what the heck. But five? For an exploratory statistical analysis? I’d be long gone before that. And what if I found a serious bug half way down the road? That’s after more than two days of waiting, in case you weren’t counting. Or what if I decided I needed a minor modification to extract some other statistic? Wait another five days? Call me a Matlab-spoiled brat, but forget what I said just now about waiting one day. I changed my mind already. A few hours, tops. But we need a lot more studies like this. Consequently, we need the tools to facilitate them.

Hence my decision to frolic with Hadoop. This post focuses on exploratory data analysis tasks: the kind I usually do with Matlab or IPython/SciPy scripts, which involve many iterations of feature extraction, data summarization, model building and validation. This may be contrary to Hadoop’s design priorities: it is not intended for quick turnaround or interactive response times with modestly large datasets. However, it can still make life much easier.

Scale up on large datasets

First, we start with a very simple benchmark, which scans a 350GB text log. Each record is one line, consisting of a comma-separated list of key=value pairs. The job extracts the value for a specific key using a simple regular expression and computes the histogram of the corresponding values (i.e., how many times each distinct value appears in the log). The input consists of approximately 500M records and the chosen key is associated with about 130 distinct values.

Scalability: histogram

The plot above shows aggregate throughput versus number of nodes. HDFS and MapReduce cluster sizes are always equal, with HDFS rebalanced before each run. The job uses a split size of 256MB (or four HDFS blocks) and one reducer. All machines have a total of four cores (most Xeon, a few AMD) and one local disk. Disks range from ridiculously slow laptop-type drives (the most common type), to ridiculously fast SAS drives. Hadoop 0.16.2 (yes, this post took a while to write) and Sun’s 1.6.0_04 JRE were used in all experiments.

For such an embarrassingly parallel task, scaleup is linear. No surprises here, but it’s worth pointing out some numbers. As you can see from the plot, extracting simple statistics from this 350GB dataset took less than ten minutes with 39 nodes, down from several hours on one node. Without knowing the details of how the data were processed, if we assume similar throughput, then processing time of the raw instant messaging log could be roughly reduced from five days to just a few hours. In fact, when parsing a document corpus (about 1TB of raw text) to extract a document-term graph, we witnessed similar scale-up, going down from well over a day on a beast of a machine, to a couple of hours on the Hadoop cluster.

Hadoop is also reasonably simple to program with. It’s main abstraction is natural, even if your familiarity with functional programming concepts is next to none. Furthermore, most distributed execution details are, by default, hidden: if the code runs correctly on your laptop (with a smaller dataset, of course), then it will run correctly on the cluster.

Single core performance

Linear scaleup is good, but how about absolute performance? I implemented the same simple benchmark in C++, using Boost for regex matching. For a rough measure of sustained sequential disk throughput, I simply cat the same large file to /dev/null.

I collected measurements from various machines I had access to: (i) a five year old Mini-ITX system I use with my television at home, running Linux FC8 for this experiment, (ii) a two year old desktop at work, again with FC8, (iii) my three year old Thinkpad running Windows XP and Cygwin, and (iv) a recent IBM blade running RHEL4.

Single core performance

The hand-coded version in C++ is about 40% faster on the older machines and 33% faster on the blade [Note: I’m missing the C++ times for my laptop and it’s drive crashed since then — I was too lazy to reload the data and rerun everything, so I simply extrapolated from single-thread Hadoop assuming a 40% improvement, which seems reasonable enough for these back-of-the-envelope calculations]. Not bad, considering that Hadoop is written in Java and also incurs additional overheads to process each file split separately.

Perhaps I’m flaunting my ignorance but, surprisingly, this workload was CPU-bound and not I/O-bound—with the exception of my laptop, which has a really crappy 2.5″ drive (and Windows XP). Scanning raw text logs is a rather representative workload for real-world data analysis (e.g., AWK was built at AT&T for this purpose).

The blade has a really fast SAS drive (suspiciously fast, except perhaps if it runs at 15K RPM) and the results are particularly instructive. The drive reaches 120MB/sec sustained read throughput. Stated differently, the 3GHz CPU can only dwell on each byte for 24 cycles on average, if it’s to keep up with the drive’s read rate. Even on the other machines, the break-even point is between 30-60 cycles [Note: The laptop drive seems to be an exception, but I wouldn’t be so sure that at least part of the inefficiency isn’t due to Cygwin].

On the other hand, the benchmark throughput translates into 150-500 cycles per byte, on average. If I get the chance, I’d like to instrument the code with PAPI, validate these numbers and perhaps obtain a breakdown (into average cycles for regex state machine transition per byte, average cycles for hash update per record, etc). I would never have thought the numbers to be so high and I still don’t quite believe it. In any case, if we believe these measurements, at least 4-6 cores are needed to handle the sequential read throughput from a single drive!

The common wisdom in algorithms and databases textbooks, as far as I remember, was that when disk I/O is involved, CPU cycles can be more or less treated as a commodity. Perhaps this is an overstatement, but I didn’t expect it to be so off the mark.

This also raises another interesting question, which was the original motivation for measuring on a broad set of machines: what would be the appropriate cost-performance balance between CPU and disk for a purpose-built machine? I thought one might be able to get away with a setup similar to active disks: a really cheap and power-efficient Mini-ITX board, attached to a couple of moderately priced drives. For example, see this configuration, which was once used in the WayBack machine (I just found out that the VIA-based models have apparently been withdrawn, but the pages are still there for now). This does not seem to be the case.

The blades may be ridiculously expensive, perhaps even a complete waste of money for a moderately tech-savvy person. However, you can’t just throw together any old motherboard and hard disk, and magically turn them into a “supercomputer.” This is common sense, but some of the hype might have you believe the opposite.

Performance on smaller datasets

Once the original, raw data is processed, the representation of the features relevant to the analysis task typically occupies much less space. In this case, a bipartite graph extracted from the same 350GB text logs (the details don’t really matter for this discussion) takes up about 3GB, or two orders of magnitude less space.

Scalability: coclustering iteration

The graph shows aggregate throughput for one iteration of an algorithm similar to k-means clustering. This is fundamentally very similar to computing a simple histogram. In both cases, the output size is very small compared to the input size: the histogram has size proportional to the number of distinct values, whereas the cluster centers occupy space proportional to k. Furthermore, both computations iterate over the entire dataset and perform a hash-based group-by aggregation. For k-means, each point is “hashed” based on its distance to the closest cluster center, and the aggregation involves a vector sum.

Nothing much to say here, except that the linear scaleup tapers off after about 10-15 nodes, essentially due to lack of data: the fixed per-split overheads start dominating the total processing time. Hadoop is not really built to process datasets of modest size, but fundamentally I see nothing to prevent MapReduce from doing so. More importantly, when the dataset becomes really huge, I would expect Hadoop to scale almost-linearly with more nodes.

Hadoop can clearly help pre-process the raw data quickly. Once the relevant features are extracted, they may occupy at least an order of magnitude less space. It may be possible to get away with single-node processing on the appropriate representation of the features, at least for exploratory tasks.  Sometimes it may even be better to use a centralized approach.

Summary

My focus is on exploratory analysis of large datasets, which is a pre-requisite for the design of mining algorithms. Such tasks typically involve (i) raw data pre-processing and feature extraction stages, and (ii) model building and testing stages. Distributed data processing platforms and, in particular, Hadoop are well-suited for such tasks, especially the feature extraction stages.  In fact, tools such as Sawzall (which is akin to AWK, but on top of Google’s MapReduce and protocol buffers), excel at the feature extraction and summarization stages.

The original, raw data may reside in a traditional database, but more often than not they don’t: packet traces, event logs, web crawls, email corpora, sales data, issue-tracking ticket logs, and so on. Hadoop is especially well-suited for “harvesting” those features out of the original data. In its present form, it can also help in model building stages, if the dataset is really large.

In addition to reducing processing time, Hadoop is also quite easy to use. My experience is that the programming effort compares very favorably to the usual approach of writing my own, quick Python scripts for data pre-processing. Furthermore, there are ongoing efforts for even further simplification (e.g., Cascading and Pig).

I was somewhat surprised with the CPU vs I/O trade-offs for what I would consider real-world data processing tasks. Perhaps also influenced by the original work on active disks (one of the inspirations for MapReduce), which suggested using the disk controller to process data. However, there is a cross-over point for the performance of active disks versus centralized processing; I was way off with my initial guess on how much CPU power it takes for a reasonably low cross-over point (which is workload-dependent, of course, and any results herein should be treated as indicative and not conclusive).

Footnote: For what it’s worth, I’ve put up some of the code (and hope to document it sometime). Also, thanks to Stavros Harizopoulos for pointing out the simple cycles-per-byte metric.

  • Hi,

    I was curious about the code you provided. Are the results above generated using the code? Also I wanted to know how to install and run your code and get more insight on the whole process.

    Nice work!