前言Flink使用HadoopIceberg使用Flink下载jar包启动集群测试启动Flink SQL创建Hadoop Catalog使用创建表Demo1(失败)创建表Demo2(成功)Overwrite操作
前言
版本信息
Hadoop | 3.3.2 |
Zookeeper | 3.7.0 |
Flink | 1.14.3 |
Flink使用Hadoop
Flink1.11开始就不再提供flink-shaded-hadoop-2-uber的支持,如果需要flink支持hadoop,需要配置环境变量HADOOP_CLASSPATH。
我这里flink1.14.3直接jar包放入
/usrl/local/flink-standalone/lib
目录下后直接使用,没有进行配置(可能是我配置了Hadoop的系统环境变量),参考:Flink6:Standalone HA 模式 root@redis01:/usr/local/flink-local/bin# vim config.sh
Iceberg使用Flink
下载jar包
jar包复制到flink集群的lib目录下
启动集群
我这里使用的Flink on Yarn模式
- 启动Hadoop
- 启动Zookeeper
- 启动Flink
# 启动Hadoop集群 root@redis01:/usr/local/hadoop/sbin# start-all.sh # 启动Zookeeper集群,这里使用脚本启动 root@redis01:/home/bigdata/bin# ./zk1.sh start # 启动Flink集群 root@redis01:/usr/local/flink-standalone# ./bin/start-cluster.sh
测试
启动Flink SQL
root@redis01:/usr/local/flink-standalone# ./bin/sql-client.sh embedded shell
创建Hadoop Catalog
CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://redis01:8020/flink/tmp/iceberg_test', 'property-version'='1' );
下次使用
Flink SQL> use catalog hadoop_catalog;
使用
# 查看数据库 Flink SQL> show databases; # 创建数据库 Flink SQL> CREATE DATABASE iceberg_db; # 使用数据库 Flink SQL> USE iceberg_db;
创建表Demo1(失败)
- 需要指定存储格式,否则会报错如下
- 创建表
# 创建表 create table testA( id bigint, name string, age int, dt string) PARTITIONED by(dt);
- insert into插入数据
insert into iceberg_db.testA values(1001,'henggao',18,'2022-08-29'),(1002,'james',23,'2022-08-30');
- 报错如下,Flink不知道数据源关联
创建表Demo2(成功)
- 创建表
CREATE TABLE hadoop_catalog.iceberg_db.sample_test ( id BIGINT COMMENT 'unique id', data STRING, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'format-version'= '2', 'write.upsert.enable'='true' );
- insert into 插入数据
INSERT INTO hadoop_catalog.iceberg_db.sample_test VALUES (10, 'test10_U'), (11, 'test11'), (12, 'test12');
- 查看insert结果
select * from hadoop_catalog.iceberg_db.sample_test;
记录会在Flink Dashboard中有记录:http://redis01:8081/#/overview
Overwrite操作
flink默认是流处理overwrite需要切换位批处理
Flink SQL> SET execution.type = batch;