Feature grouping based on string distances using the ‘stringdist’ R package

I’m currently working on Kaggle’s “What’s Cooking” challenge. Basically, using data provided by the recipe site Yummly, the task is to predict a dish’s cuisine based on its ingredients. An example entry from the dataset looks like this:

The 39774 recipes in the training set have in total a unique number of 6716 ingredients. Or rather, unique in the sense that their strings don’t equal. In fact, there are a quite some ingredients that are identical or at least very similar under different names. In the remainder of this post I’m going to describe how I use the ‘stringdist’ R package to tackle the problem of cleaning the dataset by grouping such ingredients together.

The challenge: Identical Ingredients with different names

Since the dataset covers recipes from 20 different cuisines, there is a large number of truly unique ingredients. However, it appears that the input on Yummly is free text, which leads to duplicates like these (ingredients_unique is a character vector):

Now, arguably, these are not all the same from a culinary perspective. However, they are at least closely related and belong primarily to the italian cuisine. In a cleaned dataset, we would like to group these together as to reduce the number of similar features to make the remaining ones more meaningful. At the same time, it is important to avoid false groupings as the distortions they would bring are likely more harmful than the benefits of correct grouping.

The stringdist package

stringdist is an R package for measuring the distance (i.e. how much they differ) between strings. It implements several ametrics for this including the well-known Levenshtein distance and cosine similarity and even the phonetic Soundex algorithm, which however turned out to be inadequate for the task at hand. While the stringdist measures the distance for two strings, stringdistmatrix takes two lists of strings and produces a matrix by comparing each string of list A with each string of list B.

Applying stringdist to the ingredients

The idea is to find ingredients using stringdist that are so close together by the chosen metric that it is safe to treat them as identical without manual checking. Of course, for this number of ingredients a manual approach is still feasible, but for similar use cases of a much higher dimension it is probably not. To start with, we compute the Levenshtein distance and inspect those pairs which have the lowest possible difference of 1 (0 means they are identical):

The resulting list contains many true positives, e.g:

“lemongrass” “lemon grass”
“sambal olek” “sambal ulek”
“thyme sprig” “thyme sprigs”
“crema mexican” “crema mexicana”
“self raising flour” “self rising flour”

But, unfortunately, also very false positives:

“ice” “rice”
“jam” “ham”
“malt” “salt”
“peas” “pears”
“beer” “beef”

Clearly, the list as it is can’t be used to perform the grouping. Let’s do the same with a continuous measure, cosine distance, and look at those with a value of smaller than or equal to 0.03 and greater than 0:

This finds other true positives, such as:

“whole wheat spaghettini” “whole wheat thin spaghetti”
“skinless and boneless chicken breast fillet” “skinless chicken breast fillets”
“garlic chili sauce” “Thai chili garlic sauce”

But also other false positives (omitted here). One characteristic is that it finds duplicates with reordered or additional adjectives/words, that often describe the same base ingredient like above, but not always (I wouldn’t want to use “unsalted peanut butter” instead of “unsalted butter”). Again, the list can’t be used unfiltered. Now, both lists provided true and false positives. Is there a way to get just the true positives or at least a subset of it? Fortunately, there is, albeit it comes at the cost of also losing true positives: Combine different measures. What if, for example, we took all ingredients with a Levenshtein distance of 1 and a maximum cosine distance smaller than 0.03, but larger than 0?

This results in a list of 41 distinct pairs, which – I’m not going to paste it here, I hope you believe me – are indeed all true duplicates. Compared with 198 cosine and 161 Levenshtein matches this is not much, but following the hypothesis that false negatives hurt less than false positives, the combination of the metrics is an improvement. I’m still in the process of tweaking the values and there is likely a better combination. For example, accepting up to 1.1 combined score yields 91 pairs of which – by visual inspection – almost all are true positives.

What about the mozzarellas?

I’ve chosen the different variants of mozzarella as an example for duplicates. Unfortunately, none of these can be found using the combined filter described above, because they have too large an edit distance. The average Levenshtein distance between them is as follows:

This is still a slight underestimation, because the diagonal of the distance matrix is 0, but 13 is already far too high for a filter criterion. For brevity I won’t include the averages of the other distance metrics here, but they are not suitable either. A more promising approach would be to match the ingredients with a second ingredient database free of duplicates, but the competition’s rules forbid the use of external databases.

