Processing one input file per map process with Hadoop Streaming

This is just a very short post about Hadoop Streaming which I hope will be helpful for people who stumble upon the same problem. For those who don’t know Hadoop streaming: It enables you to create MapReduce jobs with arbitrary executables/scripts as the mapper and/or reducer so you don’t have to write a Java wrapper application to execute it on your Hadoop cluster. This allows for parallelized execution of e.g. a file compression utility for a large set of files with just one command line call.

Problem statement

Suppose you would like to process a set of files using Hadoop streaming with one map job per input file. The official FAQ recommends to:
Generate a file containing the full HDFS path of the input files. Each map task would get one file name as input.1

I’ve followed this advice, but for some reason I couldn’t figure out Hadoop created jobs in a different fashion than I had expected. 2 For example, in one use case I had only two files listed in my input file on a 10 node test cluster. Based on different debug specifications for mapreduce.map.tasks it either (1) processed just one file at the same time but with two mappers each or (2) it processed both at the same time and again with two mappers each. For the mapper I’m using this is a no-go because the output file for the corresponding input file then is written to by both mapper processes causing inconsistencies to the output.

Problem solution

The solution was quite straightforward. Instead of using the default TextInputFormat I passed org.apache.hadoop.mapred.lib.NLineInputFormat as the parameter for the -inputformat switch. In my mapper script I now receive not just the file name as input, but key values pair, the key being the start position of the line in the file and value being the line contents separated by a tab. My mapper script is written in Python, so after doing a

I have the input file name for my map job and do the actual processing.

Notes:

  1. http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/HadoopStreaming.html#How_do_I_process_files_one_per_map
  2. Admittedly, one thing I haven’t checked is whether this happens because I’m not using hdfs:// style URLs, but the corresponding NFS file paths instead due to the fact that I’m operating on the HDFS via the HDFS NFS Gateway. But even if it was different for hdfs:// URLs, the solution will be useful for other NFS gateway users.

Finding “Mutual Friends” with MapReduce / Hadoop

A feature every Facebook user knows is the “Mutual Friends” list you see when visiting somebody’s profile. Like the name says, it shows the friends that you have in common with that person. Facebooks implementation for this is, of course, not public. Broadly speaking the two options there are is (first) that they could either generate the list in real time as you open up a profile page and cache the result for later retrieval or (second) they could create it batch-style and read the results whenever necessary. I guess it is safe to say that Facebook is not using the very basic MapReduce algorithm I’m going to show in the remainder of this post, but for illustrating how this problem could be parallelized it is interesting nonetheless.

How many different pairs of users exist for a user base of size n? The question boils down to the handshake problem, i.e. “In a room of n people, how many different handshakes are possible?”. The result can easily be calculated using the binomial coefficient, it is: n choose 2. For a user base of 100 people these are only a modest 4950 pairs, but for a user database as large as Facebook’s with 1.35 billion monthly active users it is an impressive ~9.11×10^15 number of pairs. Consider that every user can be identified with a 4 byte integer and that there is only one common friend on average, each pair takes only a modest 12 bytes to store, but summed up we are in the range of petabytes. Of course, for a substantial share of user combinations there will be no common friends at all – despite Facebook’s global nature, many users only have friends within their home country, so no mutual friends with users of any other country -, but the resulting table will still look huge compared to your hard drive’s capacity.

The algorithm 1

The algorithm is implemented using one mapper and one reducer.

Mapper

Suppose for each person the friends are stored as: Person -> [Friends]. For each friend in the list, the mapper emits as key a pair of the person along with the friend and the list of friends again. Or in pseudocode: for(each person in the friends list) { Emit(<friend pair>, <friends>); }
The key point here is that keys are sorted before being processed by the reducer, so all friend pairs will go to the same reducer. Take as an example the following person and its friends:
Amy -> [Betty, Charlie, Daniel, Eric]
The map phase now outputs the following pairs:
[Amy, Betty] -> [Betty, Charlie, Daniel, Eric]
[Amy, Charlie] -> [Betty, Charlie, Daniel, Eric]
[Amy, Daniel] -> [Betty, Charlie, Daniel, Eric]

Reducer

The reducer takes as input the key/value pairs of the map phase. For each pair (key) it calculates the intersection of the corresponding friends list. Again in pseudocode: for(each pair of users from map phase) { Emit(<pair>, <intersection of friends lists>) }

Suppose Charlie’s friends list is as follows:
Charlie -> [Amy, Betty, Daniel, Jesse, Mike]
Then the relevant key/value pair of the map phase for calculating Amy’s and Charlie’s common friends is:
[Charlie, Amy] -> [Amy, Betty, Daniel, Jesse, Mike]

