Kafka
Introduction to Apache Kafka
1. What is Apache Kafka?
Apache Kafka is a distributed event streaming platform used for building real-time data pipelines and
event-driven applications. It was initially developed by LinkedIn and later open-sourced as part of the
Apache Software Foundation. Kafka is designed to handle large volumes of data, ensuring fault
tolerance, scalability, and durability.
Kafka is widely used for:
• Log aggregation (e.g., centralizing logs from multiple applications).
• Real-time analytics (e.g., fraud detection, stock trading systems).
• Event-driven microservices (e.g., asynchronous communication between services).
• Message brokering (e.g., replacing traditional message queues like RabbitMQ).
1.1 Key Features of Kafka
• Scalability: Kafka can scale horizontally by adding more brokers.
• Durability: Kafka uses a log-based storage model with persistent message retention.
• High Throughput: Kafka efficiently processes millions of messages per second.
• Fault Tolerance: Replication ensures data is not lost even if a broker fails.
• Decoupling: Kafka enables loose coupling between producers and consumers, reducing
dependency.
2. Kafka Architecture Overview
Kafka follows a publish-subscribe model where producers send data to Kafka topics, and consumers
read from these topics.
2.1 Components of Kafka Architecture
1. Producers:
o Applications that publish messages to Kafka topics.
2. Topics:
o Categories to which producers send messages. Each topic is split into partitions.
3. Partitions:
o Messages are distributed across partitions for load balancing and parallelism.
4. Consumers:
o Applications that read messages from Kafka topics.
5. Brokers:
o Kafka servers that store and manage messages.
6. Consumer Groups:
o Consumers can form groups to balance message consumption across instances.
7. ZooKeeper:
o A coordination service for managing Kafka metadata and leader election.
2.2 Kafka Architecture Diagram
pgsqlCopyEdit +----------------------------+
| Kafka Cluster |
+----------------------------+
| |
+-------------+-------------+-------------+
| Kafka Broker 1 | Kafka Broker 2 | Kafka Broker 3 |
+-------------+-------------+-------------+
| |
+--------+--------+ +--------+--------+
| Partition 1 | | Partition 2 |
+--------+--------+ +--------+--------+
| |
+------+------+ +------+------+
| Consumer 1 | | Consumer 2 |
+-------------+ +-------------+
3. Setting Up Kafka Locally
3.1 Download and Install Kafka
1. Download Apache Kafka from the official site:
https://kafka.apache.org/downloads
2. Extract the downloaded file:
shCopyEdittar -xzf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0
3. Start ZooKeeper (required by Kafka):
shCopyEditbin/zookeeper-server-start.sh config/zookeeper.properties
4. Start Kafka Broker:
shCopyEditbin/kafka-server-start.sh config/server.properties
5. Create a Kafka topic:
shCopyEditbin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --
partitions 3 --replication-factor 1
4. Implementing Kafka Producer in C++
To integrate C++ with Kafka, we use the librdkafka library. Install it using:
shCopyEditsudo apt-get install librdkafka-dev
4.1 C++ Kafka Producer Code
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void delivery_report_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err)
std::cerr << "Message delivery failed: " << rd_kafka_err2str(rkmessage->err) << std::endl;
else
std::cout << "Message delivered successfully to topic " << rd_kafka_topic_name(rkmessage->rkt) <<
std::endl;
}
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test-topic";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_conf_set_dr_msg_cb(conf, delivery_report_callback);
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
if (!producer) {
std::cerr << "Failed to create Kafka producer" << std::endl;
return 1;
}
std::string message = "Hello, Kafka!";
rd_kafka_producev(producer,
RD_KAFKA_V_TOPIC(topic.c_str()),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(message.c_str(), message.size()),
RD_KAFKA_V_END);
rd_kafka_flush(producer, 1000);
rd_kafka_destroy(producer);
return 0;
}
Steps Explained:
1. Initialize Kafka producer.
2. Set Kafka broker (localhost:9092).
3. Send message to Kafka topic (test-topic).
4. Handle delivery reports.
Compile and run:
shCopyEditg++ -o producer producer.cpp -lrdkafka
./producer
5. Implementing Kafka Consumer in C++
5.1 C++ Kafka Consumer Code
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void msg_consume(rd_kafka_message_t *message, void *opaque) {
if (message->err) {
std::cerr << "Message consumption failed: " << rd_kafka_err2str(message->err) << std::endl;
} else {
std::cout << "Received message: " << (char *)message->payload << std::endl;
}
}
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test-topic";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_conf_set(conf, "group.id", "test-group", NULL, 0);
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
if (!consumer) {
std::cerr << "Failed to create Kafka consumer" << std::endl;
return 1;
}
rd_kafka_poll_set_consumer(consumer);
rd_kafka_subscribe(consumer, topic.c_str());
while (true) {
rd_kafka_message_t *message = rd_kafka_consumer_poll(consumer, 1000);
if (message) {
msg_consume(message, NULL);
rd_kafka_message_destroy(message);
}
}
rd_kafka_consumer_close(consumer);
rd_kafka_destroy(consumer);
return 0;
}
Steps Explained:
1. Initialize Kafka consumer.
2. Subscribe to Kafka topic (test-topic).
3. Continuously poll messages from the topic.
Compile and run:
shCopyEditg++ -o consumer consumer.cpp -lrdkafka
./consumer
What is Apache Kafka?
Apache Kafka is a high-performance, distributed event streaming platform designed to handle large
volumes of real-time data. It is an open-source system that allows applications to publish, process,
store, and subscribe to streams of events in a scalable, fault-tolerant, and durable manner.
Kafka is used by organizations worldwide for various real-time data processing tasks, including log
aggregation, event-driven microservices, stream processing, and real-time analytics.
1. Core Concept of Kafka
Kafka operates based on a publish-subscribe model, where producers send data to Kafka topics, and
consumers read from these topics. Kafka ensures high throughput, low latency, durability, and
scalability, making it ideal for big data processing.
1.1 Key Features of Kafka
Feature Description
Scalability Kafka can scale horizontally by adding more brokers to the cluster.
High Throughput Kafka handles millions of messages per second with low latency.
Durability Messages are stored on disk and replicated across multiple brokers.
Fault Tolerance Kafka ensures message availability even if a broker crashes.
Distributed Architecture Kafka is designed to work in a multi-node environment, ensuring
load balancing.
Decoupling of Producers & Kafka enables loose coupling between data producers and
Consumers consumers.
2. How Kafka Works?
Kafka follows a log-based storage system and asynchronous messaging model. Here’s a high-level
workflow of how Kafka processes messages:
2.1 Kafka Message Flow
1. Producers send messages to a Kafka topic.
2. Kafka stores messages in partitions within a topic.
3. Kafka brokers distribute messages and ensure replication for fault tolerance.
4. Consumers subscribe to topics and process messages at their own pace.
2.2 Kafka Architecture Components
1. Producer:
o The producer sends messages to Kafka topics.
o Messages are sent asynchronously, ensuring low latency.
2. Topic:
o A topic is a logical category where messages are stored.
o Topics are split into partitions for scalability.
3. Partition:
o Each topic is divided into multiple partitions for parallel processing.
o Kafka ensures load balancing by distributing partitions across brokers.
4. Broker:
o A Kafka broker stores and manages topic partitions.
o Multiple brokers form a Kafka cluster for scalability.
5. Consumer:
o The consumer reads messages from a Kafka topic.
o Consumers can be grouped into Consumer Groups for parallel processing.
6. ZooKeeper:
o Kafka uses Apache ZooKeeper to manage metadata, leader election, and broker
coordination.
3. Kafka Message Structure
Kafka messages are stored as binary data and contain the following fields:
Field Description
Key Used for partitioning messages.
Value The actual message data (payload).
Offset A unique identifier assigned to each message in a partition.
Timestamp The time the message was produced.
Example Kafka Message Format
jsonCopyEdit{
"key": "order_123",
"value": "{ \"customer\": \"John\", \"amount\": 250 }",
"timestamp": "2025-02-12T12:30:00Z"
}
4. Implementing Kafka Producer & Consumer in C++
Kafka provides a C++ client library (librdkafka) for integrating Kafka with C++ applications. We will
implement:
1. A Kafka Producer to send messages.
2. A Kafka Consumer to read messages.
4.1 Kafka Producer in C++
Installing librdkafka Library
To work with Kafka in C++, install librdkafka:
shCopyEditsudo apt-get install librdkafka-dev
C++ Kafka Producer Code
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void delivery_report_callback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err)
std::cerr << "Message delivery failed: " << rd_kafka_err2str(rkmessage->err) << std::endl;
else
std::cout << "Message delivered successfully to topic " << rd_kafka_topic_name(rkmessage->rkt) <<
std::endl;
}
int main() {
std::string brokers = "localhost:9092";
std::string topic = "orders";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_conf_set_dr_msg_cb(conf, delivery_report_callback);
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
if (!producer) {
std::cerr << "Failed to create Kafka producer" << std::endl;
return 1;
}
std::string message = "{\"order_id\": \"12345\", \"customer\": \"John Doe\", \"amount\": 100}";
rd_kafka_producev(producer,
RD_KAFKA_V_TOPIC(topic.c_str()),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(message.c_str(), message.size()),
RD_KAFKA_V_END);
rd_kafka_flush(producer, 1000);
rd_kafka_destroy(producer);
return 0;
}
Steps Explained:
1. Initialize Kafka producer.
2. Set Kafka broker (localhost:9092).
3. Send JSON message to Kafka topic (orders).
4. Handle message delivery status.
4.2 Kafka Consumer in C++
C++ Kafka Consumer Code
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void msg_consume(rd_kafka_message_t *message, void *opaque) {
if (message->err) {
std::cerr << "Message consumption failed: " << rd_kafka_err2str(message->err) << std::endl;
} else {
std::cout << "Received message: " << (char *)message->payload << std::endl;
}
}
int main() {
std::string brokers = "localhost:9092";
std::string topic = "orders";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_conf_set(conf, "group.id", "test-group", NULL, 0);
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
if (!consumer) {
std::cerr << "Failed to create Kafka consumer" << std::endl;
return 1;
}
rd_kafka_poll_set_consumer(consumer);
rd_kafka_subscribe(consumer, topic.c_str());
while (true) {
rd_kafka_message_t *message = rd_kafka_consumer_poll(consumer, 1000);
if (message) {
msg_consume(message, NULL);
rd_kafka_message_destroy(message);
}
}
rd_kafka_consumer_close(consumer);
rd_kafka_destroy(consumer);
return 0;
}
Steps Explained:
1. Initialize Kafka consumer.
2. Subscribe to Kafka topic (orders).
3. Continuously poll messages from the topic.
Kafka vs Traditional Messaging Systems
Apache Kafka is often compared to traditional messaging systems like ActiveMQ, RabbitMQ, and IBM
MQ. While they all serve the purpose of message exchange between applications, Kafka fundamentally
differs in its architecture, scalability, durability, and performance characteristics.
In this section, we will analyze how Kafka differs from traditional message queues and when to use
Kafka over other messaging systems.
1. Traditional Messaging Systems Overview
Traditional messaging systems fall into two main categories:
1. Message Queues (Point-to-Point Model)
oExamples: RabbitMQ, ActiveMQ, IBM MQ
oHow it works: Messages are sent by producers to a queue. Consumers pick up
messages, and each message is processed by a single consumer.
2. Publish-Subscribe Systems
o Examples: JMS (Java Message Service), Google Cloud Pub/Sub
o How it works: Producers send messages to a topic, and multiple consumers receive a
copy of the message.
1.1 Key Characteristics of Traditional Message Brokers
Feature Description
Message Ensures reliable delivery by requiring consumers to acknowledge message
Acknowledgment reception.
Transactional Support Supports message rollback in case of failures.
Durability Messages persist until acknowledged by consumers.
Routing Mechanisms Supports complex message filtering and routing.
2. Apache Kafka vs Traditional Messaging Systems
Kafka is not a traditional message queue; instead, it is a distributed log-based event streaming
platform that provides high scalability, durability, and low latency.
2.1 Key Differences Between Kafka and Traditional Message Brokers
Feature Kafka Traditional MQ (RabbitMQ, ActiveMQ)
Architecture Distributed, Log-based Centralized Queue-based
Message Retention Stores messages for a configurable Deletes messages after consumption
time
Throughput Very High (millions of messages/sec) Moderate
Scalability Scales horizontally across multiple Limited scalability
brokers
Fault Tolerance Replicates data across brokers Less robust failover mechanisms
Ordering Maintains strict partition-based May not guarantee strict order
Guarantees ordering
Consumer Model Pull-based (consumers fetch data) Push-based (messages are sent to
Feature Kafka Traditional MQ (RabbitMQ, ActiveMQ)
consumers)
Latency Low Latency (milliseconds) Higher latency compared to Kafka
3. Kafka’s Advantages Over Traditional Messaging Systems
1. Message Persistence and Replay
o Kafka stores messages for a configured period, even after being read.
o In traditional MQs, messages disappear once consumed.
2. High Throughput & Low Latency
o Kafka achieves low latency (<10ms) with millions of messages per second.
o Traditional message queues have higher overhead due to acknowledgments.
3. Horizontal Scalability
o Kafka partitions topics across multiple brokers, allowing parallel processing.
o Traditional MQs often struggle with load balancing and scaling.
4. Fault Tolerance
o Kafka replicates messages across brokers to prevent data loss.
o Traditional MQs rely on backup queues, which may not be as robust.
5. Decoupling of Producers & Consumers
o Kafka’s log-based storage allows multiple consumers to process messages
independently.
o Traditional MQs tightly couple consumers with message delivery, limiting reprocessing.
4. Kafka vs RabbitMQ - Detailed Comparison
RabbitMQ is a widely used message broker that follows the AMQP (Advanced Message Queuing
Protocol), while Kafka follows a log-based event streaming model.
Feature Kafka RabbitMQ
Processing Log-based Message Queue
Model
Message Retains messages for days Messages deleted after
Retention acknowledgment
Consumer Model Consumers pull messages Broker pushes messages to consumers
Use Case Event streaming, log processing, real- Transactional processing, short-lived
time analytics messages
Ordering Guaranteed within partitions Ordering not always preserved
Guarantee
When to use RabbitMQ?
• When you need low-latency, transactional messaging.
• When message acknowledgment and routing are critical.
When to use Kafka?
• When handling real-time event streaming.
• When you need to process large volumes of data at scale.
5. Implementing Kafka vs Traditional MQ in C++
5.1 Implementing Kafka Producer in C++
Here’s an example of a Kafka producer that sends messages to a Kafka topic:
cppCopyEdit#include <iostream>
#include <rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "event_stream";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
if (!producer) {
std::cerr << "Failed to create Kafka producer" << std::endl;
return 1;
}
std::string message = "Kafka vs RabbitMQ!";
rd_kafka_producev(producer,
RD_KAFKA_V_TOPIC(topic.c_str()),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(message.c_str(), message.size()),
RD_KAFKA_V_END);
rd_kafka_flush(producer, 1000);
rd_kafka_destroy(producer);
return 0;
}
5.2 Implementing RabbitMQ Producer in C++
RabbitMQ in C++ uses rabbitmq-c library for message publishing.
Installing rabbitmq-c Library
shCopyEditsudo apt-get install librabbitmq-dev
C++ RabbitMQ Producer Code
cppCopyEdit#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <iostream>
int main() {
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
amqp_socket_open(socket, "localhost", 5672);
amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
amqp_channel_open(conn, 1);
amqp_get_rpc_reply(conn);
amqp_bytes_t queue_name = amqp_cstring_bytes("test_queue");
amqp_basic_publish(conn, 1, amqp_empty_bytes, queue_name, 0, 0, NULL,
amqp_cstring_bytes("Hello, RabbitMQ!"));
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
6. Conclusion: When to Use Kafka vs Traditional MQ?
Use Case Best Choice
Event streaming, real-time processing Kafka
Transactional messaging RabbitMQ
Message durability and replayability Kafka
Low-latency messaging RabbitMQ
Large-scale log collection Kafka
Complex routing and priority messages RabbitMQ
Kafka Use Cases
Apache Kafka is widely used across multiple industries for real-time event processing, data streaming,
messaging, and analytics. Its high scalability, durability, and low-latency make it the ideal choice for
handling large amounts of data.
In this section, we will explore Kafka’s use cases in different domains, supported by C++
implementations where applicable.
1. Real-Time Data Streaming
Kafka is extensively used for real-time data streaming, where applications need to process and analyze
data continuously.
1.1 Example Use Cases
• Stock Market Data Processing – Real-time analysis of stock prices.
• Website Clickstream Analysis – Analyzing user behavior on websites.
• Sensor Data Processing – IoT devices continuously sending sensor data.
1.2 C++ Implementation of Kafka Streaming
Producer: Sending Stock Price Updates
cppCopyEdit#include <iostream>
#include <rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "stock_prices";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
if (!producer) {
std::cerr << "Failed to create Kafka producer" << std::endl;
return 1;
}
std::string message = "AAPL: 185.32, TSLA: 720.15, AMZN: 3123.76";
rd_kafka_producev(producer,
RD_KAFKA_V_TOPIC(topic.c_str()),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(message.c_str(), message.size()),
RD_KAFKA_V_END);
rd_kafka_flush(producer, 1000);
rd_kafka_destroy(producer);
return 0;
}
Consumer: Receiving Stock Price Updates
cppCopyEdit#include <iostream>
#include <rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "stock_prices";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
if (!consumer) {
std::cerr << "Failed to create Kafka consumer" << std::endl;
return 1;
}
rd_kafka_subscribe(consumer, rd_kafka_topic_partition_list_new(1));
while (true) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 1000);
if (msg) {
std::cout << "Received stock update: " << (char*)msg->payload << std::endl;
rd_kafka_message_destroy(msg);
}
}
rd_kafka_destroy(consumer);
return 0;
}
2. Log Aggregation
Kafka is widely used for log aggregation, where logs from multiple servers are collected, stored, and
analyzed in real time.
2.1 Example Use Cases
• Centralized Logging System – Collect logs from multiple microservices.
• Security Log Analysis – Detect anomalies in security logs.
• Real-time Monitoring – Track server performance using logs.
2.2 Kafka-Based Log Aggregation Architecture
1. Servers generate logs.
2. Kafka Producers send logs to a Kafka log topic.
3. Kafka Consumers process and store logs in Elasticsearch or HDFS.
Kafka Producer for Log Collection in C++
cppCopyEdit#include <iostream>
#include <rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "server_logs";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
if (!producer) {
std::cerr << "Failed to create Kafka producer" << std::endl;
return 1;
}
std::string log_message = "ERROR: Memory overflow on server 192.168.1.1";
rd_kafka_producev(producer,
RD_KAFKA_V_TOPIC(topic.c_str()),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(log_message.c_str(), log_message.size()),
RD_KAFKA_V_END);
rd_kafka_flush(producer, 1000);
rd_kafka_destroy(producer);
return 0;
}
3. Real-Time Fraud Detection
Banks and financial institutions use Kafka for fraud detection by analyzing transaction patterns in real-
time.
3.1 Example Use Cases
• Credit Card Fraud Detection – Detects fraudulent transactions.
• Anomalous Login Detection – Identifies unusual login attempts.
• Insurance Fraud – Detects fraudulent claims.
3.2 Kafka for Fraud Detection
• Kafka collects transaction data.
• A Kafka Stream application applies fraud detection algorithms.
• Alerts are generated in real-time.
Kafka Producer for Real-Time Fraud Detection in C++
cppCopyEdit#include <iostream>
#include <rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "fraud_detection";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
if (!producer) {
std::cerr << "Failed to create Kafka producer" << std::endl;
return 1;
}
std::string transaction_data = "User:12345, Amount:$10000, Location:Russia";
rd_kafka_producev(producer,
RD_KAFKA_V_TOPIC(topic.c_str()),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(transaction_data.c_str(), transaction_data.size()),
RD_KAFKA_V_END);
rd_kafka_flush(producer, 1000);
rd_kafka_destroy(producer);
return 0;
}
4. IoT Data Processing
Kafka is widely used in IoT applications for real-time processing of sensor data.
4.1 Example Use Cases
• Smart Home Devices – Real-time temperature and motion detection.
• Industrial IoT – Monitoring machine health in factories.
• Connected Vehicles – Processing GPS and speed data.
4.2 Kafka-Based IoT Data Processing Architecture
• IoT devices send data to Kafka Producers.
• Kafka stores the data in topics.
• Stream processing applications analyze sensor data.
Kafka Producer for IoT Sensor Data in C++
cppCopyEdit#include <iostream>
#include <rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "iot_sensors";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
if (!producer) {
std::cerr << "Failed to create Kafka producer" << std::endl;
return 1;
}
std::string sensor_data = "Temperature: 25.3°C, Humidity: 60%";
rd_kafka_producev(producer,
RD_KAFKA_V_TOPIC(topic.c_str()),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(sensor_data.c_str(), sensor_data.size()),
RD_KAFKA_V_END);
rd_kafka_flush(producer, 1000);
rd_kafka_destroy(producer);
return 0;
}
1. Kafka Brokers
A Kafka Broker is a server that stores, manages, and distributes messages in Kafka. Brokers receive
data from Producers and serve data to Consumers.
1.1 Kafka Broker Responsibilities
• Stores messages in topics.
• Handles partitions of topics.
• Manages producer and consumer connections.
• Distributes load among multiple brokers in a Kafka cluster.
1.2 Example: Kafka Broker Setup
To start a Kafka broker, we need to configure the server.properties file:
propertiesCopyEditbroker.id=1
log.dirs=/tmp/kafka-logs
listeners=PLAINTEXT://localhost:9092
zookeeper.connect=localhost:2181
Then, start the Kafka broker using:
shCopyEditbin/kafka-server-start.sh config/server.properties
2. Kafka Producers
Kafka Producers send messages to Kafka topics. They decide which partition a message should go to.
2.1 Producer Features
• Sends messages asynchronously.
• Supports message partitioning for parallel processing.
• Uses acks (acknowledgments) for message durability.
2.2 C++ Kafka Producer Example
cppCopyEdit#include <iostream>
#include <rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test_topic";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
if (!producer) {
std::cerr << "Failed to create Kafka producer" << std::endl;
return 1;
}
std::string message = "Hello Kafka from C++!";
rd_kafka_producev(producer,
RD_KAFKA_V_TOPIC(topic.c_str()),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(message.c_str(), message.size()),
RD_KAFKA_V_END);
rd_kafka_flush(producer, 1000);
rd_kafka_destroy(producer);
return 0;
}
3. Kafka Consumers
Kafka Consumers read messages from Kafka topics and process them.
3.1 Consumer Features
• Automatically balances load in consumer groups.
• Supports message offset tracking.
• Can process messages in real-time or batch mode.
3.2 C++ Kafka Consumer Example
cppCopyEdit#include <iostream>
#include <rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test_topic";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), NULL, 0);
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
if (!consumer) {
std::cerr << "Failed to create Kafka consumer" << std::endl;
return 1;
}
rd_kafka_subscribe(consumer, rd_kafka_topic_partition_list_new(1));
while (true) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 1000);
if (msg) {
std::cout << "Received message: " << (char*)msg->payload << std::endl;
rd_kafka_message_destroy(msg);
}
}
rd_kafka_destroy(consumer);
return 0;
}
4. Kafka Topics and Partitions
A Kafka Topic is a logical category where messages are published. Each topic is divided into partitions
to enable scalability and parallel processing.
4.1 Example: Creating a Topic with 3 Partitions
shCopyEditbin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 -
-replication-factor 1
4.2 Checking Topic Details
shCopyEditbin/kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092
5. ZooKeeper (Cluster Coordination)
Kafka uses Apache ZooKeeper to manage metadata and leader election.
5.1 ZooKeeper Responsibilities
• Stores Kafka broker metadata.
• Elects leaders for partitions.
• Handles cluster coordination.
5.2 Starting ZooKeeper
shCopyEditbin/zookeeper-server-start.sh config/zookeeper.properties
6. Kafka Streams
Kafka Streams is a real-time stream processing library that processes Kafka data without requiring a
separate cluster.
6.1 Kafka Streams Features
• Supports stateful and stateless processing.
• Can filter, transform, and aggregate Kafka messages.
7. Kafka Connect
Kafka Connect is used for integrating Kafka with external data sources like databases, cloud storage,
and message queues.
7.1 Example Connectors
• JDBC Connector – Reads from MySQL/PostgreSQL.
• HDFS Connector – Writes to Hadoop HDFS.
• Elasticsearch Connector – Sends data to Elasticsearch.
8. Schema Registry
Kafka Schema Registry is used for managing Avro schemas in Kafka topics.
8.1 Schema Registry Benefits
• Ensures schema evolution compatibility.
• Supports versioning of message formats.
8.2 Example: Registering an Avro Schema
shCopyEditcurl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\":
\"int\"}, {\"name\": \"name\", \"type\": \"string\"}]}"}' \
http://localhost:8081/subjects/my_topic-value/versions
9. KSQL (Kafka SQL)
KSQL allows users to query and process Kafka data using SQL-like syntax.
9.1 Example: Querying Kafka Data
sqlCopyEditSELECT * FROM pageviews WHERE userid = 'user123';
Apache Kafka Architecture – A Deep Dive
1. Introduction
Apache Kafka is a distributed event streaming platform used for high-throughput, fault-tolerant data
streaming applications. Originally developed at LinkedIn, Kafka is now a key part of real-time data
pipelines and event-driven architectures across various industries.
Kafka follows a publish-subscribe messaging model, allowing multiple producers to send data to topics
and multiple consumers to process that data independently.
1.1 Key Features of Kafka Architecture
• Scalability – Kafka can handle millions of messages per second by distributing data across
multiple brokers and partitions.
• Durability – Data is persisted on disk and replicated across brokers to ensure fault tolerance.
• High Throughput – Kafka provides efficient data handling with minimal latency.
• Decoupling – Producers and consumers are loosely coupled, improving system resilience.
2. Core Components of Kafka Architecture
Kafka’s architecture consists of the following essential components:
4. Producers – Applications that publish messages to Kafka topics.
5. Brokers – Kafka servers that store and manage messages.
6. Topics and Partitions – Logical divisions of data streams to enable scalability.
7. Consumers – Applications that subscribe to topics and process messages.
8. Zookeeper – A service that manages Kafka metadata, leader elections, and configurations.
9. Clusters – A group of brokers working together to ensure high availability and fault tolerance.
Let's explore each component in detail.
3. Kafka Producers – Writing Data to Kafka
3.1 What is a Kafka Producer?
Kafka producers are responsible for publishing messages to Kafka topics. They push data to Kafka
asynchronously, which ensures high throughput.
3.2 How Producers Work?
• Producers connect to Kafka brokers and send messages to topics.
• Kafka appends messages to the respective topic partitions.
• Messages are stored in order within partitions.
• Producers can choose which partition to send a message to (default is round-robin).
3.3 Producer Acknowledgment Mechanism
Kafka producers can be configured with different acknowledgment levels:
• acks = 0 → Fire-and-forget (no guarantee of message persistence).
• acks = 1 → Acknowledged by leader only (moderate reliability).
• acks = all → Acknowledged by all replicas (highest reliability).
3.4 C++ Implementation of a Kafka Producer
To send messages to Kafka using C++, we use librdkafka.
C++ Code: Kafka Producer
cppCopyEdit#include <iostream>
#include <librdkafka/rdkafkacpp.h>
void produceMessages() {
std::string brokers = "localhost:9092";
std::string topicName = "test_topic";
std::string errstr;
// Create producer configuration
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
// Create producer instance
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
for (int i = 0; i < 10; i++) {
std::string message = "Message " + std::to_string(i);
producer->produce(topicName, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(), NULL, NULL);
producer->flush(1000);
}
delete producer;
delete conf;
}
int main() {
produceMessages();
return 0;
}
3.5 Producer Best Practices
• Use batching and compression to reduce network overhead.
• Set retries and timeouts to handle transient failures.
• Partition wisely to ensure even distribution of messages.
4. Kafka Brokers – Message Storage & Distribution
4.1 What is a Kafka Broker?
A Kafka broker is a server that stores messages and serves client requests.
4.2 How Kafka Brokers Work?
• A Kafka cluster consists of multiple brokers (distributed architecture).
• Brokers store partitions of topics, and each partition has a leader broker.
• Kafka automatically balances partitions across brokers.
4.3 Example: Configuring Kafka Brokers
Kafka brokers are configured using server.properties:
shCopyEditbroker.id=1
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
To start a Kafka broker:
shCopyEditkafka-server-start.sh config/server-1.properties
5. Kafka Topics and Partitions
5.1 What is a Kafka Topic?
A topic is a logical grouping of messages. Each topic consists of partitions to enable parallelism.
5.2 How Kafka Partitions Work?
• Partitions enable load balancing by distributing messages across multiple brokers.
• Each partition has a leader and multiple replicas for fault tolerance.
• Consumers can read messages in order from a single partition.
5.3 Creating Kafka Topics with Partitions
To create a topic with 3 partitions and a replication factor of 2:
shCopyEditkafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 --
replication-factor 2
6. Kafka Consumers – Reading Data from Kafka
6.1 What is a Kafka Consumer?
A consumer subscribes to Kafka topics and processes messages asynchronously.
6.2 Consumer Groups
• Multiple consumers can form a consumer group to share partitions.
• Kafka ensures that each partition is assigned to only one consumer in a group.
6.3 C++ Implementation of a Kafka Consumer
cppCopyEditvoid consumeMessages() {
std::string brokers = "localhost:9092";
std::string topicName = "test_topic";
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("group.id", "test_group", errstr);
RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
consumer->subscribe({topicName});
while (true) {
RdKafka::Message *msg = consumer->consume(1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::cout << "Received: " << std::string(static_cast<char *>(msg->payload()), msg->len()) <<
std::endl;
}
delete msg;
}
}
7. Kafka Zookeeper – Managing Metadata
Kafka relies on Zookeeper for:
• Broker discovery – Keeps track of active brokers.
• Leader election – Assigns partition leaders dynamically.
• Consumer group coordination – Tracks which consumer is reading which partition.
Starting Zookeeper
shCopyEditzookeeper-server-start.sh config/zookeeper.properties
8. Kafka Cluster and High Availability
Kafka ensures fault tolerance using replication and partitioning.
8.1 Replication in Kafka
• Each partition has a leader and multiple replicas.
• If the leader fails, Zookeeper elects a new leader.
8.2 Handling Broker Failures
Kafka automatically reassigns partitions when a broker fails, ensuring high availability.
1. Introduction
In an Apache Kafka cluster, a broker is a server that acts as an intermediary between producers and
consumers. Brokers store messages, manage partitions, handle client connections, and ensure data
availability through replication.
A Kafka cluster typically consists of multiple brokers to distribute workload and achieve high
availability. Kafka brokers work together in a leader-follower model to ensure fault tolerance and load
balancing across partitions.
2. What is a Kafka Broker?
A Kafka Broker is a server process in a Kafka cluster that:
10. Receives messages from Kafka producers.
11. Stores messages in topic partitions.
12. Serves messages to Kafka consumers.
13. Replicates partitions across multiple brokers to ensure fault tolerance.
14. Coordinates with Zookeeper for partition leadership and metadata management.
Each Kafka broker is identified by a unique broker ID and communicates with other brokers, producers,
and consumers using the Kafka protocol.
3. Role of Brokers in Kafka Architecture
Kafka brokers serve as the backbone of Kafka's distributed architecture. Their main roles include:
3.1 Message Storage and Distribution
• Messages sent by producers are stored in partitions within brokers.
• Kafka writes messages sequentially on disk to optimize performance.
• Consumers fetch messages from specific brokers based on partition assignments.
3.2 Partition Leadership and Replication
• Each partition in Kafka has a leader broker that handles all read/write requests.
• Other brokers maintain replica partitions to ensure high availability.
• If a leader broker fails, a new leader is elected automatically.
3.3 Consumer Load Balancing
• Kafka ensures even distribution of partitions across consumers.
• Rebalancing occurs when a consumer joins or leaves a consumer group.
3.4 Fault Tolerance and Data Durability
• Kafka replicates data across brokers using a configurable replication factor.
• Consumers can set acks=all to ensure messages are written to all replicas before
acknowledgment.
4. How Kafka Brokers Store and Manage Messages
Kafka stores messages efficiently using log-based storage.
4.1 Message Storage Format
• Each Kafka topic has multiple partitions spread across brokers.
• Partitions are stored as log files on disk in the following format:
bashCopyEdit/kafka-logs/
├── topic1-0/
│ ├── 00000000000000000000.log
│ ├── 00000000000000000001.log
│ ├── index
│ ├── timeindex
• Kafka appends messages to log segments, making sequential writes highly efficient.
4.2 Retention Policy and Log Cleanup
Kafka allows configurable retention policies:
• Time-based retention → Messages older than a threshold (e.g., 7 days) are deleted.
• Size-based retention → Deletes logs once they exceed a certain size (e.g., 10GB).
• Compaction-based retention → Retains only the latest messages with a specific key.
Example Configuration in server.properties:
propertiesCopyEditlog.retention.hours=168 # Retain logs for 7 days
log.segment.bytes=1073741824 # Rotate log file after 1GB
log.cleanup.policy=delete # Delete old logs
5. Partition Leadership and Replication
5.1 How Leadership Works?
• Each partition has a leader broker that handles read/write requests.
• Other brokers hold follower replicas to maintain copies of the partition data.
• If a broker fails, Zookeeper elects a new leader from available replicas.
5.2 Replication Factor and ISR (In-Sync Replicas)
Kafka ensures fault tolerance through replication.
• Replication factor = 3 → Data is stored on 3 different brokers.
• Kafka tracks in-sync replicas (ISR) to ensure data consistency.
• If ISR falls below min.insync.replicas, Kafka rejects new writes to avoid data loss.
Example Configuration:
shCopyEditkafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-
server localhost:9092
6. Handling Broker Failures and Recovery
Kafka is designed for automatic failure recovery:
6.1 How Kafka Handles Broker Failures?
• If a broker crashes, its partitions become unavailable until a new leader is elected.
• Zookeeper detects failures and assigns a new leader broker.
• If no replicas are available, consumers must wait until recovery.
6.2 Recovering a Failed Broker
• Restart the broker process:
shCopyEditkafka-server-start.sh config/server.properties
• If necessary, rebuild partitions using:
shCopyEditkafka-replica-verification.sh --broker-list <broker-id>
• Ensure new brokers join the cluster by updating server.properties.
7. Configuring Kafka Brokers
Kafka brokers are configured using the server.properties file.
7.1 Essential Kafka Broker Configurations
propertiesCopyEditbroker.id=1 # Unique broker identifier
log.dirs=/var/lib/kafka/logs # Location to store Kafka logs
zookeeper.connect=localhost:2181 # Zookeeper connection string
num.network.threads=3 # Number of network threads
num.io.threads=8 # Number of I/O threads
log.retention.hours=168 # Retain logs for 7 days
log.segment.bytes=1073741824 # Log segment size (1GB)
log.cleanup.policy=delete # Delete old logs
8. Kafka Broker Performance Optimization
Kafka brokers can be optimized for better performance and scalability.
8.1 Increasing Throughput
• Enable batching in producers:
cppCopyEditconf->set("batch.size", "1000000", errstr);
• Use compression (e.g., Snappy, Gzip) to reduce network load:
cppCopyEditconf->set("compression.type", "snappy", errstr);
8.2 Reducing Latency
• Increase log flush intervals:
propertiesCopyEditlog.flush.interval.messages=10000
8.3 Handling Large Data Volumes
• Increase num.partitions for parallelism.
• Enable log.segment.bytes=1073741824 for large log segments.
9. Real-world Use Cases of Kafka Brokers
Kafka brokers are used in real-time streaming applications such as:
• Financial services → Processing stock trades in real-time.
• E-commerce → Tracking orders, payments, and customer activity.
• IoT (Internet of Things) → Handling sensor data at scale.
• Log Aggregation → Collecting logs from distributed applications.
10. C++ Implementation – Producer and Consumer with Brokers
10.1 C++ Kafka Producer Code
cppCopyEditRdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
for (int i = 0; i < 10; i++) {
std::string message = "Message " + std::to_string(i);
producer->produce("my_topic", RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(), NULL, NULL);
}
producer->flush(1000);
10.2 C++ Kafka Consumer Code
cppCopyEditRdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
consumer->subscribe({"my_topic"});
while (true) {
RdKafka::Message *msg = consumer->consume(1000);
std::cout << "Received: " << std::string(static_cast<char *>(msg->payload()), msg->len()) << std::endl;
}
1. Introduction to Kafka Topics and Partitions
In Apache Kafka, topics and partitions form the core of data distribution.
• A topic is a logical grouping of messages that acts like a category or feed name.
• Each topic is split into partitions, enabling parallelism and scalability.
• Kafka distributes partitions across multiple brokers to handle high throughput and ensure fault
tolerance.
2. How Kafka Topics Work
Kafka topics function as message streams, allowing producers to write and consumers to read data.
2.1 Key Characteristics of Kafka Topics
• Topics store records sequentially in log files.
• Kafka topics never push data; consumers pull messages at their own pace.
• Topics can be partitioned for parallel processing.
• Kafka topics have retention policies that define how long data is stored.
2.2 Example Kafka Topics
• logs → Stores application logs.
• orders → Stores e-commerce order data.
• payments → Stores real-time payment transactions.
Command to create a Kafka topic with 3 partitions and replication factor 2:
shCopyEditkafka-topics.sh --create --topic orders --partitions 3 --replication-factor 2 --bootstrap-server
localhost:9092
3. Understanding Kafka Partitions
Kafka splits topics into multiple partitions, where each partition:
• Is an ordered sequence of messages.
• Can be replicated across brokers for fault tolerance.
• Allows parallel reads and writes, improving throughput.
Example: A topic orders with 3 partitions (P0, P1, P2) distributed across brokers
Partition Leader Broker Replica Brokers
orders-0 Broker 1 Broker 2, 3
orders-1 Broker 2 Broker 1, 3
orders-2 Broker 3 Broker 1, 2
Kafka automatically assigns partitions across brokers to balance load.
4. Partitioning Strategies in Kafka
Kafka uses partitioning strategies to determine which partition a message is sent to.
4.1 Default Kafka Partitioning Strategy
By default, Kafka uses a round-robin approach when no key is provided.
4.2 Key-Based Partitioning
Kafka allows partitioning based on keys to ensure ordering for specific records.
Example: Orders with the same customer_id should go to the same partition:
cppCopyEditstd::string key = "customer_123";
producer->produce("orders", RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
&key, NULL);
This ensures consistent ordering per customer.
4.3 Custom Partitioning Strategy
Kafka allows defining custom partitioners for advanced partitioning logic.
5. Data Distribution and Parallel Processing
Kafka partitions allow multiple consumers to process data in parallel, improving performance.
• Each consumer in a consumer group reads data from a subset of partitions.
• Kafka dynamically rebalances consumers when a new consumer joins or leaves.
Example: A Kafka topic with 6 partitions and 3 consumers:
• Consumer 1 reads from Partition 0 and 1
• Consumer 2 reads from Partition 2 and 3
• Consumer 3 reads from Partition 4 and 5
This ensures scalability and load balancing in real-time data processing.
6. Message Ordering in Kafka Partitions
6.1 Partition-Level Ordering
Kafka guarantees order only within a partition.
6.2 How Ordering is Maintained?
• If a single producer writes to a partition, message order is preserved.
• If multiple producers write to the same partition, ordering is not guaranteed.
• Consumers should process messages sequentially within a partition.
Example:
• Producer 1 → Writes msg1, msg2, msg3 to Partition 0 (order is preserved).
• Producer 2 → Writes msg4, msg5 to Partition 1 (separate order is maintained).
7. Partition Replication and Fault Tolerance
Kafka ensures high availability through partition replication.
7.1 Replication Factor
• A topic with replication factor = 3 has 3 copies of each partition.
• One replica is the leader, while others are followers.
7.2 In-Sync Replicas (ISR)
• Kafka tracks ISR to ensure data consistency.
• If a broker fails, a new leader is elected from ISR.
Example Command: Creating a topic with 3 partitions and replication factor 2:
shCopyEditkafka-topics.sh --create --topic transactions --partitions 3 --replication-factor 2 --bootstrap-
server localhost:9092
8. Managing Kafka Topics and Partitions
Kafka provides commands for topic administration.
8.1 Listing Kafka Topics
shCopyEditkafka-topics.sh --list --bootstrap-server localhost:9092
8.2 Describing a Kafka Topic
shCopyEditkafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
8.3 Increasing Partitions for a Topic
shCopyEditkafka-topics.sh --alter --topic orders --partitions 6 --bootstrap-server localhost:9092
9. Kafka Topic Retention Policies
Kafka automatically deletes old messages based on time-based, size-based, or compaction-based
retention.
Example: Retain messages for 7 days (time-based retention)
propertiesCopyEditlog.retention.hours=168
Example: Retain only latest messages with same key (compaction)
propertiesCopyEditlog.cleanup.policy=compact
10. Real-world Use Cases of Kafka Topics and Partitions
Kafka topics and partitions are widely used in:
Financial Services – Processing stock market data in real time.
E-commerce – Managing order, shipment, and payment events.
IoT Systems – Collecting sensor data from millions of devices.
Log Aggregation – Centralizing logs from distributed applications.
11. C++ Implementation – Producer and Consumer with Partitions
11.1 C++ Kafka Producer Code (Partitioned Messages)
cppCopyEdit#include <iostream>
#include <librdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "orders";
std::string key = "customer_123";
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
std::string message = "Order processed";
producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
&key, NULL);
producer->flush(1000);
return 0;
}
1. Introduction to Kafka Producers and Consumers
Kafka follows a publish-subscribe model, where:
• Producers publish (write) messages to Kafka topics.
• Consumers read messages from topics and process them.
Kafka ensures high throughput, fault tolerance, and message durability through its distributed
architecture.
1.1 How Kafka Works?
15. Producers send messages to a Kafka topic.
16. Kafka distributes messages across partitions.
17. Consumers read messages from partitions.
Kafka producers and consumers work asynchronously, allowing real-time data streaming.
2. Kafka Producer Architecture
A Kafka producer is responsible for:
Sending messages to Kafka topics.
Choosing partitions for messages.
Handling retries, acknowledgments, and batching.
2.1 Producer Workflow
18. The producer receives a message from the application.
19. It assigns a key (optional) to determine the partition.
20. The message is serialized and sent to Kafka.
21. Kafka stores the message in a partition.
22. The producer handles failures using retries and acknowledgments.
2.2 Producer Message Flow Diagram
mathematicaCopyEditApplication → Kafka Producer → Kafka Broker (Partition) → Kafka Topic
3. Producer Configuration and Key Features
Kafka producers offer configurable properties to optimize performance.
3.1 Key Producer Configurations
Configuration Description
acks Controls acknowledgment behavior (0, 1, all)
retries Number of retries before failure
batch.size Message batch size for efficiency
linger.ms Time to wait before sending a batch
compression.type Compress messages to reduce bandwidth
3.2 Example Producer Configuration
propertiesCopyEditbootstrap.servers=localhost:9092
acks=all
retries=3
batch.size=16384
linger.ms=1
compression.type=gzip
3.3 Producer Acknowledgments (acks Setting)
• acks=0 → No acknowledgment (fast but unsafe).
• acks=1 → Acknowledged by leader only.
• acks=all → Acknowledged by leader and replicas (strong durability).
4. Kafka Consumer Architecture
A Kafka consumer is responsible for:
Reading messages from Kafka topics.
Committing message offsets.
Handling parallel processing via consumer groups.
4.1 Consumer Workflow
23. The consumer subscribes to a Kafka topic.
24. Kafka assigns partitions to the consumer.
25. Messages are pulled (not pushed) from the partition.
26. The consumer processes messages and commits offsets.
4.2 Consumer Message Flow Diagram
mathematicaCopyEditKafka Topic → Kafka Broker (Partition) → Kafka Consumer → Application
5. Consumer Groups and Parallel Processing
Kafka consumer groups allow multiple consumers to process data in parallel.
• Each consumer in a group reads from a subset of partitions.
• Kafka ensures one partition is read by only one consumer per group.
• If a consumer fails, Kafka reassigns partitions to remaining consumers.
5.1 Example: Consumer Group Processing a Topic with 4 Partitions
Partition Consumer
orders-0 C1
orders-1 C2
orders-2 C3
orders-3 C4
If C3 fails, its partitions are rebalanced among C1, C2, and C4.
Command to start a consumer in a group:
shCopyEditkafka-console-consumer.sh --topic orders --group order-consumers --bootstrap-server
localhost:9092
6. Consumer Offsets and Commit Strategies
Kafka uses offsets to track the last read message for each partition.
6.1 Types of Offset Management
• Automatic Offset Commit (enable.auto.commit=true) → Kafka auto-commits offsets (risk of
data loss).
• Manual Offset Commit (enable.auto.commit=false) → Consumers explicitly commit offsets
(safer).
6.2 Example: Manual Offset Commit in Java
javaCopyEditconsumer.commitSync(Collections.singletonMap(partition, new
OffsetAndMetadata(offset)));
7. Fault Tolerance and Scalability in Producers and Consumers
Kafka ensures fault tolerance through:
Partition replication – Avoids message loss.
Consumer rebalancing – Handles failures dynamically.
Producer retries – Ensures message delivery.
8. Real-World Use Cases of Kafka Producers and Consumers
Event-Driven Architectures → Microservices publish and consume events.
Streaming Analytics → Real-time data processing from IoT sensors.
Log Aggregation → Centralized collection of logs from distributed systems.
9. C++ Implementation – Kafka Producer and Consumer
9.1 C++ Kafka Producer Code
cppCopyEdit#include <iostream>
#include <librdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "orders";
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
std::string message = "New Order Placed";
producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
NULL, NULL);
producer->flush(1000);
return 0;
}
9.2 C++ Kafka Consumer Code
cppCopyEdit#include <iostream>
#include <librdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "orders";
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("group.id", "order-consumers", errstr);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
consumer->subscribe({topic});
while (true) {
RdKafka::Message *msg = consumer->consume(1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::cout << "Received: " << std::string((char*)msg->payload()) << std::endl;
}
delete msg;
}
return 0;
}
1. Introduction to Zookeeper in Kafka
Apache Zookeeper is a distributed coordination service used by Kafka for:
Managing broker metadata
Handling leader election
Maintaining cluster state
Synchronizing distributed components
Kafka cannot function without Zookeeper, as it manages essential coordination tasks.
2. Role of Zookeeper in Kafka Architecture
Zookeeper plays a crucial role in Kafka by:
• Tracking broker information (which brokers are alive)
• Managing topic partitions and leader election
• Handling consumer group metadata
• Ensuring fault tolerance
2.1 Kafka and Zookeeper Interaction
scssCopyEditKafka Brokers → Zookeeper (Stores Metadata) → Consumers & Producers
• Producers query Zookeeper for partition leaders.
• Consumers check Zookeeper for offset metadata.
• Brokers register with Zookeeper to form a cluster.
3. Leader Election in Kafka using Zookeeper
Kafka partitions have a leader broker, which is responsible for handling producer and consumer
requests.
3.1 Leader Election Process
27. When a Kafka broker starts, it registers itself in Zookeeper.
28. Zookeeper selects a leader broker for each partition.
29. If the leader fails, Zookeeper assigns a new leader from the followers.
3.2 Example: Partition Leader Election
Partition Leader Broker Follower Brokers
topic-0 Broker 1 Broker 2, 3
topic-1 Broker 2 Broker 1, 3
4. Zookeeper’s Data Model and Nodes in Kafka
Zookeeper stores data as a hierarchical tree of znodes (Zookeeper nodes).
4.1 Important Zookeeper znodes in Kafka
Znode Path Description
/brokers/ids Stores broker information
/brokers/topics Stores topic metadata
/consumers Stores consumer group info
/controller Stores current Kafka controller broker
5. How Kafka Uses Zookeeper for Cluster Management
Kafka uses Zookeeper for:
Broker Discovery → Tracks active brokers.
Controller Election → Selects a broker to manage partition leadership.
Partition Metadata → Stores topic partition assignments.
5.1 Example: Broker Registration
When a Kafka broker starts, it creates a znode in /brokers/ids/ with its metadata.
Example:
bashCopyEdit/brokers/ids/1 → {"host": "192.168.1.1", "port": 9092}
6. Fault Tolerance and High Availability in Zookeeper
6.1 How Zookeeper Ensures High Availability?
• Runs on multiple nodes (Zookeeper ensemble).
• Uses leader-follower replication for consistency.
• Quorum-based decision-making (majority must agree).
6.2 Example: Zookeeper Ensemble with 5 Nodes
Node Role
ZK1 Leader
ZK2 Follower
ZK3 Follower
ZK4 Follower
ZK5 Follower
If ZK1 fails, a new leader is elected from ZK2, ZK3, ZK4, or ZK5.
7. Setting Up and Configuring Zookeeper for Kafka
7.1 Install Zookeeper
shCopyEditsudo apt update
sudo apt install zookeeper
7.2 Configure zoo.cfg (Zookeeper Configuration File)
Edit /etc/zookeeper/conf/zoo.cfg:
propertiesCopyEdittickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
7.3 Start Zookeeper
shCopyEditzookeeper-server-start.sh /etc/kafka/zookeeper.properties
8. Zookeeper Commands for Kafka Administration
8.1 Check Active Brokers
shCopyEditls /brokers/ids
8.2 Get Partition Leader Details
shCopyEditget /brokers/topics/orders/partitions/0/state
8.3 Check Consumer Group Offsets
shCopyEditls /consumers
9. C++ Implementation: Interacting with Zookeeper
9.1 Install C++ Zookeeper Client (zookeeper_mt)
shCopyEditsudo apt install libzookeeper-mt-dev
9.2 C++ Code: Connect to Zookeeper
cppCopyEdit#include <iostream>
#include <zookeeper/zookeeper.h>
void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
std::cout << "Zookeeper event: " << path << std::endl;
}
int main() {
const char *zookeeper_host = "localhost:2181";
zhandle_t *zh = zookeeper_init(zookeeper_host, watcher, 30000, 0, NULL, 0);
if (zh == nullptr) {
std::cerr << "Failed to connect to Zookeeper!" << std::endl;
return 1;
}
std::cout << "Connected to Zookeeper!" << std::endl;
zookeeper_close(zh);
return 0;
}
9.3 C++ Code: Create a Zookeeper Node
cppCopyEdit#include <iostream>
#include <zookeeper/zookeeper.h>
int main() {
zhandle_t *zh = zookeeper_init("localhost:2181", NULL, 30000, 0, NULL, 0);
if (!zh) {
std::cerr << "Zookeeper connection failed!" << std::endl;
return 1;
}
int rc = zoo_create(zh, "/my_kafka_node", "broker_data", 11, &ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL, nullptr, 0);
if (rc == ZOK) {
std::cout << "Node created successfully!" << std::endl;
}
zookeeper_close(zh);
return 0;
}
1. Introduction to Kafka Cluster
A Kafka cluster is a distributed system consisting of multiple Kafka brokers that work together to handle
high-throughput message streaming. Kafka clusters ensure:
Scalability – More brokers can be added as traffic grows.
Fault Tolerance – If a broker fails, others take over.
High Availability – Messages are replicated across brokers.
2. Components of a Kafka Cluster
A Kafka cluster consists of the following components:
Component Description
Kafka Brokers Store and serve messages.
Zookeeper Manages metadata and leader election.
Producers Send messages to Kafka topics.
Consumers Read messages from Kafka topics.
Partitions Kafka topics are split into multiple partitions for scalability.
2.1 Example Kafka Cluster Architecture
markdownCopyEditProducers → Kafka Brokers → Consumers
↘ Zookeeper (for metadata management)
3. How Kafka Ensures High Availability
Kafka achieves high availability through:
• Replication: Messages are copied across multiple brokers.
• Leader Election: If a broker fails, a new leader is chosen.
• Partitioning: Data is distributed across brokers.
• Consumer Groups: Workload is distributed among consumers.
4. Kafka Replication and Data Redundancy
Kafka replicates partitions across brokers to ensure data redundancy.
4.1 Example of Replication Factor
If Replication Factor = 3, each partition is stored on 3 brokers.
Partition Leader Broker Replica Brokers
topic-0 Broker 1 Broker 2, 3
topic-1 Broker 2 Broker 1, 3
5. Kafka Controller and Failover Handling
Kafka Controller is a special broker responsible for:
Leader election for partitions
Detecting and handling broker failures
Managing cluster metadata
5.1 Failover Process
30. If a broker fails, Zookeeper detects it.
31. Controller assigns a new leader for the affected partitions.
32. Consumers and producers reconnect to the new leader.
6. Load Balancing in a Kafka Cluster
Kafka balances load across brokers by:
• Evenly distributing partitions among brokers.
• Reassigning partitions when brokers are added/removed.
• Using partitioning strategies to ensure efficient message routing.
6.1 Example: Distributing Partitions Across Brokers
sqlCopyEditPartition 0 → Broker 1
Partition 1 → Broker 2
Partition 2 → Broker 3
7. Fault Tolerance Mechanisms in Kafka
Kafka ensures fault tolerance through:
Data Replication – Multiple copies prevent data loss.
Leader Election – If a leader broker fails, another broker takes over.
Acknowledge Mechanism – Producers can ensure messages are stored in multiple replicas before
confirming.
7.1 Acknowledgment Levels in Kafka
Acks Mode Description
acks=0 No acknowledgment (fastest but risky).
acks=1 Leader broker acknowledges (balanced approach).
acks=all All replicas acknowledge (strongest durability).
8. Setting Up a Multi-Broker Kafka Cluster
8.1 Install Kafka on Multiple Brokers
shCopyEditsudo apt update
sudo apt install kafka
8.2 Configure Multiple Kafka Brokers (server.properties)
Broker 1 (server-1.properties)
propertiesCopyEditbroker.id=1
log.dirs=/var/lib/kafka-logs-1
zookeeper.connect=localhost:2181
Broker 2 (server-2.properties)
propertiesCopyEditbroker.id=2
log.dirs=/var/lib/kafka-logs-2
zookeeper.connect=localhost:2181
Broker 3 (server-3.properties)
propertiesCopyEditbroker.id=3
log.dirs=/var/lib/kafka-logs-3
zookeeper.connect=localhost:2181
8.3 Start Kafka Brokers
shCopyEditkafka-server-start.sh config/server-1.properties &
kafka-server-start.sh config/server-2.properties &
kafka-server-start.sh config/server-3.properties &
8.4 Create a Replicated Topic
shCopyEditkafka-topics.sh --create --topic my-replicated-topic --partitions 3 --replication-factor 2 --
bootstrap-server localhost:9092
9. C++ Code for Kafka Cluster Interaction
9.1 Install Kafka C++ Client (librdkafka)
shCopyEditsudo apt install librdkafka-dev
9.2 C++ Code: Kafka Producer
cppCopyEdit#include <iostream>
#include <librdkafka/rdkafka.h>
void produce_message() {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
conf = rd_kafka_conf_new();
char errstr[512];
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
rd_kafka_brokers_add(rk, "localhost:9092");
rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("my-replicated-topic"),
RD_KAFKA_V_KEY("key", 3),
RD_KAFKA_V_VALUE("Hello Kafka!", 12),
RD_KAFKA_V_END);
rd_kafka_poll(rk, 0);
rd_kafka_flush(rk, 1000);
rd_kafka_destroy(rk);
}
int main() {
produce_message();
return 0;
}
9.3 C++ Code: Kafka Consumer
cppCopyEdit#include <iostream>
#include <librdkafka/rdkafka.h>
void consume_message() {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
conf = rd_kafka_conf_new();
char errstr[512];
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
rd_kafka_brokers_add(rk, "localhost:9092");
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "my-replicated-topic", 0);
rd_kafka_subscribe(rk, topics);
while (true) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(rk, 1000);
if (msg) {
std::cout << "Received: " << (char *)msg->payload << std::endl;
rd_kafka_message_destroy(msg);
}
}
rd_kafka_destroy(rk);
}
int main() {
consume_message();
return 0;
}
Kafka Installation and Setup
1. Introduction to Apache Kafka
1.1 What is Apache Kafka?
Apache Kafka is an open-source distributed event streaming platform used for building real-time data
pipelines and streaming applications. It is designed to handle high-throughput, fault-tolerant, and
scalable messaging between systems.
1.2 Why Use Kafka?
Kafka is used in various industries, including finance, e-commerce, IoT, and analytics, because of its
ability to:
• Process large volumes of data in real time.
• Store data persistently for replay.
• Scale horizontally with multiple brokers.
• Ensure fault tolerance with replication.
1.3 Kafka Architecture Overview
Kafka consists of several key components:
• Producers: Send messages to Kafka topics.
• Brokers: Kafka servers that store and distribute messages.
• Consumers: Read messages from topics.
• Zookeeper: Manages metadata and leader election.
• Topics: Logical categories where messages are published.
2. Installing Kafka on Windows, Linux, and Mac
Kafka can be installed on various operating systems. Let's go through the detailed installation steps for
each.
2.1 Installing Kafka on Windows
Step 1: Install Java
Kafka requires Java (JDK 8 or higher). Install it by:
33. Downloading JDK from Oracle.
34. Running the installer and adding Java to the system PATH.
Verify the installation:
shCopyEditjava -version
Expected output:
nginxCopyEditopenjdk version "17.0.1" 2022-04-19
Step 2: Download Kafka
35. Visit the official Kafka website.
36. Download the latest binary (e.g., kafka_2.13-3.5.0.tgz).
37. Extract it using WinRAR or 7-Zip.
Step 3: Set Up Environment Variables
38. Open Control Panel → System → Advanced System Settings → Environment Variables.
39. Add Java to PATH:
makefileCopyEditC:\Program Files\Java\jdk-17\bin
40. Set KAFKA_HOME:
makefileCopyEditC:\kafka_2.13-3.5.0
41. Add %KAFKA_HOME%\bin\windows to the PATH.
Step 4: Start Zookeeper
42. Open Command Prompt and navigate to the Kafka folder:
shCopyEditcd C:\kafka_2.13-3.5.0
43. Start Zookeeper:
shCopyEditbin\windows\zookeeper-server-start.bat config\zookeeper.properties
Step 5: Start Kafka Broker
44. Open a new Command Prompt and run:
shCopyEditbin\windows\kafka-server-start.bat config\server.properties
45. Verify Kafka is running using:
shCopyEditjps
Expected output:
CopyEdit12345 QuorumPeerMain
67890 Kafka
11111 Jps
Step 6: Create and Test a Kafka Topic
Create a topic called test-topic:
shCopyEditbin\windows\kafka-topics.bat --create --topic test-topic --bootstrap-server localhost:9092 --
partitions 3 --replication-factor 1
List available topics:
shCopyEditbin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
Produce a message:
shCopyEditbin\windows\kafka-console-producer.bat --topic test-topic --bootstrap-server localhost:9092
Type a message, e.g., Hello Kafka!, and press Enter.
Consume messages:
shCopyEditbin\windows\kafka-console-consumer.bat --topic test-topic --bootstrap-server
localhost:9092 --from-beginning
2.2 Installing Kafka on Linux (Ubuntu)
Step 1: Install Java
shCopyEditsudo apt update
sudo apt install openjdk-11-jdk
java -version
Step 2: Download Kafka
shCopyEditwget https://downloads.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
tar -xvzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0
Step 3: Start Zookeeper
shCopyEditbin/zookeeper-server-start.sh config/zookeeper.properties
Step 4: Start Kafka
shCopyEditbin/kafka-server-start.sh config/server.properties
Step 5: Verify Kafka Installation
shCopyEditbin/kafka-topics.sh --list --bootstrap-server localhost:9092
2.3 Installing Kafka on Mac
Step 1: Install Homebrew
shCopyEdit/bin/bash -c "$(curl -fsSL
https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
Step 2: Install Kafka
shCopyEditbrew install kafka
Step 3: Start Kafka and Zookeeper
shCopyEditbrew services start zookeeper
brew services start kafka
Step 4: Verify Installation
shCopyEditkafka-topics.sh --list --bootstrap-server localhost:9092
3. Configuring Kafka Brokers
Kafka brokers manage message storage and delivery.
3.1 Change Broker ID
Each broker must have a unique ID:
shCopyEditbroker.id=1
3.2 Set Log Directories
shCopyEditlog.dirs=/var/lib/kafka/logs
3.3 Configure Listeners
shCopyEditlisteners=PLAINTEXT://localhost:9092
3.4 Adjust Retention Policies
shCopyEditlog.retention.hours=168
log.segment.bytes=1073741824
4. Running Kafka with Docker
4.1 Install Docker
shCopyEditsudo apt install docker.io
4.2 Run Kafka with Docker Compose
Create a docker-compose.yml:
yamlCopyEditversion: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
Start Kafka:
shCopyEditdocker-compose up -d
Check running containers:
shCopyEditdocker ps
5. Starting Zookeeper and Kafka
5.1 Start Zookeeper
shCopyEditbin/zookeeper-server-start.sh config/zookeeper.properties
5.2 Start Kafka
shCopyEditbin/kafka-server-start.sh config/server.properties
1. Understanding Kafka Broker Configuration
Kafka brokers are configured using the server.properties file, located in Kafka’s config/ directory. The
main configurations include:
Configuration Description
broker.id Unique ID for each Kafka broker
log.dirs Directory where Kafka stores data
zookeeper.connect Zookeeper connection string
listeners Network interface for brokers
num.network.threads Number of threads handling network requests
num.io.threads Threads handling disk I/O
log.retention.hours Time before logs are deleted
message.max.bytes Maximum size of a single message
2. Setting Up Kafka Brokers
2.1 Configuring a Single Kafka Broker
46. Navigate to Kafka Configuration Directory
shCopyEditcd /path/to/kafka/config/
47. Edit server.properties File Open server.properties in a text editor:
shCopyEditnano server.properties
48. Modify the Broker Configuration
propertiesCopyEditbroker.id=1
log.dirs=/var/lib/kafka/data
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
log.retention.hours=168
49. Start the Kafka Broker
shCopyEditbin/kafka-server-start.sh config/server.properties
2.2 Configuring Multiple Kafka Brokers
Kafka supports multi-broker setups, where each broker in the cluster has a unique ID and a separate log
directory.
50. Copy and Modify Configuration Files
shCopyEditcp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
51. Edit server-1.properties
propertiesCopyEditbroker.id=1
log.dirs=/var/lib/kafka/data1
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://:9093
52. Edit server-2.properties
propertiesCopyEditbroker.id=2
log.dirs=/var/lib/kafka/data2
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://:9094
53. Start Multiple Brokers
shCopyEditbin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
54. Verify the Brokers
shCopyEditbin/kafka-topics.sh --list --bootstrap-server localhost:9092
3. Important Kafka Broker Configurations
3.1 Storage and Log Management
Kafka persists messages in logs. Configure log retention to manage disk space.
Modify Log Retention Settings
propertiesCopyEditlog.retention.hours=48 # Keep logs for 48 hours
log.segment.bytes=1073741824 # Max log segment size: 1GB
log.retention.bytes=10737418240 # Retain logs up to 10GB
Verify Log Directory
shCopyEditls -lh /var/lib/kafka/data
3.2 Broker Networking and Security
Modify listeners for Remote Access
propertiesCopyEditlisteners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://your-server-ip:9092
Enable SSL Encryption
propertiesCopyEditsecurity.protocol=SSL
ssl.keystore.location=/etc/kafka/server.keystore.jks
ssl.keystore.password=your_password
ssl.truststore.location=/etc/kafka/server.truststore.jks
ssl.truststore.password=your_password
Enable Authentication with SASL
propertiesCopyEditsecurity.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
4. C++ Code to Monitor Kafka Brokers
Using librdkafka, we can monitor the broker status.
C++ Kafka Broker Monitoring (monitor_broker.cpp)
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
class KafkaMonitor {
public:
KafkaMonitor(const std::string& brokers) : brokers_(brokers) {
connect();
}
void get_metadata() {
RdKafka::Metadata* metadata;
RdKafka::ErrorCode err = producer_->metadata(true, nullptr, &metadata, 5000);
if (err == RdKafka::ERR_NO_ERROR) {
std::cout << "Connected to Kafka Broker: " << brokers_ << std::endl;
std::cout << "Number of Brokers: " << metadata->brokers()->size() << std::endl;
for (const auto& broker : *(metadata->brokers())) {
std::cout << "Broker ID: " << broker->id() << " - Host: " << broker->host() << std::endl;
}
} else {
std::cerr << "Error fetching metadata: " << RdKafka::err2str(err) << std::endl;
}
delete metadata;
}
~KafkaMonitor() {
delete producer_;
}
private:
std::string brokers_;
RdKafka::Producer* producer_;
void connect() {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers_, errstr);
producer_ = RdKafka::Producer::create(conf, errstr);
if (!producer_) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
delete conf;
}
};
int main() {
KafkaMonitor monitor("localhost:9092");
monitor.get_metadata();
return 0;
}
Compile and Run
shCopyEditg++ -std=c++11 monitor_broker.cpp -o monitor -lrdkafka++
./monitor
Expected Output
yamlCopyEditConnected to Kafka Broker: localhost:9092
Number of Brokers: 3
Broker ID: 1 - Host: localhost
Broker ID: 2 - Host: localhost
5. Testing Kafka Broker Setup
5.1 Create a Test Topic
shCopyEditbin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 3
--replication-factor 2
5.2 Check Topic Details
shCopyEditbin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
5.3 Send a Test Message
shCopyEditbin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
> Hello, Kafka Broker!
5.4 Read the Message
shCopyEditbin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server
localhost:9092
Output:
CopyEditHello, Kafka Broker!
6. Monitoring Kafka Brokers
Check Broker Status
shCopyEditbin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
Check Active Brokers in Zookeeper
shCopyEditecho "ls /brokers/ids" | zookeeper-shell.sh localhost:2181
Running Kafka with Docker
Running Kafka with Docker simplifies deployment, scalability, and management. This guide provides an
in-depth explanation of setting up Kafka in Docker with configurations, Docker Compose, and C++
examples for interacting with Kafka.
1. Why Use Docker for Kafka?
1.1 Benefits of Running Kafka in Docker
• Easy Deployment: No need for manual installations.
• Consistency: Works identically on all machines.
• Scalability: Easily create multi-broker clusters.
• Isolation: Kafka runs in a separate environment, avoiding conflicts.
1.2 Prerequisites
• Docker (Installed and running)
• Docker Compose (For multi-container orchestration)
2. Running a Single Kafka Broker with Docker
2.1 Pulling the Kafka Docker Image
Kafka's official image is available on Docker Hub.
Run the following command to pull the image:
shCopyEditdocker pull confluentinc/cp-kafka
2.2 Running Kafka and Zookeeper in Docker
Kafka requires Zookeeper, so we need to run both together.
shCopyEditdocker network create kafka-network
docker run -d --name zookeeper \
--network kafka-network \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:latest
docker run -d --name kafka \
--network kafka-network \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:latest
2.3 Verifying Kafka is Running
Check if the Kafka container is running:
shCopyEditdocker ps
3. Running Kafka with Docker Compose
3.1 Creating docker-compose.yml
Docker Compose simplifies Kafka setup by defining all configurations in a YAML file.
Create a docker-compose.yml file:
yamlCopyEditversion: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- kafka-network
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- kafka-network
networks:
kafka-network:
3.2 Running Kafka with Docker Compose
Start Kafka and Zookeeper:
shCopyEditdocker-compose up -d
Check running containers:
shCopyEditdocker ps
To stop Kafka:
shCopyEditdocker-compose down
4. Configuring Multiple Kafka Brokers in Docker
Kafka can run in multi-broker mode for high availability.
4.1 Modify docker-compose.yml
yamlCopyEditversion: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- kafka-network
kafka-1:
image: confluentinc/cp-kafka:latest
container_name: kafka-1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
networks:
- kafka-network
kafka-2:
image: confluentinc/cp-kafka:latest
container_name: kafka-2
depends_on:
- zookeeper
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
networks:
- kafka-network
networks:
kafka-network:
4.2 Running the Multi-Broker Cluster
shCopyEditdocker-compose up -d
List brokers:
shCopyEditdocker exec kafka-1 kafka-topics --list --bootstrap-server localhost:9092
5. Interacting with Kafka Using C++
Kafka clients like librdkafka allow C++ applications to communicate with Kafka.
5.1 Install librdkafka
shCopyEditsudo apt-get install librdkafka-dev
5.2 Writing a C++ Kafka Producer
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
void produceMessage(const std::string& topic, const std::string& message) {
std::string brokers = "localhost:9092";
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return;
}
producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(), nullptr, nullptr);
producer->flush(1000);
delete producer;
delete conf;
}
int main() {
produceMessage("test-topic", "Hello Kafka from C++!");
return 0;
}
5.3 Compile and Run
shCopyEditg++ -std=c++11 kafka_producer.cpp -o producer -lrdkafka++
./producer
5.4 Writing a C++ Kafka Consumer
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
void consumeMessages(const std::string& topic) {
std::string brokers = "localhost:9092";
std::string group_id = "cpp-group";
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("group.id", group_id, errstr);
RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
consumer->subscribe({topic});
while (true) {
RdKafka::Message *msg = consumer->consume(1000);
if (msg->len() > 0) {
std::cout << "Received: " << std::string((char*)msg->payload(), msg->len()) << std::endl;
}
delete msg;
}
delete consumer;
delete conf;
}
int main() {
consumeMessages("test-topic");
return 0;
}
5.5 Compile and Run
shCopyEditg++ -std=c++11 kafka_consumer.cpp -o consumer -lrdkafka++
./consumer
6. Testing Kafka in Docker
6.1 Create a Topic
shCopyEditdocker exec kafka kafka-topics --create --topic test-topic --bootstrap-server localhost:9092 --
partitions 3 --replication-factor 1
6.2 Produce a Message
shCopyEditdocker exec -it kafka kafka-console-producer --topic test-topic --bootstrap-server
localhost:9092
> Hello from Docker Kafka!
6.3 Consume Messages
shCopyEditdocker exec -it kafka kafka-console-consumer --topic test-topic --from-beginning --bootstrap-
ser
Starting Zookeeper and Kafka
Apache Kafka requires Zookeeper for leader election, configuration management, and cluster
coordination. This guide provides a detailed explanation of starting Zookeeper and Kafka on different
environments, configuring them properly, and interacting with Kafka using C++ examples.
1. Understanding Zookeeper’s Role in Kafka
1.1 Why Does Kafka Need Zookeeper?
• Manages Metadata: Stores broker details, topics, and partitions.
• Leader Election: Ensures a single broker acts as a leader for partitions.
• Failure Detection: Detects broker failures and assigns leadership.
• Configuration Management: Stores configurations for Kafka clusters.
1.2 Kafka without Zookeeper (KRaft Mode)
Kafka now supports KRaft mode, eliminating the need for Zookeeper. However, Zookeeper-based Kafka
is still widely used, especially in production environments.
2. Starting Zookeeper and Kafka on Windows/Linux/Mac
2.1 Download Kafka
Kafka includes a built-in Zookeeper instance.
Download Kafka (latest version):
shCopyEditwget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
Extract:
shCopyEdittar -xvzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0
2.2 Start Zookeeper
Run Zookeeper with the default configuration:
shCopyEditbin/zookeeper-server-start.sh config/zookeeper.properties
To run in the background:
shCopyEditbin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2.3 Start Kafka Broker
Once Zookeeper is running, start Kafka:
shCopyEditbin/kafka-server-start.sh config/server.properties
To run in the background:
shCopyEditbin/kafka-server-start.sh -daemon config/server.properties
3. Starting Zookeeper and Kafka Using Docker
3.1 Using Docker for Kafka & Zookeeper
Run Zookeeper:
shCopyEditdocker run -d --name zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:latest
Run Kafka:
shCopyEditdocker run -d --name kafka \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
confluentinc/cp-kafka:latest
3.2 Using Docker Compose
Create a docker-compose.yml file:
yamlCopyEditversion: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
ports:
- "9092:9092"
Start Kafka & Zookeeper:
shCopyEditdocker-compose up -d
4. Verifying Kafka is Running
4.1 Check Zookeeper and Kafka Status
Check running processes:
shCopyEditps aux | grep zookeeper
ps aux | grep kafka
List Kafka topics:
shCopyEditbin/kafka-topics.sh --list --bootstrap-server localhost:9092
4.2 Stopping Kafka and Zookeeper
Stop Kafka:
shCopyEditbin/kafka-server-stop.sh
Stop Zookeeper:
shCopyEditbin/zookeeper-server-stop.sh
For Docker:
shCopyEditdocker stop kafka zookeeper
docker-compose down
5. Creating Kafka Topics
5.1 Create a Topic
shCopyEditbin/kafka-topics.sh --create --topic test-topic --partitions 3 --replication-factor 1 --bootstrap-
server localhost:9092
5.2 List Topics
shCopyEditbin/kafka-topics.sh --list --bootstrap-server localhost:9092
6. Producing and Consuming Messages Using C++
6.1 Install Kafka C++ Client (librdkafka)
Install on Ubuntu:
shCopyEditsudo apt-get install librdkafka-dev
6.2 Kafka Producer in C++
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
void produceMessage(const std::string& topic, const std::string& message) {
std::string brokers = "localhost:9092";
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return;
}
producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(), nullptr, nullptr);
producer->flush(1000);
delete producer;
delete conf;
}
int main() {
produceMessage("test-topic", "Hello Kafka from C++!");
return 0;
}
6.3 Compile and Run
shCopyEditg++ -std=c++11 kafka_producer.cpp -o producer -lrdkafka++
./producer
7. Kafka Consumer in C++
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
void consumeMessages(const std::string& topic) {
std::string brokers = "localhost:9092";
std::string group_id = "cpp-group";
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("group.id", group_id, errstr);
RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
consumer->subscribe({topic});
while (true) {
RdKafka::Message *msg = consumer->consume(1000);
if (msg->len() > 0) {
std::cout << "Received: " << std::string((char*)msg->payload(), msg->len()) << std::endl;
}
delete msg;
}
delete consumer;
delete conf;
}
int main() {
consumeMessages("test-topic");
return 0;
}
7.1 Compile and Run
shCopyEditg++ -std=c++11 kafka_consumer.cpp -o consumer -lrdkafka++
./consumer
8. Kafka CLI Producer & Consumer
8.1 Kafka Producer
shCopyEditbin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
Type messages and press Enter.
8.2 Kafka Consumer
shCopyEditbin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server
localhost:9092
Kafka Topics and Partitions
Kafka is a distributed event streaming platform used for building real-time data pipelines and streaming
applications. It is widely used for high-throughput, fault-tolerant message processing. At the core of
Kafka’s architecture are topics and partitions, which enable efficient data distribution and scalability.
1. Kafka Topics
A topic in Kafka is a logical channel or category where records (messages) are published and consumed.
It acts like a queue where producers send data and consumers read it.
1.1 Characteristics of Kafka Topics
• Multi-Producer Support: Multiple producers can write to a single topic concurrently.
• Multi-Consumer Support: Multiple consumers can read messages from a topic.
• Immutable Messages: Messages are stored in the order they are received and cannot be
modified.
• Retention Policy: Kafka can retain messages for a configured duration, even after consumption.
• Partitioning: Topics are divided into partitions to enable scalability and parallel processing.
1.2 Naming Kafka Topics
Kafka topic names should be meaningful and follow best practices:
• Use lowercase with dashes or underscores (user-events, order_processing).
• Avoid reserved keywords or special characters.
• Keep names concise but descriptive.
1.3 Creating a Kafka Topic
A Kafka topic can be created in different ways:
55. Automatically (if auto.create.topics.enable is set to true).
56. Manually using Kafka’s CLI or Admin API.
Command to Create a Topic
shCopyEditkafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 5
--topic user-events
This creates a topic named user-events with 5 partitions and a replication factor of 3.
2. Kafka Partitions
A partition is a subset of a topic that stores a portion of the messages. Kafka uses partitions to
parallelize data processing and improve scalability.
2.1 Why Use Partitions?
• Parallelism: Allows multiple consumers to read data in parallel.
• Scalability: Increases Kafka’s throughput by distributing data across multiple brokers.
• Fault Tolerance: With replication, partitions ensure data availability.
2.2 How Partitions Work
• A topic is split into multiple partitions.
• Each partition is stored on different Kafka brokers.
• Messages are appended sequentially within a partition.
• Kafka assigns an offset (unique identifier) to each message.
Example of a Topic with Partitions
If we have a topic order-events with 3 partitions, the messages will be distributed as follows:
lessCopyEditPartition 0: [M1, M4, M7, M10]
Partition 1: [M2, M5, M8, M11]
Partition 2: [M3, M6, M9, M12]
Each partition is independent, and consumers can read from them in parallel.
2.3 Partition Key
Kafka uses a partitioning strategy to determine which partition a message should go to:
• Keyed Messages: Messages with the same key always go to the same partition.
• Round-Robin: If no key is provided, Kafka distributes messages evenly across partitions.
Example in C++ Using Kafka C++ Client (librdkafka)
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "order-events";
RdKafka::Conf *config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
config->set("bootstrap.servers", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(config, errstr);
std::string message = "New Order Placed: ID 123";
RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()),
message.size(), nullptr, nullptr);
std::cout << "Message sent to Kafka topic: " << topic << std::endl;
delete producer;
delete config;
return 0;
}
This C++ program sends a message to the order-events topic, and Kafka will determine the partition
dynamically.
3. Partitioning Strategy
Partitioning plays a crucial role in scalability and message ordering. The choice of partitioning strategy
depends on the application's use case.
3.1 Types of Partitioning Strategies
57. Random (Round-Robin): Messages are evenly distributed across partitions.
58. Key-Based (Consistent Hashing): Messages with the same key are routed to the same partition.
59. Custom Partitioning: Developers can implement their own logic for partitioning.
Example: Key-Based Partitioning in C++
cppCopyEditstd::string key = "user123";
RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()),
message.size(), &key, nullptr);
Here, all messages with user123 as the key will go to the same partition.
4. Topic Replication
Kafka ensures high availability using replication. Each partition is replicated across multiple brokers.
4.1 Replication Factor
• Replication factor = 1 → No redundancy. If the broker fails, data is lost.
• Replication factor > 1 → Multiple copies of data exist.
4.2 Leader and Followers
• Leader: Handles all read and write requests.
• Followers: Replicate data from the leader. If the leader fails, a follower takes over.
Example: Creating a Replicated Topic
shCopyEditkafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 5
--topic replicated-topic
This creates a topic with 3 replicas for each partition.
5. Log Segments and Retention
Kafka stores messages in log segments, which are managed based on retention policies.
5.1 Log Segments
• Kafka appends messages to a log file.
• When the file reaches a certain size, Kafka rolls over to a new segment.
5.2 Retention Policy
Kafka provides different ways to retain messages:
• Time-Based Retention: Messages are deleted after a certain period.
• Size-Based Retention: Old messages are deleted when storage reaches a threshold.
Configuring Retention Policy
shCopyEditkafka-configs.sh --alter --bootstrap-server localhost:9092 --entity-type topics --entity-name
user-events --add-config retention.ms=86400000
This keeps messages for one day (24 hours = 86400000ms) before deleting them.
Creating and Deleting Topics in Kafka
Kafka provides a flexible mechanism for managing topics, allowing users to create, modify, and delete
them efficiently. This section explores the creation, deletion, and management of topics using Kafka’s
CLI, Kafka Admin API, and C++.
1. Creating Topics in Kafka
Kafka topics can be created in different ways:
• Automatic Creation: Kafka can auto-create topics when a producer or consumer interacts with a
non-existent topic (if auto.create.topics.enable=true).
• Manual Creation: Using Kafka CLI commands or Kafka Admin API.
1.1 Kafka CLI Method
Kafka provides the kafka-topics.sh script for creating topics manually.
Basic Topic Creation
shCopyEditkafka-topics.sh --create --bootstrap-server localhost:9092 --topic orders --partitions 3 --
replication-factor 2
• --bootstrap-server localhost:9092: Kafka broker address.
• --topic orders: The topic name.
• --partitions 3: The topic will have 3 partitions.
• --replication-factor 2: Each partition will have 2 replicas for fault tolerance.
Verifying Topic Creation
After creating a topic, verify it using:
shCopyEditkafka-topics.sh --list --bootstrap-server localhost:9092
1.2 Creating Topics Using Kafka Admin API (Java Example)
Kafka’s Admin API allows dynamic topic creation.
Java Code to Create a Kafka Topic
javaCopyEditimport org.apache.kafka.clients.admin.*;
import java.util.*;
public class KafkaTopicCreator {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("user-logs", 3, (short) 2);
admin.createTopics(Collections.singletonList(topic)).all().get();
System.out.println("Topic Created Successfully!");
admin.close();
}
}
• NewTopic("user-logs", 3, (short) 2) → Creates a topic with 3 partitions and 2 replicas.
1.3 Creating Topics in C++ Using librdkafka
Kafka does not have a native C++ Admin API, but we can use librdkafka to interact with brokers.
C++ Code to Create a Kafka Topic
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "user-signups";
RdKafka::Conf *config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
config->set("bootstrap.servers", brokers, errstr);
RdKafka::AdminClient *admin = RdKafka::AdminClient::create(config, errstr);
RdKafka::NewTopic new_topic(topic, 3, 2); // 3 partitions, 2 replicas
std::vector<RdKafka::NewTopic*> new_topics;
new_topics.push_back(&new_topic);
RdKafka::ErrorCode resp = admin->create_topics(new_topics);
if (resp == RdKafka::ERR_NO_ERROR) {
std::cout << "Topic Created Successfully!" << std::endl;
} else {
std::cerr << "Error Creating Topic: " << RdKafka::err2str(resp) << std::endl;
}
delete admin;
delete config;
return 0;
}
This program creates a Kafka topic programmatically from C++.
2. Deleting Topics in Kafka
Kafka allows topic deletion to free up storage, but it must be explicitly enabled in the broker settings.
2.1 Enable Topic Deletion
To allow topic deletion, set the following in server.properties:
shCopyEditdelete.topic.enable=true
Restart the Kafka broker for changes to take effect.
2.2 Deleting Topics Using Kafka CLI
Use the following command to delete a topic:
shCopyEditkafka-topics.sh --delete --bootstrap-server localhost:9092 --topic orders
If deletion is enabled, Kafka will remove the topic and its data.
Check if the Topic is Deleted
shCopyEditkafka-topics.sh --list --bootstrap-server localhost:9092
If the topic does not appear, it has been deleted.
2.3 Deleting Topics Using Kafka Admin API (Java Example)
Kafka’s Admin API allows programmatic deletion of topics.
Java Code to Delete a Kafka Topic
javaCopyEditimport org.apache.kafka.clients.admin.*;
import java.util.*;
public class KafkaTopicDeleter {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);
admin.deleteTopics(Collections.singletonList("user-logs")).all().get();
System.out.println("Topic Deleted Successfully!");
admin.close();
}
}
This deletes the user-logs topic programmatically.
2.4 Deleting Topics in C++ Using librdkafka
Kafka’s librdkafka does not natively support topic deletion, but we can send a request via Kafka API.
C++ Code for Deleting a Kafka Topic
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "user-signups";
RdKafka::Conf *config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
config->set("bootstrap.servers", brokers, errstr);
RdKafka::AdminClient *admin = RdKafka::AdminClient::create(config, errstr);
std::vector<std::string> topics_to_delete = {topic};
RdKafka::ErrorCode resp = admin->delete_topics(topics_to_delete);
if (resp == RdKafka::ERR_NO_ERROR) {
std::cout << "Topic Deleted Successfully!" << std::endl;
} else {
std::cerr << "Error Deleting Topic: " << RdKafka::err2str(resp) << std::endl;
}
delete admin;
delete config;
return 0;
}
This deletes the user-signups topic in Kafka using C++.
3. Modifying Kafka Topics
Kafka also allows modification of topic configurations.
3.1 Changing Partition Count
Increasing partitions helps scale the system.
shCopyEditkafka-topics.sh --alter --bootstrap-server localhost:9092 --topic orders --partitions 6
This increases the partition count of orders to 6.
3.2 Changing Retention Policy
shCopyEditkafka-configs.sh --alter --bootstrap-server localhost:9092 --entity-type topics --entity-name
orders --add-config retention.ms=86400000
This sets the retention period to 1 day.
4. Best Practices for Managing Topics
60. Choose an appropriate number of partitions for scalability.
61. Use replication for fault tolerance.
62. Enable topic deletion cautiously, as it permanently removes data.
63. Avoid auto-topic creation in production to prevent accidental topic creation.
64. Set retention policies to control storage usage.
Partitioning Strategy in Kafka
Partitioning is a fundamental concept in Kafka that allows topics to scale horizontally by distributing
messages across multiple brokers. This section covers how partitioning works, partitioning strategies,
custom partitioners, and examples in C++.
1. Understanding Kafka Partitioning
Each Kafka topic is split into partitions, which act as independent logs stored across different brokers.
Partitions allow parallelism and fault tolerance.
1.1 How Kafka Partitioning Works
• When a producer sends a message to a Kafka topic, it must determine which partition to send
the message to.
• Kafka assigns messages to partitions based on:
a. A specified partition (if provided).
b. A partitioning strategy (hashing or custom logic).
c. A random partition (if no key is provided).
1.2 Example: Partition Distribution
Consider a topic orders with 3 partitions (P0, P1, P2) and 2 brokers:
Partition Leader Broker Replica Broker
P0 Broker 1 Broker 2
P1 Broker 2 Broker 1
P2 Broker 1 Broker 2
When producing messages, Kafka distributes them among the partitions using its partitioning strategy.
2. Partitioning Strategies in Kafka
Kafka provides multiple strategies to determine how messages are assigned to partitions:
2.1 Round-Robin Partitioning (Default for No Key)
• If a key is NOT provided, Kafka randomly assigns messages using a round-robin strategy.
• Ensures an even distribution of messages.
Example (No Key)
shCopyEditkafka-console-producer.sh --broker-list localhost:9092 --topic orders
Each message is sent to a different partition in round-robin order.
2.2 Hash-Based Partitioning (Default for Keyed Messages)
• If a key is provided, Kafka hashes the key using Murmur2 hashing and assigns it to a partition.
• Ensures messages with the same key go to the same partition.
Example (Keyed Messages)
shCopyEditkafka-console-producer.sh --broker-list localhost:9092 --topic orders --property
"parse.key=true" --property "key.separator=:"
Sample Input:
makefileCopyEditorder123:Item1
order123:Item2
order456:Item3
• Messages with order123 go to the same partition.
• Messages with order456 may go to a different partition.
2.3 Custom Partitioning Strategy
Kafka allows developers to implement custom partitioning logic based on business requirements.
Custom Partitioning Example (Java)
javaCopyEditimport org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster
cluster) {
int partitions = cluster.partitionsForTopic(topic).size();
return (key.hashCode() & Integer.MAX_VALUE) % partitions; // Custom hash logic
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
• This ensures that messages with the same key always go to the same partition.
• Can be plugged into Kafka Producer Configs using:
javaCopyEditprops.put("partitioner.class", "CustomPartitioner");
2.4 Custom Partitioning in C++ (librdkafka)
Kafka’s librdkafka provides an interface for implementing custom partitioners.
C++ Custom Partitioner Example
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
class MyPartitionerCb : public RdKafka::PartitionerCb {
public:
int32_t partitioner_cb(const RdKafka::Topic* topic, const std::string* key,
int32_t partition_count, void* msg_opaque) override {
if (key) {
return std::hash<std::string>{}(*key) % partition_count;
}
return rand() % partition_count; // Default to random partition
}
};
int main() {
std::string brokers = "localhost:9092";
std::string topic = "custom-orders";
RdKafka::Conf *config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
config->set("bootstrap.servers", brokers, errstr);
MyPartitionerCb partitioner;
config->set("partitioner_cb", &partitioner, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(config, errstr);
RdKafka::Topic *kafkaTopic = RdKafka::Topic::create(producer, topic, NULL, errstr);
std::string message = "Order123";
std::string key = "OrderKey";
producer->produce(kafkaTopic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
&key, nullptr);
producer->flush(1000);
delete producer;
delete config;
return 0;
}
• This partitioner hashes the key and assigns the partition accordingly.
3. Choosing the Right Partitioning Strategy
Strategy When to Use?
Round-Robin When ordering doesn't matter (e.g., logging).
Key-Based (Hashing) When ordering per key is required (e.g., orders, user data).
Custom Partitioner When a specific logic is needed (e.g., priority-based partitions).
4. Best Practices for Partitioning
65. Balance the number of partitions with broker capacity. Too many partitions can lead to high
overhead.
66. Use key-based partitioning for ordering guarantees within the same partition.
67. Monitor partition skew (some partitions getting more messages).
68. Plan for future scalability (increasing partitions later may cause data rebalancing issues).
Topic Replication in Kafka
Kafka topic replication ensures fault tolerance, high availability, and durability by storing multiple
copies of topic partitions across different brokers. This section covers replication fundamentals, leader
and follower roles, ISR (In-Sync Replicas), failover handling, and C++ examples using librdkafka.
1. What is Topic Replication?
Kafka replicates partitions across brokers to ensure data availability and reliability even if some brokers
fail.
• Each partition has multiple replicas stored on different brokers.
• One replica is the leader, while the others are followers.
• Only the leader serves read and write requests.
• Followers synchronize with the leader by fetching data.
1.1 Example: Replication in a 3-Broker Cluster
Consider a topic "orders" with 3 partitions and a replication factor of 3.
Partition Leader (Serving Requests) Follower 1 Follower 2
P0 Broker 1 Broker 2 Broker 3
P1 Broker 2 Broker 3 Broker 1
P2 Broker 3 Broker 1 Broker 2
If Broker 1 fails, Kafka automatically elects a new leader (from followers).
2. Configuring Topic Replication
2.1 Setting Replication Factor
• The replication factor defines how many copies of each partition exist.
• It should never exceed the number of brokers.
Example: Creating a Topic with Replication
shCopyEditkafka-topics.sh --create --topic orders --bootstrap-server localhost:9092 --partitions 3 --
replication-factor 3
• This creates an "orders" topic with:
o 3 partitions
o 3 replicas per partition
2.2 Checking Replication Status
To see partition leaders and replicas:
shCopyEditkafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
Example output:
yamlCopyEditTopic: orders Partition: 0 Leader: 1 Replicas: 1,2,3 ISR: 1,2,3
Topic: orders Partition: 1 Leader: 2 Replicas: 2,3,1 ISR: 2,3,1
Topic: orders Partition: 2 Leader: 3 Replicas: 3,1,2 ISR: 3,1,2
• Leader: The active broker handling reads/writes.
• Replicas: Other brokers storing a copy.
• ISR (In-Sync Replicas): Followers that are up-to-date.
3. Leader and Follower Replication Process
3.1 Leader and Follower Roles
• The Leader partition handles all client requests.
• The Follower partitions replicate data from the leader.
• Kafka automatically elects a new leader if the current leader fails.
Replication Process
69. Producer sends messages → Leader stores them first.
70. Followers fetch data from the leader.
71. Once followers are in sync, the message is committed.
3.2 C++ Implementation of a Kafka Producer with Replication
Using librdkafka, we can produce messages to a replicated Kafka topic.
C++ Producer Example with Replication
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "orders";
std::string errstr;
RdKafka::Conf *config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
config->set("bootstrap.servers", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(config, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return 1;
}
std::string message = "Order123";
producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
NULL, NULL);
producer->flush(1000);
delete producer;
delete config;
return 0;
}
• The message is replicated across brokers.
• If the leader fails, a follower becomes the new leader.
4. In-Sync Replicas (ISR) and Unclean Leader Elections
4.1 In-Sync Replicas (ISR)
• ISR includes all replicas that are up to date.
• Kafka only commits messages if at least one ISR confirms the write.
• If a follower lags too much, it is removed from ISR.
Checking ISR Status
shCopyEditkafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
Example output:
yamlCopyEditTopic: orders Partition: 0 Leader: 1 Replicas: 1,2,3 ISR: 1,2
• ISR = {1,2} → Broker 3 is out of sync.
4.2 Unclean Leader Elections (Data Loss Risks)
• If all ISR nodes fail, Kafka can elect a non-ISR follower.
• This may lead to data loss.
Enable/Disable Unclean Leader Elections
• Disable unclean leader election (default, recommended)
shCopyEditkafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name orders
--alter --add-config unclean.leader.election.enable=false
• Enable unclean leader election (may cause data loss)
shCopyEditkafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name orders
--alter --add-config unclean.leader.election.enable=true
5. Monitoring and Handling Replication Failures
5.1 Detecting Partition Replication Issues
Use Kafka metrics to monitor partition health:
shCopyEditkafka-consumer-groups.sh --describe --group my-consumer-group --bootstrap-server
localhost:9092
If you see "Lag" increasing, some replicas may be falling behind.
5.2 Recovering from Broker Failures
72. Check ISR status → If ISR is low, increase broker resources.
73. Restart failed brokers → Kafka should automatically re-sync.
74. Manually reassign partitions (if necessary):
shCopyEditkafka-reassign-partitions.sh --execute --reassignment-json-file reassignment.json
75. Expand cluster → Add more brokers if needed.
6. Best Practices for Replication
76. Use at least 3 replicas to ensure fault tolerance.
77. Disable unclean leader elections to prevent data loss.
78. Monitor ISR status to detect follower lags early.
79. Distribute leaders across brokers to balance load.
80. Enable rack-aware replication in multi-datacenter setups.
Log Segments and Retention in Kafka
Kafka log segments and retention policies control how messages are stored, retained, and deleted over
time. This ensures efficient disk space usage while allowing consumers to replay messages within a
configured retention period.
This section covers:
81. Log Segments: Structure & Internals
82. Retention Policies & Expiry Mechanisms
83. Log Cleanup Strategies
84. C++ Example: Fetching Messages Before Expiry
1. Log Segments in Kafka
Kafka stores messages in log segments within each partition.
1.1 Structure of a Log Segment
Each partition consists of multiple log segments, where:
• Each segment contains a batch of messages.
• Segments are immutable (new messages are appended only).
• Kafka deletes old segments based on retention settings.
Example: Partition with Log Segments
sqlCopyEditTopic: orders (Partition 0)
-----------------------------------------------------
| Segment 1 | Segment 2 | Segment 3 | Segment 4 |
| Offset 0-99 | Offset 100-199 | Offset 200-299 | Offset 300-399 |
-----------------------------------------------------
• Each segment stores messages sequentially.
• Older segments are deleted when retention expires.
1.2 Log Segment Files
Each log segment consists of multiple files:
File Type Description
.log Stores actual Kafka messages
.index Index file for quick message lookups
.timeindex Timestamp-based index for log retention
.txnindex Tracks ongoing transactions
Example Log Files for Partition 0
pgsqlCopyEdit/kafka-logs/orders-0/
├── 00000000000000000000.log
├── 00000000000000000000.index
├── 00000000000000000000.timeindex
├── 00000000000000010000.log
├── 00000000000000010000.index
├── 00000000000000010000.timeindex
• Each segment starts with an offset (00000000000000000000.log).
• New segments are created when a segment reaches a size limit.
2. Retention Policies in Kafka
Kafka retains messages for a configurable period before deleting old segments.
2.1 Types of Retention Policies
Retention Type Description
Retention Type Description
Time-based Deletes segments after a fixed time (log.retention.hours)
Size-based Deletes old logs when the total log size exceeds (log.retention.bytes)
Compaction-based Retains only the latest value per key
2.2 Configuring Retention Policies
Time-Based Retention (Default: 168 hours = 7 days)
shCopyEditkafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name
orders --add-config retention.ms=604800000
• Messages will be deleted after 7 days.
Size-Based Retention
shCopyEditkafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name
orders --add-config retention.bytes=1073741824
• Deletes older messages when total log size exceeds 1GB.
3. Log Cleanup Strategies
Kafka offers two cleanup strategies:
Cleanup Policy Description
delete (default) Removes old segments after retention expiry
compact Keeps only the latest message for each key
3.1 Enabling Log Compaction
shCopyEditkafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name
orders --add-config cleanup.policy=compact
• Keeps only the latest version of messages per key.
3.2 Fetching Messages Before Retention Expiry (C++ Example)
Before Kafka deletes messages, consumers must process them in time.
C++ Consumer: Fetching Messages Before Expiry
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafkacpp.h>
class ConsumerRebalanceCb : public RdKafka::RebalanceCb {
void rebalance_cb(RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition *> &partitions) override {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
consumer->assign(partitions);
} else {
consumer->unassign();
}
}
};
int main() {
std::string brokers = "localhost:9092";
std::string topic = "orders";
std::string errstr;
RdKafka::Conf *config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
config->set("bootstrap.servers", brokers, errstr);
config->set("group.id", "consumer-group-1", errstr);
config->set("auto.offset.reset", "earliest", errstr);
ConsumerRebalanceCb rebalance_cb;
config->set("rebalance_cb", &rebalance_cb, errstr);
RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(config, errstr);
consumer->subscribe({topic});
while (true) {
RdKafka::Message *msg = consumer->consume(1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::cout << "Received: " << std::string(static_cast<char *>(msg->payload()), msg->len()) <<
std::endl;
}
delete msg;
}
consumer->close();
delete consumer;
delete config;
return 0;
}
• This fetches messages before Kafka deletes old logs.
• Consumers should read messages within retention time to prevent data loss.
4. Monitoring and Troubleshooting Log Retention
4.1 Checking Log Segment Status
shCopyEditkafka-log-dirs.sh --describe --bootstrap-server localhost:9092
• Shows log segment sizes, retention settings, and disk usage.
4.2 Monitoring Log Retention with Metrics
Kafka exposes metrics like log segment age, disk usage, etc.
shCopyEditkafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
Example output:
yamlCopyEditTopic: orders Partition: 0 Leader: 1 Replicas: 1,2,3 ISR: 1,2,3
Log Retention: 7 days (604800000ms)
Segment Size: 1GB
Cleanup Policy: delete
5. Best Practices for Log Segments and Retention
85. Choose retention settings carefully
o Shorter retention for high-throughput topics.
o Longer retention for topics requiring audit logs.
86. Monitor disk usage
o Use retention.bytes to prevent excessive disk space consumption.
87. Use compaction for key-based storage
o Retains only the latest version per key, saving space.
88. Ensure consumers process data before expiry
o Use consumer lag monitoring to detect slow consumers.
1. Introduction to Kafka Producers
Apache Kafka is a high-throughput, distributed event streaming platform used in real-time data pipelines
and event-driven architectures. At its core, Kafka Producers are responsible for sending data to Kafka
topics. They play a crucial role in ensuring efficient data ingestion into Kafka clusters.
1.1 What is a Kafka Producer?
A Kafka Producer is a client application that:
• Connects to a Kafka cluster.
• Creates messages and assigns them to topics.
• Serializes messages for efficient storage and retrieval.
• Distributes messages among partitions based on a defined strategy.
• Handles acknowledgment and retry mechanisms for message delivery.
1.2 Key Responsibilities of a Kafka Producer
A Kafka Producer must:
Serialize data: Convert messages into a storable format (JSON, Avro, Protobuf).
Partition data: Decide which partition a message should go to.
Ensure delivery reliability: Handle acknowledgments and retries.
Optimize performance: Control batch sizes and delay settings.
2. How Kafka Producers Work
A Kafka Producer sends messages to a Kafka cluster, where they are stored in partitions of a topic. The
following steps outline how it works:
2.1 Kafka Producer Workflow
1⃣ Initialize the producer → Configure broker connection, serialization, and acknowledgments.
2⃣ Create records → Each message consists of a key, value, and timestamp.
3⃣ Send messages → Messages are sent asynchronously or synchronously.
4️⃣ Partitioning logic → Messages are distributed based on keys or a round-robin strategy.
5️⃣ Acknowledge messages → Kafka ensures message durability based on the acknowledgment settings.
2.2 Kafka Producer Architecture
pgsqlCopyEdit+---------------------+
| Producer Application|
+---------------------+
|
| Message (Key-Value)
v
+----------------------+
| Kafka Producer API |
+----------------------+
|
| Partitioner (Determines which partition)
v
+----------------------+
| Kafka Broker (Cluster)|
+----------------------+
3. Writing a Kafka Producer in C++
C++ doesn’t have an official Kafka library, but librdkafka (a popular C++ client) provides Kafka producer
support.
3.1 Installing librdkafka
shCopyEditsudo apt update
sudo apt install librdkafka-dev
3.2 Basic Kafka Producer in C++
This C++ program creates a producer and sends a message to a Kafka topic.
C++ Kafka Producer Example
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafka.h>
// Delivery report callback
class DeliveryReportCallback : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) override {
if (message.err()) {
std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
} else {
std::cout << "Message delivered to " << message.topic_name()
<< " [" << message.partition() << "] at offset "
<< message.offset() << std::endl;
}
}
};
int main() {
std::string brokers = "localhost:9092";
std::string topic = "my_topic";
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
DeliveryReportCallback dr_cb;
conf->set("dr_cb", &dr_cb, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return 1;
}
std::string message = "Hello, Kafka!";
RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()),
message.size(), NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Error producing message: " << RdKafka::err2str(resp) << std::endl;
}
producer->flush(1000); // Ensure message is delivered before exiting
delete producer;
return 0;
}
3.3 Explanation of Code
• Producer Configuration: Sets Kafka broker details.
• Message Production: Sends a message to my_topic.
• Delivery Report Callback: Confirms successful delivery.
4. Message Serialization in Kafka Producers
Serialization is required to convert structured data into a format Kafka can store and transmit. Common
serialization formats:
1⃣ String Serialization – Simple text-based messages.
2⃣ JSON Serialization – Popular for structured data exchange.
3⃣ Avro Serialization – Efficient for large-scale streaming.
4️⃣ Protobuf Serialization – Compact, fast, and cross-platform.
4.1 JSON Serialization in C++
Using nlohmann/json library:
cppCopyEdit#include <iostream>
#include <nlohmann/json.hpp>
using json = nlohmann::json;
int main() {
json message;
message["id"] = 1001;
message["name"] = "Kafka JSON Message";
message["timestamp"] = 1712569200;
std::string serializedMessage = message.dump();
std::cout << "Serialized JSON: " << serializedMessage << std::endl;
}
5. Key-Based Partitioning
Kafka allows key-based partitioning, ensuring messages with the same key always go to the same
partition.
5.1 Custom Partitioning in C++
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafka.h>
int custom_partitioner(const RdKafka::Topic *topic, const std::string &key, int partition_count) {
return std::hash<std::string>{}(key) % partition_count;
}
int main() {
std::string key = "user123";
int partitions = 4;
int chosen_partition = custom_partitioner(nullptr, key, partitions);
std::cout << "Message with key " << key << " will go to partition " << chosen_partition << std::endl;
}
6. Acknowledgment Mechanisms in Kafka Producers
Kafka offers three acknowledgment modes:
• acks=0 → No acknowledgment (fastest, but risky).
• acks=1 → Leader acknowledgment (default).
• acks=all → Full acknowledgment (strong durability).
6.1 Configuring Acknowledgments in C++
cppCopyEditconf->set("acks", "all", errstr);
This ensures messages are replicated across all replicas before confirmation.
7. Producer Configurations
Producers can be tuned for performance and reliability.
Config Description
acks Defines acknowledgment level.
retries Specifies retry attempts if failure occurs.
batch.size Controls batch size.
linger.ms Adds a delay to batch messages.
7.1 Optimizing Producer Configuration in C++
cppCopyEditconf->set("retries", "5", errstr); // Retry 5 times
conf->set("batch.size", "16384", errstr); // Batch messages
conf->set("linger.ms", "5", errstr); // Add 5ms delay
1. Introduction to Kafka Producer API
Kafka provides a Producer API that allows applications to send messages to Kafka topics. This API
provides essential functionalities such as:
Asynchronous message sending – High performance with minimal latency.
Synchronous message sending – Ensuring delivery before proceeding.
Custom partitioning – Placing messages in specific partitions.
Error handling & retries – Ensuring reliable message delivery.
Batch processing & compression – Optimizing resource utilization.
1.1 What is Kafka Producer API?
Kafka's Producer API is a high-level abstraction that provides methods for message creation,
serialization, and delivery. It interacts with the Kafka broker and ensures that messages are stored
correctly.
1.2 Architecture of Kafka Producer API
pgsqlCopyEdit+------------------------------------------------+
| Producer Application |
+------------------------------------------------+
|
v
+------------------------------------------------+
| Kafka Producer API |
| - Send Messages |
| - Handle Partitions |
| - Manage Acknowledgments |
+------------------------------------------------+
|
v
+------------------------------------------------+
| Kafka Broker Cluster |
| - Stores Messages in Topics |
| - Ensures Message Durability |
+------------------------------------------------+
2. Setting Up Kafka Producer API in C++
Kafka does not have an official C++ client, but we can use librdkafka to interact with Kafka.
2.1 Installing librdkafka (C++ Kafka Library)
shCopyEditsudo apt update
sudo apt install librdkafka-dev
3. Kafka Producer API Methods
Kafka's Producer API exposes several key methods:
Method Description
produce() Sends a message to Kafka.
flush() Waits for all messages to be sent before closing.
poll() Handles events and delivery reports.
close() Gracefully closes the producer.
3.1 Producing Messages in Kafka (Basic Example in C++)
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test_topic";
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return 1;
}
std::string message = "Hello, Kafka!";
RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Error producing message: " << RdKafka::err2str(resp) << std::endl;
}
producer->flush(1000);
delete producer;
return 0;
}
Explanation
Initialize the Producer → Connects to Kafka brokers.
Send a Message → Uses produce() to publish a message.
Flush Messages → Ensures all messages are delivered.
4. Advanced Producer Features
4.1 Handling Acknowledgments in Producer API
Kafka provides different acknowledgment levels:
acks Value Behavior
0 No acknowledgment (fastest, but risky).
1 Leader acknowledgment only (default).
all All replicas must acknowledge (strong durability).
Setting Acknowledgments in C++
cppCopyEditconf->set("acks", "all", errstr); // Ensures message durability
4.2 Handling Errors & Retries
Kafka Producers must handle network failures, leader changes, and broker unavailability.
Setting Retries in C++
cppCopyEditconf->set("retries", "5", errstr); // Retry up to 5 times
Handling Error Codes
cppCopyEditif (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Kafka error: " << RdKafka::err2str(resp) << std::endl;
}
5. Asynchronous vs Synchronous Message Sending
Kafka Producers can send messages synchronously or asynchronously.
5.1 Synchronous Producer Example
Waits for confirmation before sending the next message.
cppCopyEditRdKafka::ErrorCode resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY,
message, strlen(message), NULL, 0, NULL);
producer->flush(1000); // Ensures message is sent before exit
5.2 Asynchronous Producer Example
Does not wait for acknowledgment, improving performance.
cppCopyEditproducer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY,
message, strlen(message), NULL, 0, NULL);
producer->poll(0); // Non-blocking send
6. Custom Partitioning in Kafka Producer API
By default, Kafka assigns messages round-robin across partitions. We can override this with key-based
partitioning.
6.1 Custom Partitioning Example in C++
cppCopyEditint partitionFunction(const std::string &key, int partition_count) {
return std::hash<std::string>{}(key) % partition_count;
}
int chosen_partition = partitionFunction("user123", 4);
7. Producer Performance Optimization
Kafka allows fine-tuning for latency vs throughput trade-offs.
7.1 Key Configurations
Config Description
batch.size Number of messages in a batch.
linger.ms Wait time before sending a batch.
compression.type Compress messages for faster transmission.
7.2 Optimizing Performance in C++
cppCopyEditconf->set("batch.size", "16384", errstr); // Increase batch size
conf->set("linger.ms", "5", errstr); // Delay sending to batch more messages
conf->set("compression.type", "snappy", errstr); // Use Snappy compression
8. Security in Kafka Producer API
Kafka supports SSL and SASL authentication to secure producer communication.
8.1 Enabling SSL in C++ Producer
cppCopyEditconf->set("security.protocol", "SSL", errstr);
conf->set("ssl.ca.location", "/etc/kafka/ca-cert.pem", errstr);
conf->set("ssl.certificate.location", "/etc/kafka/client-cert.pem", errstr);
conf->set("ssl.key.location", "/etc/kafka/client-key.pem", errstr);
9. Logging and Monitoring Kafka Producers
Kafka provides metrics for monitoring producer performance.
9.1 Enable Logging in C++
cppCopyEditconf->set("log_level", "3", errstr);
conf->set("debug", "msg,protocol", errstr);
10. Complete C++ Kafka Producer with Advanced Configuration
cppCopyEdit#include <iostream>
#include <rdkafka/rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "advanced_topic";
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("acks", "all", errstr);
conf->set("retries", "5", errstr);
conf->set("batch.size", "16384", errstr);
conf->set("linger.ms", "5", errstr);
conf->set("compression.type", "snappy", errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
std::string message = "Advanced Kafka Producer!";
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
NULL, NULL);
producer->flush(1000);
delete producer;
return 0;
}
1. Introduction to Message Serialization in Kafka
Kafka is a distributed event streaming system that processes and stores data in the form of messages.
These messages need to be serialized before transmission and deserialized at the consumer side.
Why Serialization?
Serialization converts complex data structures into byte streams that can be transmitted over a
network and stored efficiently.
Kafka Serialization Use Cases
• Transmitting JSON, Avro, Protobuf, or custom binary data.
• Maintaining data consistency across microservices.
• Reducing message size for better performance.
2. Types of Serialization in Kafka
Kafka supports different serialization formats:
Format Description Pros Cons
String Converts data into plain text. Simple & human- Not efficient for
readable. complex data.
JSON Stores structured data in text Widely used & flexible. Larger size than binary
format. formats.
Avro Compact binary format with Schema support, efficient Requires schema
schema evolution. storage. registry.
Protobuf Google's compact and fast binary Efficient, strong typing. Requires .proto
format. definition.
Custom Raw binary serialization using C++ High performance. No built-in schema
Binary structures. support.
3. Configuring Kafka Producer for Serialization
Serialization in Kafka is configured through Producer properties:
Property Description
key.serializer Converts message keys into bytes.
value.serializer Converts message values into bytes.
schema.registry.url Specifies the Avro/Protobuf schema registry.
Example in C++:
cppCopyEditconf->set("key.serializer", "org.apache.kafka.common.serialization.StringSerializer", errstr);
conf->set("value.serializer", "org.apache.kafka.common.serialization.StringSerializer", errstr);
4. Implementing Different Serialization Techniques in C++
4.1 String Serialization (Basic Example)
cppCopyEditstd::string message = "Hello, Kafka!";
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
NULL, NULL);
Pros: Simple and easy to debug.
Cons: Not suitable for large or structured data.
4.2 JSON Serialization
JSON is a widely used text format that supports nested structures.
C++ Producer with JSON Serialization
cppCopyEdit#include <nlohmann/json.hpp>
std::string serializeToJson(std::string id, std::string name, int age) {
nlohmann::json jsonData;
jsonData["id"] = id;
jsonData["name"] = name;
jsonData["age"] = age;
return jsonData.dump(); // Serialize to JSON string
}
std::string jsonMessage = serializeToJson("1001", "Alice", 25);
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(jsonMessage.c_str()), jsonMessage.size(),
NULL, NULL);
Pros: Human-readable, flexible.
Cons: Larger size compared to binary formats.
4.3 Avro Serialization
Apache Avro is a compact binary format with schema evolution support.
Avro Schema Example (User.avsc)
jsonCopyEdit{
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "age", "type": "int" }
]
}
C++ Producer with Avro Serialization
cppCopyEdit#include <avro/Encoder.hh>
#include <avro/Specific.hh>
#include <avro/Generic.hh>
std::vector<uint8_t> serializeToAvro(std::string id, std::string name, int age) {
avro::ValidSchema schema;
avro::compileJsonSchemaFromString(R"(
{
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "age", "type": "int" }
]
})", schema);
avro::GenericDatum datum(schema);
datum.value<avro::GenericRecord>().field("id").value<std::string>() = id;
datum.value<avro::GenericRecord>().field("name").value<std::string>() = name;
datum.value<avro::GenericRecord>().field("age").value<int>() = age;
std::vector<uint8_t> output;
avro::EncoderPtr encoder = avro::binaryEncoder();
avro::encode(*encoder, datum);
return output;
}
std::vector<uint8_t> avroMessage = serializeToAvro("1001", "Alice", 25);
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
avroMessage.data(), avroMessage.size(),
NULL, NULL);
Pros: Compact, supports schema evolution.
Cons: Requires Avro schema registry.
4.4 Protocol Buffers (Protobuf) Serialization
Protobuf is a highly efficient binary serialization format developed by Google.
Protobuf Schema Definition (user.proto)
protoCopyEditsyntax = "proto3";
message User {
string id = 1;
string name = 2;
int32 age = 3;
}
C++ Producer with Protobuf Serialization
cppCopyEdit#include "user.pb.h" // Generated from user.proto
#include <google/protobuf/util/json_util.h>
std::string serializeToProtobuf(std::string id, std::string name, int age) {
User user;
user.set_id(id);
user.set_name(name);
user.set_age(age);
return user.SerializeAsString(); // Serialize to binary
}
std::string protobufMessage = serializeToProtobuf("1001", "Alice", 25);
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(protobufMessage.c_str()), protobufMessage.size(),
NULL, NULL);
Pros: Fast, compact, strongly typed.
Cons: Requires .proto definitions.
4.5 Custom Binary Serialization in C++
Raw binary serialization provides maximum efficiency but lacks built-in schema validation.
C++ Struct for Binary Serialization
cppCopyEdit#include <cstring>
struct User {
char id[10];
char name[50];
int age;
};
std::vector<char> serializeToBinary(const User& user) {
std::vector<char> buffer(sizeof(User));
std::memcpy(buffer.data(), &user, sizeof(User));
return buffer;
}
User user = {"1001", "Alice", 25};
std::vector<char> binaryMessage = serializeToBinary(user);
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
binaryMessage.data(), binaryMessage.size(),
NULL, NULL);
Pros: Extremely efficient, minimal overhead.
Cons: No schema validation.
5. Choosing the Right Serialization Format
Format Use Case
String Simple text messages, debugging.
JSON Microservices, RESTful APIs, logs.
Avro Schema evolution, large-scale data processing.
Protobuf High-performance data exchange, low-latency systems.
Binary Embedded systems, real-time processing.
6. Full Kafka Producer with Serialization in C++
cppCopyEditstd::string jsonMessage = serializeToJson("1001", "Alice", 25);
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(jsonMessage.c_str()), jsonMessage.size(),
NULL, NULL);
producer->flush(1000);
1. Introduction to Key-Based Partitioning in Kafka
Kafka is a distributed message broker that organizes messages into topics divided into partitions. Key-
based partitioning ensures that messages with the same key always go to the same partition, which
guarantees message ordering and load distribution across brokers.
Why Partitioning Matters?
• Scalability: Increases throughput by distributing messages across multiple partitions.
• Parallelism: Consumers can process messages in parallel.
• Message Ordering: Ensures order for messages with the same key.
2. How Key-Based Partitioning Works?
When a Kafka producer sends a message, it can specify a key along with the message. Kafka uses a
partitioning strategy to determine which partition the message should be written to.
Partition Selection Logic
If a key is provided, Kafka applies:
partition = hash(key)𝑚𝑜𝑑 number of partitionspartition=hash(key)modnumber of partitions
If no key is provided, Kafka uses round-robin partitioning.
Example with 3 partitions
Key Hash(Key) Partition (hash % 3)
Key Hash(Key) Partition (hash % 3)
"Alice" hash("Alice") = 1345 1345 % 3 = 2
"Bob" hash("Bob") = 2789 2789 % 3 = 1
"Alice" hash("Alice") = 1345 1345 % 3 = 2
Messages with key "Alice" always go to partition 2.
3. Configuring Kafka Producer for Key-Based Partitioning
The producer needs to set the key.serializer and partitioner.class properties.
Property Description
key.serializer Converts the message key into bytes.
value.serializer Converts the message value into bytes.
partitioner.class Defines a custom partitioning strategy.
C++ Kafka Producer Configuration
cppCopyEditconf->set("key.serializer", "org.apache.kafka.common.serialization.StringSerializer", errstr);
conf->set("value.serializer", "org.apache.kafka.common.serialization.StringSerializer", errstr);
conf->set("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner", errstr);
4. Implementing Key-Based Partitioning in C++
4.1 Using Default Kafka Partitioner
Kafka’s default partitioner applies hash-based partitioning when a key is provided.
cppCopyEditstd::string key = "Alice";
std::string message = "Hello, Kafka!";
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
&key, NULL);
Ensures all messages with key "Alice" go to the same partition.
May cause partition imbalance if some keys are more frequent.
4.2 Custom Kafka Partitioner in C++
A custom partitioner allows better control over partition assignment.
Custom Hash-Based Partitioner
cppCopyEditclass CustomPartitioner : public RdKafka::PartitionerCb {
public:
int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
int32_t partition_cnt, void *msg_opaque) override {
return std::hash<std::string>{}(*key) % partition_cnt; // Hash-based partitioning
}
};
Using Custom Partitioner in Producer
cppCopyEditstd::unique_ptr<RdKafka::Conf>
conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
CustomPartitioner partitioner;
conf->set("partitioner_cb", &partitioner, errstr);
Pros: More control over partitioning strategy.
Cons: Needs extra implementation effort.
4.3 Round-Robin Partitioning (Without Key)
If no key is provided, Kafka uses round-robin partitioning.
cppCopyEditstd::string message = "Hello, Kafka!";
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
NULL, NULL);
Ensures even load distribution across partitions.
Does not guarantee message ordering.
5. When to Use Key-Based Partitioning?
Use Case Key-Based Partitioning?
Ensuring message order Yes, ensures messages with the same key go to the same partition.
Parallel processing Yes, allows parallel consumer processing.
Load balancing No, round-robin works better.
Event correlation Yes, ensures related events stay together.
6. Full Kafka Producer Example with Key-Based Partitioning in C++
cppCopyEdit#include <iostream>
#include <librdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test-topic";
std::string errstr;
std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
conf->set("bootstrap.servers", brokers, errstr);
conf->set("key.serializer", "org.apache.kafka.common.serialization.StringSerializer", errstr);
conf->set("value.serializer", "org.apache.kafka.common.serialization.StringSerializer", errstr);
std::unique_ptr<RdKafka::Producer> producer(RdKafka::Producer::create(conf.get(), errstr));
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return 1;
}
std::string key = "Alice";
std::string message = "Hello, Kafka with Key-Based Partitioning!";
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
&key, NULL);
producer->flush(1000);
std::cout << "Message sent successfully!" << std::endl;
return 0;
}
Ensures messages with the same key always go to the same partition.
Uses Kafka’s default partitioner to distribute messages efficiently.
7. Key-Based Partitioning Performance Considerations
7.1 Avoiding Partition Imbalance
Some keys may be more frequent than others, leading to hot partitions.
Solution: Use randomized partitioning for high-frequency keys:
cppCopyEditint partition = (std::hash<std::string>{}(key) + rand()) % partition_count;
7.2 Handling Partition Failures
If a partition leader fails, Kafka automatically reassigns partitions.
Solution: Configure replication and enable acks in producer settings:
cppCopyEditconf->set("acks", "all", errstr); // Ensures durability
7.3 Optimizing for High Throughput
Batching can improve performance for high-throughput producers.
Solution: Increase batch.size and linger.ms settings:
cppCopyEditconf->set("batch.size", "65536", errstr);
conf->set("linger.ms", "5", errstr);
8. Key Takeaways
Key-based partitioning guarantees message order within a partition.
Kafka’s default partitioner uses a hash function to assign keys to partitions.
Custom partitioners can provide better load balancing and fault tolerance.
Partitioning strategy affects performance, ordering, and scalability in Kafka applications.
1. Introduction to Kafka Acknowledgments
Kafka is a distributed message streaming platform that ensures data reliability and durability using
acknowledgment mechanisms (acks). These mechanisms control how producers receive confirmation
from brokers after sending messages.
Why Acknowledgments Matter?
• Data Durability: Prevents message loss.
• Performance Trade-offs: Higher reliability often means lower throughput.
• Fault Tolerance: Helps handle broker failures.
Kafka allows configuring the acknowledgment level via the acks parameter.
2. Types of Kafka Acknowledgment Mechanisms
Kafka provides three acknowledgment levels:
Acks Setting Description Reliability Performance
acks=0 Producer does not wait for broker acknowledgment. Low High
acks=1 Producer waits for acknowledgment from the leader Medium Medium
broker only.
acks=all (acks=- Producer waits for acknowledgment from all in-sync High Low
1) replicas (ISR).
3. Acknowledgment Levels in Detail
3.1 acks=0 (Fire-and-Forget Mode)
• The producer does not wait for any acknowledgment from Kafka.
• Messages are sent to the broker, but loss is possible if the broker crashes before persisting
them.
• Best for high-speed, low-latency applications (e.g., logging, monitoring).
Pros:
• Fastest message production.
• No delay caused by broker acknowledgment.
Cons:
• Risk of message loss if the broker fails.
C++ Producer Example with acks=0
cppCopyEditconf->set("acks", "0", errstr);
3.2 acks=1 (Leader Acknowledgment Mode)
• The producer waits until the leader broker writes the message to its local log.
• If the leader fails before replication, data loss is possible.
• Good balance between performance and durability.
Pros:
• Faster than acks=all, while ensuring the leader receives the message.
• Prevents message loss in most cases.
Cons:
• Risk of data loss if the leader crashes before replication.
C++ Producer Example with acks=1
cppCopyEditconf->set("acks", "1", errstr);
3.3 acks=all (Full Replication Mode)
• The producer waits until all in-sync replicas (ISR) acknowledge the message.
• Ensures no message loss as long as at least one in-sync replica is available.
• Most reliable but can be slow.
Pros:
• Guaranteed durability (message won't be lost).
• Handles leader failures without data loss.
Cons:
• Higher latency due to waiting for all replicas.
• More CPU and network usage.
C++ Producer Example with acks=all
cppCopyEditconf->set("acks", "all", errstr);
4. Impact of Acknowledgment Mechanisms on Performance
Acks Setting Latency Throughput Durability
acks=0 Low High No guarantee
acks=1 Medium Medium Leader-only
acks=all High Low Fully replicated
Trade-Off: acks=all ensures durability but affects throughput.
5. Implementing Acknowledgment Mechanisms in C++ Kafka Producer
5.1 Full C++ Implementation for a Kafka Producer
The following example demonstrates how to configure acks=all in a Kafka producer.
C++ Code: Kafka Producer with Acknowledgments
cppCopyEdit#include <iostream>
#include <librdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test-topic";
std::string errstr;
// Create a Kafka configuration object
std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
// Set broker address
conf->set("bootstrap.servers", brokers, errstr);
// Set acknowledgment mode (change to "0", "1", or "all" as needed)
conf->set("acks", "all", errstr);
// Create a Kafka producer
std::unique_ptr<RdKafka::Producer> producer(RdKafka::Producer::create(conf.get(), errstr));
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return 1;
}
// Sending message
std::string key = "myKey";
std::string message = "Hello, Kafka with Acknowledgments!";
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
&key, NULL);
// Wait for message delivery
producer->flush(1000);
std::cout << "Message sent successfully!" << std::endl;
return 0;
}
Ensures full durability (acks=all)
Can be adjusted for performance (acks=0 or acks=1)
6. Optimizing Acknowledgment Mechanisms
6.1 Configuring Retries
If a message is not acknowledged, Kafka allows automatic retries.
cppCopyEditconf->set("retries", "5", errstr);
Ensures delivery if a temporary failure occurs.
6.2 Configuring Delivery Timeout
Controls how long the producer waits for acknowledgment.
cppCopyEditconf->set("delivery.timeout.ms", "30000", errstr);
Ensures messages are either delivered or fail within the timeout.
7. Handling Acknowledgment Failures in C++
If Kafka does not acknowledge a message, it can be logged or retried.
cppCopyEditclass MyDeliveryReportCallback : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) override {
if (message.err()) {
std::cerr << "Delivery failed: " << message.errstr() << std::endl;
} else {
std::cout << "Message delivered to partition " << message.partition() << std::endl;
}
}
};
Logs failed messages for debugging.
Can trigger retry logic.
8. When to Use Different Acks Settings?
Use Case Recommended Acks
High-speed logging acks=0
General messaging acks=1
Use Case Recommended Acks
Mission-critical data acks=all
Best Practice: Use acks=all for critical data, acks=1 for balanced performance, and acks=0 for high-
speed, non-critical applications.
9. Key Takeaways
acks=0 is fastest but risks message loss.
acks=1 balances performance and reliability.
acks=all ensures no message loss but is slower.
C++ implementation allows fine-tuning acknowledgment settings for specific needs.
1. Introduction to Kafka Producer Configurations
Kafka producers have various configuration parameters that control message delivery, performance,
reliability, and fault tolerance. Some of the most important ones are:
89. acks - Controls message acknowledgment behavior.
90. retries - Defines the number of retry attempts if message delivery fails.
91. batch.size - Determines the maximum batch size for sending messages.
92. linger.ms - Introduces a delay before sending messages to allow batching.
93. buffer.memory - Controls the total memory allocated for buffering unsent messages.
Configuring these correctly is crucial for ensuring high throughput and reliability.
2. Key Producer Configurations and Their Impact
2.1 acks (Message Acknowledgment Mode)
• Defines how many brokers must acknowledge a message before it is considered successful.
• Possible values:
o acks=0 → No acknowledgment, highest speed, risk of loss.
o acks=1 → Acknowledged by the leader broker, balance of speed and safety.
o acks=all (acks=-1) → Acknowledged by all in-sync replicas (ISR), safest but slowest.
Example C++ Configuration
cppCopyEditconf->set("acks", "all", errstr);
Best for durability → Ensures no data loss.
Tradeoff → Higher latency.
2.2 retries (Message Retries on Failure)
• Defines the number of retry attempts before a message is dropped.
• Helps recover from temporary broker failures or network issues.
Example C++ Configuration
cppCopyEditconf->set("retries", "5", errstr);
Ensures delivery even during transient failures.
If set too high, can cause duplicate messages if not handled with idempotence.
2.3 batch.size (Batching Messages for Efficiency)
• Kafka groups messages into batches before sending them to brokers.
• Controls the maximum size (bytes) of a batch.
Example C++ Configuration
cppCopyEditconf->set("batch.size", "16384", errstr); // 16 KB batch size
Larger batches improve throughput.
Too large → Higher memory usage, possible delays if the batch isn’t filled.
2.4 linger.ms (Adding Delay for Batching)
• Introduces a delay before sending messages to allow batch accumulation.
• Good for increasing throughput at the cost of slight latency.
Example C++ Configuration
cppCopyEditconf->set("linger.ms", "5", errstr);
Reduces network calls, improves batching efficiency.
Increases message latency.
2.5 buffer.memory (Producer Buffer Memory)
• Defines the maximum amount of memory available for buffering unsent messages.
• If full, new messages block or result in an exception.
Example C++ Configuration
cppCopyEditconf->set("buffer.memory", "33554432", errstr); // 32MB buffer
Larger buffer allows handling higher message rates.
Too large → Higher memory usage.
2.6 key.serializer & value.serializer (Message Serialization)
• Controls how Kafka serializes messages before sending.
• Common formats: JSON, Avro, Protocol Buffers.
Example C++ Configuration
cppCopyEditconf->set("key.serializer", "org.apache.kafka.common.serialization.StringSerializer", errstr);
conf->set("value.serializer", "org.apache.kafka.common.serialization.StringSerializer", errstr);
Ensures correct encoding of message data.
Wrong serialization can cause deserialization errors.
2.7 compression.type (Message Compression)
• Kafka compresses messages before sending to brokers.
• Available options: gzip, snappy, lz4, zstd.
Example C++ Configuration
cppCopyEditconf->set("compression.type", "gzip", errstr);
Reduces network bandwidth usage.
Increases CPU overhead for compression/decompression.
3. Full C++ Implementation for a Kafka Producer with Configurations
The following C++ program demonstrates a Kafka producer with various configurations using
librdkafka.
C++ Code: Kafka Producer with Configurations
cppCopyEdit#include <iostream>
#include <librdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test-topic";
std::string errstr;
// Create Kafka configuration
std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
// Set broker address
conf->set("bootstrap.servers", brokers, errstr);
// Configure message acknowledgment level
conf->set("acks", "all", errstr);
// Enable retries for temporary failures
conf->set("retries", "5", errstr);
// Optimize batch size and latency
conf->set("batch.size", "16384", errstr);
conf->set("linger.ms", "5", errstr);
// Allocate sufficient buffer memory
conf->set("buffer.memory", "33554432", errstr);
// Set compression type for efficiency
conf->set("compression.type", "gzip", errstr);
// Create Kafka producer
std::unique_ptr<RdKafka::Producer> producer(RdKafka::Producer::create(conf.get(), errstr));
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return 1;
}
// Send message
std::string key = "myKey";
std::string message = "Hello, Kafka with Advanced Configurations!";
producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(),
&key, NULL);
// Wait for message delivery
producer->flush(1000);
std::cout << "Message sent successfully!" << std::endl;
return 0;
}
Uses advanced configurations (acks=all, retries=5, batch.size=16384, linger.ms=5, etc.).
Ensures durability, performance tuning, and fault tolerance.
4. Optimizing Producer Performance
Setting Optimization
batch.size Larger values reduce network calls.
linger.ms Small delays allow more efficient batching.
compression.type Reduces network usage but increases CPU load.
acks=1 Good balance between speed and durability.
retries=5 Ensures messages are retried before failing.
For high-speed applications, use:
acks=1
batch.size=65536
linger.ms=10
compression.type=snappy
For maximum reliability, use:
acks=all
retries=10
buffer.memory=67108864
compression.type=gzip
5. Key Takeaways
acks, retries, and batch.size significantly impact Kafka performance.
Higher linger.ms improves batching but adds latency.
Compression saves bandwidth but requires more CPU.
Using the right configurations ensures high throughput and reliability.
1. Kafka Consumers
Introduction
Apache Kafka is a distributed event streaming platform used for high-throughput messaging between
services. Kafka follows a publish-subscribe model, where producers send messages to topics, and
consumers read from them.
A Kafka consumer is responsible for reading records from Kafka topics. Consumers typically run in
applications that process real-time data, such as:
• Log processing
• Streaming analytics
• Event-driven architectures
Kafka consumers can operate independently or within consumer groups, distributing the workload
among multiple instances.
How Kafka Consumers Work
94. Consumer subscribes to one or more topics.
95. It requests data by polling the Kafka broker.
96. Kafka delivers messages from assigned partitions.
97. Consumer processes the data and optionally commits offsets.
Kafka Consumer Workflow
The key operations of a Kafka consumer are:
98. Create a Kafka consumer instance.
99. Subscribe to one or more topics.
100. Poll messages from the broker periodically.
101. Process received messages.
102. Commit offsets (manually or automatically).
2. Consumer API
Kafka provides a Consumer API that allows applications to consume messages from topics.
Key Methods in Kafka Consumer API
• subscribe(): Subscribes the consumer to a topic.
• poll(): Retrieves messages from Kafka.
• commitSync(): Manually commits offsets to Kafka.
• close(): Gracefully shuts down the consumer.
Kafka Consumer API Example in C++ (librdkafka)
Kafka does not have an official C++ client, but librdkafka provides a C++ interface.
C++ Example: Creating a Kafka Consumer
cppCopyEdit#include <iostream>
#include <rdkafka.h>
// Callback for error handling
void errorCallback(rd_kafka_t* rk, int err, const char* reason, void* opaque) {
std::cerr << "Kafka Error: " << reason << std::endl;
}
void consumeMessages() {
rd_kafka_conf_t* conf = rd_kafka_conf_new();
rd_kafka_conf_set_error_cb(conf, errorCallback);
// Create Kafka consumer
rd_kafka_t* consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
rd_kafka_poll_set_consumer(consumer);
// Subscribe to topic
rd_kafka_topic_partition_list_t* topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "test_topic", 0);
rd_kafka_subscribe(consumer, topics);
// Consume messages
while (true) {
rd_kafka_message_t* msg = rd_kafka_consumer_poll(consumer, 1000);
if (msg) {
if (msg->err) {
std::cerr << "Error receiving message: " << rd_kafka_err2str(msg->err) << std::endl;
} else {
std::cout << "Received message: " << static_cast<char*>(msg->payload) << std::endl;
}
rd_kafka_message_destroy(msg);
}
}
// Cleanup
rd_kafka_consumer_close(consumer);
rd_kafka_destroy(consumer);
}
int main() {
consumeMessages();
return 0;
}
Explanation
103. The consumer is configured using rd_kafka_conf_new().
104. We subscribe to a Kafka topic using rd_kafka_subscribe().
105. The consumer polls messages from Kafka using rd_kafka_consumer_poll().
106. Received messages are processed and printed to the console.
107. The consumer commits offsets manually if needed.
3. Consumer Groups and Offsets
Consumer Groups
A consumer group is a collection of consumers that read from a Kafka topic together.
• Each partition is assigned to only one consumer within a group.
• If a consumer fails, its partitions are reassigned to another consumer.
• Multiple consumer groups can consume the same topic independently.
Example: Multiple Consumers in a Group
sqlCopyEditTopic: orders
Partitions: P0, P1, P2, P3
Consumer Group: order_processing
- Consumer A reads from P0, P1
- Consumer B reads from P2, P3
If Consumer A fails, Consumer B will take over partitions P0 and P1.
Offsets in Kafka
Offsets track the last consumed message in a partition. Kafka stores offsets in an internal topic called
__consumer_offsets.
There are two ways to manage offsets:
108. Automatic offset commit (enable.auto.commit = true)
109. Manual offset commit (commitSync() or commitAsync())
C++ Example: Manual Offset Commit
cppCopyEditrd_kafka_commit(consumer, NULL, 0);
This commits the latest processed offset, ensuring no duplicate messages are processed in case of
failures.
4. Auto Offset Reset Policy
When a consumer starts for the first time, it may not have a previous offset. The auto.offset.reset policy
determines what happens:
Value Behavior
earliest Read from the beginning of the topic.
latest Read only new messages.
Value Behavior
none Fail if no offsets are found.
C++ Configuration
cppCopyEditrd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);
This ensures that when no offset is found, the consumer reads from the beginning of the topic.
5. Polling vs Push Mechanism
Kafka uses polling instead of push to handle message consumption.
Polling (Pull-Based)
• Consumers request data at their own pace.
• Prevents overwhelming slow consumers.
• Allows better control over resource usage.
Push Model (Not Used in Kafka)
• Messages are pushed to consumers immediately.
• Can lead to overload if consumers are slow.
C++ Example: Polling for Messages
cppCopyEditrd_kafka_message_t* msg = rd_kafka_consumer_poll(consumer, 1000);
This fetches messages on demand instead of receiving them automatically.
6. Consumer Configurations
Kafka consumers have several configuration options that affect behavior.
Configuration Description
group.id Defines the consumer group.
enable.auto.commit Controls automatic offset commits.
auto.offset.reset Defines behavior when no offset is found.
session.timeout.ms Sets the consumer failure detection time.
C++ Example: Configuring Kafka Consumer
cppCopyEditrd_kafka_conf_t* conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "group.id", "my_consumer_group", NULL, 0);
rd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);
rd_kafka_conf_set(conf, "session.timeout.ms", "60000", NULL, 0);
2. Consumer API in Kafka
Kafka provides a Consumer API that enables applications to consume records from Kafka topics
efficiently. This API is used in real-time applications such as event-driven systems, log aggregation,
monitoring, and analytics.
Overview of the Consumer API
The Kafka Consumer API allows consumers to:
• Subscribe to one or more topics
• Poll for new records
• Commit offsets manually or automatically
• Handle consumer group rebalances
• Process messages in batches or one-by-one
Kafka does not push messages to consumers. Instead, consumers must poll Kafka periodically to
receive new data.
Key Components of the Kafka Consumer API
1. KafkaConsumer Class
This class is responsible for consuming messages from Kafka. It provides essential functions such as:
• subscribe(): Subscribe to topics.
• poll(): Fetch new records.
• commitSync(): Manually commit offsets.
• close(): Gracefully close the consumer.
2. Consumer Records
A batch of records retrieved from Kafka in a poll operation.
3. Offsets and Commit Strategies
Consumers can manage offsets:
• Automatic Commit: Kafka commits offsets periodically.
• Manual Commit: Consumers explicitly commit offsets.
Kafka Consumer API in C++ Using librdkafka
Kafka does not provide an official C++ API, but we can use librdkafka to build a Kafka consumer.
Setting Up librdkafka for C++
First, install librdkafka:
bashCopyEditsudo apt-get install librdkafka-dev
Then, link it while compiling:
bashCopyEditg++ -o kafka_consumer consumer.cpp -lrdkafka++
C++ Implementation of Kafka Consumer
1. Basic Kafka Consumer in C++
This example shows how to consume messages from a topic.
cppCopyEdit#include <iostream>
#include <rdkafka.h>
// Error handling callback
void errorCallback(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
std::cerr << "Kafka Error: " << reason << std::endl;
}
void consumeMessages() {
// Configure Kafka consumer
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set_error_cb(conf, errorCallback);
// Create consumer instance
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
rd_kafka_poll_set_consumer(consumer);
// Subscribe to topic
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "test_topic", 0);
rd_kafka_subscribe(consumer, topics);
// Consume messages
while (true) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 1000);
if (msg) {
if (msg->err) {
std::cerr << "Error: " << rd_kafka_err2str(msg->err) << std::endl;
} else {
std::cout << "Received: " << static_cast<char *>(msg->payload) << std::endl;
}
rd_kafka_message_destroy(msg);
}
}
// Cleanup
rd_kafka_consumer_close(consumer);
rd_kafka_destroy(consumer);
}
int main() {
consumeMessages();
return 0;
}
2. Explanation
• Create a Kafka Consumer (rd_kafka_new)
• Subscribe to a Topic (rd_kafka_subscribe)
• Poll Messages (rd_kafka_consumer_poll)
• Process Messages & Handle Errors
• Clean Up (rd_kafka_destroy)
Manual Offset Management in C++
Kafka supports manual offset commits, which help in ensuring that messages are processed reliably.
cppCopyEditrd_kafka_commit(consumer, NULL, 0);
This ensures that the consumer does not reprocess messages after a restart.
Handling Consumer Rebalancing
Kafka dynamically assigns partitions to consumers in a consumer group. When a consumer joins or
leaves, Kafka rebalances partitions.
Rebalance Callback Example
cppCopyEditvoid rebalanceCallback(rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions, void *opaque) {
if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
std::cout << "Assigned Partitions" << std::endl;
rd_kafka_assign(rk, partitions);
} else {
std::cout << "Revoking Partitions" << std::endl;
rd_kafka_assign(rk, NULL);
}
}
Advanced Consumer Features
Batch Processing
Consumers can process multiple messages at once for efficiency.
cppCopyEditstd::vector<rd_kafka_message_t *> messages;
for (int i = 0; i < 10; i++) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 1000);
if (msg) messages.push_back(msg);
}
for (auto &msg : messages) {
std::cout << "Processing: " << static_cast<char *>(msg->payload) << std::endl;
rd_kafka_message_destroy(msg);
}
Asynchronous Offset Commits
Instead of waiting for each commit:
cppCopyEditrd_kafka_commit_async(consumer, NULL, 0);
This improves performance but requires proper error handling.
Error Handling in Consumers
Common Kafka consumer errors include:
• Broker connection failures
• Offset not found
• Rebalancing conflicts
Handling Errors in C++
cppCopyEditif (msg->err) {
std::cerr << "Error: " << rd_kafka_err2str(msg->err) << std::endl;
}
Performance Considerations
• Increase batch size (fetch.min.bytes = 1024)
• Reduce poll frequency (max.poll.interval.ms = 5000)
• Enable compression (compression.type = snappy)
3. Consumer Groups and Offsets in Kafka
Kafka Consumer Groups and Offsets are essential components that help manage message consumption
efficiently in a fault-tolerant and scalable manner.
This section will cover:
110. What are Consumer Groups?
111. How Consumer Groups Work?
112. Kafka Offsets and Their Role
113. Manual vs. Automatic Offset Management
114. Consumer Rebalancing in Groups
115. Implementing Consumer Groups in C++
116. Performance Optimization and Best Practices
1. What are Consumer Groups?
A Consumer Group is a collection of one or more consumers that work together to consume data from
a topic. Kafka ensures that:
• Each partition in a topic is consumed by only one consumer within a group.
• If a new consumer joins, Kafka redistributes partitions automatically.
• If a consumer fails, Kafka reassigns its partitions to another consumer in the group.
Why Use Consumer Groups?
• Parallel Processing: Multiple consumers can consume messages in parallel, increasing
throughput.
• Scalability: Adding more consumers to a group allows faster data consumption.
• Fault Tolerance: If a consumer fails, Kafka automatically reassigns its partitions.
2. How Consumer Groups Work?
Each consumer group has a unique group ID, which helps Kafka track which consumers belong to the
group.
Example: Consumer Group with 3 Consumers
Scenario 1: Equal Partitions and Consumers
Consider a Kafka topic Orders with 3 partitions (P0, P1, P2).
• If there are 3 consumers (C1, C2, C3) in a group, the partitions will be assigned as follows:
o C1 → P0
o C2 → P1
o C3 → P2
Each consumer processes only the messages from its assigned partition.
Scenario 2: More Partitions than Consumers
If the Orders topic has 4 partitions (P0, P1, P2, P3) but only 2 consumers (C1, C2), Kafka will distribute
partitions as:
• C1 → P0, P1
• C2 → P2, P3
Kafka balances the load automatically.
Scenario 3: More Consumers than Partitions
If there are 4 consumers (C1, C2, C3, C4) but only 3 partitions (P0, P1, P2), one consumer will be idle:
• C1 → P0
• C2 → P1
• C3 → P2
• C4 → No assigned partition
3. Kafka Offsets and Their Role
Kafka stores offsets to keep track of which messages have been consumed.
How Offsets Work?
Each partition maintains an incremental offset value, which is updated when:
• A consumer reads a message.
• The consumer commits an offset manually or automatically.
Where are Offsets Stored?
Offsets are stored in the Kafka internal topic __consumer_offsets.
Example of Offsets in a Partition
pgsqlCopyEditPartition 0:
Offset 0 → Msg 1
Offset 1 → Msg 2
Offset 2 → Msg 3 ← (Consumer last committed offset)
Offset 3 → Msg 4️ ← (Next message to consume)
Offset 4️ → Msg 5️
If a consumer crashes, Kafka resumes from the last committed offset.
4. Manual vs. Automatic Offset Management
Kafka provides two ways to manage offsets:
• Automatic Offset Commit
• Manual Offset Commit
A. Automatic Offset Commit
Kafka automatically commits offsets at regular intervals.
Configuration:
cppCopyEditrd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);
rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);
This commits offsets every 5 seconds.
Advantages:
Simple implementation
Reduces offset management complexity
Disadvantages:
If a consumer crashes before committing, it may reprocess messages
Less control over offset management
B. Manual Offset Commit
Consumers manually commit offsets only after successful processing.
cppCopyEditrd_kafka_commit(consumer, NULL, 0);
This ensures that only processed messages are acknowledged.
Advantages:
Ensures reliability (messages are not lost)
Allows handling retries
Disadvantages:
More code complexity
Can lead to lower performance
5. Consumer Rebalancing in Groups
Kafka performs automatic rebalancing when:
• A new consumer joins the group
• A consumer leaves the group
• A consumer fails
Rebalancing Process:
117. Kafka pauses message consumption.
118. It redistributes partitions among available consumers.
119. Consumers resume consuming from the last committed offset.
Rebalance Callback Example in C++
cppCopyEditvoid rebalanceCallback(rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions, void *opaque) {
if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
std::cout << "Assigned new partitions." << std::endl;
rd_kafka_assign(rk, partitions);
} else {
std::cout << "Revoking partitions." << std::endl;
rd_kafka_assign(rk, NULL);
}
}
• When a rebalance happens, Kafka assigns or revokes partitions.
6. Implementing Consumer Groups in C++
To create a consumer with group ID:
C++ Code: Consumer in a Group
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void consumeMessages() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "group.id", "order-consumers", NULL, 0);
rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
rd_kafka_poll_set_consumer(consumer);
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "Orders", -1);
rd_kafka_subscribe(consumer, topics);
while (true) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 1000);
if (msg) {
if (msg->err) {
std::cerr << "Error: " << rd_kafka_err2str(msg->err) << std::endl;
} else {
std::cout << "Received: " << static_cast<char *>(msg->payload) << std::endl;
}
rd_kafka_message_destroy(msg);
}
}
rd_kafka_consumer_close(consumer);
rd_kafka_destroy(consumer);
}
int main() {
consumeMessages();
return 0;
}
Key Points
Group ID (group.id) ensures that multiple consumers share load
Automatic offset commit (enable.auto.commit = true) simplifies offset management
Topics can be subscribed dynamically (-1 for all partitions)
7. Performance Optimization and Best Practices
• Use Multiple Consumers in a Group → Allows parallel processing
• Increase max.poll.records → Fetch more messages in each poll
• Use enable.auto.commit = false for reliability → Ensures no message loss
• Handle Consumer Failures → Implement exception handling
4. Auto Offset Reset Policy in Kafka
The Auto Offset Reset Policy determines how a Kafka consumer should behave when no previous offset
is found for a partition. This can happen in scenarios such as:
• A new consumer group starts consuming the topic for the first time.
• The stored offset is deleted due to retention policies.
• The offset is beyond the retention period in Kafka.
This section will cover:
120. What is Auto Offset Reset Policy?
121. Available Auto Offset Reset Strategies
122. How to Configure Auto Offset Reset in Kafka?
123. Use Cases for Different Policies
124. Implementing Auto Offset Reset in C++
125. Best Practices and Performance Considerations
1. What is Auto Offset Reset Policy?
Kafka’s auto.offset.reset setting tells the consumer where to start reading messages when no previous
offset is available.
Without this setting, Kafka does not know whether the consumer should start from the beginning of the
topic or from the latest available message.
2. Available Auto Offset Reset Strategies
Kafka provides three main strategies:
Policy Description
earliest Reads messages from the beginning of the topic.
latest Reads only new messages (skips old messages).
none Throws an error if no offset is found.
A. earliest Policy (Start from Beginning)
• If no offset is found, consumer starts from the first available message in the partition.
• Useful for data reprocessing, log processing, analytics applications.
• If a consumer crashes, it resumes from the last committed offset.
Example:
A consumer joins a group for the first time on a topic with 5 messages:
mathematicaCopyEditOffset 0 → Msg A
Offset 1 → Msg B
Offset 2 → Msg C
Offset 3 → Msg D
Offset 4️ → Msg E
• With earliest, the consumer starts from Msg A.
• If offset 2 is committed, the consumer resumes from Msg C.
Use Case
Good for batch processing, analytics, and event replay.
B. latest Policy (Start from New Messages Only)
• If no offset is found, the consumer ignores old messages and starts from the latest message.
• Ideal for real-time streaming applications where only new data matters.
Example:
If the consumer joins a topic with messages:
mathematicaCopyEditOffset 0 → Msg A
Offset 1 → Msg B
Offset 2 → Msg C
Offset 3 → Msg D
Offset 4️ → Msg E
• With latest, the consumer will start consuming from any new messages arriving after joining.
Use Case
Suitable for real-time event processing, stock market data, live feeds.
C. none Policy (Throw an Error if No Offset is Found)
• If no offset is found, the consumer fails with an exception.
• This prevents unexpected behavior when offset information is missing.
Example Error:
pgsqlCopyEdit[ERROR] No previous offset found and auto.offset.reset is set to none.
• The consumer does not start consuming unless an offset is explicitly set.
Use Case
Ideal for mission-critical applications where missing data must not be ignored.
3. How to Configure Auto Offset Reset in Kafka?
The policy is set using the auto.offset.reset property.
Kafka Consumer Configuration Example in C++
cppCopyEditrd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "group.id", "order-consumers", NULL, 0);
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);
The possible values:
cppCopyEditrd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0); // Start from the beginning
rd_kafka_conf_set(conf, "auto.offset.reset", "latest", NULL, 0); // Start from the latest message
rd_kafka_conf_set(conf, "auto.offset.reset", "none", NULL, 0); // Fail if no offset is found
4. Use Cases for Different Policies
Use Case Recommended Policy
Log Processing, Analytics earliest
Real-Time Streaming (e.g., Stock Market, IoT Data) latest
Mission-Critical Applications (e.g., Banking, Security) none
Scenario Example: Fraud Detection System
• If a fraud detection service needs to analyze all past transactions, it should use earliest.
• If a real-time fraud alert system only needs new transactions, it should use latest.
• If the system must not run without historical offsets, use none.
5. Implementing Auto Offset Reset in C++
C++ Code: Consumer with Auto Offset Reset
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void consumeMessages() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "group.id", "consumer-group-1", NULL, 0);
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
rd_kafka_poll_set_consumer(consumer);
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "Orders", -1);
rd_kafka_subscribe(consumer, topics);
while (true) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 1000);
if (msg) {
if (msg->err) {
std::cerr << "Error: " << rd_kafka_err2str(msg->err) << std::endl;
} else {
std::cout << "Received: " << static_cast<char *>(msg->payload) << std::endl;
}
rd_kafka_message_destroy(msg);
}
}
rd_kafka_consumer_close(consumer);
rd_kafka_destroy(consumer);
}
int main() {
consumeMessages();
return 0;
}
Key Features:
Uses "earliest" to read all messages from the beginning.
If a consumer joins a group with no previous offset, it starts from offset 0.
Uses librdkafka for Kafka operations in C++.
6. Best Practices and Performance Considerations
A. When to Use Each Policy?
Scenario Best Policy
Data Processing (Logs, Events, Analytics) earliest
Live Streaming (IoT, Stocks, Notifications) latest
Strict Offset Management Required none
B. Performance Considerations
Use earliest cautiously for large topics (can cause high latency).
Use latest when real-time processing is required.
Always monitor __consumer_offsets to track offset retention.
1. Introduction to Polling vs Push Mechanisms
Mechanism Description Example
Polling (Pull- The consumer actively requests messages from Kafka brokers Kafka
Based) when ready.
Push-Based The broker/server sends messages automatically to consumers RabbitMQ,
as they arrive. MQTT
• In a pull-based (polling) model, consumers request messages when they are ready.
• In a push-based model, messages are pushed to consumers as soon as they are available.
2. Why Kafka Uses Polling Instead of Push?
Kafka intentionally uses a pull-based (polling) model instead of push-based for several reasons:
✅ 1. Consumer Control Over Data Rate
• Polling allows consumers to fetch messages at their own speed rather than being
overwhelmed by message floods.
• Avoids backpressure issues where slow consumers can be overwhelmed with high message
rates.
✅ 2. Better Load Balancing
• Consumers control when and how much data they fetch, preventing memory overload.
• Unlike push-based systems where messages are sent at the broker’s rate, consumers in Kafka
regulate their own pace.
✅ 3. Efficient Batch Processing
• Consumers can batch process messages, improving throughput and reducing network
overhead.
✅ 4. Message Ordering Guarantees
• Pull-based consumption ensures each partition is consumed sequentially, preserving ordering
guarantees.
3. How Polling Works in Kafka?
126. Consumer sends a poll request to the Kafka broker.
127. Broker returns a batch of messages from assigned partitions.
128. Consumer processes the messages and commits offsets.
129. Consumer sends another poll request for the next batch.
Kafka Consumer Polling Process
lessCopyEdit[Consumer] ---> [Broker] : Poll request (fetch messages)
[Broker] ---> [Consumer] : Returns messages
[Consumer] ---> [Broker] : Commits offsets
[Consumer] ---> [Broker] : Poll request (next batch)
4. Polling in Kafka Consumer API (C++ Code Example)
Kafka’s polling mechanism is implemented in C++ using librdkafka.
C++ Consumer Polling Example
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void consumeMessages() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "group.id", "polling-consumer-group", NULL, 0);
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
rd_kafka_poll_set_consumer(consumer);
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "TestTopic", -1);
rd_kafka_subscribe(consumer, topics);
while (true) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 1000);
if (msg) {
if (msg->err) {
std::cerr << "Error: " << rd_kafka_err2str(msg->err) << std::endl;
} else {
std::cout << "Received: " << static_cast<char *>(msg->payload) << std::endl;
}
rd_kafka_message_destroy(msg);
}
}
rd_kafka_consumer_close(consumer);
rd_kafka_destroy(consumer);
}
int main() {
consumeMessages();
return 0;
}
Explanation
The consumer polls messages every 1000ms (1 second).
If messages are available, they are processed and offsets are committed.
The consumer requests the next batch, ensuring a controlled flow of data.
5. Push Mechanism in Other Messaging Systems
Some messaging systems use a push-based model, including:
System Mechanism Use Case
RabbitMQ Push Low-latency message delivery
MQTT Push IoT & real-time applications
Apache Pulsar Push/Pull Hybrid model
Push-Based Flow (Example in RabbitMQ)
lessCopyEdit[Broker] ---> [Consumer] : Message (pushed automatically)
[Consumer] ---> [Broker] : Acknowledges receipt
Push-Based Drawbacks in High-Throughput Systems
Overloads slow consumers if messages are sent too fast.
Loses fine-grained control over message processing rate.
Difficult to handle backpressure efficiently.
6. Performance Comparison: Polling vs Push
Feature Polling (Pull-Based) Push-Based
Flow Control Consumer controls rate Broker controls rate
Backpressure Handling Efficient Difficult
Batch Processing Yes No
Message Order Guarantee Strong Weaker
Latency Slightly higher Lower
Scalability High Moderate
Key Takeaways:
Polling ensures scalability & efficiency in high-throughput systems.
Push is good for low-latency, real-time applications but can cause backpressure.
7. Best Practices for Efficient Polling
1. Adjust Poll Interval for Performance
• Too frequent polling → Increases CPU usage.
• Too slow polling → Can trigger a consumer group rebalance.
• Optimal setting: Poll every 500ms – 2 seconds.
cppCopyEditrd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 500); // Poll every 500ms
2. Use Batch Polling for High Throughput
• Instead of polling one message at a time, fetch multiple messages in a batch.
cppCopyEditrd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 1000); // Fetch multiple
messages
3. Avoid Consumer Timeouts
• If a consumer does not poll within session.timeout.ms, Kafka marks it as failed and rebalances.
• Set session.timeout.ms = 30s for stability.
cppCopyEditrd_kafka_conf_set(conf, "session.timeout.ms", "30000", NULL, 0);
4. Use Manual Offset Commit for More Control
cppCopyEditrd_kafka_commit_message(consumer, msg, 0); // Commit manually
1. Introduction to Kafka Consumer Configurations
Kafka consumers can be fine-tuned using configuration settings provided via:
Consumer properties file (consumer.properties)
API methods (rd_kafka_conf_set in C++)
These configurations directly impact consumer behavior regarding offsets, parallelism, security, and
fault tolerance.
2. Essential Kafka Consumer Configurations
Configuration Description Recommended Value
bootstrap.servers List of Kafka brokers to connect to "localhost:9092"
group.id Consumer group ID "my-consumer-group"
auto.offset.reset Where to start reading if no offset exists "earliest" (for new consumers)
enable.auto.commit Automatically commit offsets "false" (use manual commit)
session.timeout.ms Consumer failure detection timeout 30000 (30 sec)
max.poll.records Max messages per poll request 500
fetch.min.bytes Min data per poll request 1KB
fetch.max.wait.ms Max wait time for batch fetch 500ms
1⃣ bootstrap.servers (Kafka Broker Address)
• Specifies the Kafka broker list to connect the consumer.
• Example:
cppCopyEditrd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
2⃣ group.id (Consumer Group ID)
• Defines the group to which the consumer belongs.
• Consumers in the same group share the load for partitions.
cppCopyEditrd_kafka_conf_set(conf, "group.id", "my-consumer-group", NULL, 0);
3⃣ auto.offset.reset (Starting Offset Behavior)
• Determines where to start consuming messages if no previous offset is stored.
o "earliest" → Read from the beginning.
o "latest" → Read only new messages.
cppCopyEditrd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);
4⃣ enable.auto.commit (Automatic Offset Commit)
• If true, Kafka automatically commits offsets periodically.
• Recommended: false (use manual commit for better control).
cppCopyEditrd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);
5️⃣ session.timeout.ms (Consumer Heartbeat Timeout)
• If a consumer fails to send a heartbeat within this time, Kafka removes it from the group.
• Default: 45000ms (45 sec), Recommended: 30000ms (30 sec).
cppCopyEditrd_kafka_conf_set(conf, "session.timeout.ms", "30000", NULL, 0);
3. Performance Tuning Parameters
Parameter Description Recommended Value
max.poll.records Max messages returned per poll request 500
fetch.min.bytes Min data size per fetch request 1KB
fetch.max.bytes Max data size per fetch request 10MB
fetch.max.wait.ms Max wait time for batching messages 500ms
max.partition.fetch.bytes Max fetch size per partition 1MB
1⃣ max.poll.records (Batch Size per Poll)
• Controls the number of messages retrieved per poll request.
• Higher values improve throughput but increase processing time.
cppCopyEditrd_kafka_conf_set(conf, "max.poll.records", "500", NULL, 0);
2⃣ fetch.min.bytes (Minimum Fetch Size)
• Minimum amount of data the broker should send.
• Higher values improve efficiency but increase latency.
cppCopyEditrd_kafka_conf_set(conf, "fetch.min.bytes", "1024", NULL, 0);
3⃣ fetch.max.wait.ms (Max Wait Time for Fetching Messages)
• The max time Kafka waits before sending messages to the consumer.
• Increasing it helps batching but increases latency.
cppCopyEditrd_kafka_conf_set(conf, "fetch.max.wait.ms", "500", NULL, 0);
4. Security Configurations
Kafka supports SSL and SASL authentication for security.
Configuration Purpose
security.protocol "SSL", "SASL_SSL", "SASL_PLAINTEXT"
ssl.keystore.location SSL keystore location
ssl.truststore.location SSL truststore location
sasl.mechanism Authentication mechanism (PLAIN, SCRAM-SHA-256)
cppCopyEditrd_kafka_conf_set(conf, "security.protocol", "SSL", NULL, 0);
rd_kafka_conf_set(conf, "ssl.keystore.location", "/etc/kafka/client.keystore.jks", NULL, 0);
rd_kafka_conf_set(conf, "ssl.truststore.location", "/etc/kafka/client.truststore.jks", NULL, 0);
5. C++ Code Example: Configuring a Kafka Consumer
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void configureConsumer() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
// Kafka Broker
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
// Consumer Group ID
rd_kafka_conf_set(conf, "group.id", "my-consumer-group", NULL, 0);
// Offset Reset Policy
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);
// Auto Commit Disabled (Manual Commit)
rd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);
// Security Configurations (If using SSL)
rd_kafka_conf_set(conf, "security.protocol", "SSL", NULL, 0);
rd_kafka_conf_set(conf, "ssl.keystore.location", "/etc/kafka/client.keystore.jks", NULL, 0);
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
rd_kafka_poll_set_consumer(consumer);
std::cout << "Kafka Consumer Configured Successfully!" << std::endl;
rd_kafka_destroy(consumer);
}
int main() {
configureConsumer();
return 0;
}
Explanation
Sets up consumer configurations (bootstrap.servers, group.id, auto.offset.reset).
Configures security settings (SSL authentication).
Disables auto commit for manual offset management.
6. Best Practices for Consumer Configurations
Use enable.auto.commit = false to avoid message loss.
Set session.timeout.ms = 30s to detect failures faster.
Use batch processing (max.poll.records = 500) for efficiency.
Enable SSL (security.protocol = SSL) in production.
Use multiple partitions to scale consumer throughput.
1. Introduction to Kafka Brokers and Clustering
Apache Kafka is a distributed event-streaming platform designed to handle large volumes of real-time
data. It is widely used for log aggregation, stream processing, and real-time analytics. Kafka follows a
publish-subscribe model where producers send data to topics, and consumers read from them.
1.1 Why Clustering?
A single Kafka broker is not sufficient to handle large-scale workloads due to:
• Scalability: A single broker has resource limits.
• Fault Tolerance: A broker can fail, leading to data loss.
• High Availability: A distributed system ensures continued operation.
By clustering multiple brokers together, Kafka achieves high availability, scalability, and fault tolerance.
1.2 Kafka Cluster Overview
A Kafka cluster consists of:
• Multiple brokers (instances of Kafka servers)
• Partitions to distribute data across brokers
• Replication for redundancy and fault tolerance
• ZooKeeper to manage metadata and leader elections
1.3 Kafka Architecture
A Kafka cluster contains:
• Producers: Applications that publish messages.
• Brokers: Kafka servers storing and managing messages.
• Consumers: Applications that read messages.
• ZooKeeper: A service that keeps track of broker metadata.
2. Kafka Brokers
A Kafka broker is a single instance of a Kafka server that is responsible for:
• Managing topic partitions.
• Handling producer and consumer requests.
• Storing and replicating data.
Each broker has a unique ID and is connected to other brokers in the cluster.
2.1 Broker Responsibilities
130. Message Storage: Brokers store topic partitions on disk.
131. Load Balancing: Kafka distributes partitions across brokers.
132. Message Routing: Determines which partition a message belongs to.
133. Replication Management: Ensures data redundancy.
134. Failure Recovery: Ensures continuity in case of failures.
2.2 Setting Up a Kafka Broker
To configure a broker, update server.properties:
bashCopyEditbroker.id=1
log.dirs=/var/lib/kafka/data
zookeeper.connect=localhost:2181
Start a Kafka broker:
bashCopyEditbin/kafka-server-start.sh config/server.properties
2.3 Connecting a Client to Kafka Broker
Example: Connecting a C++ producer to Kafka using librdkafka:
cppCopyEdit#include <iostream>
#include <rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test_topic";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, nullptr, 0);
if (!producer) {
std::cerr << "Failed to create Kafka producer" << std::endl;
return 1;
}
std::cout << "Kafka producer connected to broker at " << brokers << std::endl;
rd_kafka_destroy(producer);
return 0;
}
3. Kafka Clustering
Kafka achieves high availability and scalability through clustering. A Kafka cluster consists of multiple
brokers that work together.
3.1 Benefits of Kafka Clustering
• Horizontal Scaling: More brokers can be added to handle load.
• Fault Tolerance: If a broker fails, another broker takes over.
• Data Replication: Ensures redundancy.
• Parallel Processing: Distributes partitions across multiple nodes.
3.2 How Kafka Clustering Works
135. Producers send messages to Kafka.
136. Kafka distributes messages across partitions.
137. Messages are replicated to multiple brokers.
138. Consumers read messages from brokers.
3.3 Setting Up a Multi-Broker Cluster
Modify configuration files for multiple brokers:
Broker 1 (server-1.properties):
bashCopyEditbroker.id=1
log.dirs=/var/kafka/data1
zookeeper.connect=localhost:2181
Broker 2 (server-2.properties):
bashCopyEditbroker.id=2
log.dirs=/var/kafka/data2
zookeeper.connect=localhost:2181
Start multiple brokers:
bashCopyEditbin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
Verify brokers:
bashCopyEditbin/kafka-topics.sh --list --zookeeper localhost:2181
3.4 Example: Listing Brokers in a Cluster
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void list_brokers(rd_kafka_t *rk) {
const rd_kafka_metadata_t *metadata;
if (rd_kafka_metadata(rk, 0, NULL, &metadata, 5000) == RD_KAFKA_RESP_ERR_NO_ERROR) {
for (int i = 0; i < metadata->broker_cnt; ++i) {
std::cout << "Broker: " << metadata->brokers[i].host << ":" << metadata->brokers[i].port <<
std::endl;
}
}
rd_kafka_metadata_destroy(metadata);
}
4. Partitioning and Replication in Clustering
4.1 Partitioning
Kafka splits data into partitions for scalability.
• Messages are stored in ordered partitions.
• Each partition is stored on one broker.
• Partitioning allows parallel processing.
Example: Producing Messages to Partitions
cppCopyEdit#include <iostream>
#include <rdkafka.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic = "partitioned_topic";
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, nullptr, 0);
int partition = 1;
std::string message = "Message to partition 1";
rd_kafka_producev(
producer,
RD_KAFKA_V_TOPIC(topic.c_str()),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_VALUE(message.c_str(), message.size()),
RD_KAFKA_V_END);
rd_kafka_flush(producer, 5000);
rd_kafka_destroy(producer);
}
4.2 Replication
Kafka ensures fault tolerance through replication.
• Each partition has one leader and multiple followers.
• Followers replicate data from the leader.
• If a leader fails, a follower becomes the new leader.
Example: Configuring Replication in Kafka
bashCopyEditbin/kafka-topics.sh --create --topic test_topic --partitions 3 --replication-factor 2 --
zookeeper localhost:2181
5. Failover and Controller Management
5.1 Broker Failover Handling
Kafka automatically handles broker failures.
• If a leader fails, Kafka promotes a follower.
• Consumers automatically switch to the new leader.
C++ Example: Checking Kafka Broker Health
cppCopyEdit#include <iostream>
#include <rdkafka.h>
bool is_broker_alive(rd_kafka_t *rk, int broker_id) {
const rd_kafka_metadata_t *metadata;
if (rd_kafka_metadata(rk, 0, NULL, &metadata, 5000) != RD_KAFKA_RESP_ERR_NO_ERROR) {
return false;
}
for (int i = 0; i < metadata->broker_cnt; ++i) {
if (metadata->brokers[i].id == broker_id) {
rd_kafka_metadata_destroy(metadata);
return true;
}
}
rd_kafka_metadata_destroy(metadata);
return false;
}
5.2 Controller Election
• The Kafka controller manages leader elections.
• If the controller fails, ZooKeeper elects a new one.
Example: Electing a Kafka Controller
cppCopyEdit#include <iostream>
#include <zookeeper/zookeeper.h>
void create_controller_node(zhandle_t *zh) {
int ret = zoo_create(zh, "/controller", "broker_1", 8, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL,
nullptr, 0);
if (ret == ZOK) {
std::cout << "Broker elected as controller" << std::endl;
} else {
std::cout << "Another broker is already the controller" << std::endl;
}
}
1. Introduction to Kafka Broker Roles
A Kafka broker is a server that stores and manages data in a Kafka cluster. Each broker is responsible for
handling producer requests, storing topic partitions, replicating data, and serving consumer requests.
1.1 Importance of Broker Roles
In a Kafka cluster, brokers have different roles to ensure efficient message distribution and fault
tolerance:
• Leader Broker: Manages read and write requests for a partition.
• Follower Broker: Replicates data from the leader and takes over if the leader fails.
• Controller Broker: Manages leader elections and cluster metadata.
2. Leader and Follower Brokers
Kafka ensures data availability by distributing topic partitions across multiple brokers. Each partition has:
139. One Leader Broker: Handles producer and consumer requests.
140. One or More Follower Brokers: Replicates data from the leader and takes over in case
of failure.
2.1 Leader Broker
• A leader broker is responsible for handling all read and write operations for a partition.
• It ensures the messages are committed and properly replicated across the cluster.
• Producers send data to the leader of a partition, not directly to followers.
2.2 Follower Broker
• A follower broker replicates data from the leader broker.
• It does not handle producer requests directly.
• If a leader broker fails, a follower is elected as the new leader.
2.3 How Kafka Assigns Leader and Follower Roles
• When a topic is created, Kafka automatically assigns partitions across brokers.
• The broker that initially holds a partition becomes the leader.
• Other brokers holding replicas of the partition become followers.
Example: Creating a topic with partitions and replication factor
bashCopyEditbin/kafka-topics.sh --create --topic test_topic --partitions 3 --replication-factor 2 --
zookeeper localhost:2181
This command creates a topic test_topic with 3 partitions and a replication factor of 2.
Example: Checking partition leadership
bashCopyEditbin/kafka-topics.sh --describe --topic test_topic --zookeeper localhost:2181
Output:
yamlCopyEditTopic: test_topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
Here, brokers 1, 2, and 3 act as leaders for different partitions.
3. Controller Broker in Kafka
A controller broker is a special role in Kafka that manages:
• Leader election when a broker fails.
• Metadata management of topics, partitions, and brokers.
3.1 How Controller Broker is Elected
141. ZooKeeper selects a broker to act as the controller.
142. The elected controller monitors broker status and performs leader elections.
143. If the controller fails, ZooKeeper assigns a new one.
Checking the controller broker in Kafka
bashCopyEditbin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 --version
This command returns metadata, including which broker is the controller.
4. Leader and Follower Coordination (ISR - In-Sync Replicas)
4.1 In-Sync Replicas (ISR)
• ISR includes the leader and all followers that have fully replicated the leader’s data.
• A message is considered committed only when all ISR replicas have acknowledged it.
• If a follower falls behind, it is removed from the ISR.
Checking ISR in Kafka
bashCopyEditbin/kafka-topics.sh --describe --topic test_topic --zookeeper localhost:2181
Output:
yamlCopyEditTopic: test_topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Here, brokers 1 and 2 are in the ISR for partition 0.
5. Leader Election Process
Kafka ensures high availability by automatically electing a new leader when the current leader fails.
5.1 Steps in Leader Election
144. Leader broker fails.
145. Controller broker detects failure using ZooKeeper.
146. A new leader is elected from the ISR list.
147. Consumers and producers reconnect to the new leader.
5.2 Example: Simulating Leader Failure
148. Start three brokers:
bashCopyEditbin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
bin/kafka-server-start.sh config/server-3.properties
149. List partitions and leaders:
bashCopyEditbin/kafka-topics.sh --describe --topic test_topic --zookeeper localhost:2181
150. Stop the leader broker:
bashCopyEditbin/kafka-server-stop.sh
151. Check the new leader:
bashCopyEditbin/kafka-topics.sh --describe --topic test_topic --zookeeper localhost:2181
6. Implementing Broker Role Detection in C++
6.1 Detecting Kafka Brokers and Their Roles
Using librdkafka, we can detect brokers and their roles in the cluster.
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void list_brokers(rd_kafka_t *rk) {
const rd_kafka_metadata_t *metadata;
if (rd_kafka_metadata(rk, 0, NULL, &metadata, 5000) == RD_KAFKA_RESP_ERR_NO_ERROR) {
for (int i = 0; i < metadata->broker_cnt; ++i) {
std::cout << "Broker: " << metadata->brokers[i].host << ":" << metadata->brokers[i].port <<
std::endl;
}
}
rd_kafka_metadata_destroy(metadata);
}
int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, nullptr, 0);
list_brokers(rk);
rd_kafka_destroy(rk);
return 0;
}
This program lists the brokers available in the Kafka cluster.
7. Controller Election in C++ (Using ZooKeeper)
To manage controller elections, we can use ZooKeeper in C++.
cppCopyEdit#include <iostream>
#include <zookeeper/zookeeper.h>
void create_controller_node(zhandle_t *zh) {
int ret = zoo_create(zh, "/controller", "broker_1", 8, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL,
nullptr, 0);
if (ret == ZOK) {
std::cout << "Broker elected as controller" << std::endl;
} else {
std::cout << "Another broker is already the controller" << std::endl;
}
}
int main() {
zhandle_t *zh = zookeeper_init("localhost:2181", nullptr, 2000, 0, 0, 0);
if (!zh) {
std::cerr << "Failed to connect to ZooKeeper" << std::endl;
return 1;
}
create_controller_node(zh);
zookeeper_close(zh);
return 0;
}
This program attempts to create a controller node, which succeeds only if the broker is elected as a
Kafka controller.
1. Introduction to Kafka Partitions
Kafka is a distributed messaging system that enables high throughput and fault tolerance by dividing
topics into partitions.
Each partition is stored on multiple Kafka brokers for redundancy. Within these partitions:
• One broker is designated as the Leader.
• The other brokers replicate data as Followers.
This partitioning mechanism ensures parallelism, load balancing, and fault tolerance in Kafka.
2. Understanding Partitioning in Kafka
2.1 What is a Partition?
A partition is a subset of messages within a topic. Each partition acts as an independent log stored on
different brokers.
• Producers write messages to topic partitions.
• Consumers read messages from partitions in order.
• Partitions enable horizontal scaling, as multiple partitions can be processed in parallel.
2.2 How Partitions Work?
• When a Kafka topic is created, it is split into multiple partitions.
• Each partition is assigned a leader broker and one or more follower brokers.
• Producers always send messages to the leader partition.
• Followers replicate the leader's data to maintain consistency.
2.3 Example of a Partitioned Topic
Imagine a topic "orders" with three partitions:
Partition Leader Broker Follower Brokers
Partition 0 Broker 1 Broker 2, Broker 3
Partition 1 Broker 2 Broker 1, Broker 3
Partition 2 Broker 3 Broker 1, Broker 2
• If Broker 1 fails, Broker 2 or 3 will take over as the new leader.
• Kafka ensures that partitions are evenly distributed across brokers.
2.4 Creating a Topic with Partitions
You can create a Kafka topic with multiple partitions using the following command:
bashCopyEditbin/kafka-topics.sh --create --topic orders --partitions 3 --replication-factor 2 --zookeeper
localhost:2181
This creates a topic "orders" with:
• 3 partitions
• 2 replicas per partition
3. Leader Partition in Kafka
3.1 Role of a Leader Partition
A leader partition is the primary broker that:
• Handles all producer writes.
• Serves consumer reads.
• Synchronizes data with follower partitions.
3.2 How Kafka Assigns Leaders?
When a topic is created, Kafka assigns one broker as the leader for each partition.
• The leader is chosen from available brokers.
• The leader remains responsible until failover occurs.
3.3 Checking Partition Leadership
To view partition leadership details:
bashCopyEditbin/kafka-topics.sh --describe --topic orders --zookeeper localhost:2181
Example output:
yamlCopyEditTopic: orders Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: orders Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: orders Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
• Leader column shows which broker is leading each partition.
• ISR (In-Sync Replicas) lists brokers that are in sync with the leader.
4. Follower Partition in Kafka
4.1 Role of Follower Partitions
A follower partition is a replica of the leader partition that:
• Passively replicates data from the leader.
• Steps in as the new leader if the current leader fails.
• Does not serve producer requests directly.
4.2 Follower Synchronization with Leader
• Followers fetch data from the leader at regular intervals.
• If a follower lags behind, it is removed from the ISR.
• Followers must stay in sync with the leader to ensure fault tolerance.
4.3 Checking In-Sync Replicas (ISR)
To check which brokers are in sync with the leader:
bashCopyEditbin/kafka-topics.sh --describe --topic orders --zookeeper localhost:2181
Example output:
yamlCopyEditTopic: orders Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
• Brokers 1 and 2 are in the ISR for partition 0.
• If broker 2 falls behind, it will be removed from the ISR.
5. Leader Election Process
5.1 What Happens When a Leader Fails?
• If a leader broker crashes, Kafka elects a new leader.
• The controller broker coordinates the election.
• A new leader is chosen from the ISR list.
5.2 Steps in Leader Election
152. Leader broker fails unexpectedly.
153. Controller broker detects failure using ZooKeeper.
154. A new leader is selected from in-sync replicas (ISR).
155. Producers and consumers reconnect to the new leader.
5.3 Simulating Leader Failure
To simulate a leader failure:
156. Start three Kafka brokers:
bashCopyEditbin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
bin/kafka-server-start.sh config/server-3.properties
157. Check partition leadership:
bashCopyEditbin/kafka-topics.sh --describe --topic orders --zookeeper localhost:2181
158. Stop the leader broker:
bashCopyEditbin/kafka-server-stop.sh
159. Check the new leader:
bashCopyEditbin/kafka-topics.sh --describe --topic orders --zookeeper localhost:2181
Kafka will automatically assign a new leader from the ISR list.
6. Implementing Partition Leadership Detection in C++
Using librdkafka, we can programmatically check Kafka partitions and their leaders.
6.1 Detecting Partition Leaders in Kafka (C++ Code)
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void get_partition_leader(rd_kafka_t *rk, const char *topic_name) {
const rd_kafka_metadata_t *metadata;
if (rd_kafka_metadata(rk, 0, nullptr, &metadata, 5000) == RD_KAFKA_RESP_ERR_NO_ERROR) {
for (int i = 0; i < metadata->topic_cnt; ++i) {
const rd_kafka_metadata_topic_t &topic = metadata->topics[i];
if (strcmp(topic.topic, topic_name) == 0) {
for (int j = 0; j < topic.partition_cnt; ++j) {
std::cout << "Partition: " << topic.partitions[j].id
<< " Leader: " << topic.partitions[j].leader << std::endl;
}
}
}
}
rd_kafka_metadata_destroy(metadata);
}
int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, nullptr, 0);
get_partition_leader(rk, "orders");
rd_kafka_destroy(rk);
return 0;
}
This program:
• Connects to Kafka.
• Fetches metadata for the "orders" topic.
• Displays partition leaders.
1. Introduction to Broker Failover Handling
Kafka is a distributed, fault-tolerant messaging system designed to handle broker failures without
impacting system availability. Broker failover handling ensures that when a broker crashes, Kafka can
quickly recover and continue operations with minimal disruption.
In this section, we will cover:
• What happens when a broker fails?
• How Kafka handles broker failover?
• Leader election mechanism during failover
• Replication and data consistency during failure
• Monitoring broker health and recovery
• Implementation of broker failover detection in C++
2. Understanding Broker Failover in Kafka
2.1 What is a Kafka Broker?
A Kafka broker is a server that stores partitions and processes client requests.
Kafka brokers:
• Store partition data for different topics.
• Serve as leaders or followers for partitions.
• Replicate data for fault tolerance.
2.2 How Does Kafka Detect a Broker Failure?
Kafka brokers communicate with ZooKeeper to report their health.
• Each broker sends a heartbeat to ZooKeeper.
• If a broker fails to send heartbeats, it is marked as dead.
• The controller broker initiates a failover process.
3. What Happens When a Broker Fails?
3.1 Producer and Consumer Impact
• Producers: Can no longer send data to partitions hosted by the failed broker.
• Consumers: Cannot fetch messages from affected partitions.
3.2 Partition Leadership Transfer
• If the broker was a leader, a new leader is chosen from the in-sync replicas (ISR).
• If the broker was a follower, no immediate action is needed unless it was the only follower.
3.3 Data Replication and Consistency
• If the failed broker had partitions in ISR, data loss is avoided because other brokers have copies.
• If it was the only replica, data for those partitions becomes unavailable until recovery.
4. Kafka’s Failover Handling Mechanism
Kafka automatically handles broker failover in four key steps:
4.1 Step 1: Failure Detection
Kafka detects a broker failure using ZooKeeper:
• ZooKeeper tracks broker heartbeats.
• If a broker stops responding, it is marked as down.
4.2 Step 2: Leader Election for Affected Partitions
• The Kafka controller broker assigns a new leader from the ISR list.
• If no ISR exists, the partition remains unavailable until the broker recovers.
4.3 Step 3: Producer and Consumer Rebalancing
• Producers reconnect to the new leader broker.
• Consumers update their metadata to fetch from the new leader.
4.4 Step 4: Broker Recovery and Reassignment
• The failed broker restarts and rejoins the cluster.
• It replicates missing data from the leader before rejoining ISR.
5. Simulating Broker Failure and Recovery
5.1 Checking Current Partition Leadership
To view which broker is leading each partition:
bashCopyEditbin/kafka-topics.sh --describe --topic orders --zookeeper localhost:2181
Example output:
yamlCopyEditTopic: orders Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: orders Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: orders Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
5.2 Stopping a Broker to Simulate Failure
Stop Broker 1, which is the leader of Partition 0:
bashCopyEditbin/kafka-server-stop.sh config/server-1.properties
5.3 Checking the New Leader
After broker failure, run:
bashCopyEditbin/kafka-topics.sh --describe --topic orders --zookeeper localhost:2181
Output:
yamlCopyEditTopic: orders Partition: 0 Leader: 2 Replicas: 1,2 Isr: 2
• Kafka has automatically elected Broker 2 as the new leader.
5.4 Restarting the Failed Broker
bashCopyEditbin/kafka-server-start.sh config/server-1.properties
Once the broker is restarted, it will sync data and rejoin the ISR.
6. Monitoring Broker Failover in Kafka
6.1 Using Kafka Metrics
Kafka provides metrics for broker health monitoring:
bashCopyEditjps | grep Kafka
This checks if Kafka brokers are running.
bashCopyEditbin/kafka-consumer-groups.sh --describe --group my-group --bootstrap-server
localhost:9092
This shows which brokers are handling consumer requests.
6.2 Enabling Broker Monitoring with Prometheus & Grafana
Kafka can be monitored using Prometheus and Grafana dashboards for real-time failover detection.
7. Implementing Broker Failover Detection in C++
We can use librdkafka to monitor Kafka broker health in C++.
7.1 Detecting Broker Failure Using C++
cppCopyEdit#include <iostream>
#include <rdkafka.h>
void monitor_broker_failover(rd_kafka_t *rk) {
const rd_kafka_metadata_t *metadata;
if (rd_kafka_metadata(rk, 0, nullptr, &metadata, 5000) == RD_KAFKA_RESP_ERR_NO_ERROR) {
std::cout << "Kafka Cluster Metadata:\n";
for (int i = 0; i < metadata->broker_cnt; ++i) {
std::cout << "Broker ID: " << metadata->brokers[i].id
<< " Host: " << metadata->brokers[i].host
<< " Port: " << metadata->brokers[i].port << std::endl;
}
} else {
std::cerr << "Error fetching metadata. Possible broker failure!" << std::endl;
}
rd_kafka_metadata_destroy(metadata);
}
int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, nullptr, 0);
while (true) {
monitor_broker_failover(rk);
sleep(5); // Check broker health every 5 seconds
}
rd_kafka_destroy(rk);
return 0;
}
7.2 How It Works?
• The program fetches Kafka metadata and prints broker details.
• If metadata retrieval fails, it indicates a possible broker failure.
• The loop runs every 5 seconds to monitor broker status.
8. Best Practices for Handling Broker Failures
Replication and ISR Management
• Ensure replication factor ≥ 2 to avoid data loss.
• Use min.insync.replicas to enforce ISR quorum.
Automatic Leader Election
• Kafka automatically elects a new leader using the controller broker.
• Tune leader election parameters to optimize failover speed.
Monitoring and Alerting
• Use Prometheus, Grafana, and Kafka metrics to monitor broker health.
• Set up alerts for broker downtime and ISR shrinkage.
Graceful Shutdown and Restart
• Always shutdown brokers gracefully using:
bashCopyEditbin/kafka-server-stop.sh
• Restart brokers and allow time for them to sync with leaders.
1. Introduction to Controller Election
Kafka operates as a distributed system that requires a dedicated controller broker to manage cluster-
wide administrative tasks. Controller election is the process by which a single Kafka broker is chosen to
act as the controller and oversee the cluster’s metadata, leader election, and broker failovers.
1.1 Why is Controller Election Important?
• Ensures only one broker manages metadata updates.
• Manages leader election for partitions.
• Handles broker failover and recovery.
• Maintains cluster stability and fault tolerance.
1.2 Overview of Kafka Controller Responsibilities
The controller broker is responsible for:
Leader Election: Assigning a new leader when a broker fails.
Broker Registration & Deregistration: Tracking available brokers.
Partition Reassignment: Balancing partitions across brokers.
Cluster State Management: Ensuring data consistency.
2. How Controller Election Works in Kafka
2.1 Controller Election Process
Kafka controller election happens through ZooKeeper, a distributed coordination service.
Step 1: Broker Registration in ZooKeeper
When a Kafka broker starts, it registers itself in ZooKeeper:
bashCopyEditzookeeper-shell.sh localhost:2181 create /brokers/ids/1 ""
Each broker gets a unique ID in /brokers/ids/.
Step 2: Brokers Compete for the Controller Role
• Each broker attempts to create the /controller node in ZooKeeper.
• The first broker to succeed becomes the controller.
• Other brokers monitor this node to detect changes.
Step 3: Controller Assignment
The winning broker stores its broker ID in /controller:
bashCopyEditzookeeper-shell.sh localhost:2181 get /controller
Example output:
jsonCopyEdit{"version":1,"brokerid":1,"timestamp":"1700000000"}
Step 4: Controller Failure Handling
• If the controller crashes, ZooKeeper removes the /controller node.
• Remaining brokers compete to create a new /controller node.
• A new controller is elected automatically.
3. Simulating Controller Election in Kafka
3.1 Checking the Current Controller
To find the current controller broker, run:
bashCopyEditbin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 --command-config
config.properties
Example output:
scssCopyEditBroker 1 (Controller)
Broker 2
Broker 3
Here, Broker 1 is the active controller.
3.2 Forcing a Controller Election
To trigger controller election, stop the current controller broker:
bashCopyEditbin/kafka-server-stop.sh config/server-1.properties
Then, check the new controller:
bashCopyEditbin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
Kafka will automatically assign a new controller from the available brokers.
4. Monitoring Controller Changes
4.1 Using ZooKeeper to Monitor Controller Events
Check the current controller ID in ZooKeeper:
bashCopyEditzookeeper-shell.sh localhost:2181 get /controller
4.2 Enabling Kafka Metrics for Controller Status
Kafka exposes controller metrics through JMX:
bashCopyEditjconsole
Look for:
pgsqlCopyEditkafka.controller:type=KafkaController,name=ActiveControllerCount
A value of 1 means a controller is active.
5. Implementing Controller Election Detection in C++
We can use ZooKeeper API in C++ to detect controller election.
5.1 Installing ZooKeeper C++ Client
First, install zookeeper-client-cpp:
bashCopyEditsudo apt install libzookeeper-mt-dev
5.2 C++ Code to Detect Kafka Controller Election
cppCopyEdit#include <iostream>
#include <zookeeper/zookeeper.h>
void controller_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
if (type == ZOO_CHANGED_EVENT || type == ZOO_CREATED_EVENT) {
std::cout << "Controller election event detected!" << std::endl;
}
}
void get_controller_id(zhandle_t *zh) {
char buffer[64];
int buffer_len = sizeof(buffer);
if (zoo_get(zh, "/controller", 0, buffer, &buffer_len, nullptr) == ZOK) {
buffer[buffer_len] = '\0';
std::cout << "Current Kafka Controller: " << buffer << std::endl;
} else {
std::cout << "No active controller found!" << std::endl;
}
}
int main() {
zhandle_t *zh = zookeeper_init("localhost:2181", controller_watcher, 30000, 0, nullptr, 0);
if (!zh) {
std::cerr << "Failed to connect to ZooKeeper!" << std::endl;
return -1;
}
while (true) {
get_controller_id(zh);
sleep(5); // Check every 5 seconds
}
zookeeper_close(zh);
return 0;
}
5.3 How It Works?
Connects to ZooKeeper and watches the /controller node.
Prints the active controller broker ID every 5 seconds.
Detects controller changes in real-time.
Compile and run the program:
bashCopyEditg++ controller_watcher.cpp -o controller_watcher -lzookeeper_mt
./controller_watcher
6. Best Practices for Kafka Controller Election
Distribute Brokers Evenly Across Nodes
• Deploy Kafka brokers on separate physical machines to avoid single points of failure.
Tune ZooKeeper Timeout Settings
Modify zookeeper.session.timeout.ms in server.properties to reduce failover time:
propertiesCopyEditzookeeper.session.timeout.ms=18000
Enable Automatic Controller Detection
Use Prometheus and Grafana dashboards to monitor controller status in real-time.
Enable Unclean Leader Election Carefully
Set unclean.leader.election.enable=false to prevent data loss.