KEMBAR78
Kafka and Spark Streaming | PDF | Computer Data Storage | Application Programming Interface
0% found this document useful (0 votes)
484 views45 pages

Kafka and Spark Streaming

This document explains Kafka and Spark Streaming. It also explains the preservance of Partitions and Partitioning for different Spark operations. Also included is a case study for using Kafka with Spark Streaming.

Uploaded by

manish_gupta22
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
484 views45 pages

Kafka and Spark Streaming

This document explains Kafka and Spark Streaming. It also explains the preservance of Partitions and Partitioning for different Spark operations. Also included is a case study for using Kafka with Spark Streaming.

Uploaded by

manish_gupta22
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
You are on page 1/ 45

Kafka

Terminology

Topics feeds of messages


Producer writes data to brokers
Consumer read data from brokers
Brokers Kafka Servers
Partitions Topics are split into partitions
Controller broker which is responsible for managing the states of
partitions and replicas and for performing administrative tasks like
reassigning partitions.

Partitions

Each topics is divided into partitions.


Each partition contains ordered sequence of messages (Sequence no.
= offset)
Messages are removed only on timeout, not on consumption.
Consumers keep track of current offset so as to check from where to
read next messages.
Partitions can be increased but never decreased.
Each individual partition must fit on the servers that host it.

Architecture

KAFKA

Zookeeper

Consumer Group: Y

Consumer 2
Consumer 1
Consumer 3

Kafka cluster consists of one or more brokers where each broker is


identified by broker.id
Each partition in the Kafka cluster has a leader and a set of replicas
among the brokers.
Replication factor can be configured separately for each topic.
All writes to the partition must go through the partition leader.
Read is always done from partition leader.
The replicas are kept in sync by fetching from the leader.
When the leader shuts down/fails, the next leader is chosen from
among the in-sync replicas.
Messages are stored on disk. Kafka relies on page cache for
performance, so storage on disk do not bring down the memory
requirement.
Zookeeper is used for storing configuration and offsets.

Replication

Leader maintains:
o ISR (In Sync Replica Set)
o HW (High watermark offset of last committed message)
A message is committed only if it reaches all replicas in ISR
(acknowledgement needed).
Replicas will fetch messages from master at least every
replica.fetch.wait.max.ms
Dead Replica - If a replica fails to send a fetch request for longer than
replica.lag.time.max.ms.
Slow Replica - If a replica starts lagging behind the leader for longer than
replica.lag.time.max.ms.
Leader saves HW to disk every
replica.high.watermark.checkpoint.interval.ms
Leader continuously propagate HW to followers.
When a failed replica is restarted, it recovers the latest HW from disk and
truncates its log to the HW.

Leader Election
When all in sync replica dies, there are two options:
o
o

Wait for a replica in ISR to come back to life.


Choose the first replica that come back to life (not necessarily in
ISR) as leader.

Kafka chooses second approach. Advantage is that down time is reduced as it is


not known that how long will it take for a replica in ISR to come up or will it ever
come up.
Disadvantage is that consistency is violated. This means that Kafka will start
working with an old copy of log which will not contain all the messages. Later if a
replica in ISR comes up, it will be having messages which are not available with
leader. However same could not be used. This will lead to loss of messages.

Co

Zookeeper Data

brokers

Map
of Topic name to number of partitions

1

0 (partition id)

A (topic name)
Consumer id

1
owners

B
0 (partition id) status

name)

