In-depth analysis of Flink SQL working mechanism

Posted Jun 27, 202016 min read

Author | Wu Chong(Yun Xie), Alibaba technical expert
Finishing | Chen Jingmin(Qing Yue), Alibaba technical expert

Abstract:This article is compiled from the Chinese version of Flink Forward 2020 Global Online Conference, shared by Apache Flink PMC Wu Chong(Yun Xie), and organized by community volunteer Chen Jingmin(Qing Yue). It is designed to help everyone better understand how the Flink SQL engine works. The article is mainly divided into the following four parts:

  1. Flink SQL Architecture
  2. How Flink SQL Works?
  3. Flink SQL Optimizations
  4. Summary and Futures

Tips:Click the link below to view the original video shared by the author~
https://ververica.cn/developers/flink-forward-virtual-conference/

The Apache Flink community has made a lot of optimizations at the architectural level for the future-oriented unified stream batch processing in the two most recent versions(1.9 & 1.10). One of the major transformations is the introduction of Blink Planner, which began to support the use of different SQL & Table APIs. Compile with SQL Planner(Planner plug-in).

This article will first introduce the thinking behind these optimizations, show how a unified architecture can better handle streaming and batch queries, and secondly analyze the Flink SQL compilation and optimization process in depth, including:

  1. Flink SQL uses Apache Calcite to translate SQL into relational algebraic expressions, using expression reduction(Expression Reduce), pushdown optimization(Predicate/Projection Pushdown) and other optimization techniques to generate a physical execution plan(Physical Plan), using Codegen technology to generate Execute code efficiently.
  2. Flink SQL uses an efficient binary data storage structure BinaryRow to accelerate computing performance; uses Mini-batch to save batches to improve throughput and reduce data jitter caused by Retraction during two-layer aggregation; optimization of data tilt processing and Top-N sorting in aggregation scenarios principle.

## Flink SQL Architecture & Blink Planner(1.9+)

1.1 Limitations of Old Planner

To understand the motivation for Flink SQL to introduce a new architecture in version 1.9, we first look at the architecture design before version 1.9.

1 old-arch.gif

As can be seen from the figure, although the Table API & SQL for users are unified, the streaming and batch tasks correspond to DataStreamAPI and DataSetAPI at the translation layer, and the execution plan must be obtained according to different APIs at the runtime level. The design of the layer makes the modules that can be reused in the entire architecture limited and not easy to expand.

At the beginning of the design, Flink followed the concept of "the special case of batch is flow", and it is the general trend to unify the flow and batch on the architecture. With the joint efforts of the community and Alibaba, version 1.9 introduced a new Blink Planner, using batch SQL processing as a special case of streaming SQL processing, trying to abstract and reuse common processing and optimization logic, through Flink's internal Stream Transformation The API realizes the unified processing of streams & batches, replacing the original Flink Planner's way of distinguishing between streams & batches.

In addition, the new architecture is compatible with the old version of Planner through a flexible plug-in method, users can choose. However, in version 1.11, Blink Planner will replace Old Planner as the default Planner to support the further integration of stream & batch(Old Planner will gradually withdraw from the historical stage afterwards).

2 future-arch.gif

The workflow summary of the Flink SQL engine is shown in the figure.

3 sql-engine-internal.png

As can be seen from the figure, a query SQL/a program written using TableAPI(hereinafter referred to as TableAPI code) from input to compilation into an executable JobGraph mainly goes through the following stages

  1. Convert SQL text/TableAPI code into a logical execution plan(Logical Plan)
  2. The Logical Plan is optimized into a physical execution plan(Physical Plan) by the optimizer
  3. After generating Transformations through code generation technology, it is further compiled into executable JobGraph and submitted for operation

This section will focus on the common optimization methods of Flink SQL optimizer and CodeGen to generate Transformations.

2.1 Logical Planning

The Flink SQL engine uses Apache Calcite SQL Parser to parse the SQL text into a lexical tree. SQL Validator obtains the metadata information in the Catalog for grammatical analysis and verification, and converts it into a relational algebra expression(RelNode), which is then converted by the Optimizer The logical execution plan for the initial state.

