Kafka in Depth: A Practical Approach to Low-Level Design of Ecosystem and Architecture of Data Streaming Systems
Apache Kafka is a distributed streaming platform that enables you to publish, subscribe, store, and process streams of data in real time. It is widely used for building scalable, reliable, and high-performance data pipelines, microservices, and event-driven applications.
Kafka is designed to handle periodic large data loads from offline systems as well as traditional messaging use-cases with low-latency. It is fault-tolerant for node failures through replication and leadership election, making it similar to message-oriented middleware (MOM) such as IBM MQSeries, JMS, ActiveMQ, and RabbitMQ. However, Kafka’s design is more like a distributed database transaction log than a traditional messaging system. Unlike many MOMs, Kafka replication was built into the low-level design and is not an afterthought.
Persistence
Apache Kafka, a distributed streaming platform, relies heavily on the file system for storing and caching messages. Kafka logs are write-only structures, meaning data gets appended to the end of the log, and sequential reads and writes are fast, predictable, and heavily optimized by operating systems. Using hard disk drives (HDD), sequential disk access can be faster than random memory access and solid-state drives (SSD). Although JVM garbage collection overhead can be high, Kafka leans on the operating system (OS) a lot for caching, which is big, fast, and rock-solid. Modern operating systems use all available main memory for disk caching, and OS file caches are almost free and don’t have the overhead of the OS. Kafka relies on the rock-solid OS for cache coherence, which reduces the number of buffer copies. Since Kafka disk usage tends to do sequential reads, the OS read-ahead cache is impressive. Cassandra, Netty, and Varnish use similar techniques. Kafka’s documentation explains all of this well, and there is a more entertaining explanation at the Varnish.
Kafka, like other modern messaging systems such as Cassandra, LevelDB, and RocksDB, utilizes a log-structured storage and compaction approach instead of an on-disk mutable BTree. This method favors long sequential disk access for reads and writes, which is highly efficient for modern disks that have unlimited space and are incredibly fast. This allows Kafka to provide features not typically found in messaging systems, such as the ability to hold onto old messages for a long time. This flexibility opens up a world of possibilities for Kafka, making it an ideal choice for tech-oriented audiences looking to build innovative applications.
The log-structured storage engine used by Kafka has several advantages over traditional databases. For example, it is highly optimized for write-heavy applications, allowing for fast and efficient writes to disk.
Additionally, each data segment can be compressed better because each segment is closed, which reduces fragmentation and eliminates the need for space overhead.
Producer
Load balancer
The producer client can request metadata from the Kafka broker to determine which broker has which topic partitions leaders, eliminating the need for a routing layer. This leadership data allows the producer to send records directly to the Kafka broker partition leader.
Additionally, the producer client has control over which partition it publishes messages to and can choose a partition based on application logic. Producers can partition records by key, round-robin, or use a custom application-specific partitioner logic.
Record batching
By batching records together into fewer requests, the producer can improve network I/O throughput and speed up overall throughput. Batching can be configured based on the size of records in bytes, and batches can be auto-flushed based on time. Buffering is configurable and allows for a tradeoff between additional latency for better throughput. In heavily used systems, buffering can lead to better average throughput and reduced overall latency. By allowing the accumulation of more bytes to send, batching can reduce the number of I/O operations on Kafka brokers and increase compression efficiency.
To achieve higher throughput, Kafka producer configuration allows for buffering based on time and size. The producer sends multiple records as a batch with fewer network requests than sending each record one by one.
However, it is important to find the right balance between batch size and buffer memory. Small batch sizes can reduce throughput, while very large batch sizes can lead to memory inefficiency.
Durability and acknowledgement
The producer can send with no acknowledgments or can send with just get one acknowledgment from the partition leader, or can send and wait on acknowledgments from all replicas, which is the default.
Kafka now supports exactly once delivery from the producer, performance improvements, and atomic write across partitions. They achieve this by the producer sending a sequence id, and the broker keeps track if the producer already sent this sequence. If the producer tries to send it again, it gets an ack for duplicate message, but nothing is saved to log. This improvement requires no API change.
Another improvement to Kafka is the Kafka producers having atomic write across partitions. The atomic writes mean Kafka consumers can only see committed logs (configurable). Kafka has a coordinator that writes a marker to the topic log to signify what has been successfully transacted. The transaction coordinator and transaction log maintain the state of the atomic writes. The atomic writes do require a new producer API for transactions.
Compression
A common challenge for large-scale streaming platforms is not CPU or disk performance, but network bandwidth. This is especially true in the cloud, where containerized and virtualized environments may share a NIC (Network interface controller) card with multiple services. Network bandwidth can also limit data transfer between datacenters or over WAN.
Batching can help improve compression and network IO efficiency.
Kafka leverages end-to-end batch compression to optimize streaming performance. Instead of compressing each record individually, Kafka compresses a whole batch of records at once. The compressed batch can then be sent to the Kafka broker/server in one shot and stored in the log partition in its compressed form. Kafka also allows configuring the compression so that the records are not decompressed until they reach the consumer and also supports various compression protocols, such as gzip, snappy, and lz4.
Pull vs Push
Kafka is a pull-based system that allows consumers to fetch data from brokers at their own pace. This is different from other systems where brokers push data or stream data to consumers, which are called push-based or streaming systems. Some examples of push-based systems are Scribe, Flume, Reactive Streams, RxJava, and Akka.
One of the advantages of a pull-based system is that it can handle data batching more efficiently. Kafka uses a long poll mechanism, which means that it keeps a connection open for a period of time after a request and waits for data to become available. This reduces the latency and overhead of frequent requests.
However, a pull-based system also has some drawbacks. For instance, there is always a delay between requesting data and receiving it, which can affect the responsiveness of the system. Also, if a consumer falls behind, it has to catch up later by processing more data, which can cause performance issues.
On the other hand, a push-based system can deliver data to consumers as soon as it is produced, or in batches based on some criteria (such as back pressure). This can improve the throughput and responsiveness of the system. However, a push-based system also faces some challenges. For example, how to deal with slow or dead consumers that cannot keep up with the data rate. Some push-based systems use a back-off protocol that allows consumers to signal when they are overwhelmed and need to slow down the data flow. Another challenge is how to track message acknowledgments and ensure data delivery in case of consumer failures. This can introduce complexity and overhead in the system.
Kafka avoids these problems by using a pull-based system that gives consumers more control and flexibility over their data consumption.
In many MOM systems, the broker has to monitor which messages are consumed by the clients. This is a complex task that involves keeping track of the state of each message. MOM systems aim to delete the data as soon as possible after consumption. This is because most MOM systems were designed when disk space was scarce, costly, and less efficient. However, monitoring the messages is not straightforward (acknowledgment feature), as brokers have to store a lot of information for each message, such as whether it was sent, acknowledged, deleted, or resent.
In Kafka topics are split into ordered partitions, where each message has a specific offset. Unlike MOM, where each message is tracked individually, Kafka only needs to store the offset of each consumer group and partition pair. This reduces the amount of data to track significantly. The consumers periodically send their location data (consumer group and partition offset) to the Kafka broker, which stores it in a special offset topic. This offset-based acknowledgment system is much more efficient and flexible than MOM. It also allows consumers to rewind to an earlier offset and replay the topic. This is a powerful feature of Kafka, as it can retain topic log data for a long time.
Message Delivery
Message delivery semantics define how messages are handled by the producer and the consumer in a distributed system. There are three main types of message delivery semantics:
- at most once: messages can be dropped, but never duplicated
- at least once: messages are guaranteed to be delivered, but may be repeated
- exactly once: messages are delivered exactly one time, with no loss or duplication
Exactly once is the ideal scenario, but it comes with a higher cost and complexity, as it requires more coordination and tracking between the producer and the consumer.
Imagine that all replicas are clones of the same log partitions with the same offsets and the consumer groups track their progress in the log per topic partition.
To achieve at-most-once delivery, the consumer reads a message, then updates its offset in the partition by sending it to the broker, and finally processes the message. The downside of it is that a consumer could fail after updating its position but before processing the message. Then the consumer that takes over or restarts would skip over the last position and the message in question is never processed.
To achieve at-least-once delivery, the consumer reads a message, processes it, and then updates its offset to the broker. The downside of it is that a consumer could crash after processing a message but before updating its position. Then if the consumer restarts or another consumer takes over, the consumer could receive the message that was already processed. This approach is the most common setup for messaging, and it is your responsibility to make the messages idempotent, which means getting the same message twice will not cause a problem (two debits).
To achieve exactly once delivery on the consumer side, the consumer would need a two-phase commit between storage for the consumer position, and storage of the consumer’s message process output. Or, the consumer could store the message process output in the same location as the last offset.
Kafka offers the first two, and it is up to you to implement the third from the consumer perspective.
Replication
Kafka distributes each topic’s partitions among a flexible number of Kafka brokers. Kafka’s replication model is built-in, unlike most MOMs that add it as an afterthought. Kafka was designed to handle partitions and multi-nodes from the beginning. Each topic partition has one leader and multiple followers.
Leaders and followers are called replicas. A replication factor is the number of replicas for a partition. Partition leadership is balanced among Kafka brokers. Consumers only read from the leader. Producers only write to the leader.
The topic log partitions on followers are synchronized with the leader’s log, ISRs are identical to the leaders except for the records that are being replicated. Followers fetch records in batches from their leader like a normal Kafka consumer.
Log partitions
A Kafka partition is more than a replicated log. It’s a building block for distributed data systems based on state machines. A replicated log enables different systems to agree on an ordered sequence of values.
As long as the leader is alive, the followers just replicate the values and order from it. If the leader dies, Kafka picks a new leader from the in-sync followers. If a producer gets a confirmation for a message, and then the leader fails, the new leader must have that message.
The more ISRs (in-sync replicas) you have, the more choices you have for a new leader.
Broker
Kafka ensures that all the Kafka brokers are active and responsive. To achieve this, a Kafka broker must keep a Zookeeper session alive using Zookeeper’s heartbeat mechanism and must sync all of its followers with the leaders without any significant lag.
Both the Zookeeper session and the sync status are essential for broker liveness, which is also known as being in-sync. An in-sync replica is abbreviated as ISR. Each leader maintains a set of “in sync replicas”.
If an ISR/follower fails, lags behind, then the leader will exclude the follower from the ISR set. Lagging behind means that a replica is out of sync after the replica.lag.time.max.ms duration.
A message is deemed “committed” when all ISRs have applied the message to their log. Consumers only access committed messages. Kafka guarantee: a committed message will not be lost, as long as there is at least one ISR.
Quorum
To ensure availability, Kafka needs a quorum of replicas that agree on the leader and the log. Unlike most systems that use a simple majority vote, Kafka selects leaders based on having a complete log. For example, if we have a replication factor of 3, the leader will only commit a message after at least two replicas (including itself) have received it. This way, if a new leader is elected, it will have all the committed messages, as long as there are no more than 3 failures.
Kafka maintains a set of in-sync replicas (ISR) for each leader. Only the replicas in the ISR can become leaders or acknowledge writes. The ISR is updated whenever a replica joins or leaves the set, and this information is stored in ZooKeeper. This allows Kafka to tolerate failures without losing data, as long as there is at least one replica in the ISR.
However, if all the replicas in the ISR die at once, Kafka may lose data or choose an unclean leader. An unclean leader is a replica that does not have all the committed messages, but is elected as the leader anyway. This can happen if all the replicas for a partition are down, and Kafka decides to favor availability over consistency (this is the default behavior, but it can be changed by setting a flag for unclean leader election). Therefore, Kafka’s guarantee about data loss only holds if at least one replica in the ISR is alive.
Conclusion
Kafka is a powerful distributed system designed for streams. It provides a universal pipeline for data that can be used to build rich real-time applications. Designing a low-level architecture of data streaming systems requires a deep understanding of the Kafka ecosystem and its components.