tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表
文档消息队列 CKafka 版SDK 参考Java SDK公网 SASL_PLAINTEXT 方式接入

公网 SASL_PLAINTEXT 方式接入

PDF
聚焦模式
字号
最后更新时间: 2026-01-05 15:20:01

操作背景

本文以 Java 客户端为例介绍在公网环境下,使用 SASL_PLAINTEXT 方式接入消息队列 CKafka 版并收发消息的过程。

前提条件

操作步骤

步骤1:控制台配置

1. 创建接入点。
1.1 实例列表 页面,单击目标实例 ID,进入实例详情页。
1.2 基本信息 > 接入方式 中,单击添加路由策略,在弹窗中选择:路由类型:公网域名接入, 接入方式:SASL_PLAINTEXT

2. 创建角色。
ACL 策略管理下的用户管理页面新建角色,设置密码。

3. 创建 Topic。 在控制台 Topic 列表页面新建 Topic(参见 创建 Topic)。
4. 配置 ACL 策略。
参考配置 Topic 读写权限为创建好的角色配置 Topic 的读写权限。

步骤2:添加配置文件

1. 在 pom.xml 文件中添加以下依赖。
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
</dependencies>
2. 创建 JAAS 配置文件 ckafka_client_jaas.conf,使用ACL策略管理下的用户管理界面创建的用户进行修改。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="yourinstance#yourusername"
password="yourpassword";
};
说明
username 是实例 ID + # + 配置的用户名,password 是配置的用户密码。
3. 创建消息队列 CKafka 配置文件 kafka.properties。
## 配置接入网络,在控制台的实例详情页面接入方式模块的网络列复制。
bootstrap.servers=ckafka-xxxxxxx
## 配置 Topic,在控制台上 topic 管理页面复制。
topic=XXX
## 配置 consumer group,您可以自定义设置
group.id=XXX
## SASL 配置
java.security.auth.login.config.plain=/xxxx/ckafka_client_jaas.conf
参数
说明
bootstrap.servers
接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。
topic
Topic 名称,您可以在控制台上 topic 列表页面复制。
group.id
您可以自定义设置,Demo 运行成功后可以在 Consumer Group 页面看到该消费者。
java.security.auth.login.config.plain
填写 JAAS 配置文件 ckafka_client_jaas.conf 的路径。
4. 创建配置文件加载程序 CKafkaConfigurer.java。
public class CKafkaConfigurer {

private static Properties properties;

public static void configureSaslPlain() {
//如果用 -D 或者其它方式设置过,这里不再设置。
if (null == System.getProperty("java.security.auth.login.config")) {
//请注意将 XXX 修改为自己的路径。
System.setProperty("java.security.auth.login.config",
getCKafkaProperties().getProperty("java.security.auth.login.config.plain"));
}
}

public synchronized static Properties getCKafkaProperties() {
if (null != properties) {
return properties;
}
//获取配置文件 kafka.properties 的内容。
Properties kafkaProperties = new Properties();
try {
kafkaProperties.load(CKafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
} catch (Exception e) {
System.out.println("getCKafkaProperties error");
}
properties = kafkaProperties;
return kafkaProperties;
}
}

步骤3:发送消息

1. 创建发送消息程序 KafkaSaslProducerDemo.java。
public class KafkaSaslProducerDemo {

public static void main(String[] args) {
//设置 JAAS 配置文件的路径。
CKafkaConfigurer.configureSaslPlain();

//加载 kafka.properties。
Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();

Properties props = new Properties();
//设置接入点,请通过控制台获取对应 Topic 的接入点。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));

//
// SASL_PLAINTEXT 公网接入
//
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
// SASL 采用 Plain 方式。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

//消息队列 Kafka 版消息的序列化方式。
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//请求的最长等待时间。
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
//设置客户端内部重试次数。
props.put(ProducerConfig.RETRIES_CONFIG, 5);
//设置客户端内部重试间隔。
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
//ack=0 producer 将不会等待来自 broker 的确认,重试配置不会生效。注意如果被限流了后,就会被关闭连接。
//ack=1 broker leader 将不会等待所有 broker follower 的确认,就返回 ack。
//ack=all broker leader 将等待所有 broker follower 的确认,才返回 ack。
props.put(ProducerConfig.ACKS_CONFIG, "all");
//构造 Producer 对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可。
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

//构造一个消息队列 Kafka 版消息。
String topic = kafkaProperties.getProperty("topic"); //消息所属的Topic,请在控制台申请之后,填写在这里。
String value = "this is ckafka msg value"; //消息的内容。

try {
//批量获取 Future 对象可以加快速度。但注意,批量不要太大。
List<Future<RecordMetadata>> futures = new ArrayList<>(128);
for (int i = 0; i < 100; i++) {
//发送消息,并获得一个Future对象。
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic,
value + ": " + i);
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
futures.add(metadataFuture);

}
producer.flush();
for (Future<RecordMetadata> future : futures) {
//同步获得 Future 对象的结果。
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
}
} catch (Exception e) {
//客户端内部重试之后,仍然发送失败,业务要应对此类错误。
System.out.println("error occurred");
}
}
}
2. 编译并运行 KafkaSaslProducerDemo.java 发送消息。
3. 运行结果(输出)。
Produce ok:ckafka-topic-demo-0@198
Produce ok:ckafka-topic-demo-0@199
4. 在 CKafka 控制台 Topic 列表页面,选择对应的 Topic,单击更多 > 消息查询,查看刚刚发送的消息。



步骤4:消费消息

1. 创建 Consumer 订阅消息程序 KafkaSaslConsumerDemo.java
public class KafkaSaslConsumerDemo {

public static void main(String[] args) {
//设置JAAS配置文件的路径。
CKafkaConfigurer.configureSaslPlain();

//加载kafka.properties。
Properties kafkaProperties = CKafkaConfigurer.getCKafkaProperties();

Properties props = new Properties();
//设置接入点,请通过控制台获取对应Topic的接入点。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));

//
// SASL_PLAINTEXT 公网接入
//
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
// SASL 采用 Plain 方式。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

//消费者超时时长
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//两次poll的最长时间间隔
//0.10.1.0 版本前这2个概念是混合的,都用session.timeout.ms表示
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
//每次 Poll 的最大数量。
//注意该值不要改得太大,如果 Poll 太多数据,而不能在下次 Poll 之前消费完,则会触发一次负载均衡,产生卡顿。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//消息的反序列化方式。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//当前消费实例所属的消费组,请在控制台申请之后填写。
//属于同一个组的消费实例,会负载消费消息。
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
//构造消费对象,也即生成一个消费实例。
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//设置消费组订阅的 Topic,可以订阅多个。
//如果 GROUP_ID_CONFIG 是一样,则订阅的 Topic 也建议设置成一样。
List<String> subscribedTopics = new ArrayList<String>();
//如果需要订阅多个 Topic,则在这里添加进去即可。
//每个 Topic 需要先在控制台进行创建。
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic : topics) {
subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);

//循环消费消息。
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
//必须在下次 Poll 之前消费完这些数据, 且总耗时不得超过 SESSION_TIMEOUT_MS_CONFIG。
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("Consume partition:%d offset:%d", record.partition(),
record.offset()));
}
} catch (Exception e) {
System.out.println("consumer error!");
}
}
}
}
2. 编译并运行 KafkaSaslConsumerDemo.java 消费消息。
3. 运行结果。
Consume partition:0 offset:298
Consume partition:0 offset:299

4. 在 CKafka 控制台 Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查看详情,查看消费详情。



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