tencent cloud

Stream Compute Service

Releases Notes and Announcements
Release Notes
Product Introduction
Overview
Strengths
Use Cases
Purchase Guide
Billing Overview
Billing Mode
Refund
Configuration Adjustments
Getting Started
Preparations
Creating a Private Cluster
Creating a SQL Job
Creating a JAR Job
Creating an ETL Job
Creating a Python Job
Operation Guide
Managing Jobs
Developing Jobs
Monitoring Jobs
Job Logs
Events and Diagnosis
Managing Metadata
Managing Checkpoints
Tuning Jobs
Managing Dependencies
Managing Clusters
Managing Permissions
SQL Developer Guide
Overview
Glossary and Data Types
DDL Statements
DML Statements
Merging MySQL CDC Sources
Connectors
SET Statement
Operators and Built-in Functions
Identifiers and Reserved Words
Python Developer Guide
ETL Developer Guide
Overview
Glossary
Connectors
FAQ
Contact Us

Iceberg

PDF
포커스 모드
폰트 크기
마지막 업데이트 시간: 2023-11-08 16:02:26

Versions

Flink Version
Description
1.11
Unsupported
1.13
Supported (use as source and sink)
1.14
Supported (use as source and sink)
1.16
Unsupported

‌## Use cases

This connector can be used as a source or a sink. When used as a source, it does not support an Iceberg source to which data is written with the upsert operations.

Defining a table in DDL

As a sink:
CREATE TABLE `sink` (
`id` bigint,
`YCSB_KEY` string,
`FIELD0` string,
`FIELD1` string,
`FIELD2` string,
`database_name` string,
`table_name` string,
`op_ts` timestamp(3),
`date` string
) PARTITIONED BY (`date`) WITH (
'connector' = 'iceberg',
'hdfs://HDFS14979/usr/hive/warehouse',
'write.upsert.enabled'='false', -- Whether to enable "upsert".
'catalog-type' = 'hive',
'catalog-name'='xxx',
'catalog-database'='xxx',
'catalog-table'='xxx',
-- The thrift URI of the Hive metastore, which can be obtained from the configuration file hive-site.xm and whose key is "hive-metastore-uris".
'uri'='thrift://ip:port',
'engine.hive.enabled' = 'true',
'format-version' = '2'
);
As a source:
CREATE TABLE `icesource` (
`id` bigint,
`YCSB_KEY` string,
`FIELD0` string,
`FIELD1` string,
`FIELD2` string,
`database_name` string,
`table_name` string,
`op_ts` timestamp(3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'hive_catalog',
'catalog-type' = 'hive',
'catalog-database' = 'database_ta',
'catalog-table' = 't_p1_hive3_avro_3',
'warehouse'='hdfs://HDFS14979/usr/hive/warehouse',
'engine.hive.enabled' = 'true',
'format-version' = '2',
'streaming'='true',
'monitor-interval'='10',
-- The thrift URI of the Hive metastore, which can be obtained from the configuration file hive-site.xm and whose key is "hive-metastore-uris".
'uri'='thrift://ip:port'
);

WITH parameters

‌### Common parameters

Option
Required
Default Value
Description
connector
Yes
None
Here, it should be iceberg.
location
Yes
None
The data storage path, in the format of hdfs:// for data storage in HDFS and COSN://$bucket/$path for data storage in COS.
catalog-name
Yes
None
A custom catalog name.
catalog-type
Yes
None
The catalog type. Valid values: hadoop, hive, and custom.
catalog-database
Yes
None
The name of the Iceberg database.
catalog-table
Yes
None
The name of the Iceberg table.
catalog-impl
No
None
This option is required when catalog-type is set to custom.
uri
No
None
-- The thrift URI of the Hive metastore, which can be obtained from the configuration file hive-site.xm and whose key is "hive-metastore-uris; Eg. thrift://172.28.1.149:7004".
format-version
No
1
For more Iceberg formats, see Iceberg Table Spec.
For more options, see Configuration.

COS configuration

No additional configurations are required. You just need to set path to the respective cosn path.

HDFS configuration

Getting the HDFS JAR package

To write data to Iceberg in a Flink SQL task, if the data is stored in HDFS, a JAR package containing HDFS configurations is required to connect Flink to the target HDFS cluster. The steps to get the JAR package and to use it are as follows:
1. Log in to the respective Hive cluster using the SSH method.
2. Get hive-site.xml and hdfs-site.xml from the following paths in the EMR Hive cluster.
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
3. Package the obtained configuration files in a JAR package.
jar -cvf hdfs-xxx.jar hdfs-site.xml
4. Check the JAR structure (run a Vim command to view it). Make sure the JAR file includes the following information and has the correct structure.
vi hdfs-xxx.jar
META-INF/
META-INF/MANIFEST.MF
hdfs-site.xml

Setting the HDFS user

Note
By default, Flink jobs access HDFS with a Flink user. If the Flink user does not have permission to write to HDFS, you can use advanced job parameters to set the accessing user to a user that has write permission or to the super-user hadoop.
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop

Kerberos authentication

1. Log in to the cluster master node to get the files krb5.conf, emr.keytab, core-site.xml, and hdfs-site.xml in the following paths.
/etc/krb5.conf
/var/krb5kdc/emr.keytab
/usr/local/service/hadoop/etc/hadoop/core-site.xml
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
2. Package the obtained configuration files in a JAR package.
jar cvf hdfs-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml
3. Check the JAR structure (run the Vim command vim hdfs-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
META-INF/
META-INF/MANIFEST.MF
emr.keytab
krb5.conf
hdfs-site.xml
core-site.xml
4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
5. Get the Kerberos principal and configure it in advanced job parameters.
klist -kt /var/krb5kdc/emr.keytab

# The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
6. Configure the principle in advanced job parameters.
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: krb5.conf
Note
The values of security.kerberos.login.keytab and security.kerberos.login.conf are the respective file names.


도움말 및 지원

문제 해결에 도움이 되었나요?

피드백