pip install kafka-python
pip install confluent-kafka
pip install aiokafka
pip install pykafka
pip install kafka-python
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092', # 用于初始化连接到Kafka集群的broker列表,默认值为'localhost:9092'client_id=None, # 自定义客户端ID,用于在Kafka服务端日志中识别客户端,默认值为Nonekey_serializer=None, # 用于将消息键序列化为字节的可调用对象,默认值为Nonevalue_serializer=None, # 用于将消息值序列化为字节的可调用对象,默认值为Nonecompression_type=None, # 消息压缩类型,可选值为'gzip', 'snappy', 'lz4'或None,表示不压缩,默认值为Noneretries=0, # 重新发送失败的消息的次数,默认值为0batch_size=16384, # 用于批处理消息的大小,单位为字节,默认值为16384linger_ms=0, # 在批处理消息之前等待更多消息的最长时间,单位为毫秒,默认值为0partitioner=None, # 用于确定消息分区的可调用对象,默认值为Nonebuffer_memory=33554432, # 用于缓冲待发送消息的内存总量,单位为字节,默认值为33554432connections_max_idle_ms=540000, # 空闲连接的最长保持时间,单位为毫秒,默认值为540000max_block_ms=60000, # 在达到缓冲区内存限制时,send()方法阻塞的最长时间,单位为毫秒,默认值为60000max_request_size=1048576, # 发送到broker的请求的最大字节数,默认值为1048576metadata_max_age_ms=300000, # 元数据在本地缓存中的最长存活时间,单位为毫秒,默认值为300000retry_backoff_ms=100, # 两次重试之间的等待时间,单位为毫秒,默认值为100request_timeout_ms=30000, # 客户端等待请求响应的最长时间,单位为毫秒,默认值为30000receive_buffer_bytes=32768, # 用于接收数据的网络缓冲区大小,单位为字节,默认值为32768send_buffer_bytes=131072, # 用于发送数据的网络缓冲区大小,单位为字节,默认值为131072acks='all', # 消息确认机制,可选值为'0', '1', 或'all',默认值为'all'transactional_id=None, # 事务ID,用于标识生产者参与事务的唯一标识,默认值为Nonetransaction_timeout_ms=60000, # 事务超时时间,单位为毫秒,默认值为60000enable_idempotence=False, # 是否启用幂等性,默认值为Falsesecurity_protocol='PLAINTEXT', # 安全协议类型,可选值为'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL',默认值为'PLAINTEXT'
{"Compression","lz4"}
from kafka import KafkaProducerimport sys# 参数配置BOOTSTRAP_SERVERS = 'localhost:9092'TOPIC = 'test_topic'SYNC = TrueACKS = '1' # leader副本确认写入即可LINGER_MS = 500 # 延迟500ms发送BATCH_SIZE = 16384 # 消息批次大小16KBdef create_producer(servers, acks, linger_ms, batch_size):return KafkaProducer(bootstrap_servers=servers, acks=acks, linger_ms=linger_ms, batch_size=batch_size)def send_message_sync(producer, topic, message):future = producer.send(topic, message)result = future.get(timeout=10)print(f"Sent message: {message} to topic: {topic}, partition: {result.partition}, offset: {result.offset}")def send_message_async(producer, topic, message):def on_send_success(record_metadata):print(f"Sent message: {message} to topic: {topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")def on_send_error(excp):print(f"Error sending message: {message} to topic: {topic}", file=sys.stderr)print(excp, file=sys.stderr)future = producer.send(topic, message)future.add_callback(on_send_success).add_errback(on_send_error)def main():producer = create_producer(BOOTSTRAP_SERVERS, ACKS, LINGER_MS, BATCH_SIZE)messages = ['Hello Kafka', 'Async vs Sync', 'Demo']if SYNC:for message in messages:send_message_sync(producer, TOPIC, message.encode('utf-8'))else:for message in messages:send_message_async(producer, TOPIC, message.encode('utf-8'))producer.flush()if __name__ == '__main__':main()
from kafka import KafkaConsumer# 创建一个KafkaConsumer对象,用于连接Kafka集群并消费消息consumer = KafkaConsumer('topic_name', # 要订阅的主题列表bootstrap_servers=['localhost:9092'], # Kafka集群的接入点group_id=None, # 消费者组ID,用于将消费者分组,要加入动态分区分配(如果启用)并用于获取和提交偏移量的消费者组的名称。如果为 None,则禁用自动分区分配(通过组协调器)和偏移量提交。client_id='kafka-python-{version}',#客户端Id,默认是kafka-python-{version}api_version=None, #指定要使用的 Kafka API 版本。如果设置为 None,客户端将尝试通过API请求来启动不同版本的功能enable_auto_commit=True, # 是否自动提交消费位置,默认为Trueauto_commit_interval_ms=5000, # 自动提交消费位置的间隔,默认为5秒(5000毫秒)auto_offset_reset='latest', # 消费者在读取的分区中的消费位置的策略,默认为'latest'(从最新的位置开始消费)fetch_min_bytes=1, # 消费者在读取分区时的最小字节数,默认为1字节fetch_max_wait_ms=500, # 在没有新的消费数据,默认等待500msfetch_max_bytes=52428800, # 消费者在读取分区时的最大字节数,默认为52428800字节(50MB)max_poll_interval_ms=300000 # 消参数默认值为300000毫秒(5分钟)。如果消费者在5分钟内没有发送心跳信号,它将被认为已经失去连接,并将被从消费者组中移除。在这种情况下,其他消费者将接管被移除的消费者的分区并发起重平衡。retry_backoff_ms=100, # 重试间隔时间,默认为100毫秒reconnect_backoff_max_ms=1000, # 重新连接到多次连接失败的Broker的间隔时间最大值(以毫秒为单位)。如果连接失败,将在每次连续连接失败时呈指数增加,直至达到此最大值。一旦达到最大值,重新连接尝试将以该固定速率定期继续。request_timeout_ms=305000, # 客户端请求超时(以毫秒为单位)session_timeout_ms=10000, # session_timeout_ms (int) – 使用 Kafka 组管理工具时用于检测故障的超时时间。消费者定期发送心跳给Broker表明其活跃度。如果在此会话超时到期之前Broker没有收到心跳,则Broker将改消费组从组中删除该消费者并启动重新平衡。heartbeat_interval_ms=3000, # 使用 Kafka 的组管理工具时,向消费者协调器发出心跳之间的预期时间(以毫秒为单位)。心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于 session_timeout_ms,但通常不应高于该值的 1/3。receive_buffer_bytes=32768,#读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。默认值:无(依赖于系统默认值),默认为32768。send_buffer_bytes=131072# 发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 的大小。默认值:无(依赖于系统默认值),131072。)for message in consumer:print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
# auto_commit_consumer_interval.pyfrom kafka import KafkaConsumerfrom time import sleepconsumer = KafkaConsumer('your_topic_name',bootstrap_servers=['localhost:9092'],group_id='auto_commit_group',auto_commit_interval_ms=5000 # 设置自动提交位点的间隔为5000毫秒(5秒))for message in consumer:print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")sleep(1)
# manual_commit_consumer.pyfrom kafka import KafkaConsumerfrom kafka.errors import KafkaErrorfrom time import sleepconsumer = KafkaConsumer('your_topic_name',bootstrap_servers=['localhost:9092'],group_id='manual_commit_group',enable_auto_commit=False)count = 0for message in consumer:print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")count += 1if count % 10 == 0:try:consumer.commit()except KafkaError as e:print(f"Error while committing offset: {e}")sleep(1)
本页内容是否解决了您的问题?