The intersection of these two lists is:
[Betty, Charlie, Daniel, Eric] inter [Amy, Betty, Daniel, Jesse, Mike = [Betty, Daniel]

In other words, Amy and Charlie have two mutual friends, Betty and Daniel.

Java Hadoop implementation

For simplicity we assume that our friends lists are stored in a plain text file with one line per user and everything separated by spaces, e.g.
Amy Betty Charlie Daniel Eric, where Amy is the user in question and the remaining entries her friends. Of course, in a real-life application you will use a database as input and not work with the user’s names but their unique IDs instead, but for illustrational purposes of the algorithm both is equivalent. Following is the complete source code:

Downloading and Running

I have made available the source code along with a Maven build file project on GitHub: https://github.com/mgoettsche/MutualFriendsHadoopMapReduce. Of course, to execute it you need a running Hadoop installation. I’m not going to cover the Hadoop setup here, but the official single node setup guide should get you started quickly depending on your previous exposure to Linux system administration.
Here is an example run of the application:

hadoop@HadoopVM:~/mutualfriends$ hadoop fs -cat mutualfriends/input/input.txt
A B C D
B A C D E
C A B D E
D A B C E
E B C D
hadoop@HadoopVM:~/mutualfriends$ hadoop jar MutualFriendsMapReduce/MutualFriendsMapReduce-1.0-SNAPSHOT.jar MutualFriends mutualfriends/input mutualfriends/output              
15/01/09 10:13:14 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/01/09 10:13:14 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
15/01/09 10:13:14 INFO input.FileInputFormat: Total input paths to process : 1
15/01/09 10:13:14 INFO mapreduce.JobSubmitter: number of splits:1
15/01/09 10:13:14 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1420483488341_0002
15/01/09 10:13:15 INFO impl.YarnClientImpl: Submitted application application_1420483488341_0002
15/01/09 10:13:15 INFO mapreduce.Job: The url to track the job: http://HadoopVM:8088/proxy/application_1420483488341_0002/
15/01/09 10:13:15 INFO mapreduce.Job: Running job: job_1420483488341_0002
15/01/09 10:13:21 INFO mapreduce.Job: Job job_1420483488341_0002 running in uber mode : false
15/01/09 10:13:21 INFO mapreduce.Job:  map 0% reduce 0%
15/01/09 10:13:26 INFO mapreduce.Job:  map 100% reduce 0%
15/01/09 10:13:32 INFO mapreduce.Job:  map 100% reduce 100%
15/01/09 10:13:32 INFO mapreduce.Job: Job job_1420483488341_0002 completed successfully
... OUTPUT TRUNCATED ...

hadoop@HadoopVM:~/mutualfriends$ hadoop fs -cat mutualfriends/output/part-r-00000
AB      CD
AC      BD
AD      BC
BC      ADE
BD      ACE
BE      CD
CD      ABE
CE      BD
DE      BC

The last command shows the resulting output for the given test data. The first column is the user pair and the second the mutual friends, e.g. user C and D have A, B and E as mutual friends.

Notes:

  1. I’m borrowing from Steve Krenzel’s article in this section

Accessing Twitter using Spark Streaming: A minimal example application

In recent years, Twitter data has been used in various studies. An often cited example is the prediction of stock market changes based on the Twitter mood 1. Bollen et al. employed mood tracking tools to measure the mood in 6 dimensions and found that using this data they were able to predict with an accuracy of 87.6% the up and down changes of the Dow Jones. 2

Another interesting application of live Twitter data for predictive purpose is for the case of earthquake warnings. Researchers of the University of Tokyo 3 developed a classifier for tweets based on keywords etc. to scan for earthquake related posts and were able to detect earthquakes in Japan with a probability of 96%. Based on this system they run a notification service and claim to be on average faster than the Japan Meteorological Agency.

Long story short, examples like these show that interesting knowledge can be extracted from a large pool of seemingly banal short messages. In the remainder of this post I’m going to demonstrate with a minimal runnable example how Twitter live data can be accessed using Apache Spark, an open-source engine for data processing currently gaining ever more popularity.

Apache Spark comes with a component called Spark Streaming for, guess what, performing streaming analytics. For that to work it provides data from the stream source in small batches to the node for processing. A great point to note here is that you can use the same operations in batch and streaming mode which makes it easy to switch between the two.

Setup

First, You will need to have Spark installed. The application works in standalone mode, so no server configuration is required. Second, to connect to Twitter they require you to create an application on their developer portal at apps.twitter.com (if this step is unclear to you, please refer to e.g. this guide). Third and last, you can download the complete source code from: github.com/mgoettsche/SparkTwitterHelloWorldExample

The Application

Below is the code for a minimal runnable Spark application using the Twitter Streaming API. All the application does is printing the first ten (or less than ten if Twitter provided less) tweets of each one second interval in an infinite loop. For the application to work, copy and paste the relevant parameters of your Twitter application into lines 14-17.

As you can see from the access properties’ names, Spark uses the Twitter4J library and the stream provides objects of class Twitter4J.Status, which, besides getText() provides methods for accessing data other than just the status message.

For example, you can also access the location from where the user posted the status using getGeoLocation() which returns an GeoObject object. Posting with the current location is an opt-in feature so not all tweets have a location attached. Luckily, Spark makes it easy to filter streams 4. Replacing lines 29-33 with:

will only keep the tweets with the user’s location attached and output their location and text to the standard output.

Building and Running

I have included a Maven pom.xml for building the application. There is nothing special about it except perhaps that it uses the maven-assembly-plugin to build a JAR including the Spark Streaming Twitter library to avoid ClassNotFoundExceptions when deploying the app. To build, simply type execute

And to execute

Example output:

SparkStreamingTwitterExampleOutput

Wrapping up

The application I presented here does not yet actually do anything to the received tweets, but rather just demonstrates how to establish the Spark/Twitter connection. As a first further step one could experiment with processing the tweets by e.g. extracting and counting hashtags or creating usage statistics based on the geolocation. Be aware though that via the public streaming API you only receive a small sample of all tweets. If I find the time, I will write a post with another sample application that does some processing of the statuses.

Notes:

  1. Bollen J, Mao H, Zeng XJ (2011), Twitter mood predicts the stock market. Journal of Computational Science 2: 1–8. PDF
  2. This may at first sound like the holy grail of becoming rich using data analytics. However, the study was performed on an ex-post basis and thus can’t serve well as a trading guide.
  3. Sakaki T, Okazaki M, Matsuo Y (2010), Earthquake shakes Twitter users: real-time event detection by social sensors. WWW ’10 Proceedings of the 19th international conference on World wide web, pages 851-860. PDF
  4. This is a local filter, i.e. it filters the tweets after receiving them. Twitter also offers to pass filter parameters directly to the API call for server-side filtering.