Big Data
Streaming Spark
Prafullata Kiran Auradkar
Department of Computer Science and Engineering
prafullatak@pes.edu
Acknowledgements:
Significant information in the slide deck presented through the Unit 4 of the course have been created by Dr. K V Subramaniam’s and would like
to acknowledge and thank him for the same. I may have supplemented the same with contents from books and other sources from Internet and
would like to sincerely thank, acknowledge and reiterate that the credit/rights for the same remain with the original authors/publishers only. These
are intended for classroom presentation only.
BIG DATA
Overview of lecture
• Introduction to streaming analysis
• Streaming query types
• Streaming analysis requirements
• Streaming spark – Dstreams
• Job execution
• Stateless Transformations
• Stateful processing
• Fault Tolerance
• Performance
• Putting it all together
Introduction to streaming analysis
BIG DATA
Examples of Streaming
• Sensor data, e.g.,
• Temperature sensor in the ocean
■ Reports GPS
■ 1 sensor every 150 square miles => 1,000,000 sensors
■ 10 readings/sec => 3.5 TB/day
• Images
• London: 6 million video cameras
• Internet / Web traffic
• Google: hundreds of millions of queries/day
• Yahoo: billions of clicks/day
BIG DATA
Motivation
• Many important applications must
process large streams of live data
and provide results in near-real-
time
• Social network trends
• Website statistics
• Intrusion detection systems
• Transportation system - Uber
• etc.
• Require large clusters to handle
workloads
• Require latencies of few seconds
Streaming data model and queries
BIG DATA
Stream Data Model
• Multiple streams
• Different rates, not synchronized
• Archival store
• Offline analysis, not real-time
• Working store
• Disk or memory
• Summaries
• Parts of streams
• Queries
• Standing queries
• Ad-hoc queries
BIG DATA
Examples of Stream Queries
• Standing queries: produce outputs at appropriate time
• Query is continuously running
• Constantly reading new data
• Query execution can be optimized
• Example: maximum temperature ever recorded
• Ad hoc query: not predetermined, arbitrary query
• Need to store stream
• Approach: store sliding window in SQL DB
• Do SQL query
• Example: number of unique users over last 30 days
• Store logins for last 30 days
BIG DATA
Exercise 1
Consider the queries
• Alert when temperature > threshold
on the right. Which
among them are • Display average of last n temperature readings; n arbitrary
STANDING QUERIES • List of countries from which visits have been received over last year
and which are • Alert if website receives visit from a black-listed country
AD HOC?
BIG DATA
Exercise 1 Solution
Consider the queries Solution
on the right. Which • Alert when temperature > threshold - standing
among them are • Display average of last n temperature readings – ad hoc
STANDING QUERIES
and which are • List of countries from which visits have been received over last year
AD HOC? – ad hoc
• Alert if website receives visit from a black-listed country - standing
BIG DATA
Issues in Stream Processing
• Velocity
• Streams can have high data rate
• Need to process very fast
• Volume
• Low data rate, but large number of streams
• Ocean sensors, pollution sensors
• Need to store in memory
• May not have huge memory
• Approximate solutions
BIG DATA
Need for a framework …
… for building such complex stream processing applications
But what are the requirements
from such a framework?
Streaming analysis framework requirements
BIG DATA
Requirements
• Scalable to large clusters
• Second-scale latencies
• Simple programming model
BIG DATA
Exercise 2
CAN WE USE HADOOP?
Input Stock Data Stream:
• Consider the simple program on the num_stock
right.
• The input is a stream of records from
the stock market.
• Each time a stock is sold, a new
record is created.
• The record contains a field Find_max
num_stock which is the number of
stocks sold.
• Find_max is a program that updated a
variable Max_num_stock which is the
maximum of num_stock. Max_num_stock
BIG DATA
Exercise 2 - Solution CAN WE USE HADOOP?
• Write the pseudo-code for Find_max
Input Stock Data
• If num_stock > Max_num_stock
Stream: num_stock
Max_num_stock = num_stock
• Can this be implemented in Hadoop? Similar problems
• We need to process one record at a can arise in Spark
time. Hadoop processes a full file in
the Map
• Max_num_stock is a global variable Find_max
• Does your solution assume that
Find_max runs on a single node? Is this a
reasonable assumption?
• No, the number of transactions
could be more than what a single
Max_num_stock
node can handle
BIG DATA
Case study: Conviva, Inc.
• Real-time monitoring of online video metadata
HBO, ESPN, ABC, SyFy, …
Since we can’t use
Hadoop Custom-built distributed stream processing system
• 1000s complex metrics on millions of video
sessions
• Two processing stacks • Requires many dozens of nodes for processing
Hadoop backend for offline analysis
• Generating daily and monthly reports
• Similar computation as the streaming system
BIG DATA
Case study: XYZ, Inc.
• Any company who wants to process live streaming data has this problem
• Twice the effort to implement any new function
• Twice the number of bugs to solve
• Twice the headache Custom-built distributed stream processing system
• 1000s complex metrics on millions of video
sessions
• Requires many dozens of nodes for processing
• Two processing stacks
Hadoop backend for offline analysis
• Generating daily and monthly reports
• Similar computation as the streaming system
BIG DATA
Requirements
• Scalable to large clusters
• Second-scale latencies
• Simple programming model
• Integrated with batch & interactive processing
BIG DATA
Stateful Stream Processing
• Traditional streaming systems have a event-
driven record-at-a-time processing model mutable state
• Each node has mutable state input
• For each record, update state & send new records
records
node 1
• State is lost if node dies! node 3
input
records
• Making stateful stream processing be fault- node 2
tolerant is challenging
BIG DATA
Exercise 2 - Solution
CAN WE USE HADOOP?
• Write the pseudo-code for Find_max Input Stock Data
• If num_stock > Max_num_stock Stream: num_stock
Max_num_stock = num_stock
• Can this be implemented in Hadoop?
• We need to process one record at a Similar problems
time. Hadoop processes a full file in the can arise in Spark
Map Find_max
• Max_num_stock is a global variable
• Does your solution assume that Find_max
runs on a single node? Is this a reasonable
assumption?
• No, the number of transactions could be
more than what a single node can handle Max_num_stock
BIG DATA
Exercise 3: Stateful Stream Processing
• Traditional streaming systems have a event-
driven record-at-a-time processing model mutable state
• Each node has mutable state
input
• For each record, update state & send new records
records
In the stock market node 1
example, where is the
• State is lost if node dies! state?
node 3
input
records
• Making stateful stream processing be fault- node 2
tolerant is challenging
BIG DATA
Exercise 3: (solution): Stateful Stream Processing
• Write the pseudo-code for Find_max Input Stock Data Stream:
• If num_stock > Max_num_stock num_stock
• Max_num_stock = num_stock
• Max_num_stock is a global state
• Its value depends upon the entire stream
sequence
• The first num_stock could have been the Find_max
largest
In the stock market
example, where is the
state?
Max_num_stock
BIG DATA
Existing Streaming Systems
• Storm
• Replays record if not processed by a node
• Processes each record at least once
• May update mutable state twice!
• Mutable state can be lost due to failure!
• Trident – Use transactions to update state
• Processes each record exactly once
• Per state transaction updates slow
https://storm.apache.org/
https://storm.apache.org/releases/current/Trident-tutorial.html
BIG DATA
Requirements
• Scalable to large clusters
• Second-scale latencies
• Simple programming model
• Integrated with batch & interactive processing
• Efficient fault-tolerance in stateful computations
Spark Streaming
BIG DATA
What is Spark Streaming?
• Framework for large scale stream processing
• Scales to 100s of nodes
• Can achieve second scale latencies
• Integrates with Spark’s batch and
interactive processing
• Provides a simple batch-like API for
implementing complex algorithm
• Can absorb live data streams from Kafka,
Flume, ZeroMQ, etc.
BIG DATA
Exercise 4:
Can we modify Hadoop? Input Stock Data
Stream: num_stock
• Suppose we don’t want instantaneous updates
• Say 1 second updates are acceptable
• Can we modify Hadoop to do stream processing?
• Ignore the global variable problem for now
Find_max: If num_stock > Max_num_stock
Max_num_stock = num_stock
Max_num_stock
BIG DATA
Exercise 4: Solution
Input Stock Data
• We can batch input records into Stream: num_stock
an HDFS file
• We can batch together the
inputrecords every second
Batcher: group together records
• This file can be processed using
every second into an HDFS file
MapReduce
• We can produce an update every
second
• We have ignored the global
variable problem for now Find_max: If num_stock > Max_num_stock
Max_num_stock = num_stock
Max_num_stock
BIG DATA
Discretized Stream Processing
Run a streaming computation as a series of live data stream
very small, deterministic batch jobs
Spark
• Chop up the live stream into batches of X
Streaming
seconds
• Spark treats each batch of data as RDDs batches of X seconds
and processes them using RDD
operations
• Finally, the processed results of the RDD Spark
operations are returned in batches
processed results
BIG DATA
Discretized Stream Processing
Run a streaming computation as a series of live data stream
very small, deterministic batch jobs
Spark
• Batch sizes as low as ½ second, latency ~
Streaming
1 second
• Potential for combining batch processing batches of X seconds
and streaming processing in the same
system
Spark
processed results
Streaming Spark - DStreams
BIG DATA
Remember!!!!
• In Spark (not Streaming Spark)
• Every variable is an RDD
• There are two kinds of RDDs
• Pair RDDs are RDDs that consist of key-
value pairs
• Special operations, such as reduceByKey
are defined on pair RDDs
BIG DATA
Example 1 – Get hashtags from Twitter
val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>)
Twitter Streaming API batch @ t batch @ t+1 batch @ t+2
tweets DStream
stored in memory as an RDD (immutable,
distributed)
BIG DATA
Example 1 – Get hashtags from Twitter
val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>)
val hashTags = tweets.flatMap (status => getTags(status))
new DStream transformation: modify data in one Dstream to create another DStream
getTags is a function. A DStream is a sequence of RDDs. The function is applied to
each RDD. The result is another DStream
batch @ t batch @ t+1 batch @ t+2
flatMap flatMap flatMap
hashTags Dstream
[#cat, #dog, … ]
BIG DATA
Example 1 – Get hashtags from Twitter
val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>)
val hashTags = tweets.flatMap (status => getTags(status))
hashTags.saveAsHadoopFiles("hdfs://...")
output operation: to push data to external storage
tweets DStream batch @ t batch @ t+1 batch @ t+2
flatMap flatMap flatMap
hashTags
DStream
Every batch saved to save save save
HDFS
BIG DATA
Java Example
Scala
val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>)
val hashTags = tweets.flatMap (status => getTags(status))
hashTags.saveAsHadoopFiles("hdfs://...")
Java
JavaDStream<Status> tweets = ssc.twitterStream(<Twitter username>,
<Twitter password>)
JavaDstream<String> hashTags = tweets.flatMap(new Function<...> { })
hashTags.saveAsHadoopFiles("hdfs://...")
Function object to define the transformation
Streaming Spark - execution
BIG DATA
Streaming Spark Flow
Dstreams and Receivers
• Twitter, HDFS, Kafka, Flume
Transformations
• Standard RDD operations – map,
countByValue, reduce, join, …
• Stateful operations – window,
countByValueAndWindow, …
Output Operations on Dstreams
• saveAsHadoopFiles – saves to
HDFS
• foreach – do anything with each
batch of results
BIG DATA
Dstreams and Receivers
• Streaming Spark processes data in batches
• Together termed the Dstream
• Every Dstream is associated with a Receiver
• Receivers read data from a source and
store into Spark Memory for processing
• Types of sources
• Basic – file systems and sockets
• Advanced – Kafka, flume
• Relationship between Dstream and RDD
BIG DATA
Dstreams and Receivers
• Streaming Spark processes job
• Starts a Receiver on an executor as a long
running job
• Driver starts tasks to process blocks on
every interval
Source: https://www.youtube.com/watch?v=NHfggxItokg
BIG DATA
Transformations in Spark
• Stateless transformations
• Stateful Transformations
Spark Streaming – stateless processing
BIG DATA
Stateless transformations in Spark
• Transformation is applied to every batch of data independently.
• No information is carried forward between one batch and the
next batch
• Examples
• Map()
• FlatMap()
• Filter()
• Repartition()
• reduceByKey()
• groupByKey()
BIG DATA
Class Exercise : Stateless stream processing (10 mins)
• Consider a Dstream on stock quotes generated similar to earlier that
contains
• A sequence of tuples that contain <company name, stock sold>
• Need to find total shares sold per company in the last 1 minute
• Show Streaming spark design for the same.
BIG DATA
Solution – count stock in every window
batch @ t batch @ t+1 batch @ t+2
Stocks sold DStream
reduceByKey reduceByKey reduceByKey
counts DStream
every batch processed independently,
so no state across time
Stateful processing
BIG DATA
Stateful transformations
• Sometimes we need to keep some state across different
batches of data
• For example, what’s the max amount of stock sold across
the whole day for a company?
• In this case we need to store max value for each
company
• How do we store state across batches?
• First we ensure that data is in pairRDDs
• Key, value format
• Helps to ensure that we have a state per key
BIG DATA
Stateful transformations
• Spark provides two options
• Window operator – when we want a state to be
maintained across short periods of time
• Session based – where state is maintained for longer
BIG DATA
Example 3 – Count the hashtags over last 10 mins
val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>)
val hashTags = tweets.flatMap (status => getTags(status))
val tagCounts = hashTags.window(Minutes(10), Seconds(1)).countByValue()
sliding window operation window length sliding interval
BIG DATA
Example 3 – Counting the hashtags over last 10 mins
val tagCounts = hashTags.window(Minutes(10),Seconds(1)).countByValue()
t-1 t t+1 t+2 t+3
hashTags
sliding window
countByValue
tagCounts count over all
the data in the
window
BIG DATA
Class Exercise: (5 mins)
val tagCounts = hashTags.window(Minutes(10), Seconds(1)).countByValue()
How can we make this count operation more efficient?
BIG DATA
Smart window-based countByValue
val tagCounts = hashtags.countByValueAndWindow(Minutes(10), Seconds(1))
t-1 t t+1 t+2 t+3
hashTag
s countByValu
e add the counts
from the new
batch in the
subtract the
counts from – + window
tagCounts batch before ?
the window +
BIG DATA
Smart window-based reduce
• Technique to incrementally compute count generalizes to many
reduce operations
• Need a function to “inverse reduce” (“subtract” for counting)
• Could have implemented counting as:
hashTags.reduceByKeyAndWindow(_ + _, _ - _, Minutes(1), …)
54
BIG DATA
Session based state
▪ Maintaining arbitrary state, track sessions
- Maintain per-user mood as state, and update it with his/her tweets
tweets.updateStateByKey(tweet => updateMood(tweet))
• updateStateByKey uses the
current mood and the mood in the
tweet to update the user’s mood
BIG DATA
Exercise 4 – Maintaining State (10 minutes)
▪ Consider the code to the right ▪ Maintaining arbitrary state, track sessions
▪ What has to be the structure of the RDD - Maintain per-user mood as state, and
tweets? update it with his/her tweets
▪ Hint – note that updateStateByKey tweets.updateStateByKey(tweet =>
needs a key updateMood(tweet))
▪ What does the function updateMood do?
▪ Hint – note that it should update
per-user mood
BIG DATA
Exercise 4 – Maintaining State: solution
▪ What has to be the structure of the RDD ▪ Maintaining arbitrary state, track sessions
tweets? - Maintain per-user mood as state, and
▪ Must consist of key value pairs with update it with his/her tweets
user as key and mood as value
tweets.updateStateByKey(tweet =>
▪ Dinkar Happy updateMood(tweet))
▪ KVS VeryHappy
▪ …
▪ What does the function updateMood do?
▪ Compute the new mood based upon
the old mood and tweet
▪ Suppose user KVS (key) tweets “Eating
icecream” (value)
BIG DATA
Exercise 4 – Maintaining State: solution
▪ Consider the code to the right ▪ Maintaining arbitrary state, track sessions
▪ What has to be the structure of the RDD tweets?
- Maintain per-user mood as state, and update it
▪ Must consist of key value pairs with user as key and mood as
value with his/her tweets
▪ Dinkar Happy
▪ KVS VeryHappy tweets.updateStateByKey(tweet =>
▪ …
updateMood(tweet))
▪ Suppose user Dinkar (key) tweets “Eating icecream”
(value)
▪ updateStateByKey finds the current mood – Happy
▪ current mood (Happy) and tweet (Eating icecream) is passed
to updateMood
▪ updateMood calculates new mood as VeryHappy
▪ updateStateByKey stores the new mood for Dinkar as VeryHappy
Fault Tolerant stateful processing
BIG DATA
Fault-tolerant Stateful Processing
• All intermediate data are RDDs, hence can be recomputed if lost.
BIG DATA
Fault-tolerance
▪ RDDs remember the sequence of
operations that created it from the tweets input data
original fault-tolerant input data RDD replicated
in memory
▪ Batches of input data are replicated in flatMap
memory of multiple worker nodes,
therefore fault-tolerant
hashTags
RDD lost partitions
▪ Data lost due to worker failure, can be recomputed on
recomputed from input data other workers
BIG DATA
Fault Tolerance
• What happens if there is a failure with • Stateless
• Stateless processing • Previous history is not required.
• Stateful processing • Processing can just be
recomputed
• Stateful
• State from previous batches
required for computation.
• How much state to retain?
BIG DATA
Checkpointing
• Sometimes there may be too much
data to be stored t-1 t t+1 t+2 t+3
• For a streaming algorithm, may hashTag
s
have to store all the streams
• Checkpointing
• Stores an RDD tagCount
s
• Forgets the lineage
Checkpoint
• A checkpoint at t+2 will
• store the hashTags and
tagCounts at t+2
• Forget the rest of the lineage
Performance
BIG DATA
Performance
• Can process 6 GB/sec (60M records/sec) of data on 100 nodes at sub-second
latency
- Tested with 100 streams of data on 100 EC2 instances with 4 cores each
67
BIG DATA
Fast Fault Recovery
• Recovers from faults/stragglers within 1 sec
69
BIG DATA
Real Applications: Conviva
• Real-time monitoring of video metadata
• Achieved 1-2 second latency
• Millions of video sessions
Active sessions (millions)
processed
• Scales linearly with cluster
size
# Nodes in Cluster 70
BIG DATA
Real Applications: Mobile Millennium Project
• Traffic transit time estimation using online machine learning on GPS
observations
• Markov chain Monte Carlo
simulations on GPS
GPS observations per second
observations
• Very CPU intensive, requires
dozens of machines for useful
computation
• Scales linearly with cluster size
71
# Nodes in Cluster
Putting it all together
BIG DATA
Vision - one stack to rule them all
BIG DATA
Spark program vs Spark Streaming program
Spark Streaming program on Twitter stream
val tweets = ssc.twitterStream(<Twitter username>,
<Twitter password>)
val hashTags = tweets.flatMap (status => getTags(status))
hashTags.saveAsHadoopFiles("hdfs://...")
Spark program on Twitter log file
val tweets = sc.hadoopFile("hdfs://...")
val hashTags = tweets.flatMap (status => getTags(status))
hashTags.saveAsHadoopFile("hdfs://...")
BIG DATA
Vision - one stack to rule them all
▪ Explore data interactively $ ./spark-shell
scala> val file = sc.hadoopFile(“smallLogs”)
using Spark Shell / PySpark ...
scala> val filtered = file.filter(_.contains(“ERROR”))
to identify problems ...
scala> valProcessProductionData
object mapped = file.map(...)
{
... def main(args: Array[String]) {
val sc = new SparkContext(...)
val file = sc.hadoopFile(“productionLogs”)
▪ Use same code in Spark val filtered = file.filter(_.contains(“ERROR”))
val mapped = file.map(...)
stand-alone programs to ...
object
} ProcessLiveStream {
identify problems in } def main(args: Array[String]) {
val sc = new StreamingContext(...)
val stream = sc.kafkaStream(...)
production logs val filtered = file.filter(_.contains(“ERROR”))
val mapped = file.map(...)
...
}
▪ Use similar code in Spark
}
Streaming to identify
problems in live log streams
BIG DATA
Vision - one stack to rule them all
▪ Explore data interactively using Spark
$ ./spark-shell
scala> val file = sc.hadoopFile(“smallLogs”)
Shell / PySpark to identify problems ...
scala> val filtered = file.filter(_.contains(“ERROR”))
...
scala> valProcessProductionData
object mapped = file.map(...)
{
... def main(args: Array[String]) {
▪ Use same code in Spark stand-alone val sc = new SparkContext(...)
programs to identify problems in val file = sc.hadoopFile(“productionLogs”)
val filtered = file.filter(_.contains(“ERROR”))
production logs val mapped = file.map(...)
...
object
} ProcessLiveStream {
} def main(args: Array[String]) {
▪ Use similar code in Spark Streaming
val sc = new StreamingContext(...)
val stream = sc.kafkaStream(...)
to identify problems in live log val filtered = file.filter(_.contains(“ERROR”))
val mapped = file.map(...)
streams ...
}
}
BIG DATA
Alpha Release with Spark 0.7
• Integrated with Spark 0.7
• Import spark.streaming to get all the functionality
• Both Java and Scala API
• Give it a spin!
• Run locally or in a cluster
• Try it out in the hands-on
https://spark.apache.org/docs/latest/streaming-programming-guide.html
BIG DATA
Limitations
• Streaming Spark processes data in batches
• Near Real Time
• Not necessarily acceptable for certain scenarios
BIG DATA
Summary
• Stream processing framework that is ...
• Scalable to large clusters
• Achieves second-scale latencies
• Has simple programming model
• Integrates with batch & interactive workloads
• Ensures efficient fault-tolerance in stateful computations
• For more information, checkout the paper:
https://www.usenix.org/system/files/conference/hotcloud12/hotcloud12-
final28.pdf
BIG DATA
References
• https://spark.apache.org/docs/latest/streaming-programming-guide.html
• https://spark.apache.org/streaming/
• Mining of Massive Datasets, Anand Rajaraman, Jure Leskovec, Jeffrey D. Ullman
• Big Data Analytics Beyond Hadoop: Real-Time Applications with Storm, Spark,
and More Hadoop Alternatives, Vijay Srinivasa Agneeswaran
THANK YOU
Prafullata Kiran Auradkar
Department of Computer Science and Engineering
prafullatak@pes.edu