To summarize, the stringdist package (or, more generally, the underlying metrics) is a good starting point for problems like this one. The weighing of false positives and false negatives depends on the particular task. While in this case avoiding false positives appears more important than not catching all true positives, it can be different in other cases.

QuandlSync: Keep up-to-date copies of Quandl datasets

Quandl is a great search engine for (mostly) financial, economic and social datasets. There are millions of regularly updated datasets from central banks, international organisations, federal statistics bureaus etc. available for free access in different formats next to premium datasets that require a subscription. Quandl provides an easy-to-use API and packages for Python, R and others. While it is easy to download single datasets using the API (e.g. Quandl.get("WIKI/GOOGL", type="xts") will give you an EOD history of Google’s stock price history), downloading a copy of whole collections is not as easy. A collection is a database of datasets, e.g. of EOD data of all major US stocks. Downloading and regularly updating one or more full collections is useful for a variety of analysis tasks that require up-to-date data.

To make this task easy I have written a small Python tool called QuandlSync that does exactly this. The code plus some documentation is available at GitHub.

Processing one input file per map process with Hadoop Streaming

This is just a very short post about Hadoop Streaming which I hope will be helpful for people who stumble upon the same problem. For those who don’t know Hadoop streaming: It enables you to create MapReduce jobs with arbitrary executables/scripts as the mapper and/or reducer so you don’t have to write a Java wrapper application to execute it on your Hadoop cluster. This allows for parallelized execution of e.g. a file compression utility for a large set of files with just one command line call.

Problem statement

Suppose you would like to process a set of files using Hadoop streaming with one map job per input file. The official FAQ recommends to:
Generate a file containing the full HDFS path of the input files. Each map task would get one file name as input.1

I’ve followed this advice, but for some reason I couldn’t figure out Hadoop created jobs in a different fashion than I had expected. 2 For example, in one use case I had only two files listed in my input file on a 10 node test cluster. Based on different debug specifications for mapreduce.map.tasks it either (1) processed just one file at the same time but with two mappers each or (2) it processed both at the same time and again with two mappers each. For the mapper I’m using this is a no-go because the output file for the corresponding input file then is written to by both mapper processes causing inconsistencies to the output.

Problem solution

The solution was quite straightforward. Instead of using the default TextInputFormat I passed org.apache.hadoop.mapred.lib.NLineInputFormat as the parameter for the -inputformat switch. In my mapper script I now receive not just the file name as input, but key values pair, the key being the start position of the line in the file and value being the line contents separated by a tab. My mapper script is written in Python, so after doing a

I have the input file name for my map job and do the actual processing.


  1. http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/HadoopStreaming.html#How_do_I_process_files_one_per_map
  2. Admittedly, one thing I haven’t checked is whether this happens because I’m not using hdfs:// style URLs, but the corresponding NFS file paths instead due to the fact that I’m operating on the HDFS via the HDFS NFS Gateway. But even if it was different for hdfs:// URLs, the solution will be useful for other NFS gateway users.

Finding “Mutual Friends” with MapReduce / Hadoop

A feature every Facebook user knows is the “Mutual Friends” list you see when visiting somebody’s profile. Like the name says, it shows the friends that you have in common with that person. Facebooks implementation for this is, of course, not public. Broadly speaking the two options there are is (first) that they could either generate the list in real time as you open up a profile page and cache the result for later retrieval or (second) they could create it batch-style and read the results whenever necessary. I guess it is safe to say that Facebook is not using the very basic MapReduce algorithm I’m going to show in the remainder of this post, but for illustrating how this problem could be parallelized it is interesting nonetheless.

How many different pairs of users exist for a user base of size n? The question boils down to the handshake problem, i.e. “In a room of n people, how many different handshakes are possible?”. The result can easily be calculated using the binomial coefficient, it is: n choose 2. For a user base of 100 people these are only a modest 4950 pairs, but for a user database as large as Facebook’s with 1.35 billion monthly active users it is an impressive ~9.11×10^15 number of pairs. Consider that every user can be identified with a 4 byte integer and that there is only one common friend on average, each pair takes only a modest 12 bytes to store, but summed up we are in the range of petabytes. Of course, for a substantial share of user combinations there will be no common friends at all – despite Facebook’s global nature, many users only have friends within their home country, so no mutual friends with users of any other country -, but the resulting table will still look huge compared to your hard drive’s capacity.

