Let’s analyze some real data using Map-Reduce. Common Crawl is a web crawl of the entire web by a non profit organization (but they seem to have some sponsors to pay for resources and they’re even hiring employees). Their datasets are provided in a public S3 bucket for free to the downloader. We will analyze the data using Hadoop (in my case on Amazon’s EMR). At first I tried to use Disco, but it caused a lot of effort and some day I got stuck with a problem to hard to invest more time.

If you use custom Hadoop or EMR you can use the nice library mrjob which can run locally in an inline mode (only on thread) to make debugging really need, locally in a multithreaded mode, on a normal hadoop cluster and on EMR - all with the same code and just a few different configurations. This simplified stuff for me really heavily compared to Disco.

Common Crawl provides three types of datasets in the newer crawls:

  • WARC files containing the request information, the response header and the response content
  • WAT files containing meta information from the crawled pages
  • WET files containing the textual response of the pages

To gain some experience and to do something fancy I now want to try to calculate the page rank from the Common Crawl dataset or at least a subset of it with the possibility to easily scale up - with just enough money. For this use case the WAT files are the right choice. For HTML pages they include information about the outgoing links. This means, we do not have to parse the whole HTML source from the WARC or WET files.

To start with the data, we should first have a look at it:

WARC/1.0
WARC-Type: metadata
WARC-Target-URI: http://0pointer.de/photos/ ...
WARC-Date: 2014-07-10T02:13:47Z
WARC-Record-ID: <urn:uuid:43545e2c-ee30-484d-8dbf-2474240d9f47>
WARC-Refers-To: <urn:uuid:cdcd12ad-fa7b-43e4-b461-c21099b5f377>
Content-Type: application/json
Content-Length: 1525

