新功能发布记录
Broker 版本升级记录
公告
路由类型:公网域名接入,接入方式:SASL_SSL。

pip install kafka-python
producer.py 中配置参数。producer = KafkaProducer(bootstrap_servers = ['xx.xx.xx.xx:port'],api_version = (1, 1),## SASL_SSL 公网接入#security_protocol = "SASL_SSL",sasl_mechanism = "PLAIN",sasl_plain_username = "instanceId#username",sasl_plain_password = "password",ssl_cafile = "CARoot.pem",ssl_check_hostname = False,)message = "Hello World! Hello Ckafka!"msg = json.dumps(message).encode()producer.send('topic_name', value = msg)print("produce message " + message + " success.")producer.close()
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。 |
sasl_plain_username | |
sasl_plain_password | 用户密码,在 CKafka 控制台实例详情页面ACL策略管理下的用户管理创建用户时设置。 |
topic_name | Topic 名称,您可以在控制台上 topic 列表页面复制。 |
CARoot.pem | 采用 SASL_SSL 方式接入时,所需的证书路径。 |


consumer = KafkaConsumer('topic_name',group_id = "group_id",bootstrap_servers = ['xx.xx.xx.xx:port'],api_version = (1,1),## SASL_SSL 公网接入#security_protocol = "SASL_SSL",sasl_mechanism = 'PLAIN',sasl_plain_username = "instanceId#username",sasl_plain_password = "password",ssl_cafile = "CARoot.pem",ssl_check_hostname = False,)for message in consumer:print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %(message.topic, message.partition, message.offset, message.value))
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。 |
group_id | 消费者的组 ID,根据业务需求自定义。 |
sasl_plain_username | 用户名,格式为 实例 ID + # + 用户名。实例 ID 在CKafka 控制台的实例详情页面的基本信息获取,用户在ACL策略管理下的用户管理创建用户时设置。 |
sasl_plain_password | 用户名密码,在 CKafka 控制台实例详情页面ACL策略管理下的用户管理创建用户时设置 |
topic_name | Topic 名称,您可以在控制台上 topic 列表页面复制。 |
CARoot.pem | 采用 SASL_SSL 方式接入时,所需的证书路径。 |


File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/sender-py", line 160, in run_onceself._client.poll(timeout_ms=poll_timeout_ms)File"/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/client_async.py", line602, in pollself._poll(timeout / 1000)File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/client_async.py", line 648, in _pollconn.connect()File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/conn.py", line 429, in connectif self._try_handshake():File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/conn.py", line 508, in _try_handshakeself._sock.do_handshake()File "/root/anaconda3/envs/py39/lib/python3.9/ssl.py", line 1343, in do_handshakeself._sslobj.do_handshake()ssl.SSLCertVerificationError:[SSL:CERTIFICATE_VERIFY_FAILED]certificate verify failed:certificate signature failure(_ssl.c:1133)WARNING:kafka.conn:SSL connection closed by server during handshake.INF0:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=ckafka-xxx.ap-beijing.ckafka.tencentcloudmq.com:50001 <handshake>[IPv4('x.x.x.x', 50001)]>:Closing connection.Kafka Connection Error:SSL connection closed by server during handshake^CTraceback(most recent call last):File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py",line 49,in<module>main()File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py", line 43, in mainsend_message(producer, 'skdy_osr_1005',message)File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py", line 32, in send_messagefuture = producer.send(topic, value=msg)File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/kafka.py", line 576, in sendself._wait_on_metadata(topic, self.config['max_block_ms']/1000.0)File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/kafka.py", line 699, in _wait_on_metadatametadata_event.wait(max_wait-elapsed)File "/root/anaconda3/envs/py39/lib/python3.9/threading-py", line 581,in waitsignaled = self._cond.wait(timeout)File "/root/anaconda3/envs/py39/lib/python3.9/threading-py", line 316, in waitgotit =waiter.acquire(True, timeout)KeyboardInterruptINFo:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.INFo:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed with in timeout 0.
文档反馈