Developing a Map Reduce
Application
Developing a Map Reduce Application
    • Writing a program in MapReduce follows a certain pattern.
    • You start by writing your map and reduce functions, ideally with
      unit tests to make sure they do what you expect.
    • Then you write a driver program to run a job, which can run from
      your IDE using a small subset of the data to check that it is
      working.
Conti…
   • If it fails, you can use your IDE’s debugger to find the source of the
     problem.
   • When the program runs as expected against the small dataset,
     you are ready to unleash it on a cluster.
   • Running against the full dataset is likely to expose some more
     issues, which you can fix by expanding your tests and altering your
     mapper or reducer to handle the new cases.
Conti…
   • After the program is working, you may wish to do some
     tuning :
       • First by running through some standard checks for making
         MapReduce programs faster
       • Second by doing task profiling.
   • Profiling distributed programs are not easy, but Hadoop has
     hooks to aid in the process.
   • Before we start writing a MapReduce program, we need to
     set up and configure the development environment.
Conti…
   • Components in Hadoop are configured using Hadoop’s own
     configuration API.
   • An instance of the Configuration class represents a collection of
     configuration properties and their values.
   • Each property is named by a String, and the type of a value may
     be one of several, including Java primitives such as boolean, int,
     long, and float and other useful types such as String, Class, and
     java.io.File; and collections of Strings.
Unit tests with MR unit
• MRUnit is a JUnit-based Java library that allows us to unit test Hadoop
  MapReduce programs.
• This makes it easy to develop as well as to maintain Hadoop
  MapReduce code bases.
• MRUnit supports testing Mappers and Reducers separately as well as
  testing MapReduce computations as a whole.
Conti…
   • Hadoop MapReduce jobs have a unique code architecture that
     follows a specific template with specific constructs.
   • This architecture raises interesting issues when doing test-driven
     development (TDD) and writing unit tests.
   • With MRUnit, you can craft test input, push it through your
     mapper and/or reducer, and verify its output all in a JUnit test.
   • As do other JUnit tests, this allows you to debug your code using
     the JUnit test as a driver.
Conti…
   • A map/reduce pair can be tested using MRUnit’s
     MapReduceDriver. , a combiner can be tested using
     MapReduceDriver as well.
   • A PipelineMapReduceDriver allows you to test a workflow of
     map/reduce jobs.
   • Currently, partitioners do not have a test driver under MRUnit.
   • MRUnit allows you to do TDD(Test Driven Development) and write
     lightweight unit tests which accommodate Hadoop’s specific
     architecture and constructs.
Conti…
    Example
   • We’re processing road surface data used to create maps.
   • The input contains both linear surfaces and intersections. The
     mapper takes a collection of these mixed surfaces as input,
     discards anything that isn’t a linear road surface, i.e.,
     intersections, and then processes each road surface and writes it
     out to HDFS.
   • We can keep count and eventually print out how many non-road
     surfaces are input.
   • For debugging purposes, we can additionally print out how many
     road surfaces were processed.
Anatomy of a Map Reduce job run
    • Hadoop Framework comprises of two main components :
          • Hadoop Distributed File System (HDFS) for Data
            Storage
          • MapReduce for Data Processing.
    • A typical Hadoop MapReduce job is divided into a set of Map and
      Reduce tasks that execute on a Hadoop cluster.
Conti…
• The execution flow occurs as follows:
      • Input data is split into small subsets of data.
      • Map tasks work on these data splits.
      • The intermediate input data from Map tasks are then
        submitted to Reduce task after an intermediate process
        called ‘shuffle’.
      • The Reduce task(s) works on this intermediate data to
        generate the result of a MapReduce Job.
Failures, Shuffle and Sort
    • Shuffle phase in Hadoop transfers the map output from Mapper
      to a Reducer in MapReduce.
    • Sort phase in MapReduce covers the merging and sorting of map
      outputs.
    • Data from the mapper are grouped by the key, split among
      reducers and sorted by the key.
    • Every reducer obtains all values associated with the same key.
    • Shuffle and sort phase in Hadoop occur simultaneously and are
      done by the MapReduce framework.
