Release Notes
Broker Release Notes
Announcement
Routing Type: Public Network Domain Name Access > Access Method: SASL_SSL.

pip install kafka-python
producer.py.producer = KafkaProducer(bootstrap_servers = ['xx.xx.xx.xx:port'],api_version = (1, 1),## SASL_SSL access in the public network.#security_protocol = "SASL_SSL",sasl_mechanism = "PLAIN",sasl_plain_username = "instanceId#username",sasl_plain_password = "password",ssl_cafile = "CARoot.pem",ssl_check_hostname = False,)message = "Hello World! Hello Ckafka!"msg = json.dumps(message).encode()producer.send('topic_name', value = msg)print("produce message " + message + " success.")producer.close()
Parameter | Description |
bootstrap_servers | Access network. On the Basic Info page of the instance in the console, select the Access Mode module and copy the network information from the Network column. |
sasl_plain_username | Username, in the format of instance ID + # + username. The instance ID can be obtained from the basic information on the instance details page in the CKafka console. Choose ACL Policy Management > User Management to create a user and set the username. |
sasl_plain_password | User password. On the instance details page in the CKafka console, choose ACL Policy Management > User Management to create a user and set the password. |
topic_name | Topic name. Copy the name on the Topic List page in the console. |
CARoot.pem | The certificate path required when the SASL_SSL access method is used. |

consumer = KafkaConsumer('topic_name',group_id = "group_id",bootstrap_servers = ['xx.xx.xx.xx:port'],api_version = (1,1),## SASL_SSL access in the public network.#security_protocol = "SASL_SSL",sasl_mechanism = 'PLAIN',sasl_plain_username = "instanceId#username",sasl_plain_password = "password",ssl_cafile = "CARoot.pem",ssl_check_hostname = False,)for message in consumer:print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %(message.topic, message.partition, message.offset, message.value))
Parameter | Description |
bootstrap_servers | Access network. On the Basic Info page of the instance in the console, select the Access Mode module and copy the network information from the Network column. |
group_id | Consumer group ID. Define the group ID according to business requirements. |
sasl_plain_username | Username, in the format of instance ID + # + username. The instance ID can be obtained from the basic information on the instance details page in the CKafka console. Choose ACL Policy Management > User Management to create a user and set the username. |
sasl_plain_password | User password. On the instance details page in the CKafka console, choose ACL Policy Management > User Management to create a user and set the password. |
topic_name | Topic name. Copy the name on the Topic List page in the console. |
CARoot.pem | The certificate path required when the SASL_SSL access method is used. |

File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/sender-py", line 160, in run_onceself._client.poll(timeout_ms=poll_timeout_ms)File"/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/client_async.py", line602, in pollself._poll(timeout / 1000)File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/client_async.py", line 648, in _pollconn.connect()File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/conn.py", line 429, in connectif self._try_handshake():File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/conn.py", line 508, in _try_handshakeself._sock.do_handshake()File "/root/anaconda3/envs/py39/lib/python3.9/ssl.py", line 1343, in do_handshakeself._sslobj.do_handshake()ssl.SSLCertVerificationError:[SSL:CERTIFICATE_VERIFY_FAILED]certificate verify failed:certificate signature failure(_ssl.c:1133)WARNING:kafka.conn:SSL connection closed by server during handshake.INF0:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=ckafka-xxx.ap-beijing.ckafka.tencentcloudmq.com:50001 <handshake>[IPv4('x.x.x.x', 50001)]>:Closing connection.Kafka Connection Error:SSL connection closed by server during handshake^CTraceback(most recent call last):File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py",line 49,in<module>main()File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py", line 43, in mainsend_message(producer, 'skdy_osr_1005',message)File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py", line 32, in send_messagefuture = producer.send(topic, value = msg)File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/kafka.py", line 576, in sendself._wait_on_metadata(topic, self.config['max_block_ms']/1000.0)File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/kafka.py", line 699, in _wait_on_metadatametadata_event.wait(max_wait-elapsed)File "/root/anaconda3/envs/py39/lib/python3.9/threading-py", line 581,in waitsignaled = self._cond.wait(timeout)File "/root/anaconda3/envs/py39/lib/python3.9/threading-py", line 316, in waitgotit =waiter.acquire(True, timeout)KeyboardInterruptINFo:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.INFo:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed with in timeout 0.
Apakah halaman ini membantu?
Anda juga dapat Menghubungi Penjualan atau Mengirimkan Tiket untuk meminta bantuan.
masukan