路由类型:公网域名接入
, 接入方式:SASL_PLAINTEXT
。pip install kafka-python
producer.py
中配置参数。producer = KafkaProducer(bootstrap_servers = ['xx.xx.xx.xx:port'],api_version = (1, 1),## SASL_PLAINTEXT 公网接入#security_protocol = "SASL_PLAINTEXT",sasl_mechanism = "PLAIN",sasl_plain_username = "instanceId#username",sasl_plain_password = "password",)message = "Hello World! Hello Ckafka!"msg = json.dumps(message, ensure_ascii=False).encode()producer.send('topic_name', value = msg)print("produce message " + message + " success.")producer.close()
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 |
sasl_plain_username | |
sasl_plain_password | 用户密码,在 CKafka 控制台实例详情页面ACL策略管理下的用户管理创建用户时设置。 |
topic_name | Topic 名称,您可以在控制台上topic管理页面复制。 |
producer.py
。consumer = KafkaConsumer('topic_name',group_id = "group_id",bootstrap_servers = ['xx.xx.xx.xx:port'],api_version = (1,1),## SASL_PLAINTEXT 公网接入#security_protocol = "SASL_PLAINTEXT",sasl_mechanism = 'PLAIN',sasl_plain_username = "instanceId#username",sasl_plain_password = "password",)for message in consumer:print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %(message.topic, message.partition, message.offset, message.value))
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 |
group_id | 消费者的组 ID,根据业务需求自定义。 |
sasl_plain_username | 用户名,格式为 实例 ID + # + 用户名 。实例 ID 在CKafka 控制台的实例详情页面的基本信息获取,用户在 ACL 策略管理
下的用户管理创建用户时设置。 |
sasl_plain_password | 用户名密码,在 CKafka 控制台实例详情页面 ACL 策略管理下的用户管理创建用户时设置。 |
topic_name | Topic 名称,您可以在控制台上 topic 管理页面复制。 |
consumer.py
。
本页内容是否解决了您的问题?