Conti…
Conti…
Conti…
   • The process of transferring data from the mappers to reducers is
     known as shuffling i.e. the process by which the system performs
     the sort and transfers the map output to the reducer as input.
   • Sorting in Hadoop helps reducer to easily distinguish when a new
     reduce task should start. This saves time for the reducer. Reducer
     starts a new reduce task when the next key in the sorted input
     data is different than the previous. Each reduce task takes key-
     value pairs as input and generates key-value pair as output.
Job Scheduling
    • Early versions of Hadoop had a very simple approach to
      scheduling users’ jobs: they ran in order of submission, using a
      FIFO scheduler.
    • Typically, each job would use the whole cluster, so jobs had to
      wait their turn.
    • Although a shared cluster offers great potential for offering large
      resources to many users, the problem of sharing resources fairly
      between users requires a better scheduler.
    • Production jobs need to complete in a timely manner while
      allowing users who are making smaller ad hoc queries to get
      results back in a reasonable time.
    • The ability to set a job’s priority was added, via the mapred.
Conti…
   • When the job scheduler is choosing the next job to run, it selects
     the one with the highest priority.
   • However, with the FIFO scheduler, priorities do not support
     preemption, so a high-priority job can still be blocked by a long-
     running low priority job that started before the high-priority job
     was scheduled.
   • MapReduce in Hadoop comes with a choice of schedulers.
   • The default is the original FIFO queue-based scheduler, and there
     are also multiuser schedulers called :
         • The Fair Scheduler
         • The Capacity Scheduler.
Conti…
    Fair Scheduler
   • Aims to give every user a fair share of the cluster capacity over time.
   • If a single job is running, it gets all of the clusters.
   • As more jobs are submitted, free task slots are given to the jobs in such
     a way as to give each user a fair share of the cluster.
   • A short job belonging to one user will complete in a reasonable time
     even while another user’s long job is running, and the long job will still
     make progress.
   • Jobs are placed in pools, and by default, each user gets their own pool.
   • The Fair Scheduler supports preemption, so if a pool has not received
     its fair share for a certain period of time, then the scheduler will kill
     tasks in pools running over capacity in order to give the slots to the
     pool running under capacity.
Conti…
    Capacity Scheduler
   • Takes a slightly different approach to multiuser scheduling.
   • A cluster is made up of a number of queues (like the Fair Scheduler’s
     pools), which may be hierarchical (so a queue may be the child of
     another queue), and each queue has an allocated capacity.
   • This is like the Fair Scheduler, except that within each queue, jobs
     are scheduled using FIFO scheduling (with priorities).
   • The Capacity Scheduler allows users or organizations to simulate a
     separate MapReduce cluster with FIFO scheduling for each user or
     organization.
   • The Fair Scheduler, by contrast, enforces fair sharing within each
     pool, so running jobs share the pool’s resources.
Task Execution
    • After the task tracker assigns a task, the next step is for it to run
      the task.
    • First, it localizes the job JAR by copying it from the shared
      filesystem to the tasktracker’s filesystem.
    • It also copies any files needed from the distributed cache by the
      application to the local disk.
    • Second, it creates a local working directory for the task and un-jars
      the contents of the JAR into this directory.
Conti…
   • Third, it creates an instance of TaskRunner to run the task.
   • TaskRunner launches a new Java Virtual Machine to run each task
     so that any bugs in the user-defined map and reduce functions
     don’t affect the task tracker (by causing it to crash or hang, for
     example).
   • It is, however, possible to reuse the JVM between tasks.
   • The child process communicates with its parent through the
     umbilical interface.
   • This way it informs the parent of the task’s progress every few
     seconds until the task is complete.
Map Reduce Types
    • Hadoop uses the MapReduce programming model for the data
      processing of input and output for the map and to reduce
      functions represented as key-value pairs.
    • They are subject to the parallel execution of datasets situated in a
      wide array of machines in a distributed architecture.
    • The programming paradigm is essentially functional in nature in
      combining while using the technique of map and reduce.
Conti…
   • Mapping is the core technique of processing a list of data
     elements that come in pairs of keys and values.
   • The map function applies to individual elements defined as key-
     value pairs of a list and produces a new list.
   • The general idea of the map and reduce the function of Hadoop
     can be illustrated as follows:
                  map: (K1, V1)-> list (K2, V2)
                  reduce: (K2, list(V2)) -> list (K3, V3)
