Big Data processing
Rabaa Youssef
Batch or Stream processing?
• One of the fundamental questions you need to ask when
planning out your data architecture is the question
of batch or stream processing?
Do you process data as it arrives, in real time or near-real time?
Do you wait for data to accumulate before running your job?
Batch processing
• An efficient way of processing high/large volumes of data
• Data is processed, when :
• A group of transactions is collected over a period of time (Scheduled Batch
processing : Batch ETL jobs will typically be run on a set schedule)
• the amount of data reaches a certain threshold.
1. Collect data, 2. Load and process. 3. Produce batch results.
• Hadoop uses batch data processing. (mapReduce paradigm)
Wait to do everything at once + relying on the ability of your
system to handle it all.
• Batch processing ≠ Parallel processing (could be sequential)
• Hadoop batch processing is a parallel one.
Batch processinng
We can say, the batch processing system
• Access to all data.
• Might compute something big and complex.
• Generally, it is very concerned with throughput rather than
the latency of individual components of the computation.
• Batch processing has latency measured in minutes or more.
Pros & Cons of Batch Processing
Advantages of Batch Processing
• Ideal for processing large volumes of data/transaction. It also
increases efficiency rather than processing each individually.
• Process during less-busy times or at a desired designated time.
• Cost efficiency.
Disadvantages of Batch Processing
• The time delay between the collection of data and getting the
result after the batch process. => Latency
• Master file is not always kept up to date.
• Hadoop MR: No memory/cache management (disk access
slows down the process)
Hadoop MapReduce
Questions
• Is Mapreduce paradigm able to process all kind of
big data use cases?
• Data stream?
• If high latency is not permitted?
When to use Batch processing?
To generalize, you should lean towards batch processing when:
• Data freshness is not a mission-critical issue
• You are working with large datasets and are running a
complex algorithm that requires access to the entire batch –
e.g., sorting/averaging the entire dataset, generating a ML
model
• Global treatment on the entire data. Example, Decision
making in BI solutions (Creating reports needs an overview of
the entire data/interesting amount of data)
• You get access to the data in batches rather than in streams
• When you are joining tables in relational databases
Stream Processing
• Instead of browsing/processing the entire data once entirely stored,
we process data as soon as it arrives in the storage layer
• Often very close to the time in which it was generated (although
this would not always be the case).
• Could be asynchronous: no requirement regarding real-time results
availability
Stream Processing
• Usually involve a relatively simple transformation or
calculation to guarantee live results and avoid congestion
• Data stream processing: each event/data portion is
processed with no need of a global visibility
Low latency but less important throughput
• Example : filtering tweets and counting how much time
our product was cited : No batch but stream processing
When to use stream processing?
• While stream processing and real-time processing are not
necessarily synonymous, we would use stream processing
when we need to analyze or serve data as close as possible to
when we get hold of it.
• Data generated in a continuous stream and arriving at high velocity
• Sub-second latency is crucial
Potential data loss : Need of queues to store temporarily
upcoming data while waiting to be processed => consider a data
ingestion system (MOM/Broker)
Examples : Storm(Twitter), Flink, Kafka Streams, Samza (LinkedIn)
Stream Processing solutions/architectures
Real-Time Processing
• Involves continuous input, process, and output of data.
• Processes in a short period of time.
• Every transaction/processing result is directly reflected in
the master file, so that it will always be up-to-date
synchronous APIs
• For tasks like fraud detection, real-time processing is very
useful. By processing transaction data, we can
detect/signal fraud in real time, stop fraudulent
transactions before they take place.
Real-Time Processing
• Helps to compute a function of one data element. Also,
can say it processes a small window of recent data.
• Computes something relatively simple
• When there is a need to compute in near-real-time, only
seconds at most, we go for real-time processing.
• In real-time processing, computations are generally
independent (no aggregation for example)
Pros and Cons of Real-time
processing
Advantages of Real-Time Processing
• No significant delay in response.
• Information is always up to date. Hence, it makes
the organization able to take immediate action or
respond to an event/issue in the shortest possible
span of time.
• Gain insights from the updated data, detect patterns
of either opportunities or threats.
Micro-batch Processing
• Latency/Throughput tradeoff
Incoming tasks to be executed are grouped into small
batches to achieve some of the performance advantage
of batch processing (high throughput), without too
much increasing the latency for each task completion.
Typically applied in systems where the amount of
incoming tasks is variable: The system grab whatever
incoming tasks have been received and execute them in
a batch. This process is executed repeatedly.
Micro-batch Processing
Variant of batching which attempts to strike a better compromise
between latency and throughput than batching does.
How: waiting short time intervals (milliseconds or more) OR a
batch size threshold to pile up tasks before processing
them: batch cycle.
Execution engine has restrained visibility on the batch cycle
Micro-batch Processing
• Examples : Spark Streaming, Storm-Trident.
• Does not feel like a natural streaming: intermittent
processing depending on the batch cycle and on
the required processing.
Interactive Processing/Applications
• Who interacts with whom?
System interacts with user (other system)
Synchronous
• Which treatments could be interactive?
Batch processing? No! due to latency
Streaming ! Yes, but requires an In Memory
Treatment (Spark) for lower latency
Batch processing
Scripting languages
How to Analyze Large Data Sets in
Hadoop
• Although the Hadoop framework is implemented in
Java, MapReduce applications do not need to be
written in Java
• To abstract complexities of Hadoop programming
model, a few application development languages
have emerged that build on top of Hadoop:
• Pig
• Hive
• Jaql
Jaql
Pig, Hive, Jaql – Similarities
• Reduced program size over Java
• Applications are translated to map
and reduce jobs behind scenes
• Extension points for extending
existing functionality
• Interoperability with other
languages
• Not designed for random
reads/writes or low-latency queries
Pig, Hive, Jaql – Differences
Characteristic Pig Jaql Hive BigSQL
Developed
Yahoo! IBM Facebook IBM
by
Language Pig Latin Jaql HiveQL Ansi-SQL
Type of
Data flow Data flow SQL SQL
language
Data
JSON, semi Mostly Mostly
structures Complex
structured structured structured
supported
Schema Optional Optional Mandatory Mandatory
Pig
• The Pig platform is able to handle many kinds of data,
hence the name
• Pig Latin is a data flow language
• Two components:
• Language Pig Latin
• Runtime environment
• Two execution modes:
• Local
• Good for testing and prototyping pig –x local
• Distributed (MapReduce)
• Need access to a Hadoop cluster and HDFS pig –x mapreduce
• Default mode
Pig
• Three steps in a typical Pig program:
• LOAD
• Load data from HDFS
• TRANSFORM
• Translated to a set of map and reduce tasks
• Relational operators: FILTER, FOREACH, GROUP, UNION, etc.
• DUMP or STORE
• Display result on to the screen or store it in a file
• Pig data types:
• Simple types:
• int, long, float, double, chararray, bytearray, boolean
• Complex types: (John,18)
• tuple: ordered set of fields
{(John,18), (Mary, 29)}
• bag: collection of tuples
• map: set of key/value pairs [name#John, phone#1234567]
Pig
• Example: wordcount.pig
input = LOAD ‘./all_web_pages’ AS (line:chararray);
-- Extract words from each line and put them into a pig bag
-- datatype, then flatten the bag to get one word on each row
words = FOREACH input GENERATE FLATTEN(TOKENIZE(line)) AS word;
-- create a group for each word
word_groups = GROUP words BY word;
-- count the entries in each group
word_count = FOREACH word_groups GENERATE COUNT(words) AS count, group;
-- order the records by count
ordered_word_count = ORDER word_count BY count DESC;
STORE ordered_word_count INTO ‘./word_count_result’;
How to run wordcount.pig?
– Local mode: /bin/pig -x local wordcount.pig
– Distributed mode (MapReduce):
hadoop dfs -copyFromLocal all_web_pages input/all_web_pages
bin/pig -x mapreduce wordcount.pig
Hive
• Hive : developed by Facebook
• What is Hive?
• Data warehouse infrastructure built on top of Hadoop
• Provides an SQL-like language called HiveQL
• Allows SQL developers and business analysts to leverage existing SQL skills
• Offers built-in UDFs and indexing
• What Hive is not?
• Not designed for low-latency queries, unlike RDBMS such as DB2 and
Netezza
• Not schema on write : Schema on read instead
• Not for OLTP
• Not fully SQL compliant, only understand limited commands
Big Difference: Schema on Run
Regular database Big Data (Hadoop)
– Schema on load – Schema on run
Raw data
Raw data
Schema Storage
to filter (unfiltered,
raw data)
Schema
to filter
Storage
(pre-filtered data) Output
Hive
Benefits of schema on read:
• Flexibility in defining how your data is interpreted at load time
‾ This gives you the ability to evolve your "schema" as time goes on
‾ This allows you to have different versions of your "schema"
‾ This allows the original source data format to change without having to
consolidate to one data format
• You get to keep your original data
• You can load your data before you know what to do with it
• Gives you flexibility in being able to store unstructured, unclean,
and/or unorganized data
Hive
Downsides of schema on read:
• Less efficient because you have to reparse and reinterpret the
data every time (this can be expensive with formats like XML)
• Data is not self-documenting (i.e., you can't look at a schema to
figure out what the data is)
• More error prone and your analytics have to account for dirty
data
Hive
Components
• Shell : interface for users to submit queries and other operations
to the system.
• Driver : implements the notion of session handles and provides
execute and fetch APIs modeled on JDBC/ODBC interface
• Compiler : semantic analysis on the different query blocks and
query expressions and eventually generates an execution plan
• Engine : executes the execution plan. The plan is a DAG of stages.
• Metastore : stores all the structure information of tables and
partitions in the warehouse (column and column type
information, serializers and deserializers necessary to read and
write data, corresponding hdfs files where data is stored.
Hive
Data models:
Tables Analogous to tables in RDBMS, composed of columns
Partitions For optimizing data access, e.g. range partition tables by date
Buckets Data in each partition may in turn be divided into Buckets
based on the hash of a column in the table
DB HDFS
Table Directory
Partitions
(Sub-directories)
Buckets
(Files)
Hive
• Example: movie ratings analysis
-- create a table with tab-delimited text file format
hive> CREATE TABLE movie_ratings (
userid INT,
movieid INT,
rating INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
-- load data
hive> LOAD DATA INPATH ‘hdfs://node/movie_data' OVERWRITE INTO
TABLE movie_ratings;
-- gather ratings per movie
hive> SELECT movieid, rating, COUNT(rating)
FROM movie_ratings
GROUP BY movieid, rating;