KEMBAR78
DC Hadoop | PDF | Apache Hadoop | Map Reduce
0% found this document useful (0 votes)
49 views48 pages

DC Hadoop

Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
49 views48 pages

DC Hadoop

Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 48

Big Data Processing with

Hadoop

HEMANT AMBULGEKAR
Associate Professor, Department of Computer Science and Engineering,
SGGS Institute of Engineering and Technology, Nanded-431606
Maharashtra
Contents
1. Introduction
2. Hadoop Ecosystem
3. Installation on Single System
4. Installation on Cluster of Nodes
5. Storage
6. Map Reduce Programming Model
7. MapReduce Example
Data Generation
oGoogle processes 60 PB per day

oFacebook hosts more than 240 billion photos, growing at seven petabytes per month.

oeBay has 6.5 PB of user data + 50 TB/day (5/2009)

oThe New York Stock Exchange generates about four to five terabytes of data per day.

oAncestry.com, the genealogy site, stores around 10 petabytes of data.

oThe Internet Archive stores around 18.5 petabytes of data

oCERN’s Large Hadron Collider (LHC), near Geneva, Switzerland, produces about generates 30
PB a year
3
This type of data requires powerful mechanism for analysis and storage structure.
What is Big Data ?
oNo Single Standard Definition…
oBig Data is data whose scale, diversity, and complexity
require new architecture, techniques, algorithms, and
analytics to manage it and extract value and hidden
knowledge from it…

4
Characteristics (Volume)

Data Volume
Data volume is increasing exponentially
New York times is generating 5 TB per Day
Facebook taking upto 7 PB storage per Month
Internet archives around 4PB and generates 40 TB per
month.
5
Characteristics
Complexity (Varity)

Various formats, types, and structures


Text, numerical, images, audio, video, sequences, time
series, social media data, multi-dimensional arrays, etc…
Static data vs. streaming data
A single application can be generating/collecting many types
of data

6
Characteristics (Velocity)

Data is begin generated fast and need to be processed fast


Online Data Analytics
Late decisions ➔ missing opportunities
Examples
◦ E-Promotions: Based on your current location, your purchase history, what you like ➔
send promotions right now for store next to you

◦ Enhancing fraud detection: monitoring customer transaction of bank account ➔ any


abnormal transaction require immediate reaction from banks and credit card companies.

7
Characteristics ( 4 Vs )

8
Processing Challenges…
The problem is simple: although the storage capacities of hard drives have increased massively over the years,
access speeds—the rate at which data can be read from drives —have not kept up.

Data Read Operation (Disk Access Speed)


One typical drive from 1990 could store 1,370 MB - transfer speed of 4.4 MB/s,
Over 20 years later, one terabyte drives are the norm, but the transfer speed is around 100 MB/s, so it takes more
than two and a half hours to read all the data off the disk.

Obvious Solution
◦ The obvious way to reduce the time is to read from multiple disks at once. Imagine if we had 100 drives,
each holding one hundredth of the data. Working in parallel, we could read the data in under two minutes.

Other advantages of using Multiple Disk


◦ There’s more to being able to read and write data in parallel to or from multiple disks, though.
◦ Grater availability

Problem
◦ Combining data in some way.
9
Data Processing Approaches
Frameworks, architectures and Platforms

Ad-hoc parallel data processing


◦ Amazon Elastic MapReduce EMR
◦ Other Cloud Based

Cluster Environment
◦ Hadoop
◦ MapReduce Framework

Grid Computing
◦ MPI,Shared filesystem, SAN

10
.
Hadoop
- Hadoop fills a gap in the market by effectively storing and providing
computational capabilities over substantial amounts of data.

- It’s a distributed system made up of a distributed file system and it offers a


way to parallelize and execute programs on a cluster of machines.

- You’ve most likely come across Hadoop as it’s been adopted by technology
giants like Yahoo!, Facebook, and Twitter to address their big data needs,
and it’s making inroads across all industrial sectors.
Comparison with Other Systems
Hadoop for solving business problems
Notable Examples:

➤ Enhancing fraud detection for banks and credit card companies —

➤ Social media marketing analysis

➤ Shopping pattern analysis for retail product placement —

➤ Traffic pattern recognition for urban development

➤ Content optimization and engagement

➤ Network analytics and mediation

➤ Large data transformation

The list of these examples could go on and on. Businesses are using Hadoop for strategic
Hadoop ecosystem
Hadoop Ecosystem
HDFS — A foundational component of the Hadoop ecosystem is the Hadoop
Distributed File System (HDFS). A distributed file system that runs on large clusters of
commodity machines.
MapReduce — Hadoop’s main execution framework is MapReduce, a programming
model for distributed, parallel data processing, breaking jobs into mapping phases and
reduce phases (thus the name). Developers write MapReduce jobs for Hadoop, using
data stored in HDFS for fast data access
HBase — A column-oriented NoSQL database built on top of HDFS, HBase is used for
fast read/write access to large amounts of data
Zookeeper — Zookeeper is Hadoop’s distributed coordination service.
Oozie — A scalable workflow system, Oozie is integrated into the Hadoop stack, and is
used to coordinate execution of multiple MapReduce jobs..
Contd..
Pig —A data flow language and execution environment for exploring very large
datasets.
An abstraction over the complexity of MapReduce programming, the Pig platform
includes an execution environment and a scripting language (Pig Latin) used to
analyze Hadoop data sets. Its compiler translates Pig Latin into sequences of
MapReduce programs.
Hive — A distributed data warehouse. Hive manages data stored in HDFS and
provides a query language based on SQL (and which is translated by the runtime
engine to MapReduce jobs) for querying the data.
An SQL-like, high-level language used to run queries on data stored in Hadoop. Like
Pig, Hive was developed as an abstraction layer, but geared more toward database
analysts more familiar with SQL than Java programming.
HDFS Architecture
HDFS Architecture
Installation
Standalone (or local) mode
There are no daemons running and everything runs in a single JVM.
Standalone mode is suitable for running MapReduce programs during
development, since it is easy to test and debug them.
Pseudodistributed mode
The Hadoop daemons run on the local machine, thus simulating a
cluster on a small scale.
Fully distributed mode
The Hadoop daemons run on a cluster of machines.
Contd..
Download a stable release, which is packaged as a gzipped tar file, from the
Apache Hadoop releases page , and unpack it somewhere on your
filesystem:
$ tar xzf hadoop-x.y.z.tar.gz
Before you can run Hadoop, you need to tell it where Java is located on
your system.
$ export JAVA_HOME=/usr/lib/jvm/java-6-sun
you need to tell it where Hadoop is located on your system
$ export HADOOP_INSTALL=/home/tom/hadoop-x.y.z
$ export PATH=$PATH:$HADOOP_INSTALL/bin:$HADOOP_INSTALL/sbin
Standalone (or local) mode
$ hadoop version
Hadoop 2.6.0
Subversion
https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1
.0 -r 1214675
Compiled by hortonfo on Thu Jan 15 16:36:35 UTC 2015
Pseudo distributed mode
Hadoop can also be run on a single-node in a pseudo-distributed mode where each
Hadoop daemon runs in a separate Java process.
Each component in Hadoop is configured using an XML file
Configuration
Use nano editor to open : etc/hadoop/core-site.xml:
Add the following in the file
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
Configuring SSH
First, make sure that SSH is installed and a server is running. On Ubuntu, for
example, this is achieved with:
$ sudo apt-get install ssh
Then, to enable password-less login, generate a new SSH key with an empty
passphrase:
$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

Test this with:


$ ssh localhost
If successful, you should not have to type in a password.
Starting and stopping the daemons
To start the HDFS and MapReduce daemons, type:
$ start-dfs.sh
To start the HDFS and YARN daemons, type:
$ start-dfs.sh
$ start-yarn.sh
These commands will start the HDFS daemons, and for YARN, a resource manager
and a node manager. The resource manager web UI is at http://localhost:8088/ .
You can stop the daemons with:
$ stop-dfs.sh
$ stop-yarn.sh
Hadoop Cluster Setup
Machine selection
1. Fix the names of the machines
Hadoop-NameNode, Hadoop-DataNode01, Hadoop-DataNode02, Hadoop-DataNode03

Hadoop-DataNode04, ……….

The setup was identical on all machine


2. Edit the hosts on each machine
$ sudo nano /etc/hosts
◦ 192.168.1.61 Hadoop-NameNode
◦ 192.168.1.62 Hadoop-DataNode01
◦ 192.168.1.63 Hadoop-DataNode02
◦ 192.168.1.64 Hadoop-DataNode03
Contd..
3. Next, I enabled ssh
$ sudo apt-get update

$ sudo apt-get install -y openssh-server

4. create a user hduser and granted sudo p

$sudo adduser hduser

$sudo usermod -a -G sudo hduser

5. logg off, and ssh as hduser, create an ssh key-pair without a password on each
machine and add the public key to the authorized keys locally
$ mkdir ~/.ssh
$ ssh-keygen -b 2048 -t rsa -f ~/.ssh/id_rsa -q -N ""
$cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
Contd..
6. Add Public Key to all Nodes
$ ssh-copy-id -i ~/.ssh/id_rsa.pub hduser@Hadoop-DataNode01
$ ssh-copy-id -i ~/.ssh/id_rsa.pub hduser@Hadoop-DataNode02
$ ssh-copy-id -i ~/.ssh/id_rsa.pub hduser@Hadoop-DataNode03
7. Create a group hadoop and added hduser to it
$ sudo addgroup hadoop
$ sudo usermod -a -G hadoop hduser
8. change the owner of the hadoop folder to be hduser
$ sudo chown -R hduser:hadoop /usr/local/hadoop/
Contd..
9. Create a folder for the logs and changed the owner to hduser
$ sudo mkdir /var/log/hadoop/

$ sudo chown hduser:hadoop /var/log/hadoop

10. Edit .bashrc at the bottom of file add


$ export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/

$ export PATH=$PATH:$JAVA_HOME/bin/

$ export HADOOP_PREFIX=/usr/local/hadoop/

$ export PATH=$PATH:$HADOOP_PREFIX/bin

$ export PATH=$PATH:$HADOOP_PREFIX/sbin

and reloaded .bashrc with

$ source ~/.bashrc
Contd..
Edited the configuration files on Hadoop-NameNode found under
/usr/local/hadoop/etc/hadoop/
- hadoop-env.sh
- yarn-env.sh
- core-site.xml
- hdfs-site.xml
- yarn-site.xml
Copy mapred-site.xml.template into mapred-site.xml
- mapred-site.xml
- Slaves
Contd..
1. hadoop-env.sh
JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64

export HADOOP_LOG_DIR=/var/log/hadoop

2. yarn-env.sh
export YARN_LOG_DIR=/var/log/hadoop
3. core-site.xml
core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop-namenode:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hduser/data/hadoop/tmp/${user.name}</value>
</property>
</configuration>
hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop-namenode</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>hadoop-namenode:8030</value>
</property>
……….
</configuration>
MapReduce
MapReduce is a programming model for data processing.
Hadoop can run MapReduce programs written in various languages; we
look at the same program expressed in Java, Ruby, Python, and C++.
Most important, MapReduce programs are inherently parallel, thus
putting very large-scale data analysis into the hands of anyone with
enough machines
A Weather Dataset

What’s the highest recorded global temperature for each year in the dataset?

To take advantage of the parallel processing that Hadoop provides, we need to express our
query as a MapReduce job.

MapReduce works by breaking the processing into two phases: the map phase and the reduce
phase.

Each phase has key-value pairs as input and output, the types of which may be chosen by the
programmer.