Note:TableAPI code uses TableAPI Validator to connect to the Catalog to generate a logic execution plan.

E.g.1 Consider the following SQL that expresses JOIN operation.

SELECT
  t1.id, 1 + 2 + t1.value AS v
FROM t1, t2
WHERE
  t1.id = t2.id AND
  t2.id <1000

After the above operations, a tree-structured logical execution plan is obtained. The root node corresponds to the uppermost Select statement, the leaf node corresponds to the TableScan operation of the input tables t1 and t2, and the Join and Where condition filters correspond to the Join and Filter nodes, respectively.

LogicalProject(id=[$0], v=[+(+(1, 2), $1)])
+- LogicalFilter(condition=[AND(=($0, $3), <($3, 1000))])
   +- LogicalJoin(condition=[true], joinType=[inner])
      :- LogicalTableScan(table=[[default_catalog, default, t1]])
      +- LogicalTableScan(table=[[default_catalog, default, t2]])

After visualization, as shown in the figure, this is the initial state where the optimizer starts to work.

4 join-example.png

The following introduces several common optimization methods of Flink SQL optimizer.

■ 2.1.1 Expression Reduce

Expression is the most common syntax in SQL. For example, t1.id is an expression, and 1 + 2 + t1.value is also an expression. The optimizer will recursively traverse the nodes in the tree during the optimization process and pre-calculate the value of each expression as much as possible. This process is called expression folding. This conversion is logically equivalent. After optimization, it is no longer necessary to calculate 1 + 2 for each record when it is actually executed.

5 expression-reduce.png

■ 2.1.2 PushDown Optimization

Pushdown optimization refers to pushing down the transform operation in the SQL statement as close as possible to the data source to obtain better performance while keeping the semantics of relational algebra unchanged. Common pushdown optimizations include predicate pushdown(Predicate Pushdown), projection pushdown(Projection Pushdown, sometimes translated as column cropping), etc.

  • Predicate Pushdown

Looking back at E.g.1, we found that the filter condition t2.id <1000 in the WHERE conditional expression describes the constraint on table t2, which has nothing to do with table t1, and can be pushed down to complete before the JOIN operation. Assuming that there are one million rows of data in table t2, but there are only 1,000 data satisfying id <1000, the amount of data reaching the JOIN node after optimization through predicate pushdown is reduced by 1,000 times, which greatly saves I/O overhead and improves JOIN performance.

Predicate Pushdown is a basic technique for optimizing SQL queries. The term predicate comes from mathematics, which refers to a function or expression that can derive a Boolean return value(TRUE/FALSE). By judging the Boolean value, you can perform data filter. Predicate pushdown refers to moving the Filter as close to the data source as possible(such as the SCAN phase of reading data) to keep the amount of data(number of records) queried and transferred while keeping the semantics of relational algebra unchanged.

6 filter-push-down.png

  • Projection Pushdown

Column pruning is a more intuitive way to describe Projection Pushdown. It refers to removing unused columns during the optimization process to reduce I/O overhead and improve performance. But unlike the predicate pushdown that only moves the position of the node, the projection pushdown may increase the number of nodes. For example, the last calculated projection combination should be placed on the TableScan operation, and there is no Projection node above the TableScan node, the optimizer will explicitly add a Projection node to complete the optimization. In addition, if the input table is based on column storage(such as Parquet or ORC, etc.), the optimization will continue to be pushed down to the Scan operation.

Looking back at E.g.1, we found that only the id and value fields of table t1 and the id field of table t2 were used in the entire query, and the Projection node was added above the TableScan node to remove the extra fields, which greatly saved I/O overhead.

7 projection-push-down.png

To summarize briefly, predicate pushdown and projection pushdown reduce I/O overhead and improve performance by avoiding processing unnecessary records and fields, respectively.

2.2 Physical Planning on Batch