The algorithm 1

The algorithm is implemented using one mapper and one reducer.


Suppose for each person the friends are stored as: Person -> [Friends]. For each friend in the list, the mapper emits as key a pair of the person along with the friend and the list of friends again. Or in pseudocode: for(each person in the friends list) { Emit(<friend pair>, <friends>); }
The key point here is that keys are sorted before being processed by the reducer, so all friend pairs will go to the same reducer. Take as an example the following person and its friends:
Amy -> [Betty, Charlie, Daniel, Eric]
The map phase now outputs the following pairs:
[Amy, Betty] -> [Betty, Charlie, Daniel, Eric]
[Amy, Charlie] -> [Betty, Charlie, Daniel, Eric]
[Amy, Daniel] -> [Betty, Charlie, Daniel, Eric]


The reducer takes as input the key/value pairs of the map phase. For each pair (key) it calculates the intersection of the corresponding friends list. Again in pseudocode: for(each pair of users from map phase) { Emit(<pair>, <intersection of friends lists>) }

Suppose Charlie’s friends list is as follows:
Charlie -> [Amy, Betty, Daniel, Jesse, Mike]
Then the relevant key/value pair of the map phase for calculating Amy’s and Charlie’s common friends is:
[Charlie, Amy] -> [Amy, Betty, Daniel, Jesse, Mike]

