路由类型:公网域名接入
,接入方式:SASL_SSL
。pip install kafka-python
producer.py
中配置参数。producer = KafkaProducer(bootstrap_servers = ['xx.xx.xx.xx:port'],api_version = (1, 1),## SASL_SSL 公网接入#security_protocol = "SASL_SSL",sasl_mechanism = "PLAIN",sasl_plain_username = "instanceId#username",sasl_plain_password = "password",ssl_cafile = "CARoot.pem",ssl_check_hostname = False,)message = "Hello World! Hello Ckafka!"msg = json.dumps(message).encode()producer.send('topic_name', value = msg)print("produce message " + message + " success.")producer.close()
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 |
sasl_plain_username | |
sasl_plain_password | 用户密码,在 CKafka 控制台实例详情页面ACL策略管理下的用户管理创建用户时设置。 |
topic_name | Topic 名称,您可以在控制台上 topic 管理页面复制。 |
CARoot.pem | 采用 SASL_SSL 方式接入时,所需的证书路径。 |
producer.py
。consumer = KafkaConsumer('topic_name',group_id = "group_id",bootstrap_servers = ['xx.xx.xx.xx:port'],api_version = (1,1),## SASL_SSL 公网接入#security_protocol = "SASL_SSL",sasl_mechanism = 'PLAIN',sasl_plain_username = "instanceId#username",sasl_plain_password = "password",ssl_cafile = "CARoot.pem",ssl_check_hostname = False,)for message in consumer:print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %(message.topic, message.partition, message.offset, message.value))
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 |
group_id | 消费者的组 ID,根据业务需求自定义。 |
sasl_plain_username | 用户名,格式为 实例 ID + # + 用户名 。实例 ID 在CKafka 控制台的实例详情页面的基本信息获取,用户在ACL策略管理下的用户管理创建用户时设置。 |
sasl_plain_password | 用户名密码,在 CKafka 控制台实例详情页面ACL策略管理下的用户管理创建用户时设置 |
topic_name | Topic 名称,您可以在控制台上 topic 管理页面复制。 |
CARoot.pem | 采用 SASL_SSL 方式接入时,所需的证书路径。 |
本页内容是否解决了您的问题?