After going through the above series of operations, we got the optimized logical execution plan. The logical execution plan describes the execution steps and the operations that need to be completed at each step, but does not describe the specific implementation of the operations. The physical execution plan considers the characteristics of physical implementation and generates specific implementations for each operation. For example, Join uses SortMergeJoin, HashJoin or BroadcastHashJoin. The optimizer will calculate the cost of each node in the entire tree when generating the logic execution plan. For nodes with multiple implementation methods(such as Join nodes), the optimizer will expand all possible Join methods and calculate separately. In the end, the implementation method with the smallest cost on the entire path was selected as the Final Physical Plan.

Recall E.g.1, when it is executed in batch mode, at the same time we can get Statistics information of the input table. After the aforementioned optimization, when the table t2 reaches the Join node, there are only 1,000 pieces of data, and the overhead of using BroadcastJoin is relatively low. The final Physical Plan is shown in the following figure.

8 batch-physical-plan.png

2.3 Translation & Code Generation

Code generation(Code Generation) is a widely used technology in the computer field. Code Generation was used from Physical Plan to Transformation Tree generation.

Recalling Eg1, taking the Calc node t2.id <1000 expression on the table t2 as an example, a section of Java code describing the Transformation Operator is generated after Code Generation, and the Row with id <1000 in the received Row is sent to the next An Operator.

9 code-gen.png

The Flink SQL engine translates the Physical Plan into Transformations through Code Generation, and then further compiles it into an executable JobGraph.

2.4 Physical Planning on Stream

The above describes the overall workflow of the Flink SQL engine. The above example is assumed to be compiled in batch mode. Let us introduce an important mechanism in the process of generating the physical plan when compiling in stream mode:Retraction Mechanism(aka. Changelog Mechanism).

10 physical-planning-stream.png

■ 2.4.1 Retraction Mechanism

Retraction is a mechanism for withdrawing early-firing(Early Firing) data in streaming data processing, similar to the Update operation of a traditional database. If there is no Retraction mechanism in complex SQL such as cascaded aggregation, the final calculation result will be different from batch processing, which is also a defect of many stream calculation engines in the industry.

E.g.2 Consider the following SQL for statistical word frequency distribution.

SELECT cnt, COUNT(cnt) as freq
FROM(
  SELECT word, COUNT(*) as cnt
  FROM words
  GROUP BY word)
GROUP BY cnt

Suppose the input data is:

SQL Table 1.jpg

After the above calculation, the expected output should be:

SQL Table 2.jpg

But unlike batch processing, streaming data arrives one by one. In theory, each data will trigger a calculation, so after processing the first Hello and the first World, the number of words with a word frequency of 1 has become After 2, when processing the second Hello, if the previous result cannot be corrected, Hello will be counted at the same time in the two windows with the word frequency equal to 1 and the word frequency equal to 2. Obviously this result is wrong, this is no Problems caused by the Retraction mechanism.

11 physical-planning-retraction-0.gif

A major contribution of Flink SQL in the field of stream computing is the specific implementation of this mechanism for the first time. The Retraction mechanism is also known as the Changelog mechanism, because to some extent Flink treats the input stream data as a database Changelog, and each input data can be regarded as a change operation to the database, such as Insert, Delete, or Update. Take the MySQL database as an example, the Binlog information is stored in a binary form, where Update_rows_log_event will correspond to two markers Before Image(BI) and After Image(AI), respectively representing the information before and after the update of a row.

When the Flink SQL optimizer generates the physical plan of the streaming job, it will determine whether the current node is an update operation. If it is, it will send two messages at the same time update_before and update_after to the downstream node, update_before means before the "error" The sent data needs to be withdrawn, and update_after indicates the currently delivered "correct" data. After the downstream receives it, it will first subtract update_before from the result, and then add update_after.

Recalling E.g.2, the following animation demonstrates the calculation process of correct results after adding the Retraction mechanism.

12 physical-planning-retraction-1.gif

update_before is a very critical piece of information, equivalent to marking the "culprit" that caused the current result to be incorrect. However, additional operations will bring additional overhead. In some cases, you do not need to send update_before to obtain the correct result. For example, the downstream node is connected to UpsertSink(in the case of MySQL or HBase, the database can use the update_after message by the primary key Coverage result). Whether to send update_before is determined by the optimizer, and the user does not need to care.