The intersection of these two lists is:
[Betty, Charlie, Daniel, Eric] inter [Amy, Betty, Daniel, Jesse, Mike = [Betty, Daniel]

In other words, Amy and Charlie have two mutual friends, Betty and Daniel.

Java Hadoop implementation

For simplicity we assume that our friends lists are stored in a plain text file with one line per user and everything separated by spaces, e.g.
Amy Betty Charlie Daniel Eric, where Amy is the user in question and the remaining entries her friends. Of course, in a real-life application you will use a database as input and not work with the user’s names but their unique IDs instead, but for illustrational purposes of the algorithm both is equivalent. Following is the complete source code:

Downloading and Running

I have made available the source code along with a Maven build file project on GitHub: https://github.com/mgoettsche/MutualFriendsHadoopMapReduce. Of course, to execute it you need a running Hadoop installation. I’m not going to cover the Hadoop setup here, but the official single node setup guide should get you started quickly depending on your previous exposure to Linux system administration.
Here is an example run of the application:

hadoop@HadoopVM:~/mutualfriends$ hadoop fs -cat mutualfriends/input/input.txt
hadoop@HadoopVM:~/mutualfriends$ hadoop jar MutualFriendsMapReduce/MutualFriendsMapReduce-1.0-SNAPSHOT.jar MutualFriends mutualfriends/input mutualfriends/output              
15/01/09 10:13:14 INFO client.RMProxy: Connecting to ResourceManager at /
15/01/09 10:13:14 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
15/01/09 10:13:14 INFO input.FileInputFormat: Total input paths to process : 1
15/01/09 10:13:14 INFO mapreduce.JobSubmitter: number of splits:1
15/01/09 10:13:14 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1420483488341_0002
15/01/09 10:13:15 INFO impl.YarnClientImpl: Submitted application application_1420483488341_0002
15/01/09 10:13:15 INFO mapreduce.Job: The url to track the job: http://HadoopVM:8088/proxy/application_1420483488341_0002/
15/01/09 10:13:15 INFO mapreduce.Job: Running job: job_1420483488341_0002
15/01/09 10:13:21 INFO mapreduce.Job: Job job_1420483488341_0002 running in uber mode : false
15/01/09 10:13:21 INFO mapreduce.Job:  map 0% reduce 0%
15/01/09 10:13:26 INFO mapreduce.Job:  map 100% reduce 0%
15/01/09 10:13:32 INFO mapreduce.Job:  map 100% reduce 100%
15/01/09 10:13:32 INFO mapreduce.Job: Job job_1420483488341_0002 completed successfully

hadoop@HadoopVM:~/mutualfriends$ hadoop fs -cat mutualfriends/output/part-r-00000
AB      CD
AC      BD
AD      BC
BC      ADE
BD      ACE
BE      CD
CD      ABE
CE      BD
DE      BC

The last command shows the resulting output for the given test data. The first column is the user pair and the second the mutual friends, e.g. user C and D have A, B and E as mutual friends.


  1. I’m borrowing from Steve Krenzel’s article in this section

Accessing Twitter using Spark Streaming: A minimal example application

In recent years, Twitter data has been used in various studies. An often cited example is the prediction of stock market changes based on the Twitter mood 1. Bollen et al. employed mood tracking tools to measure the mood in 6 dimensions and found that using this data they were able to predict with an accuracy of 87.6% the up and down changes of the Dow Jones. 2

Another interesting application of live Twitter data for predictive purpose is for the case of earthquake warnings. Researchers of the University of Tokyo 3 developed a classifier for tweets based on keywords etc. to scan for earthquake related posts and were able to detect earthquakes in Japan with a probability of 96%. Based on this system they run a notification service and claim to be on average faster than the Japan Meteorological Agency.

Long story short, examples like these show that interesting knowledge can be extracted from a large pool of seemingly banal short messages. In the remainder of this post I’m going to demonstrate with a minimal runnable example how Twitter live data can be accessed using Apache Spark, an open-source engine for data processing currently gaining ever more popularity.

Apache Spark comes with a component called Spark Streaming for, guess what, performing streaming analytics. For that to work it provides data from the stream source in small batches to the node for processing. A great point to note here is that you can use the same operations in batch and streaming mode which makes it easy to switch between the two.


First, You will need to have Spark installed. The application works in standalone mode, so no server configuration is required. Second, to connect to Twitter they require you to create an application on their developer portal at apps.twitter.com (if this step is unclear to you, please refer to e.g. this guide). Third and last, you can download the complete source code from: github.com/mgoettsche/SparkTwitterHelloWorldExample

The Application

Below is the code for a minimal runnable Spark application using the Twitter Streaming API. All the application does is printing the first ten (or less than ten if Twitter provided less) tweets of each one second interval in an infinite loop. For the application to work, copy and paste the relevant parameters of your Twitter application into lines 14-17.

As you can see from the access properties’ names, Spark uses the Twitter4J library and the stream provides objects of class Twitter4J.Status, which, besides getText() provides methods for accessing data other than just the status message.

For example, you can also access the location from where the user posted the status using getGeoLocation() which returns an GeoObject object. Posting with the current location is an opt-in feature so not all tweets have a location attached. Luckily, Spark makes it easy to filter streams 4. Replacing lines 29-33 with:

will only keep the tweets with the user’s location attached and output their location and text to the standard output.

Building and Running

I have included a Maven pom.xml for building the application. There is nothing special about it except perhaps that it uses the maven-assembly-plugin to build a JAR including the Spark Streaming Twitter library to avoid ClassNotFoundExceptions when deploying the app. To build, simply type execute

And to execute

Example output:


Wrapping up

The application I presented here does not yet actually do anything to the received tweets, but rather just demonstrates how to establish the Spark/Twitter connection. As a first further step one could experiment with processing the tweets by e.g. extracting and counting hashtags or creating usage statistics based on the geolocation. Be aware though that via the public streaming API you only receive a small sample of all tweets. If I find the time, I will write a post with another sample application that does some processing of the statuses.


  1. Bollen J, Mao H, Zeng XJ (2011), Twitter mood predicts the stock market. Journal of Computational Science 2: 1–8. PDF
  2. This may at first sound like the holy grail of becoming rich using data analytics. However, the study was performed on an ex-post basis and thus can’t serve well as a trading guide.
  3. Sakaki T, Okazaki M, Matsuo Y (2010), Earthquake shakes Twitter users: real-time event detection by social sensors. WWW ’10 Proceedings of the 19th international conference on World wide web, pages 851-860. PDF
  4. This is a local filter, i.e. it filters the tweets after receiving them. Twitter also offers to pass filter parameters directly to the API call for server-side filtering.