Rescalable State in Apache Flink: The Key to Unlocking High-Performance Stream Processing
Apache Flink is a distributed stream processing framework that supports stateful computations over unbounded and bounded data streams. Flink provides a rich set of APIs and libraries for various use cases, such as event-driven applications, complex event processing, machine learning, analytics and more.
One of the key features of Flink is its support for rescalable state, which means that Flink can dynamically adjust the parallelism of a stateful operator without losing any state information. This allows Flink to handle varying workloads and resource availability, as well as to perform upgrades and maintenance without downtime.
State in stream processing can be thought of as the memory of operators that retains information about previous inputs and can influence the processing of future inputs. On the other hand, stateless stream processing operators only take into account their current inputs, without any additional context or knowledge of past events.
Apache Flink is a distributed system that enables large-scale, stateful stream processing through massive parallelism. To achieve scalability, a Flink job is broken down into a graph of operators, with each operator’s execution being further divided into multiple parallel instances. Each parallel instance of an operator in Flink is considered an independent task that can be assigned to its own machine within a cluster of network-connected, shared-nothing machines.
To ensure high throughput and low latency, network communication between tasks must be kept to a minimum. In Flink, network communication for stream processing only occurs along the logical edges of the job’s operator graph (vertically) to transfer stream data between upstream and downstream operators. There is no communication between parallel instances of an operator (horizontally).
Data locality is a key principle in Flink to avoid network communication and strongly influences how state data is stored and accessed. All state data in Flink is bound to the task running the corresponding parallel operator instance and is located on the same machine as the task.
This design ensures that all state data for a task is local and no network communication between tasks is required for state access. This is crucial for the scalability of a massively parallel distributed system like Flink.
Flink distinguishes between two types of state for stateful stream processing: operator state and keyed state. Operator state is specific to each parallel instance of an operator (sub-task), while keyed state can be thought of as “operator state that has been partitioned or sharded, with one state-partition per key”.
Re-scaling state in Flink
Flink manages the state of each operator in a distributed way, by partitioning it into chunks called state backends. Each state backend is assigned to one parallel instance of the operator, called a subtask. The state backends are stored either in memory or on disk, depending on the configuration.
Flink also periodically checkpoints the state backends to a durable storage system, such as HDFS or S3. Checkpoints are consistent snapshots of the state at a given point in time, which can be used to restore the state in case of failures.
Rescalable state is based on two main concepts: state handles and key groups. A state handle is an abstract representation of a state backend, which can be used to access or transfer the state data. A key group is a logical partition of the key space of a keyed operator, such as a map or a reduce function. Each key group is assigned to one subtask, and each subtask can handle multiple key groups.
When Flink needs to rescale a stateful operator, it performs the following steps:
- triggers a checkpoint of the current state
- computes a new parallelism for the operator, based on the workload and resource availability
- reassigns the key groups to the new subtasks, using a consistent hashing algorithm
- transfers the state handles from the old subtasks to the new subtasks, using the checkpoint coordinator
- restores the state from the state handles to the new subtasks, using the state backends
- resumes the processing from the checkpointed position
Rescaling stateful stream processing jobs
Altering the parallelism, or the number of parallel subtasks that perform work for an operator, in stateless streaming is a simple process. It involves starting or stopping parallel instances of stateless operators and connecting or disconnecting them to or from their upstream and downstream operators.
In contrast, altering the parallelism of stateful operators is more complex because it also involves redistributing the previous operator state in a consistent and meaningful manner. In Flink’s shared-nothing architecture, all state is local to the task that runs the parallel operator instance that owns it, and there is no communication between parallel operator instances during job runtime.
Flink’s checkpointing mechanism allows for the exchange of operator state between tasks in a consistent manner with exactly-once guarantees. Flink’s checkpoints are detailed in the documentation. Essentially, a checkpoint is initiated when a checkpoint coordinator introduces a special event, known as a checkpoint barrier, into a stream.
Checkpoint barriers travel downstream with the event stream from sources to sinks. When an operator instance receives a barrier, it immediately snapshots its current state to a distributed storage system such as HDFS.
Upon restoration, the new tasks for the job, which may now be running on different machines, can retrieve the state data from the distributed storage system.
Stateful jobs can be rescaled through checkpointing. The process involves triggering a checkpoint and sending it to distributed storage. The job is then restarted with altered parallelism and can access a consistent snapshot of all prior state from distributed storage.
Reassigning Operator State
A typical use case for operator state in Flink is to keep track of the current offsets for Kafka partitions in Kafka sources. Each instance of a Kafka source maintains pairs of <PartitionID, Offset> as operator state, with one pair for each Kafka partition being read. So, how is this operator state redistributed during rescaling? Ideally, all <PartitionID, Offset> pairs from the checkpoint would be reassigned in a round-robin fashion across all parallel operator instances after rescaling.
During snapshotting, each operator instance returned an object representing its entire state. For a Kafka source, this object was a list of partition offsets.
This snapshot object was then stored in a distributed store. Upon restore, the object was retrieved from distributed storage and passed to the operator instance as an argument to the restore function.
This approach had issues when it came to rescaling: how could Flink decompose the operator state into meaningful, redistributable partitions? Even though the Kafka source’s state was always a list of partition offsets, Flink saw the previously returned state object as a black box and therefore couldn’t redistribute it.
To address this black box issue, Flink made a slight modification to the checkpointing interface, called ListCheckpointed. The new checkpointing interface, which returns and accepts a list of state partitions. By introducing a list instead of a single object, the meaningful partitioning of state is made explicit: each item in the list is still a black box to Flink but is considered an atomic part of the operator state that can be independently redistributed.
Reassigning Keyed State
Flink has two types of state, one of which is keyed state. Unlike operator state, keyed state is defined by key, extracted from each event in the stream.
Keyed state is only available for keyed streams, created through Flink’s keyBy() operation. This operation specifies how to extract a key from each event and ensures that events with the same key are processed by the same parallel operator instance. As a result, all keyed state is also bound to one parallel operator instance.
Keyed state has an advantage over operator state when it comes to rescaling. We can easily split and redistribute the state across parallel operator instances by following the partitioning of the keyed stream. After rescaling, the state for each key must be assigned to the operator instance responsible for that key, as determined by the hash partitioning of the keyed stream.
However, there is still a practical problem: how do we efficiently transfer the state to the subtasks’ local backends? When not rescaling, each subtask can simply read the whole state from the checkpoint in one sequential read. When rescaling, this is no longer possible.
One approach is to read all previous subtask state from the checkpoint and filter out matching keys for each sub-task. This approach benefits from a sequential read pattern but can result in reading irrelevant data and a large number of parallel read requests.
Another approach is to build an index that tracks the location of each key’s state in the checkpoint. This approach avoids reading irrelevant data but has two major downsides: a materialized index can grow very large and it can introduce a large amount of random I/O.
Flink’s approach is in between these two extremes by introducing key-groups as the atomic unit of state assignment. The number of key-groups must be determined before starting the job and cannot be changed afterwards. As key-groups are the atomic unit of state assignment, this also means that the number of key-groups is the upper limit for rescaling.
Benefits of re-scalable state
- Scalability: Rescalable state allows Flink to dynamically adjust the parallelism of a stateful operator, without losing any state information. This enables Flink to handle varying workloads and resource availability, by scaling up or down as needed
- Reliability: Rescalable state allows Flink to perform upgrades and maintenance without downtime, by rescaling the operators to a different set of machines or containers. This also reduces the risk of data loss or corruption, by minimizing the exposure to failures
- Efficiency: Rescalable state allows Flink to optimize the resource utilization and performance of a stateful operator, by balancing the load across the subtasks and minimizing the network overhead
Common re-scalable state challenges for stream processing applications
- Complexity: Rescalable state requires careful design and implementation of the operators and their states, to ensure that they are compatible with rescaling. For example, some operators may have dependencies or assumptions on their parallelism or key distribution, which may need to be revised or relaxed for rescaling
- Overhead: Rescalable state introduces some overhead for checkpointing and transferring the states, which may affect the latency and throughput of the application. For example, some states may be large or complex, which may take longer to checkpoint or transfer
- Consistency: Rescalable state requires coordination and synchronization among the subtasks and the checkpoint coordinator, to ensure that they agree on the checkpoint position and the key group assignment. This may introduce some consistency trade-offs or challenges for some applications
How to leverage re-scalable state
- State backend: You need to choose a state backend that supports rescalable state, such as the RocksDB state backend or the Heap state backend. You also need to configure the state backend parameters, such as the memory size, the checkpoint interval, the checkpoint directory and the checkpoint mode
- Keyed operator: You need to implement a keyed operator that can handle rescalable state, such as a map or a reduce function. You also need to define a key extractor and a key serializer for your operator, to enable Flink to partition and serialize the keys and the states
- Rescaling strategy: You need to define a rescaling strategy for your operator, such as a fixed or a dynamic parallelism, or a custom parallelism based on your own logic. You also need to specify the rescaling trigger and the rescaling factor for your operator, to control when and how Flink rescales your operator
Conclusion
Rescalable state is a powerful feature of Apache Flink that can enable you to build scalable, reliable and efficient stream processing applications. However, it also requires careful design and implementation of your operators and their states, as well as a clear understanding of the benefits and challenges of rescaling.