tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

公网 SASL_SSL 方式接入

PDF
聚焦模式
字号
最后更新时间: 2026-01-05 15:20:01

操作场景

本文以 Python 客户端为例介绍在公网环境下,使用 SASL_SSL 方式接入消息队列 CKafka 版并收发消息的过程。

前提条件

操作步骤

步骤1:准备工作

1. 创建接入点。
1.1 实例列表 页面,单击目标实例 ID,进入实例详情页。
1.2 基本信息 > 接入方式 中,单击添加路由策略,在打开窗口中选择:路由类型:公网域名接入,接入方式:SASL_SSL

2. 创建角色。 在 ACL 策略管理下的用户管理页面新建角色,设置密码。

3. 创建 Topic。 在控制台 Topic 列表页面新建 Topic(参见 创建 Topic)。
4. 配置 ACL 策略。
参考配置 Topic 读写权限为创建好的角色配置 Topic 的读写权限。
5. 添加 Python 依赖库。
执行以下命令安装:
pip install kafka-python

步骤2:生产消息

1. 修改生产消息程序 producer.py 中配置参数。
producer = KafkaProducer(
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1, 1),

#
# SASL_SSL 公网接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = "PLAIN",
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)

message = "Hello World! Hello Ckafka!"
msg = json.dumps(message).encode()
producer.send('topic_name', value = msg)
print("produce message " + message + " success.")
producer.close()
参数
描述
bootstrap_servers
接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。
sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在 CKafka 控制台 的实例详情页面的基本信息获取,用户在ACL策略管理下的用户管理创建用户时设置。
sasl_plain_password
用户密码,在 CKafka 控制台实例详情页面ACL策略管理下的用户管理创建用户时设置。
topic_name
Topic 名称,您可以在控制台上 topic 列表页面复制。
CARoot.pem
采用 SASL_SSL 方式接入时,所需的证书路径。
2. 编译并运行 producer.py。
3. 查看运行结果。


4. CKafka 控制台Topic 列表页面,选择对应的 Topic , 单击更多 > 消息查询,查看刚刚发送的消息。


步骤3:消费消息

1. 修改消费消息程序 consumer.py 中配置参数。
consumer = KafkaConsumer(
'topic_name',
group_id = "group_id",
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1,1),

#
# SASL_SSL 公网接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,

)

for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
参数
描述
bootstrap_servers
接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。
group_id
消费者的组 ID,根据业务需求自定义。
sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在CKafka 控制台的实例详情页面的基本信息获取,用户在ACL策略管理下的用户管理创建用户时设置。
sasl_plain_password
用户名密码,在 CKafka 控制台实例详情页面ACL策略管理下的用户管理创建用户时设置
topic_name
Topic 名称,您可以在控制台上 topic 列表页面复制。
CARoot.pem
采用 SASL_SSL 方式接入时,所需的证书路径。
2. 编译并运行 consumer.py。
3. 查看运行结果。


4. CKafka 控制台Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查看详情,查看消费详情。


问题排查

SSL 证书错误

如您使用以上 Demo 报如下 SSL CERTIFICATE_VERIFY_FAILED 错误,请先检查下载的证书文件(SSL 证书)是否正确,如依然报错,请 提交工单 ,联系后端工程师排查。
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/sender-py", line 160, in run_once
self._client.poll(timeout_ms=poll_timeout_ms)
File"/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/client_async.py", line602, in poll
self._poll(timeout / 1000)
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/client_async.py", line 648, in _poll
conn.connect()
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/conn.py", line 429, in connect
if self._try_handshake():
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/conn.py", line 508, in _try_handshake
self._sock.do_handshake()
File "/root/anaconda3/envs/py39/lib/python3.9/ssl.py", line 1343, in do_handshake
self._sslobj.do_handshake()
ssl.SSLCertVerificationError:[SSL:CERTIFICATE_VERIFY_FAILED]certificate verify failed:certificate signature failure(_ssl.c:1133)
WARNING:kafka.conn:SSL connection closed by server during handshake.
INF0:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=ckafka-xxx.ap-beijing.ckafka.tencentcloudmq.com:50001 <handshake>[IPv4('x.x.x.x', 50001)]>:Closing connection.Kafka Connection Error:SSL connection closed by server during handshake
^CTraceback(most recent call last):
File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py",line 49,in<module>
main()
File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py", line 43, in main
send_message(producer, 'skdy_osr_1005',message)
File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py", line 32, in send_message
future = producer.send(topic, value=msg)
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/kafka.py", line 576, in send
self._wait_on_metadata(topic, self.config['max_block_ms']/1000.0)
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/kafka.py", line 699, in _wait_on_metadata
metadata_event.wait(max_wait-elapsed)
File "/root/anaconda3/envs/py39/lib/python3.9/threading-py", line 581,in wait
signaled = self._cond.wait(timeout)
File "/root/anaconda3/envs/py39/lib/python3.9/threading-py", line 316, in wait
gotit =waiter.acquire(True, timeout)
KeyboardInterrupt
INFo:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFo:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed with in timeout 0.


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