pip install kafka-python
producer.py
.producer = KafkaProducer(bootstrap_servers = ['xx.xx.xx.xx:port'],$addressapi_version = (1, 1),security_protocol = "SASL_PLAINTEXT",sasl_mechanism = "PLAIN",sasl_plain_username = "username",# usernamesasl_plain_password = "password",# password)message = "Hello World! Hello Ckafka!"msg = json.dumps(message).encode()producer.send('topic_name', value = msg)#Topic nameprint("produce message " + message + " success.")producer.close()
Parameter | Description |
bootstrapServers | The connection address. It can be obtained from the basic information page of an elastic Topic in the console. |
sasl_plain_username | The username. It can be obtained from the basic information page of an elastic Topic in the console. |
sasl_plain_password | The user password. It can be obtained from the basic information page of an elastic Topic in the console. |
topic_name | The topic name. It can be obtained from the basic information page of an elastic Topic in the console. |
consumer = KafkaConsumer('topic_name',#topic namegroup_id = "group_id",#Consumption groupbootstrap_servers = ['xx.xx.xx.xx:port'],# addressapi_version = (1,1),security_protocol = "SASL_PLAINTEXT",sasl_mechanism = 'PLAIN',sasl_plain_username = "username",# usernamesasl_plain_password = "password",# password)for message in consumer:print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %(message.topic, message.partition, message.offset, message.value))
Parameter | Description |
bootstrapServers | The connection address. It can be obtained from the basic information page of an elastic Topic in the console. |
sasl_plain_username | The username. It can be obtained from the basic information page of an elastic Topic in the console. |
sasl_plain_password | The user password. It can be obtained from the basic information page of an elastic Topic in the console. |
topic_name | The topic name. It can be obtained from the basic information page of an elastic Topic in the console. |
group.id | The consumption group name. It can be obtained from the subscription relationship list of an elastic Topic in the console. |
문제 해결에 도움이 되었나요?