A (topic
partitions
ISR

0 (partition
leader
id)
Map
of
Partition
ID
to
replica
ID
1
topics

A (topic name)
offset

B
1

offsets

B
0 (broker
id)

host
ids
port
Used by Consumer

controller
broker id

P (consumer group id)

consumers

ids

0 (consumer id)

Essential configuration
Message Size
There is a message size restriction of 1000 KB by default. It could be configured
through following parameters:
1. max.message.bytes
2. replica.fetch.max.bytes

Message Retention
Kafka is designed to retain logs over long periods. Default duration is 1 week.
This could be a problem when less storage space is available. It could be
configured through parameter "log.retention.hours". However, if a duration of
less than 1 hour is required, then log.retention.minutes or log.retention.ms
(highest precedence) could be used at topic level.

Performance
If enough RAM is available, vm.swapiness parameter in file /etc/sysctl.conf on
Linux machines could be set to 1. This improves performance by reducing the
page out requests as Kafka relies on page cache for performance.
Also, parameter num.io.threads should be set at least be equal to number of
disks available.

Producer
There is a message size restriction of 1000 KB by default. It could be configured
through parameter "max.request.size". Please note that no error will be thrown if
writing of message to Kafka partition fails.
A producer could either specify a particular partition while writing a message to a
topic. As an alternative, if partition is not specified, then Kafka automatically
distributes the incoming messages across partitions.

Consumer
Kafka broadcasts messages from each topic partition to subscribed consumer
groups where message is finally delivered to exactly one consumer in a
consumer group. A consumer must continuously commit offsets of messages
read. So, if a consumer goes down, another consumer takes its place and start
reading after the last committed offset. Its a pull model i.e., consumers ask for
messages from a particular offset.

Rebalancing
Rebalancing only occurs when

Number of consumers in a consumer group subscribed to a topic change.


Number of partitions change (partitions could not be increased).

Rebalancing is not done according to current load on Kafka partitions. Following


tests prove the same.
Tes Ste Action

Observation

t
p
No. No.
a

Create a topic with 4 partition.

Start 2 consumers.

Start producer which puts


Messages from partition 1 and 2 went to
messages in partition 1 and 2. consumer 1. Consumer 2 didn't get any
message. On Querying about the consumer
group, it could be seen that owner of
partitions 1 and 2 is consumer1 and owner of
partition 3 and 4 is consumer 2.

Start one more producer


which puts messages in
partition 3 and 4.

Messages from partition 1 and 2 went to


consumer 1. Messages from partition 3 and 4
went to consumer 2. In Querying about
consumer group, it was found that partition
distribution didn't change.

Stop first producer.

Consumer 1 stopped receiving any messages.


Messages from partition 3 and 4 went to
consumer 2. Partition distribution didn't
change.

Create a topic with 4 partition.

Start 2 consumers.

Start producer 1 and 3 which


puts messages in partition 1
and 3.

Start two more producers - 2 Messages from partition 1 and 2 went to


and 4 which puts messages in consumer 1. Messages from partition 3 and 4
partition 2 and 4 respectively. went to consumer 2. In Querying about
consumer group, it was found that partition
distribution didn't change.

Stop producer 1 and producer Consumer 1 stopped receiving any messages.


2.
Messages from partition 3 and 4 went to
consumer 2. Partition distribution didn't
change.

API
There are two sets of API available

Old API
1. High Level Consumer

Message from partition 1 went to consumer 1.


Message from partition 3 went to consumer 2.
On Querying about the consumer group, it
could be seen that owner of partitions 1 and 2
is consumer1 and owner of partition 3 and 4 is
consumer 2.

This API is simpler to use. Consumer regularly commits offsets to Kafka.


However, it requires no indication from consumer application if a message is
consumed or not i.e., offsets are committed based on delivery on messages
only. This could be a problem if consumer application receives message but
crashes before processing the same. It will result in a situation, where
processing of certain messages will be skipped. Another (though rare
scenario) could be when messages are processed but application went down
before consumer commits messages. In this case, some messages will be
processed again when application comes up again. For instance, if parameter
"autocommit.interval.ms" for high level consumer is set higher than the time
it takes to process messages then it could lead to a situation where offsets
are not yet committed to zookeeper but corresponding messages are already
processed. If now application restarts abruptly, these messages will be
processed again.

Normal scenario:

Applicatio
Message

Consumer

Consumer receives messages


from Kafka till offset m

Consumer
gives
messages till
m to
application
Auto commit timer
expires

Consumer commits offset


m
offset m
written to
zookeeper

Application finishes
processing messages till
offset m
Application
asks for more
messages

Consumer ask for messages


from Kafka from offset m
Consumer receives messages
from Kafka from offset m to
offset n

Message processing missed:

Applicatio
Message

Consumer

Consumer receives messages


from Kafka till offset m
Auto commit timer expires

Consumer commits offset


m
offset m
written to
zookeeper

Consumer
gives
messages till
m to
application
Application restarts abruptly

Consumer asks for current


offset
offset
fetched from
zookeeper
Kafka returns offset m

Consumer ask for messages


from Kafka from offset m

Consumer receives messages


from Kafka from offset m to
offset n

At this moment, processing


of message till offset m is
missed as these will not be
fetched again from Kafka

KafkaConsumer
Kafka
readsreturns
ask offset
for current
m
offset from
offset
zookeeper

Message processed more than once:

Applicatio
Consumer

Message

Consumer receives messages


from Kafka till offset m

Consumer
gives
messages till
m to
application
Auto commit timer expires

Consumer commits offset


m
offset m
written to
zookeeper

Consumer ask for messages


from Kafka from offset m

Application finishes
processing messages till
offset m
Application
asks for more
messages
Consumer
gives
messages till
n to
application
Application finishes
processing messages till
offset n

Application restarts abruptly


Consumer asks for current
offset
offset
fetched from
zookeeper
Kafka returns offset m
Consumer receives messages
from Kafka from offset m to
offset n

Application finishes
processing messages till
offset n

Consumer receives messages


from Kafka till offset m
Since offset n was not committed
to zookeeper, therefore on
application start, consumer fetches
messages till n again

At this moment, message


At this moment,
processing
till offset
n are processed
of message till offsettwice
m is
missed as these will not be
fetched again from Kafka

2. Simple Consumer
This API is difficult to use but provides the added benefit of own offset
management. It is useful for application to manage its own offsets in order to
recover from failures. For instance, an application can persist offsets on a reliable
storage only after messages are consumed. Later if application is restarted after
a failure, it can read last committed offset from storage to avoid processing. This
will ensure at least once semantics. However, application must do the extra work
of keeping track of Kafka broker who is partition leader and need to keep track of
leader changes during repartitioning. Also, offset from which messages is to be
read is passed in every request.
Normal scenario:

Applicatio
Message

Consumer

Consumer receives messages


from Kafka till offset m
Consumer
gives
messages till
m to
application
Application commits
offset m to reliable
storage

Application
asks for
messages
from offset
Consumer ask for messages
from Kafka from offset m

Consumer receives messages


from Kafka from offset m to
offset n

Application restarts before processing messages (message processed exactly


once):

Applicatio
Message

Consumer

Consumer receives messages


from Kafka till offset m

Consumer
gives
messages till
m to
application

Application commits
offset m to reliable
storage

Application
asks for
messages
from offset
Consumer ask for messages
from Kafka from offset m

Consumer receives messages


from Kafka from offset m to
offset n
Application restarts abruptly

Application reads offset


m from reliable storage
Consumer ask for messages
from Kafka from offset m
Consumer receives messages
from Kafka from offset m to
offset n

Application restarts before committing offset (message processed at least


once):

Applicatio
Message

Consumer

Consumer receives messages


from Kafka till offset m
Consumer
gives
messages till
m to
application
Application commits
offset m to reliable
storage

Application
asks for
messages
from offset
Consumer ask for messages
from Kafka from offset m
Consumer receives messages
from Kafka from offset m to
offset n

Consumer
gives
messages till
m to
application

Application restarts abruptly

Application reads offset


m from reliable storage
Consumer ask for messages
from Kafka from offset m
Consumer receives messages
from Kafka from offset m to
offset n

At this moment, message


from offset m are
received again even
though they are already
processed.

New API
It integrated high level and simple consumer API. These methods allows for
manual offset management. This means that there is an option to disable
automatic committing of offsets by consumer and application can decide when
messages are consumed and then trigger committing of offset. This greatly
reduce the window in which a failure could cause violation of "exactly once"
semantics with respect to message processing. Zookeeper is not at all used by
the consumer in this case. Instead, offsets are tracked by Kafka.
Also, if auto rebalancing is used, then application can register a call-back to be
invoked when rebalancing occurs. New API supports non-blocking operations too.
Normal scenario:

Applicatio
Consumer

Consumer receives messages


from Kafka till offset m

Consumer commits offset


m
offset m
written to
zookeeper

Consumer ask for messages


from Kafka from offset m
Consumer receives messages
from Kafka from offset m to
offset n

Message

Consumer
gives
messages till
m to
application
Application finishes
processing messages till
offset m
Application
asks
consumer
to commit
Application
asks for more
messages

Message Size
There is a restriction on message size which could be read from a partition. It is
configurable through parameter
1. "fetch.message.max.bytes" (old API)
2. "max.partition.fetch.bytes" (new API)
Default value of same is 1MB.

Spark Streaming
Reading data from Kafka
Spark streaming provides library for fetching data from Kafka. Utility class
"kafkaUtils" [1] could be used for fetching data from Kafka and loading into
spark.
There are two sets of methods available to get spark discretized stream from a
set of Kafka topics.

Receiver based approach


It includes method "createStream". This approach use old Kafka High level
consumer API. It should be noted that one of the argument "topics" is incorrectly
described as "Map of (topic_name -> numPartitions) to consume" in javadocs [1].
Here topics is actually a map of (topic_name -> number of consumers). So, if no.
of consumers is less than the number of partitions in Kafka, then a single
consumer will be assigned to multiple partitions.
[Verification Info: A spark streaming instance was created. A DStream for getting
data from Kafka topic was created using "createStream" method. No of
consumers were set less than the no of Kafka partitions. Messages were posted
across all Kafka partitions. Each message was marked with a message ID so as to
identify the partition to which it was being posted. Messages received by spark
instances were printed. It was found that messages were received from all
partitions of Kafka.]
Tes Ste
t
p
No. No. Action
1

Observation

Create a topic with 4


partitions.

Start spark streaming


instance 1 with no. of
partitions set to 4.

Owner of all 4 partitions are different.

Start producer which puts


messages in all partitions.

Messages from all partitions went to spark


instance 1. Ownership of partitions didn't
change.

Start spark streaming


instance 1 with no. of

Messages from all partitions still went to spark


instance 1. Spark instance 2 didn't received
any message. Ownership of partitions didn't

partitions set to 4.

change.

Kill spark streaming instance


1.

Messages from all partitions went to spark


instance 2. Ownership of all partitions
changed but owner name for each partition is
still different.

Create a topic with 4


partitions.

Start spark streaming


instance 1 with no. of
partitions set to 2.

Owner of two partitions is owner1 and of


remaining two is owner 2.

Start producer which puts


messages in all partitions.

Messages from all partitions went to spark


instance 1. Ownership of partitions didn't
change.

Start spark streaming


instance 1 with no. of
partitions set to 4.

Messages from partition 1 and 2 went to spark


instance 1. Messages from partition 3 and 4
went to spark instance 2. Ownership of
partitions 1 didn't change. Owners of
partitions 2, 3 and 4 changed to owner 2,
owner 3 and owner 4 respectively.

Kill spark streaming instance


1.

Messages from all partitions went to spark


instance 2. Ownership of all partitions
changed - owners of partition 1, 2, 3 and 4
changed to owner3, owner4, owner5 and
owner 6 respectively.

Use of this approach has the limitation that the receiver uses a single spark
executor and data from all Kafka partitions is received by same. Level of
parallelism is related to different threads being assigned to different consumers.
This could cause a bottleneck. One of the approach to mitigate this issue is to
create separate Dstreams for each partitions. This has the added advantage that
if we need to perform transformation operations per Kafka operations than all
operations on Dstream and all RDD based operations will produce the same
effect.

When an operation need to be performed across Kafka partitions, then a union


operations need to be done on multiple streams. Since this approach uses old
Kafka high level API, it inherits the problem with Kafka API. When a driver fails
(not executor, though when a driver fails, all executors are killed too), following
problems could occur:
a. Some messages could be missed.
This problem could be mitigated by setting parameter
"spark.streaming.receiver.writeAheadLog.enable" to true. This will enable writing
of all received

Kafka messages to a reliable storage. This however has the disadvantage that
messages are being replicated twice, once in Kafka cluster and then in HDFS.
Writing of logs to storage also adversely affect the receivers throughput.

Executor1
(Receiver)

Input Stream
Messages are
collected for given
batch duration
Consumer commits
offset m
offset m
written to
zookeepe
r

Save messages to
WAL on reliable
storage

Applicati
Driv

Executo

Shares
metadata of
messages
stored in
WAL
Driver stores received
metadata in reliable

Spark Driver restarts


Driver reads
metadata from
reliable storage
Shares
metadata
of
messages
stored in
reads messages from
WAL

Process
messages

b. Some messages could be processed twice.


There is no work around available for same, as problem occurs at Kafka end. As
offsets are not committed to zookeeper, Kafka thinks that data is not consumed
and send messages again. This results in "at least once" semantics instead of

"exactly once" semantics.

Application
Executor1 (Receiver)

Driver

Executor 2

Consumer commits offset m

offset m written to zookeeper

Consumer asks for messages after offset m

Input Stream
Messages are collected for given batch duration

Save messages to WAL on reliable storage

Shares metadata of messages stored in WAL

Driver stores received metadata in reliable storage

Message processing finished

Executor reports processing status to driver

Application
Executor1 (Receiver)

Driver

Executor 2

Spark driver restarts

Consumer asks for current offset

offset fetched from zookeeper

Kafka returns offset m


Consumer asks for messages after offset m
Kafka sends messages after offset m

Direct approach
It includes method "createDirectStream". This approach use old Kafka Low level
consumer API. Spark will create exactly one RDD partition corresponding to each
Kafka partition. Also, consumers for these partitions are started on different
executors so messaged from Kafka could be received and used to construct RDD
in parallel.

DSTREAM

a. Messages are processed at least once


Since messages are available in Kafka, they can always be re-fetched in case of
application restart.

b. Messages are processed exactly once


In order to achieve the same, transactions must be atomic that saves results and
offset (where offsets are maintained by spark driver). Alternatively, it could be
acceptable to be messages being processed twice but same output be written
always in case of which transactions must be idempotent.
Please note that both of the above two approach uses the Kafka old consumer
API. As a consequence, none of the approach allow us to manually assign a spark
consumer to a Kafka partition. Also, if new Kafka API is used, then all the
advantages of direct approach could still be realized with added advantage of
offloading offset management to Kafka without the need of reliable storage for
check pointing.

Essential Configuration
Memory
Spark executor needs atleast 450 MB of memory. This is configurable through
"spark.executor.memory" in spark-defaults.conf (default 4 GB).
Out of total memory allocated
1. Spark memory (reserved): Spark executor needs 300 MB of memory for its
own functioning (even when not running a job).

2. Spark memory: Spark executor uses (Total memory -300 MB)*Fraction for
tasks. Fraction is configurable through parameter "spark.memory.fraction"
and its default value is 0.5.
3. User memory: Remaining memory is available to user.
In order words, one must allocate memory equal to 300 MB + (memory required
for spark tasks). Out of "memory required for spark tasks", one can control
distribution among user and spark using parameter "spark.memory.fraction".

Disk Space
If disk space is an issue, then while running spark in standalone mode, clean-up
of old application directories must be enabled.
This is done through the following parameters
1. spark.worker.cleanup.enabled (default: false) - Must be set to true to
enable deletion of application interval.
2. spark.worker.cleanup.interval (default: 30 min) - Interval after which
application directories must be checked for cleanup.
3. spark.worker.cleanup.appDataTtl (default 7 days) Time to live of
application directories.
Above parameters must be set as part of SPARK_WORKER_OPTS in "sparkenv.sh"
For example,
export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true
-Dspark.worker.cleanup.interval=300 -Dspark.worker.cleanup.appDataTtl=300"

Spark Operations
Type of RDDs
KafkaRDD
This RDD stores messages received from Kafka as records.
In case "CreateDirectStream" is used, then partitions in KafkaRDD will be
mapped to partitions of Kafka topics.
If "CreateStream" is used, then number of partitions are determined by
configuration parameter "spark.streaming.blockInterval" (default value 200ms).
For a given batch duration, no of partitions will be (Batch Duration/Block
Interval), where records received during first batch duration will go into partition
0, data received during second batch will go into partition 1 and so forth.

RDD 2
Block
Block
1Block
2 3 4
Block
Interval
Interval
Interval
Interval

Pa

Alternatively, one can repartition the data using


inputStream.repartition(<number of partitions>).

MapPartitionsRDD
All of the transformations which produce MapPartitionsRDD preserve partitioning.
RDD

MapPartitionsRDD

P01

P0

P11

P111

P21

P221

This RDD is the result of applying any of the following transformation to a RDD:
map
Apply one to one transformation on each record. Partitioner is not preserved.

RDD

map((x) -> x + 1)

MapPartitionsRDD

5
6

flatmap
Apply one to many transformation on each record. Partitioner is not preserved.
RDD

flatmap((x) -> [x + 1, x+2])


MapPartitionsRDD
2
3

3
4

1
2

4
5
5

6
6
7

4
5

mapPartitions
Apply transformation on each partition given as a List of records. Result of
transformation is again a list of records which is placed in corresponding partition
of resultant RDD. Partitioner is only if optional argument "preservesPartitioning"
to the methods belonging to "mapPartitions" family is set to true.
mapPartitions(InputList -> distinct(InputList))
MapPartitionsRDD
RDD
1

2
3

3
5

2
3

2
3
3
5

distinct

2
3
5

filter
A predicate is applied on each record and those which satisfy the same are put
into resultant RDD. filter always preserves partitioner.

RDD

filter((x) -> isOdd(x))

isOdd(1) = true

isOdd(2) = false

isOdd(3) = true

4
5

MapPartitionsRDD
1

isOdd(4) = false

isOdd(5) = true

glom
Put all records in a partition into an array. This array is then placed as the only
record in the corresponding partition of resultant RDD. Partitioner is not
preserved.
RDD
1
2
3

4
5

glom()

MapPartitionsRDD
1
2

3
4
5

CoGroupedRDD
This RDD is the result of cogroup operation applied on PairRDD (RDD consisting
of records of the form (key, value) pair). For each key in source RDD, resultant
RDD contains tuple of lists, where two list contain values corresponding to key
sourced from two RDDs.

RDD 1

cogroup(RDD2)

CoGroupedRDD

1, a
1, b

2, c

3, d
1,
2, e
2,
RDD 2

a
b

c
e

3,

1, f

3, g

2, h

Also, cogroup operation takes partitioner as optional argument. If partitioner of


both source RDD and partitioner passed to cogroup operation are same and
number of partitions in source RDDs is same, then partitioning is preserved.
RDD 1

CoGroupedRDD

P01
P11
P0
P21
RDD 2
P01

P111
P221

P11
P21

If no partitioner is specified and none of the participating input RDD have a


partitioner, then a default partitioner is used with number of partitions set to
maximum of number of partitions in RDD1 and number of partitions in RDD2.

RDD 1

CoGroupedRDD

P01
P11
P21
RDD 2
P01

P0

P111
P221

P11

UnionRDD
This RDD results from taking a union of two RDDs where number of partitions in
source RDD are moved as it is to destination RDD.
RDD 1

UnionRDD

P01

P0

P11

P111

P21

P221

RDD 2
P01

P321

P11

P421

There is one exception, if both source RDD uses the same custom partitioner and
no. of partitions is also same, then partitioning is preserved in resultant RDD, i.e.,
records from partition 0 of source RDDs will be moved to partition 0 of resultant
RDD and so forth.

RDD 1

union (RDD2)

unionRDD

1
2
3

1
2
6

7
8

4
5
RDD 2

3
4

5
9

7
8
9

CoalescedRDD
This RDD results from coalesce operation with shuffle set to false. It takes
number of partitions as an argument. It tries to partition in a manner so that
each new partition has roughly the same number of parent partitions and that
the preferred location of each new partition overlaps with as many preferred
locations of its parent partitions. If Shuffle is set to true, then random partitioner
is used. It will then result into a Shuffled partition with each partition containing
roughly same number of records. It is then used to create a CoalescedRDD.
With Shuffle=false
RDD

coalesce(2, false)

CoalescedRDD

P01
P11
P21
P31
P41

With Shuffle=true

P0

P111

RDD 1

coalesce (RDD2)

CoalescedRDD

1
2

ShuffledRDD

4
5
6

7
8
9

1
2
5

7
8

1
2
5
7
8

3
4
6
9
10

3
4
6
9
10

10

ZippedPartitionsRDD
This RDD results from zipPartitions operation. Optional argument
"preservesPartitioning" controls if partitioner is preserved from first parent RDD.
Partitioning is always preserved. Both participating RDDs must have same
number of partitions, else this operation will fail. Final partitions may not have
same records as source partitions as a flat map function is applied to records
from parent RDDs and result is placed in resultant RDD.
Let flatMapFunction = (List1, List2) => List1.addAll(List2)
zipPartitions (RDD2, flatMapFunc)
ZippedPartitionsRDD
RDD 1
1
2
3

4
5
RDD 2
6

7
8
9

1
2
6

7
8
3
4

5
9

ShuffledRDD
This RDD is created as an intermediate step of many transformations. It results
from repartitioning where records are shuffled and an optional reduce operation
is performed. It takes the following arguments:

Partitioner (mandatory)
createCombiner: (V => C)
mergeValue: (C, V) => C
mergeCombiners: (C, C) => C
mapSideCombine: boolean

Source or parent RDD contains the value of type V, while Resultant RDD contains
the value of type C. If mapSideCombine is set to true, then reduce operation is
first performed with in each partition and then repartitioning is done. This is
followed by another reduce operation.
Following are some of the operations which returns a ShuffledRDD:
reduceByKey
This operation is used to aggregate values of every key using given reduce
function. Arguments provided by user are:

reduceOperator : (value, value) => value


noOfPartitions (optional) : integer
or
partitioner : Partitioner

This operations returns a ShuffledRDD created using following arguments


Argument
Partitioner
createCombin
er
mergeValue
mergeCombin
ers
mapSideCom
bine

Value
If partitioner is not provided by user, then hash based
partitioner.
(value) => value
reduceOperator provided by user
reduceOperator provided by user
True (It means that first records in a partition will be reduced,
then a repartitioning and key aggregation operation will be
performed. It will be followed by a final reduce operation).

If default hash based partitioner is used, then number of partitions is set to be (in
order of preference based on availability):
a. Argument provided by user
b. Value of spark.default.parallelism
c. Number of partitions in parent RDD

reduceByKey((v1, v2) -> v1 + v2, 2)


RDD
ShuffledRDD
a, 1
b, 1

a, 4

c, 2
c, 1
a, 2
d, 4

4 = reduce(reduce(1, 2), 1)

b, 3

d, 4

a, 1
b, 2
c, 1

groupByKey
This operation is used to aggregate values of every key where aggregation is an
iterable list. Argument provided by user are:

noOfPartitions (optional) : integer


or
partitioner : Partitioner

This operations returns a ShuffledRDD created using following arguments


Argument
Partitioner
createCombin
er
mergeValue
mergeCombin
ers
mapSideCom
bine

Value
If partitioner is not provided by user, then hash based
partitioner.
(value) => CompactBuffer(value)
(CompactBuffer, value) => CompactBuffer.add(value)
(CompactBuffer1, CompactBuffer2) =>
CompactBuffer1.addAll(CompactBuffer)
False (It means only after repartitioning reduce operation is
performed which in this case is to add records to an iterable
container. This is because as there is no advantage of map
side combine since data size is not reduced before it is moved
around during shuffle).

CompactBuffer is an iterable container similar to ArrayBuffer. Method to


determine number of partitions is same as that of reduceByKey.

RDD

groupByKey(2)

a, 1
b, 1

ShuffledRDD
a, 1
2
1

c, 1
a, 2
d, 4
a, 1
b, 2
c, 1

c,

1
1

b, 1
2

d, 4

aggregateByKey
This operation is used to aggregate values of every key using given reduce
functions and a neutral zero value. Let U be the type of aggregated values.
Argument provided by user are:

zeroValue : U
SequenceFunction : (U, value) => U
CombineFunction : (U, U) => U
noOfPartitions (optional) : integer
or
partitioner : Partitioner

This operations returns a ShuffledRDD created using following arguments


Argument
Partitioner
createCombin
er
mergeValue
mergeCombin
ers
mapSideCom
bine

Value
If partitioner is not provided by user, then hash based
partitioner.
(value) => SequenceFunction(zeroValue, value)
SequenceFunction
CombineFunction
True

Method to determine number of partitions is same as that of reduceByKey.


For example, if we need to aggregate all unique values corresponding to each
key with number of partitions in resultant RDD as two, then aggregateByKey can
be invoked using following arguments:
Argument
noOfPartitions
zeroValue
SequenceFunc
tion
CombineFuncti

Value
2
New HashSet<value type>();
(set, value) => set.add(value) where set is of type HashSet
(set1, set2) => set1.addAll(set2) where set1 and set2 are of

on

type HashSet

RDD

aggregateByKey(zeroValue, SeqFunc, CombineFunc, 2)


ShuffledRDD

a, 1
b, 1

a, 1
2

c, 1
a, 2
d, 4
a, 1
b, 2
c, 1

c, 1

b, 1
2

d, 4

Use case
Problem
There exists a Kafka topic 'DeviceData' with 'n' partitions. There are a large no of
devices generating log files which are being published on Kafka topic
'DeviceData'. Devices are organized in groups, so that each group publish log
files to a specific partition. For instance, Devices from p to q publishes their log
files to partition 0, Devices from q+1 to r publishes their log files to partition 1
and so on.
Format of each line in a log file is as following:
<Timestamp>: <Feature name of form F[number]>: (start|stop)
It is assumed that no two consecutive start or stop could occur for same feature
and device. Also, a log file may end with a feature start and subsequent feature
stop may follow later in a log file from this device.

For each device and for each feature, we need to publish the maximum interval
for which the feature is used. Also, the start and stop time for this interval as well
as total duration of usage need to be published. This information must be
refreshed every 'T' minutes

Constraints
Since, output statistics required are per device and device to Kafka partition
mapping is constant therefore there is no need to repartition Spark RDD once
created. As a consequence, shuffling must be avoided during transformations to
maximize performance.

Solution
Approach
Spark streaming is configured with interval set to 'T' minutes. For every RDD
created with messages received from Kafka during an interval of 'T' minutes,
there will be some log parts which could be not be processed. These parts will
contain feature start statements corresponding to which stop statements are not
received in this interval. These stop statements may be received in a subsequent
interval. These unprocessed log parts will be stored in a shared storage (HDFS).
When RDD constructed from next batch is being processed, unprocessed
portions of log files stored on disks will be retrieved first and combined with
current RDD for processing.
Also, maximum interval and number of usages will be stored on shared storage.
For each batch, this data will be read from HDFS and merged with the results

obtained from current batch. Final results will then again be written to disk
overwriting last results.

Processing Phase 1
Processing Phase 2

Read
Merge
Write

Read

Write
Write

Write

Result from Current Batch


Final result from Last Batch

Implementation
Step 1
A Direct Stream is obtained
using KafkaUtils.createDirectStream.
Each RDD of
Final result
Result
stream will have n partitions corresponding to n Kafka partitions. Also, these
partitions will be constructed on separate executor.
JavaPairInputDStream<String, String> logs =
KafkaUtils.createDirectStream();
Every record in RDD corresponds to a single file from a device, where device ID is
the key and content of log file is value.
Step 2
A transformation is applied on stream so that each record in RDD is now mapped
to following record:

Content of log file


Device ID Logs

mapToPair

Device ID Logs

Feature

Actions
(T1, start)

F1

(T2, stop)
(T3, start)

F2

(T2, start)
(T4, stop)

JavaPairDStream<String, Map<String, List<Tuple2<Date, String>>>> processedLogs =


logs.mapToPair()
Step 3
Each record is now broken into multiple record so that each feature device pair
has now its own record. Please note that since we started with multiple log files
from same device, we can have multiple records for same feature device pair.
flatMapToPair
Device ID Logs

Feature

Actions

(Device ID, F1)

(T2, stop)

(T1, start)
F1

(T1, start)

(T3, start)

(T2, stop)
(T3, start)
(Device ID, F2)

F2

(T2, start)
(T4, stop)

(T2, start)
(T4, stop)

JavaPairDStream<Tuple2<String, String>, List<Tuple2<Date, String>>> DeviceFeatureLogs =


processedLogs.mapToPair()

Step 4
Now unprocessed records from last batch need to be added to RDD of current
batch. Since this operation is to be applied on current RDD only (as subsequent
RDDs will be merged with different set of unprocessed records), therefore
operation foreachRDD is used. It must be noted that since foreachRDD is an
output operation, no DStream is returned and all further processing must be
done inside foreachRDD.
First unprocessed records from hdfs is read into a new RDD.
JavaRDD<Tuple2<Tuple2<String, String>, List<Tuple2<Date, String>>>>
pastUnprocessedRecordsTuple = sparkContext.objectFile();
Since method objectFile cannot be used to directly read into a pairedRDD (i.e.,
RDD containing records with key value pair), therefore RDD read must now be
converted into a paired RDD.
mapToPair
(Device ID, Feature),(T1, start)

(Device ID, Feature)

(T1, start)

(T2, stop)

(T2, stop)

(T3, start)

(T3, start)

JavaPairRDD<Tuple2<String, String>, List<Tuple2<Date, String>>> pastUnprocessedRecord


pastUnprocessedRecordsTuple.mapToPair()
Step 5
Next, zipPartitions is used to merge RDD containing unprocessed records from
past and current RDD. Each partition in resultant RDD now contains records from
corresponding partitions of both parent RDD. Care has been taken so that
records from past RDD are placed first. Note that resultant RDD is not paired and
hence again a mapToPair operation (similar to one in step 4) is required to
break the tuple into key value pairs.

JavaRDD<Tuple2<Tuple2<String, String>, List<Tuple2<Date, String>>>> recordsToBeProces


currentRecords.zipPartitions(pastUnprocessedRecords,

JavaPairRDD<Tuple2<String, String>, List<Tuple2<Date, String>>> recordsToBeProces


recordsToBeProcessedTuple.mapToPair()
Step 6
Next, a reduce operation need to be applied so as to join records for each feature
device pair. However, reduce methods provided by spark triggers shuffle which
will violate the constraint stated. Moreover, using inbuilt reduce method will not
allow us to preserve sequence of records. Sequence of records must be
preserved as we wish to join time related data received in log files in the
sequence in which it was received.
In order to overcome above mentioned limitations, mapPartitionsToPair is used to
implement reduce operation. At the end of operation, records structure is same

as mentioned in step 4 but there will be only one record for each feature device
pair.
(Device 1, F2)

(Device 1, F2)

(T1, start)
(T2, stop)

(T1, start)
(T2, stop)

(T3, start)

(T3, start)

mapPartitionsToPair

(T4, stop)
(Device 1, F2)

(T5, start)

(T4, stop)
(T5, start)

JavaPairRDD<Tuple2<String, String>, List<Tuple2<Date, String>>> recordsToBeProcessed =


recordsToBeProcessedTuple.mapPartitionsToPair().cache()
Result is also cached. This is required as this result will be used twice. Once, to
determine which part of records could not be processed in this batch. Next, part
of records which could be processed will be used to generate desired statistics as
asked by problem statement. If resultant RDD is not cached, then it will
computed twice at the time when it will be used. This will not only be inefficient
but will also give incorrect result, as computation of this RDD relies on a file
being stored in HDFS. When the first time this RDD will be used, i.e., part of
records will be computed which could not be processed, same will cause the file
to be overwritten. This means that when second time this RDD will be used, input
file will be different which will give incorrect result.
Without Cache
Content 1

Content 1

RDD

With Cache

RDD1

RDD
cached

Content 2
RDD

Content 2

Content 2

RDD

RDD1

RDD2

RDD

RDD2

Also statement to cache must be followed by an output operation so as to force


RDD computation as we wish to avoid lazy evaluation on first usage. count
method could be used for same, result of which could be ignored.
Step 7
Next, each record is split into parts which could be processed and which could
not be processed. If start is a last action for a device feature pair, then it goes

into could not be processed part. Since, every record will not result into an
unprocessed part, therefore mapToPair could not be used. Instead,
mapPartitionsToPair is used to process partition as a whole (Alternatively, filter
followed by mapToPair could have been used but that would have resulted in
processing of action list twice).
(Device 1, F2)

(T1, start)
(T2, stop)

mapPartitionsToPair (Device 1, F2)

(T5, start)

(T3, start)
(T4, stop)
(T5, start)

JavaPairRDD<Tuple2<String, String>, List<Tuple2<Date, String>>> unprocessedRecords =


recordsToBeProcessed.mapPartitionsToPair()
RDD thus obtained contained records which will not be processed further in this
batch. This RDD is written to HDFS unprocessed log store to be consumed
when next batch is processed.
Step 8
Next, each record from output of Step 6 is processed to get maximum duration,
its start and end time, number of usages for this batch. mapToPair operation is
used for same and results are stored in a serializable structure
FeatureUsageData. Structure must be serializable as we will going to be
read/write same to HDFS later. mapToPair could be used as even if there is no
actions in a record which could be a processed, we could always write a
corresponding instance of FeatureUsageData structure with duration and
number of usages set to 0. Also, start time and end time could be set to null.
(Device 1, F2)

(Device 1, F2)

(T1, start)

duration

(T2-T1) + (T4-T3)

maxInterval

T4-T3

(T3, start)

maxIntervalStartTime

T3

(T4, stop)

maxIntervalStopTime

T4

(T2, stop)

mapToPair

(T5, start)

JavaPairRDD<Tuple2<String, String>, FeatureUsageData> processedRecords =


recordsToBeProcessed.mapToPair()
Step 9
Now results till last batch need to be merged with statistics from current batch.
First results are read from hdfs into a new RDD.
JavaRDD<Tuple2<Tuple2<String, String>, FeatureUsageData>>
pastResultsTuple = sparkContext.objectFile();
Next, RDD read is converted into a paired RDD.

duration (T2-T1) + (T4-T3)


(Device 1, F2),
T4-T3
maxInterval
maxIntervalStartTime

T3

maxIntervalStopTime

T4

mapToPair

(Device 1, F2)

duration

(T2-T1) + (T4-T3)

maxInterval

T4-T3

maxIntervalStartTime

T3

maxIntervalStopTime

T4

JavaPairRDD<Tuple2<String, String>, FeatureUsageData> pastResults =


pastResultsTuple.mapToPair()
Step 10
Next, zipPartitions is used to merge RDD containing old results and statistics
from current RDD. Each partition in resultant RDD now contains records from
corresponding partitions of both parent RDD. Since, resultant RDD is not paired
and hence again a mapToPair operation (similar to step 9) is required to break
the tuple into key value pairs.

JavaRDD<Tuple2<Tuple2<String, String>, FeatureUsageData>> pastAndCurrentResultsTuple


processedRecords.zipPartitions(pastResults, )
JavaPairRDD<Tuple2<String, String>, FeatureUsageData> pastAndCurrentResults =
pastAndCurrentResultsTuple.mapToPair()
Step 11
Next, a reduce operation is applied to merge old results and statistics from
current RDD for each device feature pair. In order to avoid shuffle,
mapPartitionsToPair is used.

(Device 1, F2)

mapPartitionsToPair

(Device 1, F2)

Ta

duration

duration

Tb

maxInterval

T2-T1

maxInterval

T4-T3

maxIntervalStartTime

T1

maxIntervalStartTime

T3

maxIntervalStopTime

T2

maxIntervalStopTime

T4

(Device 1, F2)

duration
maxInterval

Ta + Tb
Max(T2-T1, T4-T3)

maxIntervalStartTime(T2-T1) > (T4-T3)? T1:T3


(T2-T1) > (T4-T3)? T2:T4
maxIntervalStopTime

JavaPairRDD<Tuple2<String, String>, FeatureUsageData> finalResults =


pastAndCurrentResultsTuple.mapPartitionsToPair()
Result is also cached as it will be used twice. First, RDD is stored as it is to be
used by next batch. Next, FeatureUsageData will be translated to a form
readable from other application. Same will then be written to a shared storage.
This RDD is written to HDFS final result to be consumed when next batch is
processed.
Step 11
Next, values in records (of the type FeatureUsageData) in finalResult RDD is
converted to a form readable from other applications. mapToPair is used for
same.
For instance, FeatureUsageData is converted to string
JavaPairRDD<Tuple2<String, String>, String> finalResultsReadable =
finalResults.mapToPair()
This RDD is then saved to a reliable storage. For example, it could be saved to
HDFS:
finalResultsReadable.saveAsTextFile(<path>);

Known Limitations
1. Since, in this solution, it is mandatory to cache some intermediate RDDs,
same could be problematic if partitions of same are too large to fit into
memory.
2. Operations of class MapPartitions are used. These operations require
complete partition to be loaded into memory at once. This could cause
issues with large partitions.
3. Intermediate steps involves reading from HDFS files and overwriting the
same later. If a spark executor fails and task is restarted, then results

obtained could be incorrect as files being read from HDFS could have been
overwritten by the time executor failed. In other fails, tasks are no longer
idempotent.
4. RDD obtained by merging unprocessed records from past and records in
current batch is being processed twice. First, to get parts of records which
could not be processed in this batch and then to process the remaining
parts of records. Algorithmically, this operation is possible in a single step.

You might also like