■ 2.4.2 Update_before Decision

The Retraction mechanism and update_before were introduced earlier. How does the optimizer decide whether to send update_before? This section will introduce the work of this part.

Step1:Determine the type of Changelog corresponding to each node

The three most common types of operations in the database are Insert(recorded as [I]), Delete(recorded as [D]), and Update(recorded as [U]). The optimizer will first check each node from bottom to top, determine which type(s) it belongs to, and mark it accordingly.

Recalling Eg2, since the first Source node only generates new data, it belongs to Insert and is recorded as [I]; the second node calculates the inner aggregation, so it will send an updated message, which is recorded as [I, U]; The third node cuts the word field, which is a simple calculation, passing the upstream change type, which is recorded as [I, U]; The fourth node is the outer aggregate calculation, because it received from Upstream Update message, so additional Delete operation is needed to ensure the update is successful, which is recorded as [I, U, D].

13 optimizer-step-0.gif

Step2:Determine the type of message sent by each node

Before introducing Step2, we first introduce the representation of the Update message type in Flink. In Flink, Update is represented by two update_before(abbreviated as UB) and update_after(abbreviated as UA), in which UB messages may not be sent in some cases, thereby improving performance.

In Step1, the optimizer deduces the Changelog change operation corresponding to each node from the bottom up. In this step, it will first infer from the top down that the current node needs the message type provided by the parent node until it encounters the first one that does not need the parent node. Provide nodes of any message type, and then push up the final implementation of each node and the required message type.

Recalling E.g.2, since the uppermost node is the UpsertSink node, only its parent node needs to provide [UA]. To the Aggregate node of the outer aggregation, because the input of the Aggregate node has an Update operation, the parent node needs to provide [UB, UA], so that it can correctly update its calculation status.

Further down to the Calc node, it needs to pass the demand of [UB, UA]to its parent node, which is the inner Aggregate node. When the inner Aggregation node is reached, its parent node is the Source node, and the Update operation is not generated, so it does not require the Source node to send any additional [UB/UA ]. When the optimizer traverses to the Source node, it starts to backtrack. If the current node can meet the requirements of the child node, the corresponding label is updated to the node, otherwise the plan cannot be generated. First, the inner Aggregate can generate UB, so it can meet the requirements of the child nodes, so the optimizer will label the inner Aggregate node with [UB, UA], and then pass it up to the Calc node, also labeled [UB , UA], and then to the outer Aggregate node, because its downstream only needs to receive the updated message, so it is marked with [UA]tag, which means that it only needs to send update_after to the downstream.

These labels will eventually affect the physical implementation of the operator, such as the outer Aggregate node, because it will receive [UB]from the upstream, so the physical implementation will use the Count with Retract, and it will only send updates to Sink _after. For the inner Aggregate node, because the data sent upstream does not have [UB], it can be implemented by Count without Retract, and because of the [UB]tag, it needs to send update_before downstream.

14 optimizer-step-1.gif

The working principle of the Flink SQL engine was introduced earlier. Next, we will briefly summarize some optimizations within Flink SQL. More information can be viewed at Flink Forward Asia 2019.

3.1 BinaryRow

Before Flink 1.9+, the data structure passed between operators in the Flink Runtime layer was Row, and its internal implementation was Object[]. The problem with this data structure is that it not only requires additional overhead to store Object Metadata, but also involves a lot of serialization/deserialization during the calculation process(especially when only a few fields need to be processed, the entire Row needs to be deserialized)./Boxing, etc., will bring a lot of additional performance overhead.

15 row.png

Flink 1.9 started to introduce Blink Planner, which uses BinaryRow to represent Record in binary data structure. BinaryRow acts on a Memory Segment with a default size of 32K, which is directly mapped into memory. BinaryRow is divided into Header, fixed-length area and variable-length area. The header is used to store the identification of the Retraction message. The fixed-length area uses 8 bytes to record the Nullable information of the field and all primitives and types that can be represented in 8 bytes. Other types will be stored in the variable length area according to the offset based on the starting position.

