Kafka and Spark Streaming
Kafka and Spark Streaming
Terminology
Partitions
Architecture
KAFKA
Zookeeper
Consumer Group: Y
Consumer 2
Consumer 1
Consumer 3
Replication
Leader maintains:
o ISR (In Sync Replica Set)
o HW (High watermark offset of last committed message)
A message is committed only if it reaches all replicas in ISR
(acknowledgement needed).
Replicas will fetch messages from master at least every
replica.fetch.wait.max.ms
Dead Replica - If a replica fails to send a fetch request for longer than
replica.lag.time.max.ms.
Slow Replica - If a replica starts lagging behind the leader for longer than
replica.lag.time.max.ms.
Leader saves HW to disk every
replica.high.watermark.checkpoint.interval.ms
Leader continuously propagate HW to followers.
When a failed replica is restarted, it recovers the latest HW from disk and
truncates its log to the HW.
Leader Election
When all in sync replica dies, there are two options:
o
o
Co
Zookeeper Data
brokers
Map
of Topic name to number of partitions
1
0 (partition id)
A (topic name)
Consumer id
1
owners
B
0 (partition id) status
name)
A (topic
partitions
ISR
0 (partition
leader
id)
Map
of
Partition
ID
to
replica
ID
1
topics
A (topic name)
offset
B
1
offsets
B
0 (broker
id)
host
ids
port
Used by Consumer
controller
broker id
consumers
ids
0 (consumer id)
Essential configuration
Message Size
There is a message size restriction of 1000 KB by default. It could be configured
through following parameters:
1. max.message.bytes
2. replica.fetch.max.bytes
Message Retention
Kafka is designed to retain logs over long periods. Default duration is 1 week.
This could be a problem when less storage space is available. It could be
configured through parameter "log.retention.hours". However, if a duration of
less than 1 hour is required, then log.retention.minutes or log.retention.ms
(highest precedence) could be used at topic level.
Performance
If enough RAM is available, vm.swapiness parameter in file /etc/sysctl.conf on
Linux machines could be set to 1. This improves performance by reducing the
page out requests as Kafka relies on page cache for performance.
Also, parameter num.io.threads should be set at least be equal to number of
disks available.
Producer
There is a message size restriction of 1000 KB by default. It could be configured
through parameter "max.request.size". Please note that no error will be thrown if
writing of message to Kafka partition fails.
A producer could either specify a particular partition while writing a message to a
topic. As an alternative, if partition is not specified, then Kafka automatically
distributes the incoming messages across partitions.
Consumer
Kafka broadcasts messages from each topic partition to subscribed consumer
groups where message is finally delivered to exactly one consumer in a
consumer group. A consumer must continuously commit offsets of messages
read. So, if a consumer goes down, another consumer takes its place and start
reading after the last committed offset. Its a pull model i.e., consumers ask for
messages from a particular offset.
Rebalancing
Rebalancing only occurs when
Observation
t
p
No. No.
a
Start 2 consumers.
Start 2 consumers.
API
There are two sets of API available
Old API
1. High Level Consumer
Normal scenario:
Applicatio
Message
Consumer
Consumer
gives
messages till
m to
application
Auto commit timer
expires
Application finishes
processing messages till
offset m
Application
asks for more
messages
Applicatio
Message
Consumer
Consumer
gives
messages till
m to
application
Application restarts abruptly
KafkaConsumer
Kafka
readsreturns
ask offset
for current
m
offset from
offset
zookeeper
Applicatio
Consumer
Message
Consumer
gives
messages till
m to
application
Auto commit timer expires
Application finishes
processing messages till
offset m
Application
asks for more
messages
Consumer
gives
messages till
n to
application
Application finishes
processing messages till
offset n
Application finishes
processing messages till
offset n
2. Simple Consumer
This API is difficult to use but provides the added benefit of own offset
management. It is useful for application to manage its own offsets in order to
recover from failures. For instance, an application can persist offsets on a reliable
storage only after messages are consumed. Later if application is restarted after
a failure, it can read last committed offset from storage to avoid processing. This
will ensure at least once semantics. However, application must do the extra work
of keeping track of Kafka broker who is partition leader and need to keep track of
leader changes during repartitioning. Also, offset from which messages is to be
read is passed in every request.
Normal scenario:
Applicatio
Message
Consumer
Application
asks for
messages
from offset
Consumer ask for messages
from Kafka from offset m
Applicatio
Message
Consumer
Consumer
gives
messages till
m to
application
Application commits
offset m to reliable
storage
Application
asks for
messages
from offset
Consumer ask for messages
from Kafka from offset m
Applicatio
Message
Consumer
Application
asks for
messages
from offset
Consumer ask for messages
from Kafka from offset m
Consumer receives messages
from Kafka from offset m to
offset n
Consumer
gives
messages till
m to
application
New API
It integrated high level and simple consumer API. These methods allows for
manual offset management. This means that there is an option to disable
automatic committing of offsets by consumer and application can decide when
messages are consumed and then trigger committing of offset. This greatly
reduce the window in which a failure could cause violation of "exactly once"
semantics with respect to message processing. Zookeeper is not at all used by
the consumer in this case. Instead, offsets are tracked by Kafka.
Also, if auto rebalancing is used, then application can register a call-back to be
invoked when rebalancing occurs. New API supports non-blocking operations too.
Normal scenario:
Applicatio
Consumer
Message
Consumer
gives
messages till
m to
application
Application finishes
processing messages till
offset m
Application
asks
consumer
to commit
Application
asks for more
messages
Message Size
There is a restriction on message size which could be read from a partition. It is
configurable through parameter
1. "fetch.message.max.bytes" (old API)
2. "max.partition.fetch.bytes" (new API)
Default value of same is 1MB.
Spark Streaming
Reading data from Kafka
Spark streaming provides library for fetching data from Kafka. Utility class
"kafkaUtils" [1] could be used for fetching data from Kafka and loading into
spark.
There are two sets of methods available to get spark discretized stream from a
set of Kafka topics.
Observation
partitions set to 4.
change.
Use of this approach has the limitation that the receiver uses a single spark
executor and data from all Kafka partitions is received by same. Level of
parallelism is related to different threads being assigned to different consumers.
This could cause a bottleneck. One of the approach to mitigate this issue is to
create separate Dstreams for each partitions. This has the added advantage that
if we need to perform transformation operations per Kafka operations than all
operations on Dstream and all RDD based operations will produce the same
effect.
Kafka messages to a reliable storage. This however has the disadvantage that
messages are being replicated twice, once in Kafka cluster and then in HDFS.
Writing of logs to storage also adversely affect the receivers throughput.
Executor1
(Receiver)
Input Stream
Messages are
collected for given
batch duration
Consumer commits
offset m
offset m
written to
zookeepe
r
Save messages to
WAL on reliable
storage
Applicati
Driv
Executo
Shares
metadata of
messages
stored in
WAL
Driver stores received
metadata in reliable
Process
messages
Application
Executor1 (Receiver)
Driver
Executor 2
Input Stream
Messages are collected for given batch duration
Application
Executor1 (Receiver)
Driver
Executor 2
Direct approach
It includes method "createDirectStream". This approach use old Kafka Low level
consumer API. Spark will create exactly one RDD partition corresponding to each
Kafka partition. Also, consumers for these partitions are started on different
executors so messaged from Kafka could be received and used to construct RDD
in parallel.
DSTREAM
Essential Configuration
Memory
Spark executor needs atleast 450 MB of memory. This is configurable through
"spark.executor.memory" in spark-defaults.conf (default 4 GB).
Out of total memory allocated
1. Spark memory (reserved): Spark executor needs 300 MB of memory for its
own functioning (even when not running a job).
2. Spark memory: Spark executor uses (Total memory -300 MB)*Fraction for
tasks. Fraction is configurable through parameter "spark.memory.fraction"
and its default value is 0.5.
3. User memory: Remaining memory is available to user.
In order words, one must allocate memory equal to 300 MB + (memory required
for spark tasks). Out of "memory required for spark tasks", one can control
distribution among user and spark using parameter "spark.memory.fraction".
Disk Space
If disk space is an issue, then while running spark in standalone mode, clean-up
of old application directories must be enabled.
This is done through the following parameters
1. spark.worker.cleanup.enabled (default: false) - Must be set to true to
enable deletion of application interval.
2. spark.worker.cleanup.interval (default: 30 min) - Interval after which
application directories must be checked for cleanup.
3. spark.worker.cleanup.appDataTtl (default 7 days) Time to live of
application directories.
Above parameters must be set as part of SPARK_WORKER_OPTS in "sparkenv.sh"
For example,
export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true
-Dspark.worker.cleanup.interval=300 -Dspark.worker.cleanup.appDataTtl=300"
Spark Operations
Type of RDDs
KafkaRDD
This RDD stores messages received from Kafka as records.
In case "CreateDirectStream" is used, then partitions in KafkaRDD will be
mapped to partitions of Kafka topics.
If "CreateStream" is used, then number of partitions are determined by
configuration parameter "spark.streaming.blockInterval" (default value 200ms).
For a given batch duration, no of partitions will be (Batch Duration/Block
Interval), where records received during first batch duration will go into partition
0, data received during second batch will go into partition 1 and so forth.
RDD 2
Block
Block
1Block
2 3 4
Block
Interval
Interval
Interval
Interval
Pa
MapPartitionsRDD
All of the transformations which produce MapPartitionsRDD preserve partitioning.
RDD
MapPartitionsRDD
P01
P0
P11
P111
P21
P221
This RDD is the result of applying any of the following transformation to a RDD:
map
Apply one to one transformation on each record. Partitioner is not preserved.
RDD
map((x) -> x + 1)
MapPartitionsRDD
5
6
flatmap
Apply one to many transformation on each record. Partitioner is not preserved.
RDD
3
4
1
2
4
5
5
6
6
7
4
5
mapPartitions
Apply transformation on each partition given as a List of records. Result of
transformation is again a list of records which is placed in corresponding partition
of resultant RDD. Partitioner is only if optional argument "preservesPartitioning"
to the methods belonging to "mapPartitions" family is set to true.
mapPartitions(InputList -> distinct(InputList))
MapPartitionsRDD
RDD
1
2
3
3
5
2
3
2
3
3
5
distinct
2
3
5
filter
A predicate is applied on each record and those which satisfy the same are put
into resultant RDD. filter always preserves partitioner.
RDD
isOdd(1) = true
isOdd(2) = false
isOdd(3) = true
4
5
MapPartitionsRDD
1
isOdd(4) = false
isOdd(5) = true
glom
Put all records in a partition into an array. This array is then placed as the only
record in the corresponding partition of resultant RDD. Partitioner is not
preserved.
RDD
1
2
3
4
5
glom()
MapPartitionsRDD
1
2
3
4
5
CoGroupedRDD
This RDD is the result of cogroup operation applied on PairRDD (RDD consisting
of records of the form (key, value) pair). For each key in source RDD, resultant
RDD contains tuple of lists, where two list contain values corresponding to key
sourced from two RDDs.
RDD 1
cogroup(RDD2)
CoGroupedRDD
1, a
1, b
2, c
3, d
1,
2, e
2,
RDD 2
a
b
c
e
3,
1, f
3, g
2, h
CoGroupedRDD
P01
P11
P0
P21
RDD 2
P01
P111
P221
P11
P21
RDD 1
CoGroupedRDD
P01
P11
P21
RDD 2
P01
P0
P111
P221
P11
UnionRDD
This RDD results from taking a union of two RDDs where number of partitions in
source RDD are moved as it is to destination RDD.
RDD 1
UnionRDD
P01
P0
P11
P111
P21
P221
RDD 2
P01
P321
P11
P421
There is one exception, if both source RDD uses the same custom partitioner and
no. of partitions is also same, then partitioning is preserved in resultant RDD, i.e.,
records from partition 0 of source RDDs will be moved to partition 0 of resultant
RDD and so forth.
RDD 1
union (RDD2)
unionRDD
1
2
3
1
2
6
7
8
4
5
RDD 2
3
4
5
9
7
8
9
CoalescedRDD
This RDD results from coalesce operation with shuffle set to false. It takes
number of partitions as an argument. It tries to partition in a manner so that
each new partition has roughly the same number of parent partitions and that
the preferred location of each new partition overlaps with as many preferred
locations of its parent partitions. If Shuffle is set to true, then random partitioner
is used. It will then result into a Shuffled partition with each partition containing
roughly same number of records. It is then used to create a CoalescedRDD.
With Shuffle=false
RDD
coalesce(2, false)
CoalescedRDD
P01
P11
P21
P31
P41
With Shuffle=true
P0
P111
RDD 1
coalesce (RDD2)
CoalescedRDD
1
2
ShuffledRDD
4
5
6
7
8
9
1
2
5
7
8
1
2
5
7
8
3
4
6
9
10
3
4
6
9
10
10
ZippedPartitionsRDD
This RDD results from zipPartitions operation. Optional argument
"preservesPartitioning" controls if partitioner is preserved from first parent RDD.
Partitioning is always preserved. Both participating RDDs must have same
number of partitions, else this operation will fail. Final partitions may not have
same records as source partitions as a flat map function is applied to records
from parent RDDs and result is placed in resultant RDD.
Let flatMapFunction = (List1, List2) => List1.addAll(List2)
zipPartitions (RDD2, flatMapFunc)
ZippedPartitionsRDD
RDD 1
1
2
3
4
5
RDD 2
6
7
8
9
1
2
6
7
8
3
4
5
9
ShuffledRDD
This RDD is created as an intermediate step of many transformations. It results
from repartitioning where records are shuffled and an optional reduce operation
is performed. It takes the following arguments:
Partitioner (mandatory)
createCombiner: (V => C)
mergeValue: (C, V) => C
mergeCombiners: (C, C) => C
mapSideCombine: boolean
Source or parent RDD contains the value of type V, while Resultant RDD contains
the value of type C. If mapSideCombine is set to true, then reduce operation is
first performed with in each partition and then repartitioning is done. This is
followed by another reduce operation.
Following are some of the operations which returns a ShuffledRDD:
reduceByKey
This operation is used to aggregate values of every key using given reduce
function. Arguments provided by user are:
Value
If partitioner is not provided by user, then hash based
partitioner.
(value) => value
reduceOperator provided by user
reduceOperator provided by user
True (It means that first records in a partition will be reduced,
then a repartitioning and key aggregation operation will be
performed. It will be followed by a final reduce operation).
If default hash based partitioner is used, then number of partitions is set to be (in
order of preference based on availability):
a. Argument provided by user
b. Value of spark.default.parallelism
c. Number of partitions in parent RDD
a, 4
c, 2
c, 1
a, 2
d, 4
4 = reduce(reduce(1, 2), 1)
b, 3
d, 4
a, 1
b, 2
c, 1
groupByKey
This operation is used to aggregate values of every key where aggregation is an
iterable list. Argument provided by user are:
Value
If partitioner is not provided by user, then hash based
partitioner.
(value) => CompactBuffer(value)
(CompactBuffer, value) => CompactBuffer.add(value)
(CompactBuffer1, CompactBuffer2) =>
CompactBuffer1.addAll(CompactBuffer)
False (It means only after repartitioning reduce operation is
performed which in this case is to add records to an iterable
container. This is because as there is no advantage of map
side combine since data size is not reduced before it is moved
around during shuffle).
RDD
groupByKey(2)
a, 1
b, 1
ShuffledRDD
a, 1
2
1
c, 1
a, 2
d, 4
a, 1
b, 2
c, 1
c,
1
1
b, 1
2
d, 4
aggregateByKey
This operation is used to aggregate values of every key using given reduce
functions and a neutral zero value. Let U be the type of aggregated values.
Argument provided by user are:
zeroValue : U
SequenceFunction : (U, value) => U
CombineFunction : (U, U) => U
noOfPartitions (optional) : integer
or
partitioner : Partitioner
Value
If partitioner is not provided by user, then hash based
partitioner.
(value) => SequenceFunction(zeroValue, value)
SequenceFunction
CombineFunction
True
Value
2
New HashSet<value type>();
(set, value) => set.add(value) where set is of type HashSet
(set1, set2) => set1.addAll(set2) where set1 and set2 are of
on
type HashSet
RDD
a, 1
b, 1
a, 1
2
c, 1
a, 2
d, 4
a, 1
b, 2
c, 1
c, 1
b, 1
2
d, 4
Use case
Problem
There exists a Kafka topic 'DeviceData' with 'n' partitions. There are a large no of
devices generating log files which are being published on Kafka topic
'DeviceData'. Devices are organized in groups, so that each group publish log
files to a specific partition. For instance, Devices from p to q publishes their log
files to partition 0, Devices from q+1 to r publishes their log files to partition 1
and so on.
Format of each line in a log file is as following:
<Timestamp>: <Feature name of form F[number]>: (start|stop)
It is assumed that no two consecutive start or stop could occur for same feature
and device. Also, a log file may end with a feature start and subsequent feature
stop may follow later in a log file from this device.
For each device and for each feature, we need to publish the maximum interval
for which the feature is used. Also, the start and stop time for this interval as well
as total duration of usage need to be published. This information must be
refreshed every 'T' minutes
Constraints
Since, output statistics required are per device and device to Kafka partition
mapping is constant therefore there is no need to repartition Spark RDD once
created. As a consequence, shuffling must be avoided during transformations to
maximize performance.
Solution
Approach
Spark streaming is configured with interval set to 'T' minutes. For every RDD
created with messages received from Kafka during an interval of 'T' minutes,
there will be some log parts which could be not be processed. These parts will
contain feature start statements corresponding to which stop statements are not
received in this interval. These stop statements may be received in a subsequent
interval. These unprocessed log parts will be stored in a shared storage (HDFS).
When RDD constructed from next batch is being processed, unprocessed
portions of log files stored on disks will be retrieved first and combined with
current RDD for processing.
Also, maximum interval and number of usages will be stored on shared storage.
For each batch, this data will be read from HDFS and merged with the results
obtained from current batch. Final results will then again be written to disk
overwriting last results.
Processing Phase 1
Processing Phase 2
Read
Merge
Write
Read
Write
Write
Write
Implementation
Step 1
A Direct Stream is obtained
using KafkaUtils.createDirectStream.
Each RDD of
Final result
Result
stream will have n partitions corresponding to n Kafka partitions. Also, these
partitions will be constructed on separate executor.
JavaPairInputDStream<String, String> logs =
KafkaUtils.createDirectStream();
Every record in RDD corresponds to a single file from a device, where device ID is
the key and content of log file is value.
Step 2
A transformation is applied on stream so that each record in RDD is now mapped
to following record:
mapToPair
Device ID Logs
Feature
Actions
(T1, start)
F1
(T2, stop)
(T3, start)
F2
(T2, start)
(T4, stop)
Feature
Actions
(T2, stop)
(T1, start)
F1
(T1, start)
(T3, start)
(T2, stop)
(T3, start)
(Device ID, F2)
F2
(T2, start)
(T4, stop)
(T2, start)
(T4, stop)
Step 4
Now unprocessed records from last batch need to be added to RDD of current
batch. Since this operation is to be applied on current RDD only (as subsequent
RDDs will be merged with different set of unprocessed records), therefore
operation foreachRDD is used. It must be noted that since foreachRDD is an
output operation, no DStream is returned and all further processing must be
done inside foreachRDD.
First unprocessed records from hdfs is read into a new RDD.
JavaRDD<Tuple2<Tuple2<String, String>, List<Tuple2<Date, String>>>>
pastUnprocessedRecordsTuple = sparkContext.objectFile();
Since method objectFile cannot be used to directly read into a pairedRDD (i.e.,
RDD containing records with key value pair), therefore RDD read must now be
converted into a paired RDD.
mapToPair
(Device ID, Feature),(T1, start)
(T1, start)
(T2, stop)
(T2, stop)
(T3, start)
(T3, start)
as mentioned in step 4 but there will be only one record for each feature device
pair.
(Device 1, F2)
(Device 1, F2)
(T1, start)
(T2, stop)
(T1, start)
(T2, stop)
(T3, start)
(T3, start)
mapPartitionsToPair
(T4, stop)
(Device 1, F2)
(T5, start)
(T4, stop)
(T5, start)
Content 1
RDD
With Cache
RDD1
RDD
cached
Content 2
RDD
Content 2
Content 2
RDD
RDD1
RDD2
RDD
RDD2
into could not be processed part. Since, every record will not result into an
unprocessed part, therefore mapToPair could not be used. Instead,
mapPartitionsToPair is used to process partition as a whole (Alternatively, filter
followed by mapToPair could have been used but that would have resulted in
processing of action list twice).
(Device 1, F2)
(T1, start)
(T2, stop)
(T5, start)
(T3, start)
(T4, stop)
(T5, start)
(Device 1, F2)
(T1, start)
duration
(T2-T1) + (T4-T3)
maxInterval
T4-T3
(T3, start)
maxIntervalStartTime
T3
(T4, stop)
maxIntervalStopTime
T4
(T2, stop)
mapToPair
(T5, start)
T3
maxIntervalStopTime
T4
mapToPair
(Device 1, F2)
duration
(T2-T1) + (T4-T3)
maxInterval
T4-T3
maxIntervalStartTime
T3
maxIntervalStopTime
T4
(Device 1, F2)
mapPartitionsToPair
(Device 1, F2)
Ta
duration
duration
Tb
maxInterval
T2-T1
maxInterval
T4-T3
maxIntervalStartTime
T1
maxIntervalStartTime
T3
maxIntervalStopTime
T2
maxIntervalStopTime
T4
(Device 1, F2)
duration
maxInterval
Ta + Tb
Max(T2-T1, T4-T3)
Known Limitations
1. Since, in this solution, it is mandatory to cache some intermediate RDDs,
same could be problematic if partitions of same are too large to fit into
memory.
2. Operations of class MapPartitions are used. These operations require
complete partition to be loaded into memory at once. This could cause
issues with large partitions.
3. Intermediate steps involves reading from HDFS files and overwriting the
same later. If a spark executor fails and task is restarted, then results
obtained could be incorrect as files being read from HDFS could have been
overwritten by the time executor failed. In other fails, tasks are no longer
idempotent.
4. RDD obtained by merging unprocessed records from past and records in
current batch is being processed twice. First, to get parts of records which
could not be processed in this batch and then to process the remaining
parts of records. Algorithmically, this operation is possible in a single step.