Iceberg8:Iceberg+Flink

Iceberg8:Iceberg+Flink

前言

版本信息
Hadoop
3.3.2
Zookeeper
3.7.0
Flink
1.14.3

Flink使用Hadoop

Flink1.11开始就不再提供flink-shaded-hadoop-2-uber的支持,如果需要flink支持hadoop,需要配置环境变量HADOOP_CLASSPATH。
我这里flink1.14.3直接jar包放入/usrl/local/flink-standalone/lib目录下后直接使用,没有进行配置(可能是我配置了Hadoop的系统环境变量),参考:
Flink6:Standalone HA 模式
Flink6:Standalone HA 模式
root@redis01:/usr/local/flink-local/bin# vim config.sh
 

Iceberg使用Flink

下载jar包

notion image
jar包复制到flink集群的lib目录下
notion image

启动集群

我这里使用的Flink on Yarn模式
  • 启动Hadoop
  • 启动Zookeeper
  • 启动Flink
# 启动Hadoop集群 root@redis01:/usr/local/hadoop/sbin# start-all.sh # 启动Zookeeper集群,这里使用脚本启动 root@redis01:/home/bigdata/bin# ./zk1.sh start # 启动Flink集群 root@redis01:/usr/local/flink-standalone# ./bin/start-cluster.sh

测试

启动Flink SQL

root@redis01:/usr/local/flink-standalone# ./bin/sql-client.sh embedded shell
notion image

创建Hadoop Catalog

CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://redis01:8020/flink/tmp/iceberg_test', 'property-version'='1' );
notion image
下次使用
Flink SQL> use catalog hadoop_catalog;

使用

# 查看数据库 Flink SQL> show databases; # 创建数据库 Flink SQL> CREATE DATABASE iceberg_db; # 使用数据库 Flink SQL> USE iceberg_db;
notion image

创建表Demo1(失败)

  • 需要指定存储格式,否则会报错如下
  1. 创建表
# 创建表 create table testA( id bigint, name string, age int, dt string) PARTITIONED by(dt);
notion image
  1. insert into插入数据
insert into iceberg_db.testA values(1001,'henggao',18,'2022-08-29'),(1002,'james',23,'2022-08-30');
  1. 报错如下,Flink不知道数据源关联
notion image
 

创建表Demo2(成功)

  1. 创建表
CREATE TABLE hadoop_catalog.iceberg_db.sample_test ( id BIGINT COMMENT 'unique id', data STRING, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'format-version'= '2', 'write.upsert.enable'='true' );
notion image
  1. insert into 插入数据
INSERT INTO hadoop_catalog.iceberg_db.sample_test VALUES (10, 'test10_U'), (11, 'test11'), (12, 'test12');
notion image
  1. 查看insert结果
select * from hadoop_catalog.iceberg_db.sample_test;
notion image
 
记录会在Flink Dashboard中有记录:http://redis01:8081/#/overview
notion image
 
 

Overwrite操作

flink默认是流处理overwrite需要切换位批处理
Flink SQL> SET execution.type = batch;