Big Data Analytics
18CS72
Module 2
Sudeep Manohar
Department of ISE
JNNCE, Shivamogga
Introduction to Hadoop
2.1 Big Data Programming Model
Centralized computing
Data is transferred from multiple distributed data sources to a
central server.
Analyzing, reporting, visualizing, business intelligence tasks
compute centrally.
Data are inputs to the central server
Distributed computing
uses the databases at multiple computing nodes with data
sharing between the nodes during computation.
Requirements:
Cooperation (sharing) - Between the DBs in a transparent
manner
Location independence - Analysis results should be
independent of geographical locations
Big Data follows a theorem known as the CAP theorem. The CAP
states that out of three properties (consistency, availability and
partitions), two must at least be present for applications,
services and processes.
Big Data Store Model
Data stores in file system in the form of data blocks (physical
division of data).
The data blocks are distributed across multiple nodes.
Data nodes are at the racks of a cluster. Racks are scalable
Data blocks replicate at the Data Nodes such that a failure of
link leads to access of the data block from the other nodes
replicated at the same or other racks.
Big Data Programming Model
Big Data programming model is that application in which
application jobs and tasks (or subtasks) is scheduled on the
same servers which store the data for processing
2.2 Hadoop and its ecosystem
Apache initiated the project for developing storage and
processing framework for Big Data storage and processing.
Dough Cutting and Machael J Cafarelle were the creators of
Hadoop
Named after Cutting’s son’s stuffed toy elephant, Hadoop
The project consisted of two components, one is for data store
in blocks in the clusters and the other is computations at each
individual cluster in parallel with another
Hadoop is a computing environment in which input data
stores, processes and stores the results
Infrastructure consists of cloud for clusters.
A cluster consists of sets of computers or PCs
Hadoop enables distributed processing of large datasets
(above 10 million bytes) across clusters of computers using a
programming model called MapReduce.
The system characteristics are scalable, self-manageable, self-
healing and distributed file system
2.2.1 Hadoop Core Components
Hadoop Common - The common module contains the libraries
and utilities that are required by the other modules of Hadoop.
This includes serialization, Java RPC (Remote Procedure Call)
and file-based data structures.
Hadoop Distributed File System (HDFS) - A Java-based
distributed file system which can store all kinds of data on the
disks at the clusters.
MapReduce v1 - Software programming model in Hadoop 1
using Mapper and Reducer. The v1 processes large sets of data
in parallel and in batches.
YARN - Software for managing resources for computing. The
user application tasks or subtasks run in parallel at the Hadoop,
uses scheduling and handles the requests for the resources in
distributed running of the tasks.
MapReduce v2 - Hadoop 2 YARN-based system for parallel
processing of large datasets and distributed processing of the
application tasks.
Spark
Spark provisions for in-memory analytics
It also enables OLAP and real-time processing
Spark does faster processing of Big Data
2.2.2 Features of Hadoop
Fault-effi cient, scalable, flexible and modular design
Robust design of HDFS
Store and process Big Data
Distributed clusters computing model with data locality
Hardware fault-tolerant
Open-source framework
Java and Linux based
2.2.3. Hadoop Eco system Components
2.3 Hadoop Distributed File System
HDFS is a core component of Hadoop.
HDFS is designed to run on a cluster of computers and servers
at cloud-based utility services.
HDFS stores Big Data which may range from GBs (1 GB= 230
B) to PBs (1 PB=1015 B, nearly the 250 B).
HDFS stores the data in a distributed manner in order to
compute fast.
The distributed data store in HDFS stores data in any format
regardless of schema
2.3.1 HDFS Storage
Hadoop data store concept implies storing the data at a
number of clusters.
Each cluster has a number of data stores, called racks.
Each rack stores a number of DataNodes.
Each DataNode has a large number of data blocks.
The racks distribute across a cluster.
The nodes have processing and storage capabilities.
The data blocks replicate by default at least on three
DataNodes in same or remote nodes
A file, containing the data divides into data blocks.
A data block default size is 64 MBs
Hadoop HDFS features are as follows
i. Create, append, delete, rename and attribute modification
functions
ii. Content of individual file cannot be modified or replaced but
appended with new data at the end of the file
iii. Write once but read many times during usages and
processing
iv. Average file size can be more than 500 MB.
Figure 2.3 A Hadoop cluster example
Problem
Consider a data storage for University students. Each student data,
stuData which is in a file of size less than 64 MB (1 MB= 220 B). A data
block stores the full file data for a student of stuData_idN, whereN = 1 to
500.
i. How the files of each student will be distributed at a Hadoop cluster?
How many student data can be stored at one cluster? Assume that
each rack has two DataNodes for processing each of 64 GB (1 GB=
230 B) memory. Assume that cluster consists of 120 racks, and thus
240 DataNodes.
ii. What is the total memory capacity of the cluster in TB ((1 TB= 240 B)
and DataNodes in each rack?
iii. Show the distributed blocks for students with ID= 96 and 1025.
Assume default replication in the DataNodes = 3.
iv. What shall be the changes when a stuData file sizes 128 MB?
Solution
i. For each student file one data block suffi ces. Maximum
number of student files: 81920
ii. Total memory capacity of the cluster = 15 TB. Total memory
capacity of each DataNode = 64 GB
iii. Since the size of the student file is less than 64 MB, the file
is kept as a single block.
iv. Each node will have half the number of data blocks
2.3.1.1 Hadoop Physical organization
Figure 2.4 The client, master NameNode, MasterNodes and slave nodes
Clients as the users run the application with the help of
Hadoop ecosystem projects/applications such as Hive, Mahout
and Pig
Single MasterNode provides HDFS, MapReduce and Hbase
using threads in small to medium sized clusters
Zookeeper is used by HBase for metadata storage
The MasterNode receives client connections, maintains the
description of the global file system namespace, and the
allocation of file blocks.
It also monitors the state of the system in order to detect any
failure.
The Masters consists of three components NameNode,
Secondary NameNode and JobTracker
The NameNode stores all the file system related information such
as:
The file section is stored in which part of the cluster
Last access time for the files
User permissions like which user has access to the file
Secondary NameNode is an alternate for NameNode. Secondary
node keeps a copy of NameNode meta data. Thus, stored meta
data can be rebuilt easily, in case of NameNode failure.
The JobTracker coordinates the parallel processing of data.
Hadoop 1 vs Hadoop 2
Single Name Node failure in Hadoop 1 is an operational
limitation
In Hadoop 1 Scaling up was restricted to scale beyond a few
thousands of DataNodes and number of Clusters.
Hadoop 2 provides the multiple NameNodes which enables
higher resources availability
2.3.1.2 HDFS commands
2.4 MapReduce Framework And Programming Model
Mapper means the software for doing the assigned task after
organizing the data blocks imported using the keys.
A key is specified in a command line of Mapper.
The command maps the key to the data, which an application
uses.
Reducer means software for reducing the mapped data by
using the aggregation, query or user-specified function.
(Aggregation function examples: count, sum, maximum,
minimum, deviation and standard deviation)
MapReduce allows writing applications to process reliably the
huge amounts of data, in parallel, on large clusters of servers.
The parallel programs of MapReduce are useful for performing
large scale data analysis using multiple machines in the
cluster.
Features of MapReduce framework are as follows:
Provides automatic parallelization and distribution of
computation based on several processors
Processes data stored on distributed clusters of DataNodes
and racks
Allows processing large amount of data in parallel
Provides scalability for usages of large number of servers
Provides Map Reduce batch-oriented programming model in
Hadoop version 1
Provides additional processing modes in Hadoop 2 YARN-
based system and enables required parallel processing. For
example, for queries, graph databases, streaming data,
messages, real-time OLAP and ad hoc analytics with Big Data 3V
characteristics.
2.5 Hadoop YARN
YARN is a resource a management platform. It manages the
computer resources.
YARN manages the schedules for running the sub tasks. Each
sub tasks uses the resources in the allotted interval time.
YARN separates the resources management and processing
components.
It stands for YET ANOTHER RESOURCE NEGOTIATOR , it
manages and allocates resources for the application sub tasks
and submit the resources for them in the Hadoop system.
Hadoop 2 Execution Model
Figure 2.5 YARN based Execution Model
A Client Node submits the request of an application to the Resource
Manager (RM)
The RM keeps information of all the slave Node Managers (NM).
Information is about the location (Rack Awareness) and the number
of resources (data blocks and servers) they have.
The RM also renders the Resource Scheduler service that decides
how to assign the resources
A NM creates an Application Manager (AM) instance (AMI) and starts
up. The AMI initializes itself and registers with the RM
The AM estimates the resources requirement for running an
application program or sub-task and sends their requests for the
necessary resources to the RM
All active NMs send the controlling signal periodically to the RM
signaling their presence.
2.6 Hadoop Ecosystem tools
ZooKeeper Provisions high-performance coordination service for
Coordination Service distributed running of applications and tasks
Avro Provisions data serialization during data transfer
Data serialization and between application and processing layers
transfer Utility
Oozie Provides a way to package and bundles multiple
coordinator and workflow jobs and manage the lifecycle
of those jobs
Sqoop (SQL-to-Hadoop) Provisions for data-transfer between data stores such as
A data-transfer Software relational DBs and Hadoop
Flume Provisions for reliable data transfer and provides for
Large data transfer Utility recovery in case of failure. Transfers large amount of
data in applications, such as related to social-media
messages
Ambari Provisions, monitors, manages, and viewing of
A web-based tool functioning of the cluster, MapReduce, Hive and Pig APis
Chukwa Provisions and manages data collection system for large
A data collection System and distributed systems
Hbase Provisions a scalable and structured database for large
A structured data store tables
using database
Cassandra Provisions scalable and fault-tolerant database for
A Database multiple masters
Hive Provisions data aggregation, data-summarization, data
A data warehouse system warehouse infrastructure, ad hoc (unstructured)
querying and SQL-like scripting language for query
processing using HiveQL
Pig Provisions dataflow (DF) functionality and the execution
A highlevel dataflow framework for parallel computations
language
Mahout Provisions scalable machine learning and library
A machine learning functions for data mining and analytics
software
HDFS Design Features
Designed for Big Data processing
Not designed as a true parallel file system
Assumes a large file write-Once / read many model
Rigorously restricts data writing to one user at a time
All additional writes are "append-only“
Design is based on the design of the Google File System (GFS)
Files may be appended, but random seeks are not permitted
Data storage and processing happen on the same server
nodes
"Moving computation is cheaper than moving data“
A specialized file system is used, which is not designed for
HDFS Components
The design of HDFS is based on two types of nodes: a
NameNode and multiple DataNodes.
In a basic design, a single NameNode manages all the
metadata needed to store and retrieve the actual data from
the DataNodes.
No data is actually stored on the NameNode, however.
For a minimal Hadoop installation, there needs to be a single
NameNode daemon and a single DataNode daemon running
on at least one machine
The design is a master/slave architecture in which the master
(NameNode) manages the file system namespace and
regulates access to files by clients
File system namespace operations such as opening, closing,
and renaming files and directories are all managed by the
NameNode
The NameNode also determines the mapping of blocks to
DataNodes and handles DataNode failures
The NameNode manages block creation, deletion, and
replication.
The slaves (DataNodes) are responsible for serving read and
write requests from the file system to the clients.
The mappings between data blocks and the physical
DataNodes are not kept in persistent storage on the
NameNode but are kept in memory
Upon startup, each DataNode provides a block report (which it
keeps in persistent storage) to the NameNode. The block
reports are sent every 10 heartbeats.
The reports enable the NameNode to keep an up-to-date
account of all data blocks in the cluster
The NameNode is a metadata server or "data traffi c cop."
HDFS provides a single namespace that is managed by the
NameNode
The purpose of the Secondary NameNode is to perform
periodic checkpoints that evaluate the status of the
NameNode
The NameNode keeps all system metadata memory for fast
access. It maintains two disk files that track changes to the
metadata
An image of the file system state when the NameNode was
started. This file begins with fsimage_* and is used only at
startup by the NameNode
A series of modifications done to the file system after starting the
NameNOde. These files begin with edit_* and reflect the changes
made after the file was read.
The location of these files is set by the
dfs.namenode.Name.dir property in the hdfs-site.xrnl file
The Secondary NameNode
periodically downloads
fsimage and edit files, joins
them into a new fsimage,
and uploads the new
fsimage file to the
NameNode
If the Secondary NameNode
were not running, a restart
of the NameNode could take
a long time due to the
number of changes to the
file system
HDFS Block Replication
When HDFS writes a file, it is replicated across the cluster.
The amount of replication is based on the value of
dfs.replication in the hdfs-site.xml file.
This default value can be overruled with the hdfs dfs-setrep
command
For Hadoop clusters containing more than eight DataNodes,
the replication value is usually set to 3
In a Hadoop cluster of eight or fewer DataNodes but more
than one DataNode, a replication factor of 2 is adequate
The HDFS default block size is often 64MB. In a typical
operating system, the block size is 4KB or 8KB
The HDFS default block size is not the minimum block size,
however. If a 20K B file is written to HDFS, it will create a
block that is approximately 20KB in size
If a file of size 80MB is written to HDFS, a 64MB block and a
16MB block will be created
HDFS Safe Mode
When the NameNode starts, it enters a read-only safe mode
where blocks cannot be replicated or deleted.
Safe Mode enables the NameNode to perform two important
processes:
The previous file system state is reconstructed by loading
the fsimage file into memory and replaying the edit log.
The mapping between blocks and data nodes is created by
waiting for enough of the DataNodes to register so that at
least one copy of the data is available.
Not all DataNodes are required to register before HDFS exits
from Safe Mode. The registration process may continue for
some time.
HDFS may also enter Safe Mode for maintenance using the
hdfs dfsadmin-safemode command or when there is a file
system issue that must be addressed by the administrator.
Rack Awareness
Rack awareness deals with data locality
One of the main design goals of Hadoop MapReduce is to
move the computation to the data
A typical Hadoop cluster will exhibit three levels of data
locality:
Data resides on the local machine (best).
Data resides in the same rack (better).
Data resides in a different rack (good).
When the YARN scheduler is assigning MapReduce containers
to work as mappers, it will try to place the container first on
the local machine, then on the same rack, and finally on
another rack
The NameNode tries to place replicated datablocks on
multiple racks for improved fault tolerance.
In such a case, an entire rack failure will not cause data loss
or stop HDFS from working. Performance may be degraded,
however.
HDFS can be made rack-aware by using a user-derived script
that enables the master node to map the network topology of
the cluster.
NameNode High Availability
With early Hadoop installations, the NameNode was a single
pointof failure that could bring down the entire Hadoop
cluster.
NameNode hardware often employed redundant power
supplies and storage to guard against such problems, but it
was still susceptible to other failures
The solution was to implement NameNode High Availability
(HA) as a means to provide true failover service
A HA Hadoop cluster has two (or more) separate NameNode
machines.
Each machine is configured with exactly the same software.
One of the NameNode machines is in the Active state, and the
other is in the Standby state.
The Active NameNode is responsible for all client HDFS
operations in the cluster.
The Standby NameNode maintains enough state to provide a
fast failover
To guarantee the file system state is preserved, both the
Active and Standby NameNodes receive block reports from the
DataNodes.
The Active node also sends all file system edits to a quorum of
Journal nodes.
At least three physically separate JournalNode daemons are
required, because edit log modifications must be written to a
majority of the JournalNodes.
This design will enable the system to tolerate the failure of a
single JournalNode machine.
The Standby node continuously reads the edits from the
JournalNodes to ensure its namespace is synchronized with
that of the Active node.
In the event of an Active NameNode failure, the Standby node
reads all remaining edits from the JournalNodes before
promoting itself to the Active state.
A Secondary NameNode is not required in the HA
configuration because the Standby node also performs the
tasks of the Secondary NameNode.
Apache Zookeeper is used to monitor the NameNode health.
Zookeeper is a highly available service for maintaining small
amounts of coordination of data, notifying clients of changes
in that data, and monitoring clients for failures.
HDFS failover relies on ZooKeeper for failure detection and for
Standby to Active NameNode election.
HDFS NameNode Federation
Older versions of HDFS provided a single namespace for the
entire cluster managed by a single NameNode.
Federation adds support for multiple NameNodes/namespaces
to the HDFS file system
The key benefits are as follows:
Namespace scalability - HDFS cluster storage scales horizontally
without placing a burden on the NameNode.
Better performance - Adding more NameNodes to the cluster
scales the file system read/write operations throughput by
separating the total namespace.
System isolation - Multiple NameNodes enable different categories
of applications to be distinguished, and users can be isolated to
different namespaces.
HDFS Checkpoints and Backups
An HDFS BackupNode maintains an up-to-date copy of the file
system namespace both in memory and on disk
The BackupNode does not need to download the fsimage and
edits files from the active NameNode because it already has
an up-to-date namespace state in memory
A NameNode supports one BackupNode at a time
HDFS Snapshots
HDFS snapshots are similar to backups, but are created by
administrators using the hdfs dfs -snapshot command.
HDFS snapshots are read-only point-in-time copies of the file
system.
They offer the following features:
Snapshots can be taken of a sub-tree of the file system or
the entire file system.
Snapshots can be used for data backup, protection against
user errors, and disaster recovery.
Snapshot creation is instantaneous.
Blockson the DataNodes are not copied, because the
snapshot files record the block list and the file size.
Thereis no data copying, although it appears to the user that
there are duplicate files.
Snapshots do not adversely affect regular HDFS operations.
HDFS user commands
Syntax: hdfs [--config confdir--] COMMAND
• hdfs version
– Hadoop 2.6.0.2.2.4.3-2
• hdfs dfs –ls /
– Lists files in the root HDFS directory
• hdfs dfs –ls OR hdfs dfs –ls /user/hdfs
– Lists the files in user home directory
• Hdfs dfs –mkdir stuff
– Create directory
• hdfs dfs –put test stuff
- Copy files to HDFS
• hdfs dfs –get stuff/test test-local
– Copy files from HDFS
• hdfs dfs –cp stuff/test test.hdfs
– Copy files within HDFS
• hdfs dfs –rm test.hdfs
– Delete files within HDFS
• hdfs dfs –rm –r –skipTrash stuff
– Delete directory in HDFS
MapReduce Model
There are two stages: a mapping stage and a reducing stage.
In the mapping stage, a mapping procedure is applied to input
data.
The map is usually some kind of filter or sorting process.
Ex: Counting a word in the book. Map -> To divide the task
and Reduce -> count and sum
1. Input Splits. As mentioned, HDFS distributes and replicates
data over multiple servers. The default data chunk or block
size is 64MB. Thus, a 500MB file would be broken into 8
blocks and written to different machines in the cluster. The
data are also replicated on multiple machines (typically
three machines).
2. Map Step. The mapping process is where the parallel nature
of Hadoop comes into play. For large amounts of data, many
mappers can be operating at the same time. The user
provides the specific mapping process. MapReduce will try to
execute the mapper on the machines where the block
resides. Because the file is replicated in HDFS, the least busy
node with the data will be chosen
3. Combiner step. It is possible to provide an optimization or
pre-reduction as part of the map stage where key—value
pairs are combined prior to the next stage. The combiner
stage is optional.
4. Shuffl e step. Before the parallel reduction stage can
complete, all similar keys must be combined and counted by
the same reducer process. Therefore, results of the map
stage must be collected by key—value pairs and shuffl ed to
the same reducer process. If only a single reducer process is
used, the Shuffl e stage is not needed.
5. Reduce Step. The final step is the actual reduction. In this
stage, the data reduction is performed as per the
programmer's design. The reduce step is also optional. The
results are written to HDFS. Each reducer will write an output
file. For example, a MapReduce job running four reducers will
create files called part-0000, part-0001, part-0002, and part-
0003.
Mapper Script
#!/bin/bash
While read line ;
do
for token in $line;
do
if [“$token” = “see”]; then
echo “see, 1”
elif [“$token” = “run”]; then
echo “run, 1”
fi
done
Reducer script
#!/bin/bash
Rcount=0
Scount=0
While read line ;
do
if [ $line =“see, 1”]; then
Scount = Scount+1
elif [ $line =“run, 1”]; then
Rcount = Rcount+1
fi
Done
echo “run, $Rcount”
echo “see, $Scount”
To compile and run the program from the command line,
perform the following steps:
1. Make a local wordcount_classes directory.
$ mkdir wordcount—classes
2. Compile the WordCount program using the 'hadoop
classpath’ command to include all the available Hadoop class
paths.
$ javac -cp ’hadoop classpath' -d wordcount_classes
WordCount.java
3. The jar file can be created using the following command:
$ jar -cvf wordcount.jar -C wordcount_classes/
4. To run the example, create an input directory in HDFS and
place a text file in the new directory. For this example, we will
use the war-and-peace. txt file (available from the book
download page; see Appendix A):
$ hdfs dfs -mkdir /Demo
$ hdfs dfs -put input. txt /Demo
5. Run the WordCount application using the following command:
$ hadoop jar wordcount.jar WordCount /Demo/input /output
Listing, Killing and Job Status
The jobs can be managed using the mapred job command.
The most import options are —list, -kill, and -status.
In addition, the yarn application command can be used to
control all applications running on the cluster.
Hadoop Log Management
The MapReduce logs provide a comprehensive listing of both
mappers and reducers
The actual log output consists of three files—stdout, stderr,
and syslog (Hadoop system messages)—for the application
Log aggregation
Logs are aggregated in HDFS and can be displayed in the
YARN Resource Manager user interface or examined with the
yarn logs command.
Command-Line Log Viewing
MapReduce logs can also be viewed from the command line.
The yarn logs command enables the logs to be easily viewed
together without having to hunt for individual log files on the
cluster nodes.
$ yarn logs
Retrieve logs for completed YARN applications .
usage: yarn logs -applicationld <application ID>
(OPTIONS)
General options are:
-appOwner <Application Owner>
-container Id <Container ID>
Essential Hadoop tools
Apache Pig
Apache Pig is a high-level language that enables programmers
to write complex MapReduce transformations using a simple
scripting language.
Pig Latin (a data flow language) defines a set of
transformations on a data set such as aggregate, join, and
sort
Apache Pig has several usage modes.
The first is a local mode in which all processing is done on the
local machine.
The non-local (cluster) modes are MapReduce and Tez.
Pig example – Walk through
Copy the passwd file to a working directory for local Pig
operation:
$ cp /etc/passwd .
Copy the data file into HDFS for Hadoop MapReduce operation
$ hdfs dfs -put passwd passwd
You can confirm the file is in HDFS by entering the following
command
$ hdfs dfs -ls passwd
-rw-r--r-- 2 hdfs hdfs 2526 2015-03-17 11:08 passwd
In the following example of local Pig operation, all processing
is done on the local machine (Hadoop is not used)
First, the interactive command line is started
$ pig -x local
If Pig starts correctly, you will see a grunt> prompt. Pig
commands must end with a semicolon (;)
Load the passwd file and then grab the user name and dump
it to the terminal
grunt> A = load 'passwd' using PigStorage(':’);
grunt> B = foreach A generate $0 as id;
grunt> dump B;
The processing will start and a list of user names will be
printed to the screen. To exit the interactive session, enter the
command quit.
$ grunt> quit
To use Hadoop MapReduce, start Pig as follows (or just enter
pig)
$ pig -x mapreduce
The same sequence of commands can be entered at the
grunt> prompt.
Because we are running this application under Hadoop, make
sure the file is placed in HDFS.
If you are using the Hortonworks HDP distribution with tez
installed, the tez engine can be used as follows:
$ pig -x tez
Pig can also be run from a script. This script, which is
repeated here, is designed to do the same things as the
interactive version
/* id.pig */
A = load 'passwd' using PigStorage(':'); -- load the passwd file
B = foreach A generate $0 as id; -- extract the user IDs
dump B;
store B into 'id.out'; -- write the results to a directory name id.out
Comments are delineated by /* */ and -- at the end of a line.
First, ensure that the id.out directory is not in your local
directory, and then start Pig with the script on the command
line:
$ /bin/rm -r id.out/
$ pig -x local id.pig
If the script worked correctly, you should see at least one data
file with the results and a zerolength file with the name
_SUCCESS
To run the MapReduce version, use the same procedure; the
only difference is that now all reading and writing takes place
in HDFS.
$ hdfs dfs -rm -r id.out
$ pig id.pig
Apache Hive
Apache Hive is a data warehouse infrastructure built on top of
Hadoop for providing data summarization, ad hoc queries, and
the analysis of large data sets using a SQL-like language
called HiveQL.
Hive offers the following features:
Tools to enable easy data extraction, transformation, and loading
(ETL)
A mechanism to impose structure on a variety of data formats
Access to files stored either directly in HDFS or in other data
storage systems such as HBase
Query execution via MapReduce and Tez (optimized MapReduce)
Hive Example – Walk through
To start Hive, simply enter the hive command. If Hive starts
correctly, you should get a hive> prompt.
$ hive
hive>
As a simple test, create and drop a table. Note that Hive
commands must end with a semicolon (;)
hive> CREATE TABLE pokes (foo INT, bar STRING);
OK
Time taken: 1.705 seconds
hive> SHOW TABLES;
OK
pokes
Time taken: 0.174 seconds, Fetched: 1 row(s)
hive> DROP TABLE pokes;
OK
Time taken: 4.038 seconds
A more detailed example can be developed using a web server
log file to summarize message types.
First, create a table using the following command:
hive> CREATE TABLE logs(t1 string, t2 string, t3 string, t4
string, t5 string, t6 string, t7 string) ROW FORMAT
DELIMITED FIELDS TERMINATED BY ' ‘;
OK
Time taken: 0.129 seconds
Apache Sqoop
Sqoop is a tool designed to transfer data between Hadoop and
relational databases.
One can use Sqoop to import data from a relational database
management system (RDBMS) into the Hadoop Distributed File
System (HDFS), transform the data in Hadoop, and then
export the data back into an RDBMS
Sqoop can be used with any Java Database Connectivity
(JDBC)–compliant database and has been tested on Microsoft
SQL Server, PostgresSQL, MySQL, and Oracle
Apache Sqoop Import Method
The figure describes the Sqoop data import (to HDFS) process.
The data import is done in two steps. In the first step, shown
in the figure, Sqoop examines the database to gather the
necessary metadata for the data to be imported.
The second step is a map-only (no reduce step) Hadoop job
that Sqoop submits to the cluster. This job does the actual
data transfer using the metadata captured in the previous
step.
The imported data are saved in an HDFS directory. Sqoop will
use the database name for the directory
By default, these files contain comma-delimited fields, with
new lines separating different records.
One can easily override the format in which data are copied
over by explicitly specifying the field separator and record
terminator characters.
Once placed in HDFS, the data are ready for processing.
Apache Sqoop Export Method
• Data export is done in two
steps, as shown in Figure
• As in the import process, the
first step is to examine the
database for metadata. The
export step again uses a map-
only Hadoop job to write the
data to the database.
• Sqoop divides the input data set
into splits, then uses individual
map tasks to push the splits to
the database.
Apache Sqoop Version Changes
Apache Flume
Apache Flume is an independent agent designed to collect,
transport, and store data into HDFS.
Often data transport involves a number of Flume agents that
may traverse a series of machines and locations.
Flume is often used for log files, social media-generated data,
email messages, and just about any continuous data source
A Flume agent is composed of three components.
Source. The source component receives data and sends it to a
channel. It can send the data to more than one channel. The
input data can be from a real-time source (e.g., weblog) or
another Flume agent.
Channel. A channel is a data queue that forwards the source
data to the sink destination. It can be thought of as a buffer
that manages input (source) and output (sink) flow rates.
Sink. The sink delivers data to destination such as HDFS, a
local file, or another Flume agent.
Sqoop agents may be placed in a pipeline, possibly to traverse
several machines or domains.
This configuration is normally used when data are collected on
one machine (e.g., a web server) and sent to another machine
that has access to HDFS.
In a Flume pipeline, the sink from one agent is connected to
the source of another
The data transfer format normally used by Flume, which is
called Apache Avro, provides several useful features.
First, Avro is a data serialization/deserialization system that
uses a compact binary format.
The schema is sent as part of the data exchange and is
defined using JSON (JavaScript Object Notation).
Avro also uses remote procedure calls (RPCs) to send data.
That is, an Avro sink will contact an Avro source to send data.
In another useful Flume configuration Flume is used to consolidate
several data sources before committing them to HDFS
Apache Oozie
Oozie is a workflow director system designed to run and
manage multiple related Apache Hadoop jobs.
For instance, complete data input and analysis may require
several discrete Hadoop jobs to be run as a workflow in which
the output of one job serves as the input for a successive job.
Oozie is designed to construct and manage these workflows.
Oozie is not a substitute for the YARN scheduler. YARN
manages resources for individual Hadoop jobs, and Oozie
provides a way to connect and control Hadoop jobs on the
cluster.
Oozie workflow jobs are represented as directed acyclic
graphs (DAGs) of actions
Three types of Oozie jobs are permitted:
Workflow—a specified sequence of Hadoop jobs with outcome-
based decision points and control dependency. Progress from
one action to another cannot happen until the first action is
complete.
Coordinator—a scheduled workflow job that can run at various
time intervals or when data become available.
Bundle—a higher-level Oozie abstraction that will batch a set of
coordinator jobs.
Oozie is integrated with the rest of the Hadoop stack,
supporting several types of Hadoop jobs out of the box (e.g.,
Java MapReduce, Streaming MapReduce, Pig, Hive, and Sqoop)
as well as system-specific jobs (e.g., Java programs and shell
scripts). Oozie also provides a CLI and a web UI for monitoring
Simple Oozie workfl ow
In this case, Oozie runs a basic MapReduce operation. If the
application was successful, the job ends; if an error occurred,
the job is killed.
Oozie workflow definitions are written in hPDL (an XML Process
Definition Language). Such workflows contain several types of
nodes:
Control fl ow nodes define the beginning and the end of a
workflow. They include start, end, and optional fail nodes.
Action nodes are where the actual processing tasks are
defined. When an action node finishes, the remote systems
notify Oozie and the next node in the workflow is executed.
Action nodes can also include HDFS commands.
Fork/join nodes enable parallel execution of tasks in the
workflow. The fork node enables two or more tasks to run at
the same time. A join node represents a rendezvous point that
must wait until all forked tasks complete.
Decision nodes enable decisions to be made about the
previous task. Control decisions are based on the results of
the previous action (e.g., file size or file existence). Decision
nodes are essentially switch-case statements that use JSP EL
(Java Server Pages—Expression Language) that evaluate to
either true or false.
Apache Hbase
Apache HBase is an open source, distributed, versioned,
nonrelational database modeled after Google’s Bigtable.
Like Bigtable, HBase leverages the distributed data storage
provided by the underlying distributed file systems spread
across commodity servers
Apache Hbase provides Bigtable-like capabilities on top of
Hadoop and HDFS.
Some of the more important features include the following
capabilities:
Linear and modular scalability
Strictly consistent reads and writes
Automatic and configurable sharding of tables
Automatic failover support between Region Servers
Convenient base classes for backing Hadoop MapReduce jobs
with Apache HBase tables
Easy-to-use Java API for client access
HBase Data Model Overview
A table in HBase is similar to other databases, having rows
and columns.
Columns in Hbase are grouped into column families, all with
the same prefix. For example, consider a table of daily stock
prices. There may be a column family called “price” that has
four members—price:open, price:close, price:low, and
price:high.
A column does not need to be a family. For instance, the stock
table may have a column named “volume” indicating how
many shares were traded.
Specific HBase cell values are identified by a row key, column
(column family and column), and version (timestamp).
It is possible to have many versions of data within an HBase
cell.
A version is specified as a timestamp and is created each time
data are written to a cell.
Almost anything can serve as a row key, from strings to binary
representations of longs to serialized data structures.
Rows are lexicographically sorted with the lowest order
appearing first in a table.
The empty byte array denotes both the start and the end of a
table’s namespace.
All table accesses are via the table row key, which is
considered its primary key.