Kafka in the Cloud: Why it’s 10x better with Confluent | Find out more
The Apache Flink® community released Apache Flink 1.20 this week. In this blog post, we'll highlight some of the most interesting additions and improvements. You’ll find a comprehensive rundown of all of the updates in the official release announcement.
Recent Flink releases have emphasized improving Flink’s usability, not only for developers, but also for those operating Flink clusters, and this theme continues in this latest release. Meanwhile, Flink 1.20 is expected to be the last minor release before Flink 2.0, and so this minor release brings to a conclusion the community’s efforts to deprecate APIs that should be removed or replaced in the upcoming major release, and to clean up a number of configuration options.
With Flink 1.20, we are again seeing a number of improvements in Flink SQL. Besides the general set of performance enhancements and stability fixes, the community is proud to present two headline features that should simplify interactions with tables in Flink going forward:
FLIP-376 introduces a unified way to specify a bucketing strategy for a Flink SQL table. Bucketing is another data layout mechanism in addition to the already long-supported partitioning (FLIP-63). In contrast to partitioning, with bucketing users can control the number of created groups (number of buckets), and define the data distribution strategy (e.g., hash).
Previously, bucketing could only be configured via Flink’s WITH
clause, and every connector implemented its own syntax for defining the bucketing.
Let’s take a look at a Flink SQL Kafka Table DDL.
Before FLIP-376:
After FLIP-376:
When users start with Flink SQL, they often have difficulties understanding the freshness of their data and the related cost implications for inserting data into a table. Either users define a streaming ingestion that offers the lowest latency, and runs continuously (which implies costly infrastructure running all the time), or they insert new data with a Flink batch job, which is potentially not adhering to data freshness requirements (e.g., SLOs).
FLIP-435 offers users the possibility to directly specify data freshness requirements in Flink itself without requiring an external system to manage it. This is achieved through the new concept of a MATERIALIZED
table that is automatically refreshed in the background.
After the DDL is executed, Flink either starts a streaming job to fulfill the freshness requirement, or schedules a batch job for periodic execution. The decision is made based on the new configuration materialized-table.refresh-mode.freshness-threshold
. If the specified freshness is below the threshold, the table is continuously updated with a streaming job, or if it is larger, Flink schedules a periodic batch job.
FLIP-380 adds support for computing aggregations on non-keyed streams in parallel. This feature can only be used in batch mode, and brings to the DataStream API a feature that was until now only readily available with the DataSet API. This was needed because the DataSet API has been deprecated, and will be removed in Flink 2.0.
As part of Flink’s recovery story, Flink writes periodic metadata (checkpoints) to persistent storage (e.g., blob storage), allowing it to recover in case of failure. A checkpoint consists of many files. Over the years, multiple different state file types have been added, leading to a growing number of separate files that a Flink job has to write to blob storage for a single checkpoint.
For large jobs (those with high parallelism and many operators), the number of files created during checkpoint can cause a significant load on the underlying blob storage. When using a cloud solution (e.g., S3, ADLS, etc.), the load directly translates to more requests and hence higher costs. When using self-hosted blob storage (e.g., HDFS, Minio), it translates into frequent overloads.
With FLIP-306, Flink now offers multiple configurations to control the upload behavior of the state, in an attempt to minimize the number of files (e.g., state.checkpoints.file-merging
to enable the feature, and state.checkpoints.file-merging.max-file-size
to control the maximum checkpoint file size).
In contrast to prior solutions, the new configuration applies to all types of state files, and is applicable to Flink state in general.
The Flink community has also fixed a longstanding issue with the RocksDB state backend (FLINK-26050).
In rare situations, for example when using Flink processing-time windows that create many small non-overlapping RocksDB SST files, the continuously growing number of local SST files can eventually cause instabilities like FLINK-34430.
To avoid relying solely on RocksDB’s auto-compaction, Flink now scans the local files periodically, and triggers a manual compaction when too many small files are detected.
Flink has long had support for recovering streaming jobs from JobMaster failures by restarting from checkpoints, but batch recovery has relied on restarting jobs from the beginning. This has meant that all previous progress is lost.
FLIP-383 addresses this by making it possible to recover the state from all tasks that had finished before the JobMaster failed.
The community is doing its final preparations to prepare for Flink 2.0:
Roughly 30 other configuration options that are no longer useful have been deprecated, and some others have been reorganized, most notably the options concerning checkpoints, savepoints, state, and state recovery.
We would like to express gratitude to all of the contributors who made this release possible:
Ahmed Hamdy, Alan Sheinberg, Aleksandr Pilipenko, Alexander Fedulov, Andrey Gaskov, Antonio Vespoli, Anupam Aggarwal, Barak Ben-Nathan, Benchao Li, Brad, Cheng Pan, Chesnay Schepler, DamonXue, Danny Cranmer, David Christle, David Moravek, David Schlosnagle, Dawid Wysakowicz, Dian Fu, Dmitriy Linevich, Elphas Toringepi, Emre Kartoglu, Fang Yong, Feng Jin, Ferenc Csaky, Frank Yin, Gabor Somogyi, Gyula Fora, HCTommy, Hangxiang Yu, Hanyu Zheng, Hao Li, Hong Liang Teoh, Hong Teoh, HuangXingBo, Jacky Lau, James Hughes, Jane Chan, Jeyhun Karimov, Jiabao Sun, Jim Hughes, Jing Ge, Jinzhong Li, JunRuiLee, Juntao Hu, JustinLee, Kartikey Pant, Kumar Mallikarjuna, Leonard Xu, Lorenzo Affetti, Luke Chen, Martijn Visser, Mason Chen, Matthias Pohl, Mingliang Liu, Panagiotis Garefalakis, Peter Huang, Peter Vary, Piotr Nowojski, Puneet Duggal, Qinghui Xu, Qingsheng Ren, Ravi Dutt Singh, Robert Metzger, Robert Young, Roc Marshal, Roman, Roman Boyko, Roman Khachatryan, Ron, Rui Fan, Ryan Skraba, Samrat, Sergey Nuyanzin, Shilun Fan, Stefan Richter, SuDewei, Timo Walther, Ufuk Celebi, Vincent Woo, Wang FeiFan, Weijie Guo, Wencong Liu, Wouter Zorgdrager, Xiangyu Feng, Xintong Song, Xuyang, Yanfei Lei, Yangze Guo, Yu Chen, Yubin Li, Yuepeng Pan, Yun Tang, Yuxin Tan, Zakelly, Zhanghao Chen, Zhen Wang, Zhenqiu Huang, Zhu Zhu, Zmm, ammar-master, anupamaggarwal, bvarghese1, caicancai, caodizhou, chenzihao, drymatini, dsaisharath, eason.qin, elon-X, fengli, gongzhongqiang, hejufang, jectpro7, jiangxin, liming.1018, lincoln lee, liuyongvs, lxliyou001, oleksandr.nitavskyi, plugatarev, rmoff, slfan1989, spoon-lz, sunxia, sxnan, sychen, wforget, xiaogang, xingbo, yebukong, yunfengzhou-hub, yunhong, zhouyisha, 马越
Confluent Cloud for Apache Flink®️ supports AI model inference and enables the use of models as resources in Flink SQL, just like tables and functions. You can use a SQL statement to create a model resource and invoke it for inference in streaming queries.
Check out all the highlights from the Apache Flink® 1.19 release!