The input to our map phase is the raw NCDC data.

We choose a text input format that gives us each line in the dataset as a text value.

The key is the offset of the beginning of the line from the beginning of the file
Consider the following sample lines of input data

0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...

These lines are presented to the map function as the key-value pairs:

(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)

(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)

(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)

(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)

(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
Contd..

The map function merely extracts the year and the air
temperature (indicated in bold text), and emits them as its
output (the temperature values have been interpreted as
integers):

(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)
Contd..
The output from the map function is processed by the MapReduce framework before being sent to
the reduce function. This processing sorts and groups the key-value pairs by key.

So, continuing the example, our reduce function sees the following input:

(1949, [111, 78])

(1950, [0, 22, −11])

Each year appears with a list of all its air temperature readings. Allthe reduce function has to do
now is iterate through the list and pick up the maximumreading:
(1949, 111)
(1950, 22)

This is the final output: the maximum global temperature recorded in each
year.
Java MapReduce
We need three things: a map function, a reduce function, and some code to run
the job. The map function is represented by the Mapper class, which declares
an abstract map() method

import java.io.IOException; if (line.charAt(87) == '+') { // parseInt doesn't


import org.apache.hadoop.io.IntWritable; like leading plus signs
import org.apache.hadoop.io.LongWritable; airTemperature =
import org.apache.hadoop.io.Text; Integer.parseInt(line.substring(88, 92)); }
import org.apache.hadoop.mapreduce.Mapper; else {
public class MaxTemperatureMapper airTemperature =
extends Mapper<LongWritable, Text, Text, Integer.parseInt(line.substring(87, 92));
IntWritable> { }
private static final int MISSING = 9999; String quality = line.substring(92, 93);
@Override if (airTemperature != MISSING &&
public void map(LongWritable key, Text value, Context quality.matches("[01459]")) {
context) context.write(new Text(year), new
throws IOException, InterruptedException { IntWritable(airTemperature)); }
String line = value.toString(); }
String year = line.substring(15, 19); }
int airTemperature; }
Contd..
The Mapper class is a generic type, with four formal type parameters that specify the
input key, input value, output key, and output value types of the map function.
In example, the input key is a long integer offset, the input value is a line of text, the
output key is a year, and the output value is an air temperature (an integer).
Rather than use built-in Java types, Hadoop provides its own set of basic types that are
optimized for network serialization. These are found in the org.apache.hadoop.io
package.
Here we use LongWritable, which corresponds to a Java Long, Text (like Java String), and
IntWritable (like Java Integer).
The map() method also provides an instance of Context to write the output to. In this
case, we write the year as a Text object (since we are just using it as a key), and the
temperature is wrapped in an IntWritable.
Reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values)
{
maxValue = Math.max(maxValue, value.get()); }
context.write(key, new IntWritable(maxValue)); }
} }
The MapReduce job
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.lib.input.*; Job job = new Job();
job.setJarByClass(MaxTemperature.class);
import org.apache.hadoop.mapreduce.lib.output.*; job.setJobName("Max temperature");
public class MaxTemperature FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new
{ Path(args[1]));

public static void main(String[] args) throws Exception job.setMapperClass(MaxTemperatureMapper.class);


job.setReducerClass(MaxTemperatureReducer.class);
{
job.setOutputKeyClass(Text.class);
if (args.length != 2) job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
{ }
}
System.err.println("Usage: MaxTemperature <input path>
<output path>");

System.exit(-1);

}
Running the MapReduce Job
$ mkdir weather_classes

$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar –d


weather_classes MaxTemperatureMapper.java MaxTemperatureReducer.java
MaxTemperature.java

$ jar -cvf /home/cnlab/dc/maxtemprature.jar –c weather_classes/ .

Assuming that:

• /home/cnlab/dc/weather/input - input directory in HDFS

• /home/cnlab/dc/weather/output - output directory in HDFS

Sample text-files as input:

$ bin/hadoop dfs -ls home/cnlab/dc/weather/input

Weather.txt
Example 2

Symbol,date,open,high,low
AET,12-09-2011,12.21,15.14,13.14
RPT,13-10-2009,13.14,14.23,12.21
AET,12-03-2009,14.15,16.12,14.00
MRPL,12-03-2009,15.15,16.12,14.00
RPT,13-10-2009,13.14,14.23,12.21
AET,12-09-2011,12.21,15.14,13.14
MRPL,12-03-2009,13.15,16.12,14.00
RPT,13-10-2009,13.14,14.23,12.21
MRPL,12-03-2009,11.15,16.12,14.00
RPM,13-10-2009,13.14,14.23,12.21
Example 2

package org.myorg;

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path; String[] fields = record.split(",");
import org.apache.hadoop.conf.*; String symbol = fields[0];
import org.apache.hadoop.io.*; String high = fields[3];
import org.apache.hadoop.mapred.*; String low = fields[4];
import org.apache.hadoop.util.*;
float lowValue = Float.parseFloat(low);
public class StockAnalyzerMapper extends MapReduceBase float highValue = Float.parseFloat(high);
implements Mapper<LongWritable, Text, Text,FloatWritable> { float delta = highValue - lowValue;

@Override output.collect(new Text(symbol),


public void map(LongWritable key, Text new FloatWritable(delta));
value,OutputCollector<Text, FloatWritable> output,Reporter reporter)
}
throws IOException {
}
String record = value.toString();

if (record.startsWith("Symbol")) {
// ignore header row
return;
}
Reducer

package org.myorg; float maxValue = Float.MIN_VALUE;


while (values.hasNext()) {
import java.io.IOException; maxValue = Math.max(maxValue, values.next().get());
import java.util.*; }
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*; output.collect(key, new FloatWritable(maxValue));
import org.apache.hadoop.io.*; }
import org.apache.hadoop.mapred.*; }
import org.apache.hadoop.util.*;
public class StockAnalyzerReducer extends
MapReduceBase implements Reducer<Text,
FloatWritable, Text,FloatWritable> {

@Override
public void reduce(Text key, Iterator<FloatWritable>
values, OutputCollector<Text, FloatWritable>
output,Reporter reporter)throws IOException {
Application

package org.myorg;
Path inputPath = new Path(args[0]);
import java.io.IOException; FileInputFormat.addInputPath(conf, inputPath);
import java.util.*;
import org.apache.hadoop.fs.Path; Path outputPath = new Path(args[1]);
import org.apache.hadoop.conf.*; FileOutputFormat.setOutputPath(conf, outputPath);
import org.apache.hadoop.io.*; conf.setMapperClass(StockAnalyzerMapper.class);
import org.apache.hadoop.mapred.*; conf.setReducerClass(StockAnalyzerReducer.class);
import org.apache.hadoop.util.*;
conf.setOutputKeyClass(Text.class);
public class StockAnalyzerConfig { conf.setOutputValueClass(FloatWritable.class);

public static void main(String[] args) throws Exception { JobClient.runJob(conf);


if (args.length != 2) { }
System.err.println("Usage: StockAnalyzerConfig <input> }
<output>");
System.exit(1);
}

JobConf conf = new


JobConf(StockAnalyzerConfig.class);
conf.setJobName("Stock analyzer");
Running Stock Analyzer
$ mkdir stock_classes
$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d
stock_classes StockAnalyzerMapper.java StockAnalyzerReducer.java
StockAnalyzerConfig.java
$ jar -cvf /home/studcse/ds/stockanalyzer.jar -C stock_classes/ .

Run the application:


$ bin/hadoop jar /home/studcse/ds/stockanalyzer.jar org.myorg.StockAnalyzerConfig
/home/studcse/ds/stock/input/stock.txt /home/studcse/ds/stock/output
Output:
$ bin/hadoop dfs -cat /home/studcse/ds/stock/output/*

You might also like