Flink and Hive run-in period

Posted May 25, 202011 min read

Many readers have feedback, refer to the previous article " Hive finally waited for Flink " to deploy Flink and integrate Hive, there are some bugs and compatibility issues. Although it has been waiting, it is not available. So I added this article as a sister article. Recall that in the last article, the author used CDH version is 5.16.2, of which Hive version is 1.1.0(CDH 5.x series Hive version is not higher than 1.1.0, is it incomprehensible), Flink source code itself It is not compatible with Hive version 1.1.0 and there are many problems. In order to be compatible with the current version, the author modified the Flink code based on the CDH 5.16.2 environment, repackaged and deployed it. In fact, after many actual open source projects, such as Apache Atlas, Apache Spark, etc., Hive 1.2.x and Hive 1.1.x In most cases, replacing some Jar packages can solve the compatibility problem. For the author's environment, some Jar packages of Hive version 1.2.1 can be used instead of Jar packages of Hive version 1.1.0. In the beginning of this article, the author will solve this problem, and then add the actual combat content that is missing from the previous article. Questions that continue to be cut and chaotic based on reader feedback, the author summarizes all the questions into three categories:

  1. How does Flink connect to Hive except for API, is there any similar spark-sql command
  2. The Hadoop environment cannot be recognized or the configuration file cannot be found
  3. The dependent package, class or method cannot be found

1 . How Flink connects to Hive Some readers are not sure how to configure Flink to connect to Hive's Catalog, here is a complete conf/sql-client-hive.yaml example:catalogs:-name:staginghive type:hive hive-conf -dir:/etc/hive/conf hive-version:1.2.1 execution:planner:blink type:batch time-characteristic:event-time periodic-watermarks-interval:200 result-mode:table max-table-result-rows :1000000 parallelism:1 max-parallelism:128 min-idle-state-retention:0 max-idle-state-retention:0 current-catalog:staginghive current-database:ssb restart-strategy:type:fallback deployment:response-timeout :5000 gateway-address:"" gateway-port:0 m:yarn-cluster yn:2 ys:5 yjm:1024 ytm:2048 sql-client-hive.yaml The configuration file contains:

  1. The Hive configuration file path is configured in the Hive configuration file catalogs.

  2. Yarn configuration information Yarn configuration information is configured in deployment.

  3. Execution engine information execution is configured with blink planner and uses batch mode. The batch mode is relatively stable, suitable for traditional batch processing operations, and can be fault-tolerant. In addition, intermediate data is placed on the disk. It is recommended to enable the compression function. In addition to batch, Flink also supports streaming mode.

    Flink SQL CLI tool Similar to spark-sql command, Flink provides SQL CLI tool, namely sql-client.sh script. In Flink 1.10 version, Flink SQL CLI has improved many functions, the author will explain later. sql-client.sh is used as follows:$bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 2. The Hadoop environment or configuration file cannot be recognized. The author mentioned in the previous article However, deploy CDH gateway on the environment where Flink is deployed, including Hadoop and Hive clients, and also need to configure some environment variables, as follows:export HADOOP \\ _ CONF \\ _ DIR =/etc/hadoop/conf export YARN \ \ _CONF \\ _ DIR =/etc/hadoop/conf export HIVE \\ _ HOME =/opt/cloudera/parcels/CDH/lib/hive export HIVE \\ _ CONF \\ _ DIR =/etc/hive/conf 3 . If the dependent package, class or method cannot be found, first check the lib directory under Flink's home directory:$tree lib lib flink-connector-hive \ _2.11-1.10.0.jar flink-dist \ _2.11-1.10.0.jar flink-hadoop-compatibility \ _2.11-1.10.0.jar flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar flink-table \ _2.11-1.10.0.jar flink-table-blink \ _2.11-1.10.0.jar hive-exec-1.1.0-cdh5.16.2.jar hive-metastore-1.1.0-cdh5.16.2.jar libfb303-0.9.3.jar log4j-1.2.17.jar slf4j-log4j12-1.7.15.jar After the two problems are solved, execute the following command:$bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Error, error, or error:Caused by:java.lang.ClassNotFoundException:org.apache .commons.logging.LogFactory is actually running sq Before the l-client.sh script, you need to specify the path of the dependent packages of the Hadoop environment. It is recommended not to report errors and add one unless some readers like it. Here I suggest a convenient way to set the HADOOPCLASSPATH(can be added to ~/.bashprofile) environment variable:export HADOOP \\ _ CLASSPATH = \\ hadoop classpath \\\ execute again:$bin/sql-client .sh embedded -d conf/sql-client-hive.yaml Sorry, continue to report errors:Caused by:org.apache.flink.table.client.gateway.SqlExecutionException:Could not create execution context. at org.apache.flink. table.client.gateway.local.ExecutionContext $Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache. flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by:org.apache.flink.table.catalog. exceptions.CatalogException:Failed to create Hive Metastore client Here is the version incompatibility issue between the Hive 1.1.0 Jar package and Flink. The solution is:

  4. Download apache-hive-1.2.1 version

  5. Replace the Hive Jar package in the Flink lib directory to delete hive-exec-1.1.0-cdh5.16.2.jar, hive-metastore-1.1.0-cdh5.16.2.jar and libfb303-0.9.3.jar, and then Add hive-exec-1.2.1.jar, hive-metastore-1.2.1.jar and libfb303-0.9.2.jar, and check the lib directory again:

$tree lib lib flink-connector-hive \\ _ 2.11-1.10.0.jar flink-dist \\ _ 2.11-1.10.0.jar flink-hadoop-compatibility \\ _ 2.11-1.10.0.jar flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar flink-table \\ _ 2.11-1.10.0 .jar flink-table-blink \\ _ 2.11-1.10.0.jar hive-exec-1.2.1.jar hive-metastore-1.2.1.jar libfb303 -0.9.2.jar log4j-1.2.17.jar slf4j-log4j12-1.7.15.jar Finally, execute:$bin/sql-client.sh embedded -d conf/sql-client-hive .yaml At this point, the reader can see the cute little squirrel holding chestnuts.

Flink SQL CLI practice In Flink version 1.10(currently at RC1 stage), the Flink community has made a lot of changes to the SQL CLI, such as support for View, support for more data types and DDL statements, support for partition read and write, support for INSERT OVERWRITE, etc. , To achieve more TableEnvironment API functions, more convenient for users to use. Next, the author explains Flink SQL CLI in detail. 0. Help execute the following command to log in to the Flink SQL client:$bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Flink SQL> execute HELP to view the commands supported by Flink SQL, the following are most common:

  • CREATE TABLE
  • DROP TABLE
  • CREATE VIEW
  • DESCRIBE
  • DROP VIEW
  • EXPLAIN
  • INSERT INTO
  • INSERT OVERWRITE
  • SELECT
  • SHOW FUNCTIONS
  • USE CATALOG
  • SHOW TABLES
  • SHOW DATABASES
  • SOURCE
  • USE
  • SHOW CATALOGS

