tencent cloud

腾讯云数据仓库 TCHouse-C

产品动态
产品简介
产品概述
基本概念
集群架构
产品优势
应用场景
购买指南
计费概述
到期与欠费说明
退费说明
配置变更计费说明
快速入门
操作指南
管理集群
参数配置
监控告警
冷热分层
账户与授权
查询管理
日志检索
数据字典
备份与恢复
多 Zookeeper
外部数据入仓
配置 DDL on Cluster 功能
数据重分布
缩容迁移
开发指南
数据库引擎
表引擎
ClickHouse SQL 语法参考
ClickHouse 客户端介绍
ClickHouse 自建迁移方案
服务等级协议
CDWCH 政策
隐私政策
数据隐私和安全协议
常见问题
联系我们
词汇表

Kafka 数据导入

PDF
聚焦模式
字号
最后更新时间: 2025-03-31 14:55:26
本文介绍如何从 Kafka 中实时消费数据到腾讯云数据仓库 TCHouse-C 。

前提条件

数据源 Kafka 集群和目的端 TCHouse-C 集群必须在同一个 VPC 下。

操作步骤

1. 登录腾讯云数据仓库 TCHouse-C 集群,创建 Kafka 消费表。
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1,
kafka_max_block_size = 65536,
kafka_skip_broken_messages = 0,
kafka_auto_offset_reset = 'latest';
常用参数说明如下:
名称
是否必选
说明
kafka_broker_list
Kafka 服务的 broker 列表,用逗号分隔,这里建议用 Ip:port, 不要用域名(可能存在 DNS 解析问题)。
kafka_topic_list
Kafka topic,多个 topic 用逗号分隔。
kafka_group_name
Kafka 的消费组名称。
kafka_format
Kafka 数据格式, ClickHouse 支持的 Format, 详见
文档
可选参数。
kafka_row_delimiter
行分隔符,用于分割不同的数据行。默认为“\\n”,您也可以根据数据写入的实际分割格式进行设置。
kafka_num_consumers
单个 Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应 topic 的 partitions 总数。
kafka_max_block_size
Kafka 数据写入目标表的 Block 大小,超过该数值后,就将数据刷盘;单位:Byte,默认值为65536 Byte。
kafka_skip_broken_messages
表示忽略解析异常的 Kafka 数据的条数。如果出现了 N 条异常后,后台线程结束 默认值为0。
kafka_commit_every_batch
执行 Kafka commit 的频率,取值如下: 0:完全写入一整个Block数据块的数据后才执行commit; 1:每写完一个Batch批次的数据就执行一次commit。
kafka_auto_offset_reset
从哪个 offset 开始读取 Kafka 数据。取值范围:earlist,latest。
2. 创建 ClickHouse 本地表(目标表)。
如果您的集群是单副本版:
CREATE TABLE daily on cluster default_cluster
(
  day Date,
   level String,
  total UInt64
)
engine = SummingMergeTree()
order by int_id;
如果您的集群是双副本版:
create table daily on cluster default_cluster
(
  day Date,
   level String,
  total UInt64
)
engine = ReplicatedSummingMergeTree('/clickhouse/tables/test/test/{shard}', '{replica}')
order by int_id;`
创建分布式表:
create table daily_dis on cluster default_cluster
AS test.test
engine = Distributed('default_cluster', 'default', 'daily', rand());
3. 创建物化视图,把 Kafka 消费表消费到的数据同步到 ClickHouse 目的表。
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
4. 查询。
SELECT level, sum(total) FROM daily GROUP BY level;

其他

如果要停止接收主题数据或更改转换逻辑,可以进行 detach 和 attach 视图操作。
DETACH TABLE consumer;
ATTACH TABLE consumer;


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