{"Envelope":{"Format":"WARC","WARC-Header-Length":"422", ...

This is an excerpt for one entry in a WAT file. The JSON line as well as the URI have been truncated by me. We see that there is a lot of meta information for each line, which is not really important for us. Moreover, from the WAT definition, we also know that the information is stored in JSON. So basically we can ignore this HTTP-like header and just try to parse all lines beginning wit { as JSON.

To even simplify stuff we can just rely on the warc library. In case you want to do the parsing on your own, I’d recommend the JSON library ujson, which is much faster than the standard Python implementation.

There is also some error handling going on, because we do not want the process to be stopped just because one line of input data is invalid or cannot be read as UTF-8.

So, let’s have a look at the main Job class. It might not be the cleanest code, because it was just a quick rewrite from the already finished Disco jobs. There might also be a far too many includes. I had to learn the hard way that you can only have one MRJob per file in mrjob library, because the non-inline variant will send the class file as job to the server and execute that file. If you have your whole startup code inside the file to sent to the server it will just execute recursively (which is caught and gives a nice error message).

import urlparse
import ujson as json
import sys
import time
import itertools
import getopt

from shutil import copyfile

import boto
import warc

from boto.s3.key import Key
from gzip import GzipFile
from mrjob.job import MRJob
from mrjob.launch import _READ_ARGS_FROM_SYS_ARGV
from mrjob.step import MRStep
from mrjob.protocol import RawValueProtocol, JSONProtocol, TextValueProtocol

class MRCalculateLinkGraph(MRJob):
    HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
    INPUT_PROTOCOL = RawValueProtocol

    def map_linkgraph(self, _, commoncrawl_file):
        if '\t' in commoncrawl_file:
            commoncrawl_file = commoncrawl_file.split('\t')[1]

        try:
            conn = boto.connect_s3(anon=True)
            bucket = conn.get_bucket('aws-publicdatasets', validate=True)
            key = Key(bucket, commoncrawl_file)

            import tempfile
            fp = tempfile.NamedTemporaryFile('w+b')
            content = key.read(4096)
            while content:
                fp.write(content)
                content = key.read(4096)
            fp.seek(0)

            f = warc.WARCFile(fileobj=GzipFile(fileobj=fp, mode='rb'))
            for record in f:
                if record['content-type'] != 'application/json':
                    continue

                payload = record.payload.read()

                try:
                    page_info = json.loads(payload)
                    page_url = page_info['Envelope']['WARC-Header-Metadata']['WARC-Target-URI']
                    page_domain = urlparse.urlparse(page_url).netloc
                    links = page_info['Envelope']['Payload-Metadata']['HTTP-Response-Metadata']['HTML-Metadata']['Links']
                    domains = set(filter(None, [urlparse.urlparse(url['url']).netloc for url in links]))
        
                    if len(domains) > 0:
                        yield page_domain, list(domains)
                except (KeyError, UnicodeDecodeError):
                    pass

            f.close()
        except:
            pass

    def reduce_linkgraph(self, domain, links):
        links = list(set(itertools.chain(*links)))
        yield domain, {'id': domain, 'state': 0, 'outgoing': links}

    def map_normalize(self, domain, data):
        yield data['id'], data

        for outgoing in data['outgoing']:
            yield outgoing, None

    def reduce_normalize(self, domain, data):
        self.increment_counter('linkgraph', 'size', 1)
        try:
            node = list(filter(lambda x: x is not None, data))[0]
        except IndexError:
            node = {'id': domain, 'state': 0, 'outgoing': []}
        
        yield domain, node

    def steps(self):
        return [MRStep(mapper=self.map_linkgraph, reducer=self.reduce_linkgraph),
                MRStep(mapper=self.map_normalize, reducer=self.reduce_normalize)]

if __name__ == '__main__':
    MRCalculateLinkGraph().run()

In the mapper we just parse the structure as we saw it earlier. We parse the URLs and only use the domain portions. Then we emit the domain with all the domains it points to.

In the reducer we take in all results from the mappers and just join all outgoing links from the same domain into one list.

There’s also a second step of mapper and reducer in this class, which makes sure that we have one line per domain - even if it does not have any outgoing links. This is important for the later steps of PageRank calculation and I call this step link graph normalization.

You might also recognize the NamedTemporaryFile creation. This is needed, because otherwise the AWS API will re-read the file from S3 several times. We can solve this issue by adding some intermediate code that reads the file from the network and temporarily stores it on the hard disk. That’s what the NamedTemporaryFile is there for.

Next, we have to take the output of this first step (namely the link graph) and calculate the PageRank for each website from this. The basic implementation of PageRank is quite easy. We have a set of websites, and each website has an initial PageRank. To calculate the real PageRank, an iterative approach is used. Take each website \(w\) and all outgoing links \(l_w\). Website \(w\) then passes on the value \(\frac{PR_w}{len(l_w)}\) to each of the outgoing website. Each website then sums up all incoming scores to calculate the new value for the PageRank. This step is done several times and converges (in an ideal situation) to the real PageRank.

The problem is this ideal situation. There’s a problem with dangling nodes, which break the PageRank calculation. These dangling nodes take away the whole PageRank from the net. We also have a problem with circles during the calculation. Thus, two advanced approaches have to be introduced:

  • a matrix for dangling nodes, which makes dangling nodes pass their own page rank to all websites in the graph
  • a uniform matrix simulating a random surfer that gets bored from following links and just calls a random website from the URL bar

I got stuck with these two matrices, because the naïve approach in map reduce would force you to emit a lot of intermediate values. The normal link graph is very sparse, but the new matrices are not. Michael Nielsen has a nice tutorial explaining these problems and some neat solutions. It is not possible to incorporate all matrices into map reduce by introducing another map reduce job that is executed right before the main map reduce job.

So, let’s first implement the Job for the dangling node solution. Michael Nielsen says that we can just calculate the inner product of the current page rank vector and the dangling node vector (i.e. 1 if the node is a dangling node, 0 otherwise).

import urlparse
import ujson as json
import sys
import time
import itertools
import getopt

from shutil import copyfile

import boto
import warc

from boto.s3.key import Key
from gzip import GzipFile
from mrjob.job import MRJob
from mrjob.launch import _READ_ARGS_FROM_SYS_ARGV
from mrjob.step import MRStep
from mrjob.protocol import RawValueProtocol, JSONProtocol, TextValueProtocol

class DanglingNodeJob(MRJob):
    INPUT_PROTOCOL = JSONProtocol
    OUTPUT_PROTOCOL = TextValueProtocol

    def mapper(self, website, node):
        if len(node['outgoing']) == 0:
            yield '_', str(node['state'])
        else:
            yield '_', str(0)

    def reducer(self, _, states):
        yield '_', str(sum(list(map(float, states))))

if __name__ == '__main__':
    DanglingNodeJob().run()

Next, we implement the main page rank class. For this I make use of the graph pattern in Map Reduce.

import urlparse
import ujson as json
import sys
import time
import itertools
import getopt

from shutil import copyfile

import boto
import warc

from boto.s3.key import Key
from gzip import GzipFile
from mrjob.job import MRJob
from mrjob.launch import _READ_ARGS_FROM_SYS_ARGV
from mrjob.step import MRStep
from mrjob.protocol import RawValueProtocol, JSONProtocol, TextValueProtocol

class PageRankJob(MRJob):
    INPUT_PROTOCOL = JSONProtocol

    def configure_options(self):
        super(PageRankJob, self).configure_options()
        self.add_passthrough_option('--graph-size', dest='size_of_web', type='int', default=0) 
        self.add_passthrough_option('--dangling-node-pr', dest='dangling_node_pr', type='float', default=0) 
        self.add_passthrough_option('--damping-factor', dest='damping_factor', type='float', default=0.85) 

    def mapper(self, website, node):
        yield website, ('node', node)

        for outgoing in node['outgoing']:
            msg = node['state'] / len(node['outgoing'])
            yield outgoing, ('msg', msg)

    def reducer(self, website, data):
        node = None
        msgs = []

        for msg_type, msg_val in data:
            if msg_type == 'node':
                node = msg_val
            elif msg_type == 'msg':
                msgs.append(msg_val)

        if node != None:
            node['state'] = self.options.damping_factor * sum(msgs) \
                    + self.options.damping_factor * self.options.dangling_node_pr / self.options.size_of_web \
                    + (1 - self.options.damping_factor) / self.options.size_of_web
            yield website, node


if __name__ == '__main__':
    PageRankJob().run()

Above, you also saw that we had another parameter size_of_web. We also have to calculate this using a map reduce job.

import urlparse
import ujson as json
import sys
import time
import itertools
import getopt

from shutil import copyfile

import boto
import warc

from boto.s3.key import Key
from gzip import GzipFile
from mrjob.job import MRJob
from mrjob.launch import _READ_ARGS_FROM_SYS_ARGV
from mrjob.step import MRStep
from mrjob.protocol import RawValueProtocol, JSONProtocol, TextValueProtocol

class CountGraphNodesJob(MRJob):
    INPUT_PROTOCOL = JSONProtocol
    OUTPUT_PROTOCOL = TextValueProtocol

    def mapper(self, website, node):
        yield '_', website

        for outgoing in node['outgoing']:
            yield '_', outgoing

    def reducer(self, _, websites):
        yield 'count', str(len(set(websites)))


if __name__ == '__main__':
    CountGraphNodesJob().run()

We will also need one job that distributes the initial page rank to each page, because the pages itself cannot know how many pages the whole web has:

import urlparse
import ujson as json
import sys
import time
import itertools
import getopt

from shutil import copyfile

import boto
import warc

from boto.s3.key import Key
from gzip import GzipFile
from mrjob.job import MRJob
from mrjob.launch import _READ_ARGS_FROM_SYS_ARGV
from mrjob.step import MRStep
from mrjob.protocol import RawValueProtocol, JSONProtocol, TextValueProtocol

class DistributeInitialPageRankJob(MRJob):
    INPUT_PROTOCOL = JSONProtocol

    def configure_options(self):
        super(DistributeInitialPageRankJob, self).configure_options()
        self.add_passthrough_option('--graph-size', dest='size_of_web', type='int', default=0) 

    def mapper(self, website, node):
        node['state'] = 1.0 / self.options.size_of_web
        yield website, node


if __name__ == '__main__':
    DistributeInitialPageRankJob().run()

Putting it all together

After we have created all job classes, we can finally put them all together and run a whole task. I recognized that the calculation of the pagerank including initial page rank distribution etc. is not really slow, so the overhead of Hadoop is more expensive that just doing it locally. Thus, in the real use cases I just executed the first step (link graph creation) on EMR and all other jobs locally, after I downloaded the data from S3. However, I only tried it on 2 percent of the full data. Maybe on 100 percent, it’s worth doing the calculation a real hadoop system.

from awsplayground.tools.mrjob.count_graph_nodes import CountGraphNodesJob
from awsplayground.tools.mrjob.calc_link_graph_job import MRCalculateLinkGraph
from awsplayground.tools.mrjob.dangling_node_pr import DanglingNodeJob
from awsplayground.tools.mrjob.page_rank import PageRankJob
from awsplayground.tools.mrjob.initial_pagerank import DistributeInitialPageRankJob

import time
import sys

output_dir = 's3://<something>/<something-else>'
input_file = sys.argv[1]
runner_type = 'emr'
emr_args = ['--cluster-id', 'j-<cluster-nr>', '--conf-path', 'mrjob.conf']

current_time = time.strftime("%Y%m%d-%H%M%S")
job1 = MRCalculateLinkGraph(['-r', runner_type, input_file,
    '--output', output_dir + '-linkgraph-' + current_time] + emr_args)
job1.set_up_logging()
with job1.make_runner() as runner1:
    runner1.run()
    link_graph_result = runner1.get_output_dir()

    job2 = CountGraphNodesJob(['-r', runner1.alias, link_graph_result] + emr_args)
    job2.set_up_logging()
    with job2.make_runner() as runner2: 
        runner2.run()
        secondOutput = runner2.get_output_dir()
        graph_size = int(list(runner2.stream_output())[0].strip())

    job4 = DistributeInitialPageRankJob(['-r', runner1.alias, link_graph_result, '--graph-size', str(graph_size)] + emr_args)
    job4.set_up_logging()
    with job4.make_runner() as runner4:
        runner4.run()
        current_graph = runner4.get_output_dir()

        max_iterations = 15 # I read normally you want like 30 iterations, but we only have a part of the whole data
        for i in range(max_iterations):
            job5 = DanglingNodeJob(['-r', runner1.alias, current_graph] + emr_args)
            job5.set_up_logging()
            with job5.make_runner() as runner5:
                runner5.run()
                dangling_node_pr = float(list(runner5.stream_output())[0].strip())
            
            current_time = time.strftime("%Y%m%d-%H%M%S")
            args = ['-r', runner1.alias, current_graph,
                    '--graph-size', str(graph_size),
                    '--dangling-node-pr', str(dangling_node_pr),
                    '--damping-factor', str(0.85)] + emr_args

            if i == max_iterations - 1: # last iteration
                args += ['--output', output_dir + '-' + str(i) + '-' + current_time]

            job6 = PageRankJob(args)
            job6.set_up_logging()
            with job6.make_runner() as runner6:
                runner6.run()
                print(runner6.get_output_dir())

Further reading

In case you are interested in the calculation of the PageRank and its mathematical foundations and already have a university education in algebra, I can recommend the book Google’s PageRank and Beyond by Langville and Meyer.

I do not maintain a comments section. If you have any questions or comments regarding my posts, please do not hesitate to send me an e-mail to blog@stefan-koch.name.