Conti…
   • The input parameters of the key and value pair, represented by K1
     and V1 respectively, are different from the output pair type: K2
     and V2.
   • The reduce function accepts the same format output by the map,
     but the type of output again of the reduce operation is different:
     K3 and V3.
Conti…
• The Java API for this is as follows:
Conti…
   • The OutputCollector is the generalized interface of the Map-
     Reduce framework to facilitate the collection of data output either
     by the Mapper or the Reducer.
   • These outputs are nothing but the intermediate output of the job.
   • Therefore, they must be parameterized with their types.
   • The reporter facilitates the Map-Reduce application to report
     progress and update counters and status information.
Conti…
   • If the combine function is used, it has the same form as the
     reduce function and the output is fed to the reduce function.
   • This may be illustrated as follows:
          map: (K1, V1) → list (K2, V2)
          combine: (K2, list(V2)) → list (K2, V2)
          reduce: (K2, list(V2)) → list (K3, V3)
   • Note that they combine and reduce functions use the same type,
     except in the variable names where K3 is K2 and V3 is V2.
Conti…
   • If the combine function is used, it has the same form as the
     reduce function and the output is fed to the reduce function.
   • This may be illustrated as follows:
          map: (K1, V1) → list (K2, V2)
          combine: (K2, list(V2)) → list (K2, V2)
          reduce: (K2, list(V2)) → list (K3, V3)
   • Note that they combine and reduce functions use the same type,
     except in the variable names where K3 is K2 and V3 is V2.
Conti…
   • The partition function operates on the intermediate key-value
     types.
   • It controls the partitioning of the keys of the intermediate map
     outputs.
   • The key derives the partition using a typical hash function.
   • The total number of partitions is the same as the number of
     reduced tasks for the job.
   • The partition is determined only by the key ignoring the value.
Input Formats
    • Hadoop has to accept and process a variety of formats, from text
      files to databases.
    • A chunk of input, called input split, is processed by a single map.
    • Each split is further divided into logical records given to the map
      to process in key-value pair.
    • In the context of a database, the split means reading a range of
      tuples from an SQL table, as done by the DBInputFormat and
      producing LongWritables containing record numbers as keys and
      DBWritables as values.
Conti…
   • The Java API for input splits is as follows:
   • The InputSplit represents the data to be processed by a Mapper.
   • It returns the length in bytes and has a reference to the input
     data.
Conti…
   • It is the responsibility of the InputFormat to create the input splits
     and divide them into records.
   • The JobClient invokes the getSplits() method with an appropriate
     number of split arguments.
   • Once the split is calculated it is sent to the jobtracker.
   • The jobtracker schedules map tasks for the tasktracker using
     storage location.
Conti…
   • The task tracker then passes the split by invoking the
     getRecordReader() method on the InputFormat to get
     RecordReader for the split.
   • The FileInputFormat is the base class for the file data source.
   • It has the responsibility to identify the files that are to be included
     as the job input and the definition for generating the split.
   • Hadoop also includes the processing of unstructured data that
     often comes in textual format, the TextInputFormat is the default
     InputFormat for such data.
Conti…
   • The SequenceInputFormat takes up binary inputs and stores
     sequences of binary key-value pairs.
   • DBInputFormat provides the capability to read data from a
     relational database using JDBC.
Output Formats
• The output format classes are similar to their corresponding input
  format classes and work in the reverse direction.
Conti…
 For example :
• The TextOutputFormat is the default output format that writes
  records as plain text files, whereas key-values any be of any type, and
  transforms them into a string by invoking the toString() method.
• The key-value character is separated by the tab character, although
  this can be customized by manipulating the separator property of the
  text output format.
Conti…
   • For binary output, there is SequenceFileOutputFormat to write a
     sequence of binary output to a file. Binary outputs are particularly
     useful if the output becomes an input to a further MapReduce
     job.
   • The output formats for relational databases and to HBase are
     handled by DBOutputFormat. It sends the reduced output to a
     SQL table like the HBase’s TableOutputFormat enables the
     MapReduce program to work on the data stored in the HBase
     table and uses it for writing outputs to the HBase table.
Map Reduce Features
     Scalability
    • Apache Hadoop is a highly scalable framework. This is because of
      its ability to store and distribute huge data across plenty of
      servers.
     Flexibility
    • MapReduce programming enables companies to access new
      sources of data. It enables companies to operate on different
      types of data.
