pip install kafka-python
producer.py
中配置参数。producer = KafkaProducer(bootstrap_servers = ['xx.xx.xx.xx:port'],#地址api_version = (1, 1),security_protocol = "SASL_PLAINTEXT",sasl_mechanism = "PLAIN",sasl_plain_username = "username",#用户名sasl_plain_password = "password",#密码)message = "Hello World! Hello Ckafka!"msg = json.dumps(message).encode()producer.send('topic_name', value = msg)#topic名称print("produce message " + message + " success.")producer.close()
参数 | 描述 |
bootstrapServers | 接入地址,在控制台的弹性 Topic 基本信息页面获取。 |
sasl_plain_username | 用户名,在控制台的弹性 Topic 基本信息页面获取。 |
sasl_plain_password | 用户密码,在控制台的弹性 Topic 基本信息页面获取。 |
topic_name | Topic 名称,在控制台的弹性 Topic 基本信息页面获取。 |
consumer = KafkaConsumer('topic_name',#topic名称group_id = "group_id",#消费组bootstrap_servers = ['xx.xx.xx.xx:port'],#地址api_version = (1,1),security_protocol = "SASL_PLAINTEXT",sasl_mechanism = 'PLAIN',sasl_plain_username = "username",#用户名sasl_plain_password = "password",#密码)for message in consumer:print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %(message.topic, message.partition, message.offset, message.value))
参数 | 描述 |
bootstrapServers | 接入地址,在控制台的弹性 Topic 基本信息页面获取。 |
sasl_plain_username | 用户名,在控制台的弹性 Topic 基本信息页面获取。 |
sasl_plain_password | 用户密码,在控制台的弹性 Topic 基本信息页面获取。 |
topic_name | Topic 名称,在控制台的弹性 Topic 基本信息页面获取。 |
group.id | 消费组名称,在控制台的弹性 Topic 的订阅关系列表获取。 |
本页内容是否解决了您的问题?