1 . Hive operation 1.1 Creating tables and importing data In order to facilitate readers to conduct experiments, the author uses ssb-dbgen to generate test data. Readers can also use the data already in the test environment to conduct experiments. For details on how to create tables and insert data in one click in Hive, you can refer to the author s earlier project https://github.com/MLikeWater/ssb-kylin . 1.2 Hive table View the Hive table created in the previous step:0:jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables; + -------------- +-+ | tab \ _name | + -------------- +-+ | customer | | dates | | lineorder | | p \ _lineorder | | part | | supplier | +- ------------- +-+ Readers can perform various queries on Hive and compare the results of the following Flink SQL queries. 2. Flink operation 2.1 Access Hive database through HiveCatalog Log in to Flink SQL CLI and query catalogs:$bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Flink SQL> show catalogs; default \ _catalog staginghive Flink SQL> use catalog staginghive; Get all configured catalogs through show catalogs. Because I set the default catalog in the sql-client-hive.yaml file, which is staginghive. If you need to switch to another catalog, you can use usecatalog xxx. 2.2 Query Hive metadata Query Hive database and tables through Flink SQL:# Query database Flink SQL> show databases; ... ssb tmp ... Flink SQL> use ssb; # Query table Flink SQL> show tables; customer dates lineorder p \ _lineorder part supplier # Lookup table structure Flink SQL> DESCRIBE customer; root |-c \ _custkey:INT |-c \ _name:STRING |-c \ _address:STRING |-c \ _city:STRING |- -c \ _nation:STRING |-c \ _region:STRING |-c \ _phone:STRING |-c \ _mktsegment:STRING It should be noted here that Hive metadata is used in lowercase letters in the Flink catalog. 2.3 Query Next, query some SQL statements in the Flink SQL CLI, the complete SQL reference https://github.com/MLikeWater/ssb-kylin README. At present, when Flink SQL parses Hive view metadata, it will encounter some bugs, such as executing Q1.1 SQL:Flink SQL> select sum(v \ _revenue) as revenue> from p \ _lineorder> left join dates on lo \ _orderdate = d \ _datekey> where d \ _year = 1993> and lo \ _discount between 1 and 3> and lo \ _quantity <25; \ [ERROR ]Could not execute SQL statement. Reason:org.apache.calcite.sql.validate.SqlValidatorException :Tabeorder 'not found; did you mean' LINEORDER '? Flink SQL cannot find the entity table in the view. The p \ _lineorder table is a view in Hive. The statement to create the table is as follows:CREATE VIEW P \ _LINEORDER AS SELECT LO \ _ORDERKEY, LO \ _LINENUMBER, LO \ _CUSTKEY, LO \ _PARTKEY, LO \ _SUPPKEY, LO \ _ORDERDATE, LO \ _ORDERPRIOTITY, LO \ _SHIPPRIOTITY, LO \ _QUANTITY, LO \ _EXTENDEDPRICE, LO \ _ORDTOTALPRICE, LO \ _DISCOUNT, LO \ _REVENUE, LO \ _SUPPLYCOST, LO \ _TAX, LO \ _COMMITDATE, LO \ _SHIPMODE, LO \ _EXTENDEDPRICE \ * LO _DISCOUNT AS V \ _REVENUE FROM ssb.LINEORDER; But for the definition of views in Hive, Flink SQL does not handle metadata well. For the smooth execution of the following SQL, here I delete and rebuild the view in Hive:0:jdbc:hive2://xx.xxx.xxx.xxx:10000> create view p \ _lineorder as select lo \ _orderkey, lo \ _linenumber , lo \ _custkey, lo \ _partkey, lo \ _suppkey, lo \ _orderdate, lo \ _orderpriotity, lo \ _shippriotity, lo \ _quantity, lo \ _extendedprice, lo \ _ordtotalprice, lo \ _discount, lo \ _revenue, lo \ _supplycost, lo \ _tax, lo \ _commitdate, lo \ _shipmode, lo \ _extendedprice \ * lo \ _discount as v \ _revenue from ssb.lineorder; Then continue to query Q1.1 SQL in the Flink SQL CLI:Flink SQL> select sum(v \ _revenue) as revenue> from p \ _lineorder> left join dates on lo \ _orderdate = d \ _datekey> where d \ _year = 1993> and lo \ _discount between 1 and 3> and lo \ _quantity <25; revenue 894280292647 continue to query Q2. 1 SQL:Flink SQL> select sum(lo \ _revenue) as lo \ _revenue, d \ _year, p \ _brand> from p \ _lineorder> left join dates on lo \ _orderdate = d \ _datekey> left join part on lo \ _partkey = p \ _partkey> left join supplier on lo \ _suppkey = s \ _suppkey> where p \ _category = 'MFGR # 12' and s \ _region = 'AMERICA'> group by d \ _ year, p \ _brand> order by d \ _year, p \ _brand; lo \ _revenue d \ _year p \ _brand 819634128 1998 MFGR # 1206 877651232 1998 MFGR # 1207 754489428 1998 MFGR # 1208 816369488 1998 MFGR # 1209 668482306 1998 MFGR # 1210 660366608 1998 MFGR # 1211 862902570 1998 MFGR # 1212 ... Finally, query another Q4.3 SQL:Flink SQL> select d \ _year, s \ _city, p \ _brand, sum(lo \ _revenue)-sum(lo \ _supplycost) as profit> from p \ _lineorder> left join dates on lo \ _orderdate = d \ _datekey> left join customer on lo \ _custkey = c \ _custkey> left join supplier on lo \ _suppkey = s \ _suppkey> left join part on lo \ _partkey = p \ _partkey> where c \ _region = 'AMERICA'and s \ _nation =' UNITED STATES '> and(d \ _year = 1997 or d \ _year = 1998)> and p \ _category =' MFGR # 14 ' > group by d \ _year, s \ _city, p \ _brand> order by d \ _year, s \ _city, p \ _brand; d \ _year s \ _city p \ _brand profit 1998 UNITED ST9 MFGR # 1440 6665681 if the reader is interested If you want, you can query the remaining SQL, and of course you can compare it with Spark SQL. In addition, Flink SQL also supports EXPLAIN, querying the SQL execution plan. 2.4 Creating Views Similarly, you can create and delete views in Flink SQL CLI as follows:Flink SQL> create view p \ _lineorder2 as> select lo \ _orderkey,> lo \ _linenumber,> lo \ _custkey,> lo \ _partkey,> lo \ _suppkey,> lo \ _orderdate,> lo \ _orderpriotity,> lo \ _shippriotity,> lo \ _quantity,> lo \ _extendedprice,> lo \ _ordtotalprice,> lo \ _discount,> lo \ _revenue,> lo \ _supplycost,> lo \ _tax,> lo \ _commitdate,> lo \ _shipmode,> lo \ _extendedprice \ * lo \ _discount as v \ _revenue> from ssb.lineorder; \ [INFO ]View has been created. Here I need to emphasize that , Currently Flink cannot delete the view in Hive:Flink SQL> drop view p \ _lineorder; \ [ERROR ]Could not execute SQL statement. Reason:The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed. 2.5 Partition operation Create a partition table in the Hive database:CREATE TABLE IF NOT EXISTS flink \ _partition \ _test(id int, name string) PARTITIONED BY(day string, type string) stored as textfile; then , Insert and query data through Flink SQL:# Insert data of static partition Flink SQL> INSERT INTO flink \ _partition \ _test PARTITION(type = 'Flin k ', \ day \ =' 2020-02-01 ') SELECT 100001,' Flink001 '; # query Flink SQL> select \ * from flink \ _partition \ _test; id name day type 100001 Flink001 2020-02-01 Flink # Insert dynamic partition Flink SQL> INSERT INTO flink \ _partition \ _test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL'; # query Flink SQL> select \ * from flink \ _partition \ _test; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink # The combination of dynamic and static partitioning is similar, no more demonstration # Overwrite inserted data Flink SQL> INSERT OVERWRITE flink \ _partition \ _test PARTITION(type = ' Flink ') SELECT 100002,' Spark ',' 2020-02-08 ',' SparkSQL-2.4 '; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink field day is the key in Flink Words need special treatment. 2.6 Other functions

  • 2.6.1 Function

