前言
Hive 目前已经成为了数据仓库生态系统中的核心组件。它不仅仅是一个用于大数据分析和 ETL 的 SQL 引擎,同样也是一个数据管理平台,它可以用来发现,定义和演变数据。而 Flink 是当前最火的流式计算引擎,它可以在无界和有界数据流上进行状态计算。Flink 从 1.9 版本开始支持集成 Hive,不过 1.9 版本为 Beta 版,不推荐在生产环境中使用。在1.10 版本中,随着对阿里 Blink 的整合完成, Flink 对 Hive 的集成也达到了生产级别的要求。值得注意的是,不同版本的 Flink 对于 Hive 的集成有所差异,本文将以最新的 Flink 1.12 版本、Hive 3.1.2 版本为例,简单阐述 Flink 集成 Hive 的步骤。
集成方式
Flink 与 Hive 的集成包含以下两个层面:
-
首先,Flink 利用了 Hive 的 Metastore 作为持久化的 Catalog,我们可以通过
HiveCatalog
将不同会话中的 Flink 元数据存储到 Hive Metastore 中。例如,我们可以使用HiveCatalog
将 Kafka 或 Elasticsearch 表的元数据信息存储在 Hive Metastore 中,在后续的 SQL 查询中可以重复使用它们。 -
其次,使用 Flink 来读写 Hive 表,如同使用 SparkSQL 或者 Impala 查询 Hive 中的数据一样。
HiveCatalog
的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 数仓。不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。
支持的Hive版本
Flink 支持以下 Hive 版本。
大版本 | V1 | V2 | V3 | V4 | V5 | V6 | V7 |
---|---|---|---|---|---|---|---|
1.0 | 1.0.0 | 1.0.1 | |||||
1.1 | 1.1.0 | 1.1.1 | |||||
1.2 | 1.2.0 | 1.2.1 | 1.2.2 | ||||
2.0 | 2.0.0 | 2.0.1 | |||||
2.1 | 2.1.0 | 2.1.1 | |||||
2.2 | 2.2.0 | ||||||
2.3 | 2.3.0 | 2.3.1 | 2.3.2 | 2.3.3 | 2.3.4 | 2.3.5 | 2.3.6 |
3.1 | 3.1.0 | 3.1.1 | 3.1.2 |
请注意,某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:
- Hive 内置函数在使用 Hive-1.2.0 及更高版本时支持。
- 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
- 更改表的统计信息,在使用 Hive-1.2.0 及更高版本时支持。
DATE
列统计信息,在使用 Hive-1.2.0 及更高版时支持。- 使用 Hive-2.0.x 版本时不支持写入 ORC 表。
依赖项
Flink 集成 Hive 需要额外添加一些依赖 jar 包,并将其放在 Flink 安装目录下的 lib 文件夹下,这样才能通过 Table API 或 SQL Client 与 Hive 进行交互。
Flink 官网提供了两种方式添加 Hive 的依赖项。第一种是使用 Flink 提供的 Hive jar,可以根据使用的 Metastore 版本来选择对应的 Hive jar。第二种方式是分别添加每个所需的 jar ,比如说你使用的 Hive 版本与Flink 提供的 Hive jar 兼容的版本不一致。
注意:建议优先使用 Flink 提供的 Hive jar。
使用 Flink 提供的 Hive jar
下面列举了所有可用的 Hive jar,我们可以根据使用的 Hive 版本,下载对应的 jar 即可。比如本文使用的 Hive 版本为 Hive 3.1.2,所以只需要下载 flink-sql-connector-hive-3.1.2.jar 并将其放置在 Flink 安装目录的 lib 文件夹下即可。
Metastore version | Maven dependency | SQL Client JAR |
---|---|---|
1.0.0 ~ 1.2.2 | flink-sql-connector-hive-1.2.2 |
下载 |
2.0.0 ~2.2.0 | flink-sql-connector-hive-2.2.0 |
下载 |
2.3.0 ~2.3.6 | flink-sql-connector-hive-2.3.6 |
下载 |
3.0.0 ~ 3.1.2 | flink-sql-connector-hive-3.1.2 |
下载 |
用户定义的依赖项
以Hive 3.1.2为例,除了 flink-sql-connector-hive-3.1.2.jar 是我们使用 Flink SQL Cli 时所需要的 jar 外,还需要添加以下 jar:
依赖 jar | 下载地址 |
---|---|
flink-connector-hive_2.12-1.12.0.jar | 下载 |
flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar | 下载 |
hive-exec-3.1.2.jar | Hive安装目录下的lib目录下 |
libfb303-0.9.3.jar | Hive安装目录下的lib目录下 |
连接Hive
下面使用 Flink SQL Cli 来连接 Hive。
配置sql-client-defaults.yaml
sql-client-defaults.yaml 文件是 Flink SQL Cli 启动时使用的配置文件,位于 Flink 安装目录的 conf 文件夹下,具体的配置如下,主要是配置catalog:
catalogs:
- name: myhive
type: hive
default-database: default
hive-conf-dir: /opt/hive/conf/
hadoop-conf-dir: /opt/hadoop/etc/hadoop/
下表列出了通过 YAML 文件或 DDL 定义 HiveCatalog
时所支持的参数:
参数 | 必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
type | 是 | (无) | String | Catalog 的类型。创建 HiveCatalog 时,该参数必须设置为'hive' 。 |
name | 是 | (无) | String | Catalog 的名字。仅在使用 YAML file 时需要指定。 |
hive-conf-dir | 否 | (无) | String | 指向包含 hive-site.xml 目录的 URI。该 URI 必须是 Hadoop 文件系统所支持的类型。如果指定一个相对 URI,即不包含 scheme,则默认为本地文件系统。如果该参数没有指定,我们会在 class path 下查找hive-site.xml。 |
default-database | 否 | default | String | 当一个catalog被设为当前catalog时,所使用的默认当前database。 |
hive-version | 否 | (无) | String | HiveCatalog 能够自动检测使用的 Hive 版本。我们建议不要手动设置 Hive 版本,除非自动检测机制失败。 |
hadoop-conf-dir | 否 | (无) | String | Hadoop 配置文件目录的路径。目前仅支持本地文件系统路径。我们推荐使用 HADOOP_CONF_DIR 环境变量来指定 Hadoop 配置。因此仅在环境变量不满足您的需求时再考虑使用该参数,例如当您希望为每个 HiveCatalog 单独设置 Hadoop 配置时。 |
操作Hive中的表
首先启动FlinkSQL Cli,命令如下:
sql-client.sh embedded
启动失败,报以下错误:
报错原因是 Flink 集成 Hive 时不支持 embedded metastore,配置 Hive 时需要启动 Hive metastore 服务并在conf/hive-site.xml 配置文件中为 hive.metastore.uris 属性设置正确的值。
启动 Hive metastore 服务
hive --service metastore
在 hive-site.xml 中配置 hive.metastore.uris
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
再次启动 FlinkSQL Cli 成功,接下来我们可以查看注册的catalog
Flink SQL> show catalogs;
default_catalog
myhive
使用注册的myhive catalog
Flink SQL> use catalog myhive;
查询数据库,proghive 是我实践 《Hive编程指南》时创建的 Hive 库
Flink SQL> show databases;
default
proghive
查看所有表
Flink SQL> use proghive;
Flink SQL> show tables;
dividends
employees
stocks
在 Hive 中查询 employees 表:
hive> select * from employees;
OK
John Doe 100000.0 ["Mary Smith","Todd Jones"] {"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1} {"street":"1 Michigan Ave.","city":"Chicago","state":"IL","zip":60600}
Mary Smith 80000.0 ["Bill King"] {"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1} {"street":"100 Ontario St.","city":"Chicago","state":"IL","zip":60601}
Todd Jones 70000.0 [] {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1} {"street":"200 Chicago Ave.","city":"Oak Park","state":"IL","zip":60700}
Bill King 60000.0 [] {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1} {"street":"300 Obscure Dr.","city":"Obscuria","state":"IL","zip":60100}
Boss Man 200000.0 ["John Doe","Fred Finance"] {"Federal Taxes":0.3,"State Taxes":0.07,"Insurance":0.05} {"street":"1 Pretentious Drive.","city":"Chicago","state":"IL","zip":60500}
Fred Finance 150000.0 ["Stacy Accountant"] {"Federal Taxes":0.3,"State Taxes":0.07,"Insurance":0.05} {"street":"2 Pretentious Drive.","city":"Chicago","state":"IL","zip":60500}
Stacy Accountant 60000.0 [] {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1} {"street":"300 Main St.","city":"Naperville","state":"IL","zip":60563}
Time taken: 0.21 seconds, Fetched: 7 row(s)
现在我们使用 Flink SQL 查询 Hive 中的表。
Flink SQL> select * from employees;
接下来,我们再在 FlinkSQL Cli 中创建一张 Kafka 的数据源表:
CREATE TABLE user_behavior (
`user_id` BIGINT, -- 用户id
`item_id` BIGINT, -- 商品id
`cat_id` BIGINT, -- 品类id
`action` STRING, -- 用户行为
`province` INT, -- 用户所在的省份
`ts` BIGINT, -- 用户行为发生的时间戳
`proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
`eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定义watermark
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka主题
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消费者组
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json', -- 数据源格式为json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
查看表结构
Flink SQL> DESCRIBE user_behavior;
我们可以在 Hive 客户端中执行下面命令查看刚刚在 Flink SQL Cli 中创建的表
hive> desc formatted user_behavior;
OK
# col_name data_type comment
# Detailed Table Information
Database: proghive
OwnerType: USER
Owner: null
CreateTime: Thu Dec 24 15:52:18 CST 2020
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://localhost:9000/user/hive/warehouse/proghive.db/user_behavior
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector kafka
flink.format json
flink.json.fail-on-missing-field true
flink.json.ignore-parse-errors false
flink.properties.bootstrap.servers localhost:9092
flink.properties.group.id group1
flink.scan.startup.mode earliest-offset
flink.schema.0.data-type BIGINT
flink.schema.0.name user_id
flink.schema.1.data-type BIGINT
flink.schema.1.name item_id
flink.schema.2.data-type BIGINT
flink.schema.2.name cat_id
flink.schema.3.data-type VARCHAR(2147483647)
flink.schema.3.name action
flink.schema.4.data-type INT
flink.schema.4.name province
flink.schema.5.data-type BIGINT
flink.schema.5.name ts
flink.schema.6.data-type TIMESTAMP(3) NOT NULL
flink.schema.6.expr PROCTIME()
flink.schema.6.name proctime
flink.schema.7.data-type TIMESTAMP(3)
flink.schema.7.expr TO_TIMESTAMP(FROM_UNIXTIME(`ts`, 'yyyy-MM-dd HH:mm:ss'))
flink.schema.7.name eventTime
flink.schema.watermark.0.rowtime eventTime
flink.schema.watermark.0.strategy.data-type TIMESTAMP(3)
flink.schema.watermark.0.strategy.expr `eventTime` - INTERVAL '5' SECOND
flink.topic user_behavior
is_generic true
transient_lastDdlTime 1608796338
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.212 seconds, Fetched: 54 row(s)
Flink SQL Cli 创建的 user_behavior 表的元数据会持久化到 Hive 的元数据信息库,本文使用的是MySQL。执行下面的命令:
SELECT
a.tbl_id, -- 表id
from_unixtime(create_time) AS create_time, -- 创建时间
a.db_id, -- 数据库id
b.name AS db_name, -- 数据库名称
a.tbl_name -- 表名称
FROM TBLS AS a
LEFT JOIN DBS AS b ON a.db_id =b.db_id
WHERE a.tbl_name = "user_behavior";
今天的文章Flink 1.12.0集成Hive 3.1.2分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/23437.html