As the basic data structure of Blink Planner, the benefits of BinaryRow are obvious:first of all, it is more compact in storage, eliminating the extra overhead; secondly, the significant performance improvement in serialization and deserialization can be reversed according to the offset. Serialization required fields, after opening Object Reuse, serialization can be done directly through memory copy.

16 binary-row.png

3.2 Mini-batch Processing

Flink is a pure stream processing framework. In theory, every new data will trigger a calculation. However, at the implementation level, doing so will result in reading and writing State and serialization/deserialization for each piece of data processed in the aggregation scenario. If you can buffer a certain amount of data in memory and update the State after doing an aggregation in advance, it will not only reduce the cost of operating the State, but also effectively reduce the amount of data sent to the downstream, improve throughput, and reduce Retraction during two-layer aggregation. Data jitter caused by this is the core idea of ​​Mini-batch optimization.

17 mini-batch.gif

3.3 Skew Processing

The optimization of data skew is mainly divided into two ways with or without DISTINCT deduplication semantics. For the data aggregation of ordinary aggregates, Flink introduced Local-Global two-stage optimization, similar to the processing mode in which MapReduce adds a Local Combiner. For aggregations with deduplication, Flink will rewrite the user's SQL according to the original aggregation key combination plus the DISTINCT key for Hash modulo and rewrite it as a two-layer aggregation to break up.

18 skew-process.gif

3.4 Top-N Rewrite

Global sorting is difficult to achieve in a streaming scenario, but if you only need to calculate the current Top-N extremum, the problem becomes solvable. However, the traditional SQL syntax for sorting is to limit the number of entries by ORDER BY plus LIMIT. The mechanism behind the implementation is also to scan the full table to sort and then return the records of the LIMIT number. In addition, if you sort according to certain fields, ORDER BY can not meet the requirements. Flink SQL draws on the syntax of opening windows for Top-N in batch scenarios, and uses the ROW_NUMBER syntax to do Top-N sorting in streaming scenarios.

E.g.3 The following SQL calculates the top 3 stores in each category

SELECT*
FROM(
  SELECT *, - you can get like shopId or other information from this
    ROW_NUMBER() OVER(PARTITION BY category ORDER BY sales DESC) AS rowNum
  FROM shop_sales)
WHERE rowNum <= 3

In terms of Plan generation, the semantics of ROW_NUMBER correspond to the OverAggregate window node and a Calc node that filters the number of rows, and this window node needs to re-sort the historical data in the State for each arriving data at the implementation level, which is obviously not Optimal solution.

We know that the optimal operation for solving the maximum/minimum value in the streaming scenario is by maintaining a minHeap/maxHeap of size N. From the implementation, we need to add a new rule to the optimizer. After encountering the logical node generated by ROW_NUMBER, optimize it to a special Rank node, corresponding to the above optimal implementation method(of course, this is only a special Rank One of the corresponding implementation). This is the core idea of ​​Top-N Rewrite.

19 top-n.png

Summary & Futures

Review of the content of this article

  1. Briefly introduce Flink 1.9 + introduce a new architecture on SQL & TableAPI, unify the technology stack, and take a big step in the direction of stream & batch integration.
  2. In-depth introduction to the internal operating mechanism of the Flink SQL engine, as well as the transparency of the user, Flink SQL has done a lot of optimization work.

20 top-n-rewrite.gif

Future work plan

  1. In versions after Flink 1.11+, Blink Planner will provide production-level support as the default Planner.
  2. FLIP-95:Reconstruct the interface design of TableSource & TableSink, facing the integration of stream and batch, and support the changelog message flow on the Source side, thus supporting the CDC data source of FLIP-105.
  3. FLIP-105:Flink TableAPI & SQL support for CDC.
  4. FLIP-115:Extend the FileSystem Connector that currently only supports CSV, making it a unified Generalized FileSystem Connector for streaming.
  5. FLIP-123:Compatible with Hive DDL and DML, support users to run Hive DDL in Flink.