Flink SQL supports built-in functions and custom functions. For the built-in functions, you can execute show functions to view, this piece will introduce how to create custom functions separately.

  • 2.6.2 Setting parameters

Flink SQL supports setting environment parameters. You can use the set command to view and set parameters:Flink SQL> set; deployment.gateway-address = deployment.gateway-port = 0 deployment.m = yarn-cluster deployment.response-timeout = 5000 deployment. yjm = 1024 deployment.yn = 2 deployment.ys = 5 deployment.ytm = 2048 execution.current-catalog = staginghive execution.current-database = ssb execution.max-idle-state-retention = 0 execution.max-parallelism = 128 execution.max-table-result-rows = 1000000 execution.min-idle-state-retention = 0 execution.parallelism = 1 execution.periodic-watermarks-interval = 200 execution.planner = blink execution.restart-strategy.type = fallback execution.result-mode = table execution.time-characteristic = event-time execution.type = batch Flink SQL> set deployment.yjm = 2048; Summary In this article, the author uses Flink SQL to operate the Hive database and Flink in more detail Some features provided by SQL. Of course, there are still some problems with the operation of Hive database by Flink SQL:

  • Currently, only the TextFile storage format is supported, and other storage formats cannot be specified. Only the table of the TextFile storage format in the Hive database is supported, and the row format serde is org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe. Although storage formats such as RCFile, ORC, Parquet, and Sequence are implemented, the storage format of Hive tables cannot be automatically recognized. If you want to use other storage formats, you need to modify the source code and recompile. However, the community has tested these storage formats, and I believe they will be available in Flink SQL in the near future.
  • OpenCSVSerde support is not perfect. If the reader uses TextFile's row format serde as org.apache.hadoop.hive.serde2.OpenCSVSerde, the field type cannot be correctly identified, and all fields in the Hive table will be mapped to the String type.
  • Temporarily does not support Bucket table
  • Temporarily does not support ACID table
  • Flink SQL optimization has fewer functions
  • Permission control is similar to Spark SQL in this aspect. Currently, it is based on HDFS ACL control, and has not yet implemented Sentry or Ranger control permissions. However, Cloudera is currently developing a strategy to set Spark SQL and Hive shared access permissions based on Ranger to achieve row/column level. Control and audit information.

The Flink community is developing rapidly, and all these issues are only temporary, and will be resolved one by one as new versions are released. If Flink SQL does not currently meet the demand, it is recommended to use API to solve the problem.