Let’s analyze some real data using Map-Reduce.
Common Crawl is a
web crawl of the entire web by a non profit organization (but they seem to
have some sponsors to pay for resources and they’re even hiring employees).
Their datasets are provided in a public S3 bucket for free to the
downloader. We will analyze the data using Hadoop (in my case on Amazon’s
EMR). At first I tried to use Disco, but it caused a lot of effort and some day I got stuck
with a problem to hard to invest more time.
If you use custom Hadoop or EMR you can use the nice library mrjob
which can run locally in an inline mode (only on thread)
to make debugging really
need, locally in a multithreaded mode, on a normal hadoop cluster and on EMR -
all with the same code and just a few different configurations. This simplified
stuff for me really heavily compared to Disco.
Common Crawl provides three types of datasets in the newer crawls:
WARC files containing the request information, the response header and the
response content
WAT files containing meta information from the crawled pages
WET files containing the textual response of the pages
To gain some experience and to do something fancy I now want to try to
calculate the page rank from the Common Crawl dataset or at least a subset of
it with the possibility to easily scale up - with just enough money. For
this use case the WAT files are the right choice. For HTML pages they include
information about the outgoing links. This means, we do not have to parse
the whole HTML source from the WARC or WET files.
To start with the data, we should first have a look at it:
This is an excerpt for one entry in a WAT file. The JSON line as well
as the URI have been truncated by me. We see that there is a lot of meta
information for each line, which is not really important for us. Moreover, from
the WAT definition, we also know that the information is stored in JSON. So
basically we can ignore this HTTP-like header and just try to parse all lines
beginning wit { as JSON.
Creating a link graph from Common Crawl
To even simplify stuff we can just rely on the warc library.
In case you want to do the parsing on your own, I’d recommend the JSON
library ujson, which is much faster than the standard Python
implementation.
There is also some error handling going on, because we do not want the
process to be stopped just because one line of input data is invalid or
cannot be read as UTF-8.
So, let’s have a look at the main Job class. It might not be the cleanest code,
because it was just a quick rewrite from the already finished Disco jobs. There
might also be a far too many includes. I had to learn the hard way that you can
only have one MRJob per file in mrjob library, because the non-inline
variant will send the class file as job to the server and execute that file.
If you have your whole startup code inside the file to sent to the server
it will just execute recursively (which is caught and gives a nice
error message).
In the mapper we just parse the structure as we saw it earlier. We parse
the URLs and only use the domain portions. Then we emit the domain with all
the domains it points to.
In the reducer we take in all results from the mappers and just join all
outgoing links from the same domain into one list.
There’s also a second step of mapper and reducer in this class, which makes
sure that we have one line per domain - even if it does not have any outgoing
links. This is important for the later steps of PageRank calculation and I call
this step link graph normalization.
You might also recognize the NamedTemporaryFile creation. This is needed,
because otherwise the AWS API will re-read the file from S3 several times.
We can solve this issue by adding
some intermediate code that reads the file from the network and
temporarily stores it on the hard disk. That’s what the NamedTemporaryFile
is there for.
Calculating the PageRank from the link graph
Next, we have to take the output of this first step (namely the link graph)
and calculate the PageRank for each website from this. The basic implementation
of PageRank is quite easy. We have a set of websites, and each website has an
initial PageRank. To calculate the real PageRank, an iterative approach is
used. Take each website \(w\) and all outgoing links \(l_w\). Website \(w\) then
passes on the value \(\frac{PR_w}{len(l_w)}\) to each of the outgoing website.
Each website then sums up all incoming scores to calculate the new value for
the PageRank. This step is done several times and converges (in an ideal
situation) to the real PageRank.
The problem is this ideal situation. There’s a problem with dangling nodes,
which break the PageRank calculation. These dangling nodes take away the
whole PageRank from the net. We also have a problem with circles during the
calculation. Thus, two advanced approaches have to be introduced:
a matrix for dangling nodes, which makes dangling nodes pass their own
page rank to all websites in the graph
a uniform matrix simulating a random surfer that gets bored from following
links and just calls a random website from the URL bar
I got stuck with these two matrices, because the naïve approach in
map reduce would force you to emit a lot of intermediate values. The normal
link graph is very sparse, but the new matrices are not.
Michael Nielsen has a nice tutorial explaining these
problems and some neat solutions.
It is not possible to incorporate all
matrices into map reduce by introducing another map reduce job that is executed
right before the main map reduce job.
So, let’s first implement the Job for the dangling node solution. Michael
Nielsen says that we can just calculate the inner product of the current page
rank vector and the dangling node vector (i.e. 1 if the node is a dangling
node, 0 otherwise).
Above, you also saw that we had another parameter size_of_web. We also have
to calculate this using a map reduce job.
We will also need one job that distributes the initial page rank to each page,
because the pages itself cannot know how many pages the whole web has:
Putting it all together
After we have created all job classes, we can finally put them all together
and run a whole task. I recognized that the calculation of the pagerank
including initial page rank distribution etc. is not really slow, so the
overhead of Hadoop is more expensive that just doing it locally. Thus, in the
real use cases I just executed the first step (link graph creation) on EMR
and all other jobs locally, after I downloaded the data from S3. However,
I only tried it on 2 percent of the full data. Maybe on 100 percent, it’s worth
doing the calculation a real hadoop system.
Further reading
In case you are interested in the calculation of the PageRank and its
mathematical foundations and already have a university education in algebra,
I can recommend the book Google’s PageRank and Beyond by Langville
and Meyer.
I do not maintain a comments section. If you have any questions
or comments regarding my posts, please do not hesitate to send me
an e-mail to blog@stefan-koch.name.