Conti…
    Security and Authentication
   • The MapReduce programming model uses HBase and HDFS
     security platform that allows access only to the authenticated
     users to operate on the data.
    Cost-effective solution
   • Hadoop’s scalable architecture with the MapReduce programming
     framework allows the storage and processing of large data sets in
     a very affordable manner.
Conti…
    Fast
   • Even if we are dealing with large volumes of unstructured data,
     Hadoop MapReduce just takes minutes to process terabytes of
     data. It can process petabytes of data in just an hour.
    A simple model of programming
   • One of the most important features is that it is based on a simple
     programming model.
Conti…
    Parallel Programming
   • It divides the tasks in a manner that allows their execution in parallel.
     Parallel processing allows multiple processors to execute these divided
     tasks.
    Availability
   • If any particular node suffers from a failure, then there are always other
     copies present on other nodes that can still be accessed whenever needed.
    Resilient nature
   • One of the major features offered by Apache Hadoop is its fault tolerance.
     The Hadoop MapReduce framework has the ability to quickly recognizing
     faults that occur.
Real-world Map Reduce
    • MapReduce real world example on e-commerce transactions data
      is described here using Python streaming.
    • A real world e-commerce transactions dataset from a UK based
      retailer is used.
    • https://idevji.com/blog/2018/08/08/mapreduce-real-world-
      example/
Conti…
    Outline
   • The dataset consists of real world e-commerce data from UK
     based retailer
   • The dataset is provided by Kaggle
   • Our goal is to find out country wise total sales
   • Mapper multiplies quantity and unit price
   • Mapper emits key-value pair as country, sales
   • Reducer sums-up all pairs for same country
   • Final output is country, sales for all countries
Conti…
    Data
   • Download: Link to Kaggle Dataset
   • Source: The dataset has real-life transaction data from a UK retailer.
   • Format: CSV
   • Size: in….. MB
   • Columns:
                  •   InvoiceNo
                  •   StockCode
                  •   Description
                  •   Quantity
                  •   InvoiceDate
                  •   UnitPrice
                  •   CustomerID
                  •   Country
Conti…
    Problem
   • In this MapReduce real world example, we calculate total sales for
     each country from given dataset.
    Approach
   • Firstly, our data doesn’t have a Total column so it is to be
     computed using Quantity and UnitPrice columns as
                 Total = Quantity * UnitPrice.
Conti…
    What Mapper Does
   • Read the data
   • Convert data into proper format
   • Calculate total
   • Print output as key-value pair CountryName:Total
Conti…
    What Reducer Does
   • Read input from mapper
   • Check for existing country key in the disctionary
   • Add total to existing total value
   • Print all key-value pairs
Conti…
   • Python Code for Mapper (MapReduce Real World Example)
Conti…
   • Python Code for Reducer (MapReduce Real World Example)
              Country                Score
              Canada                 3599.68
              Brazil                 1143.6
Conti…
              Italy                  16506.03
              Czech Republic         707.72
              USA                    1730.92
              Lithuania              1661.06
              Unspecified            4746.65
              France                 197194.15
   • Output   Norway
              Bahrain
                                     34908.13
                                     548.4
              Israel                 7867.42
              Australia              135330.19
              Singapore              9054.69
              Iceland                4299.8
              Channel Islands        19950.54
              Germany                220791.78
              Belgium                40752.83
              European Community     1291.75
              Hong Kong              10037.84
              Spain                  54632.86
              EIRE                   262112.48
              Netherlands            283440.66
              Denmark                18665.18
              Poland                 7193.34
              Finland                22226.69
              Saudi Arabia           131.17
              Sweden                 36374.15
              Malta                  2503.19
              Switzerland            56199.23
              Portugal               29272.34
              United Arab Emirates   1877.08
              Lebanon                1693.88
              RSA                    1002.31
              United Kingdom         8148025.164
              Austria                10149.28
              Greece                 4644.82
              Japan                  34616.06
              Cyprus                 12791.31
Conti…
    Conclusions
   • Mapper picks-up a record and emits country and total for that
     record
   • Mapper repeats this process for all 5.42k records
   • Now, we have 5.42k key value pairs
   • Reducer’s role is to combine these pairs until all keys are unique!
THANK
YOU