tencent cloud

文档反馈

Kafka Python SDK

最后更新时间:2024-07-04 16:00:59

    背景

    CKafka 的 Python 客户端有以下几个主要的库:
    kafka-python:这是一个纯 Python 实现的 Kafka 客户端,支持 Kafka 0.8.2及更高版本。它提供了生产者、消费者和管理 Kafka 集群的 API。这个库易于使用,但性能可能不如基于 librdkafka 的客户端。
    安装方法:pip install kafka-python
    confluent-kafka-python:这个库是基于高性能的 C 库 librdkafka 实现的。它支持Kafka 0.9及更高版本,并提供了生产者、消费者和管理 Kafka 集群的 API。这个库性能更好,但可能需要安装额外的依赖。
    安装方法:pip install confluent-kafka
    aiokafka:这是一个基于 kafka-python 的异步 Kafka 客户端,使用 asyncio 库。这个库适用于需要异步编程的场景。
    安装方法:pip install aiokafka
    pykafka:这是一个支持 Kafka 0.8.x 版本的 Python 客户端。它提供了生产者、消费者和管理 Kafka 集群的 API。这个库已经不再积极维护,但仍然适用于需要支持较旧版本的 Kafka 的场景。
    安装方法:pip install pykafka
    在选择 Python Kafka 客户端时,请根据您的应用需求和 Kafka 版本选择合适的库。对于大多数场景,推荐使用 kafka-python 或 confluent-kafka-python,因为它们支持较新的 Kafka 版本并且功能更完善。如果您的应用需要异步编程,可以考虑使用 aiokafka。
    本文重点介绍 kafka-python 的使用方式,官网文档参见 kafka-python

    生产者实践

    版本选择

    在使用 kafka-python 时,需要先安装 kafka-python 库。可以使用以下命令进行安装:
    pip install kafka-python

    生产者参数与调优

    生产者参数

    Kafka Python 涉及如下关键参数,相关的参数和默认值如下:
    from kafka import KafkaProducer
    
    producer = KafkaProducer(
    bootstrap_servers='localhost:9092', # 用于初始化连接到Kafka集群的broker列表,默认值为'localhost:9092'
    client_id=None, # 自定义客户端ID,用于在Kafka服务端日志中识别客户端,默认值为None
    key_serializer=None, # 用于将消息键序列化为字节的可调用对象,默认值为None
    value_serializer=None, # 用于将消息值序列化为字节的可调用对象,默认值为None
    compression_type=None, # 消息压缩类型,可选值为'gzip', 'snappy', 'lz4'None,表示不压缩,默认值为None
    retries=0, # 重新发送失败的消息的次数,默认值为0
    batch_size=16384, # 用于批处理消息的大小,单位为字节,默认值为16384
    linger_ms=0, # 在批处理消息之前等待更多消息的最长时间,单位为毫秒,默认值为0
    partitioner=None, # 用于确定消息分区的可调用对象,默认值为None
    buffer_memory=33554432, # 用于缓冲待发送消息的内存总量,单位为字节,默认值为33554432
    connections_max_idle_ms=540000, # 空闲连接的最长保持时间,单位为毫秒,默认值为540000
    max_block_ms=60000, # 在达到缓冲区内存限制时,send()方法阻塞的最长时间,单位为毫秒,默认值为60000
    max_request_size=1048576, # 发送到broker的请求的最大字节数,默认值为1048576
    metadata_max_age_ms=300000, # 元数据在本地缓存中的最长存活时间,单位为毫秒,默认值为300000
    retry_backoff_ms=100, # 两次重试之间的等待时间,单位为毫秒,默认值为100
    request_timeout_ms=30000, # 客户端等待请求响应的最长时间,单位为毫秒,默认值为30000
    receive_buffer_bytes=32768, # 用于接收数据的网络缓冲区大小,单位为字节,默认值为32768
    send_buffer_bytes=131072, # 用于发送数据的网络缓冲区大小,单位为字节,默认值为131072
    acks='all', # 消息确认机制,可选值为'0', '1','all',默认值为'all'
    transactional_id=None, # 事务ID,用于标识生产者参与事务的唯一标识,默认值为None
    transaction_timeout_ms=60000, # 事务超时时间,单位为毫秒,默认值为60000
    enable_idempotence=False, # 是否启用幂等性,默认值为False
    security_protocol='PLAINTEXT', # 安全协议类型,可选值为'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL',默认值为'PLAINTEXT'

    参数说明调优

    关于 acks 参数优化
    acks 参数用于控制生产者发送消息时的确认机制。该参数的默认值为-1,表示消息发送给 Leader Broker 后,Leader 确认以及相应的 Follower 消息都写入完成后才返回。acks 参数还有以下可选值:0,1,-1。在跨可用区场景,以及副本数较多的 Topic,acks 参数的取值会影响消息的可靠性和吞吐量。因此:
    在一些在线业务消息的场景下,吞吐量要求不大,可以将 acks参 数设置为-1,则可以确保消息被所有副本接收和确认后才返回,从而提高消息的可靠性。
    在日志采集等大数据或者离线计算的场景下,要求高吞吐(即每秒写入 Kafka 的数据量)的情况下,可以将 acks 设置为1,提高吞吐量。
    关于 buffer_memory 参数优化(缓存)
    默认情况下,传输同等数据量的情况下,多次请求和一次请求的网络传输,一次请求传输能有效减少相关计算和网络资源,提高整体写入的吞吐量。
    因此,可以通过这个参数设置优化客户端发送消息的吞吐能力。对于 Kafka Python Client,默认提供 linger_ms 为0ms 的攒批时间积攒消息,此处可以优化,适当增加值,例如设置100ms,进行聚合多个请求批量发送消息,提高吞吐。如果带宽较高,且单机内存充足,建议调大 buffer_memory 提高吞吐。
    关于压缩参数优化
    Kafka Python Client 支持如下压缩参数:none, gzip, snappy, lz4。
    none:不使用压缩。
    gzip:使用 GZIP 压缩。
    snappy:使用 Snappy 压缩。
    lz4:使用 LZ4 压缩。
    要在 Producer 客户端中使用压缩消息,需要在创建生产者时设置 compression_type 参数。例如,要使用 LZ4 压缩算法,可以将 compression_type 设置为 lz4,虽然压缩消息的压缩和解压缩,发生客户端,是一种用计算换带宽的优化方式,但是由于 Broker 针对压缩消息存在校验行为会付出额外的计算成本,在低流量情况,不建议使用压缩,尤其是 gzip 压缩,Broker 校验计算成本会比较大,在某种程度上可能会出现得不偿失的情况,反而因为计算的增加导致 Broker 处理其他请求的能力偏低,导致带宽吞吐更低。这种情况建议可以使用如下方式:
    在 Producer 端对消息数据独立压缩,生成压缩包数据:messageCompression,同时在消息的 key 存储压缩方式:
    {"Compression","lz4"}
    在 Producer 端将 messageCompression 当成正常消息发送。
    在 Consumer 端读取消息 key,获取使用的压缩方式,独立进行解压缩。

    创建生产者实例

    如果应用程序需要更高的可靠性,则可以使用同步生产者,以确保消息发送成功。同时,可以使用 ACK 确认机制和事务机制,以确保消息的可靠性和一致性。具体的参数调优参考生产者参数与调优。如果应用程序需要更高的吞吐量,则可以使用异步生产者,以提高消息的发送速度。同时,可以使用批量发送消息的方式,以减少网络开销和 IO 消耗。示例如下:
    from kafka import KafkaProducer
    import sys
    
    # 参数配置
    BOOTSTRAP_SERVERS = 'localhost:9092'
    TOPIC = 'test_topic'
    SYNC = True
    ACKS = '1' # leader副本确认写入即可
    LINGER_MS = 500 # 延迟500ms发送
    BATCH_SIZE = 16384 # 消息批次大小16KB
    
    def create_producer(servers, acks, linger_ms, batch_size):
    return KafkaProducer(bootstrap_servers=servers, acks=acks, linger_ms=linger_ms, batch_size=batch_size)
    
    def send_message_sync(producer, topic, message):
    future = producer.send(topic, message)
    result = future.get(timeout=10)
    print(f"Sent message: {message} to topic: {topic}, partition: {result.partition}, offset: {result.offset}")
    
    def send_message_async(producer, topic, message):
    def on_send_success(record_metadata):
    print(f"Sent message: {message} to topic: {topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")
    
    def on_send_error(excp):
    print(f"Error sending message: {message} to topic: {topic}", file=sys.stderr)
    print(excp, file=sys.stderr)
    
    future = producer.send(topic, message)
    future.add_callback(on_send_success).add_errback(on_send_error)
    
    def main():
    producer = create_producer(BOOTSTRAP_SERVERS, ACKS, LINGER_MS, BATCH_SIZE)
    messages = ['Hello Kafka', 'Async vs Sync', 'Demo']
    
    if SYNC:
    for message in messages:
    send_message_sync(producer, TOPIC, message.encode('utf-8'))
    else:
    for message in messages:
    send_message_async(producer, TOPIC, message.encode('utf-8'))
    
    producer.flush()
    
    if __name__ == '__main__':
    main()
    
    

    消费者实践

    消费者参数与调优

    消费者参数

    from kafka import KafkaConsumer
    
    # 创建一个KafkaConsumer对象,用于连接Kafka集群并消费消息
    consumer = KafkaConsumer(
    'topic_name', # 要订阅的主题列表
    bootstrap_servers=['localhost:9092'], # Kafka集群的接入点
    group_id=None, # 消费者组ID,用于将消费者分组,要加入动态分区分配(如果启用)并用于获取和提交偏移量的消费者组的名称。如果为 None,则禁用自动分区分配(通过组协调器)和偏移量提交。
    client_id='kafka-python-{version}',#客户端Id,默认是kafka-python-{version}
    api_version=None, #指定要使用的 Kafka API 版本。如果设置为 None,客户端将尝试通过API请求来启动不同版本的功能
    enable_auto_commit=True, # 是否自动提交消费位置,默认为True
    auto_commit_interval_ms=5000, # 自动提交消费位置的间隔,默认为5秒(5000毫秒)
    auto_offset_reset='latest', # 消费者在读取的分区中的消费位置的策略,默认为'latest'(从最新的位置开始消费)
    fetch_min_bytes=1, # 消费者在读取分区时的最小字节数,默认为1字节
    fetch_max_wait_ms=500, # 在没有新的消费数据,默认等待500ms
    fetch_max_bytes=52428800, # 消费者在读取分区时的最大字节数,默认为52428800字节(50MB)
    max_poll_interval_ms=300000 # 消参数默认值为300000毫秒(5分钟)。如果消费者在5分钟内没有发送心跳信号,它将被认为已经失去连接,并将被从消费者组中移除。在这种情况下,其他消费者将接管被移除的消费者的分区并发起重平衡。
    retry_backoff_ms=100, # 重试间隔时间,默认为100毫秒
    reconnect_backoff_max_ms=1000, # 重新连接到多次连接失败的Broker的间隔时间最大值(以毫秒为单位)。如果连接失败,将在每次连续连接失败时呈指数增加,直至达到此最大值。一旦达到最大值,重新连接尝试将以该固定速率定期继续。
    request_timeout_ms=305000, # 客户端请求超时(以毫秒为单位)
    session_timeout_ms=10000, # session_timeout_ms (int) – 使用 Kafka 组管理工具时用于检测故障的超时时间。消费者定期发送心跳给Broker表明其活跃度。如果在此会话超时到期之前Broker没有收到心跳,则Broker将改消费组从组中删除该消费者并启动重新平衡。
    heartbeat_interval_ms=3000, # 使用 Kafka 的组管理工具时,向消费者协调器发出心跳之间的预期时间(以毫秒为单位)。心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于 session_timeout_ms,但通常不应高于该值的 1/3。
    receive_buffer_bytes=32768,#读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。默认值:无(依赖于系统默认值),默认为32768。
    send_buffer_bytes=131072# 发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 的大小。默认值:无(依赖于系统默认值),131072。
    )
    
    for message in consumer:
    print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")

    参数说明与调优

    1. max_poll_interval_ms 是 Kafka Python Consumer 的一个配置参数,它用于指定 Consumer 在两次 poll 操作之间的最大延迟。这个参数的主要作用是控制 Consumer 的 存活,也就是判断 Consumer 是否还活着。如果 Consumer 在 max_poll_interval_ms 指定的时间内没有进行 poll 操作,那么 Kafka 认为这个 Consumer 已经挂掉,会触发 Consumer 的 rebalance 操作。这个参数的设置需要根据实际的消费速度来调整。如果设置得太小,可能会导致 Consumer 频繁地触发 rebalance 操作,增加了 Kafka 的负担;如果设置得太大,可能会导致 Consumer 在出现问题时不能及时被 Kafka 检测到,从而影响了消息的消费。建议在高吞吐下可以时长增加该值的设置。
    2. 针对自动提交位点请求,建议 auto_commit_interval_ms 时间不要低于1000ms,因为频率过高的位点请求会导致 Broker CPU 很高,影响其他正常服务的读写。

    创建消费者实例

    Kafka Python 提供订阅的模型创建消费者,其中在提交位点方面,提供手动提交位点和自动提交位点两种方式。

    自动提交位点

    自动提交位点:消费者在拉取消息后会自动提交位点,无需手动操作。这种方式的优点是简单易用,但是可能会导致消息重复消费或丢失。建议间隔5s提交位点。
    
    # auto_commit_consumer_interval.py
    from kafka import KafkaConsumer
    from time import sleep
    
    consumer = KafkaConsumer(
    'your_topic_name',
    bootstrap_servers=['localhost:9092'],
    group_id='auto_commit_group',
    auto_commit_interval_ms=5000 # 设置自动提交位点的间隔为5000毫秒(5秒)
    )
    
    for message in consumer:
    print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
    sleep(1)

    手动提交位点

    手动提交位点:消费者在处理完消息后需要手动提交位点。这种方式的优点是可以精确控制位点的提交,避免消息重复消费或丢失。但是需要注意,手动提交位点如果太频繁会导致 Broker CPU 很高,影响性能,随着消息量增加,CPU 消费会很高,影响正常 Broker 的其他功能,因此建议间隔一定消息提交位点。
    
    # manual_commit_consumer.py
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    from time import sleep
    
    consumer = KafkaConsumer(
    'your_topic_name',
    bootstrap_servers=['localhost:9092'],
    group_id='manual_commit_group',
    enable_auto_commit=False
    )
    
    count = 0
    for message in consumer:
    print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
    count += 1
    
    if count % 10 == 0:
    try:
    consumer.commit()
    except KafkaError as e:
    print(f"Error while committing offset: {e}")
    
    sleep(1)
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持