tencent cloud

Feedback

Kafka Python SDK

Last updated: 2024-07-04 16:00:59

    Overview

    CKafka's Python client has the following major libraries:
    kafka-python: This is a Kafka client implemented in pure Python, supporting Kafka 0.8.2 and higher versions. It provides APIs for producers, consumers, and managing the Kafka cluster. This library is easy to use, but its performance may not be as good as clients based on librdkafka.
    Installation method: pip install kafka-python
    confluent-kafka-python: This library is based on the high-performance C library librdkafka. It supports Kafka 0.9 and later versions, and provides APIs for producers, consumers, and managing the Kafka cluster. This library has better performance, but might require the installation of additional dependencies.
    Installation method: pip install confluent-kafka
    aiokafka: This is an asynchronous Kafka client based on kafka-python, using the asyncio library. It is suitable for scenarios that require asynchronous programming.
    Installation method: pip install aiokafka
    pykafka: This is a Python client supporting Kafka version 0.8.x. It provides APIs for producers, consumers, and managing the Kafka cluster. This library is no longer actively maintained, but still suitable for scenarios requiring support for older Kafka versions.
    Installation method: pip install pykafka
    When choosing a Python Kafka client, select the appropriate library based on your application requirements and Kafka version. For most scenarios, kafka-python or confluent-kafka-python is recommended, as they support newer versions of Kafka and have more comprehensive features. If your application requires asynchronous programming, consider using aiokafka.
    This document mainly describes how to use kafka-python. See the official documentation at kafka-python.

    Producer Practices

    Version Selection

    To use kafka-python, you should install the kafka-python library first. Run the following command to install it:
    pip install kafka-python

    Producer Parameters and Optimization

    Producer Parameters

    Kafka Python involves the following key parameters. The parameters and their default values are as follows:
    from kafka import KafkaProducer
    
    producer = KafkaProducer(
    bootstrap_servers='localhost:9092', # Broker list used to initialize connection to the Kafka cluster, with the default value being 'localhost:9092'
    client_id=None, # Custom client ID for identification in Kafka server logs, with the default value being None
    key_serializer=None, # Callable object used for serializing message keys into bytes, with the default value being None
    value_serializer=None, # Callable object used for serializing message values into bytes, with the default value being None
    compression_type=None, # Message compression type, valid values are 'gzip', 'snappy', 'lz4', or None. Default value is None, indicating no compression.
    retries=0, # Number of times to retry failed messages, with the default value being 0
    batch_size=16384, # The size of messages used for batch processing, measured in bytes, with the default value being 16384
    linger_ms=0, # Maximum waiting time for additional messages before batch processing, in milliseconds, with the default value being 0
    partitioner=None, # Callable object used to determine the message partition, with the default value being None
    buffer_memory=33554432, # Total memory allocated for buffering messages awaiting dispatch, in bytes, with the default value being 33554432
    connections_max_idle_ms=540000, # Maximum duration to maintain idle connections, in milliseconds, with the default value being 540000
    max_block_ms=60000, # Maximum duration to block the send() method when reaching buffer memory limits, in milliseconds, with the default value being 60000
    max_request_size=1048576, # Maximum byte size of requests sent to the broker, with the default value being 1048576
    metadata_max_age_ms=300000, # Maximum lifespan of metadata in the local cache, in milliseconds, with the default value being 300000
    retry_backoff_ms=100, # Waiting time between two retry attempts, in milliseconds, with the default value being 100
    request_timeout_ms=30000, # Maximum waiting time for client to receive a response, in milliseconds, with the default value being 30000
    receive_buffer_bytes=32768, # Network buffer size for receiving data, in bytes, with the default value being 32768
    send_buffer_bytes=131072, # Network buffer size for sending data, in bytes, with the default value being 131072
    acks='all', # Message acknowledgment mechanism, optional values are '0', '1', or 'all', with the default value being 'all'
    transactional_id=None, # Transaction ID, a unique identifier for producer participating in a transaction, with the default value being None
    transaction_timeout_ms=60000, # Transaction timeout, in milliseconds, with the default value being 60000
    enable_idempotence=False, # Whether to enable Idempotence, with the default value being False
    security_protocol='PLAINTEXT', # Security protocol type, optional values are 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL', with the default value being 'PLAINTEXT'

    Parameter Description and Optimization

    acks Parameter Optimization
    The acks parameter controls the confirmation mechanism for producers when sending messages. The default value of this parameter is -1, indicating that the message will only be returned after being sent to the Leader Broker and confirmed by the Leader, along with the associated Follower messages being written. The acks parameter has the following optional values: 0, 1, -1. In cross-availability zone scenarios, as well as Topic with a higher number of replicas, the value of the acks parameter will affect the message's reliability and throughput. Therefore:
    In some online business messaging scenarios, where required throughput is not high, you can set the acks parameter to -1. This ensures that the message is received and acknowledged by all replicas before returning, thereby increasing the message's reliability.
    In scenarios involving big data, such as log collection or offline computing, where high throughput (i.e., the volume of data written to Kafka per second) is required, you can set the acks to 1 to improve throughput.
    buffer_memory Parameter Optimization (Caching)
    By default, for transmitting the same volume of data, a single request's can effectively reduce computation and network resources compared to multiple requests, thereby improving the overall write throughput.
    Therefore, the message sending throughput of the client can be optimized by setting this parameter. For Kafka Python Client, the default linger_ms is set to 0 ms for batching time to accumulate messages, which can be optimized by appropriately increasing the value, for example, setting it to 100 ms, to aggregate and send multiple requests in batches, thus improving throughput. If bandwidth is high and memory on the machine is sufficient, it is recommended to increase buffer_memory to enhance throughput.
    Compression Parameter Optimization
    The Kafka Python Client supports the following compression parameters: none, gzip, snappy, lz4.
    none: Do not compress.
    gzip: Compress by GZIP.
    snappy: Compress by Snappy.
    lz4: Compress by LZ4.
    To use compressed messages on the Producer client, you need to set the compression_type parameter when creating the producer. For example, to use the LZ4 compression algorithm, you can set the compression_type to lz4. As an optimization method that exchanges computation for bandwidth, message compression and decompression occurs on the client side. However, due to additional computation cost of the Broker's behavior in verifying compressed messages, compression is not recommended in low traffic situations. For GZIP compression, especially, the verification computation cost for the Broker can be significant, which is not worthwhile in some cases. Due to increased computation, message processing capabilities of the Broker may be lower, resulting in lower bandwidth throughput. In these situations, the following compression method is recommended:
    Independently compress message data on the Producer side to generate packed data compression: messageCompression, and store the compression method in the message's key:
    {"Compression","lz4"}
    On the Producer side, send messageCompression as a normal message.
    On the Consumer side, read the message key, access the compression method used, and perform independent decompression.

    Creating Producer Instance

    If an application requires higher reliability, a synchronous producer can be used to ensure message delivery success. Furthermore, the ACK acknowledgement mechanism and transaction mechanism can be used to ensure the reliability and consistency of messages. For specific parameter optimization, see Producer Parameters and Optimization. If an application requires higher throughput, employ an asynchronous producer to increase the speed of message delivery. Additionally, use the batch message sending method to reduce network overhead and IO consumption. See the example below:
    from kafka import KafkaProducer
    import sys
    
    # Parameter Configuration
    BOOTSTRAP_SERVERS = 'localhost:9092'
    TOPIC = 'test_topic'
    SYNC = True
    ACKS = '1' # leader replica acknowledgement suffices
    LINGER_MS = 500 # Delay sending for 500 ms
    BATCH_SIZE = 16384 # Message batch size 16 KB
    
    def create_producer(servers, acks, linger_ms, batch_size):
    return KafkaProducer(bootstrap_servers=servers, acks=acks, linger_ms=linger_ms, batch_size=batch_size)
    
    def send_message_sync(producer, topic, message):
    future = producer.send(topic, message)
    result = future.get(timeout=10)
    print(f"Sent message: {message} to topic: {topic}, partition: {result.partition}, offset: {result.offset}")
    
    def send_message_async(producer, topic, message):
    def on_send_success(record_metadata):
    print(f"Sent message: {message} to topic: {topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")
    
    def on_send_error(excp):
    print(f"Error sending message: {message} to topic: {topic}", file=sys.stderr)
    print(excp, file=sys.stderr)
    
    future = producer.send(topic, message)
    future.add_callback(on_send_success).add_errback(on_send_error)
    
    def main():
    producer = create_producer(BOOTSTRAP_SERVERS, ACKS, LINGER_MS, BATCH_SIZE)
    messages = ['Hello Kafka', 'Async vs Sync', 'Demo']
    
    if SYNC:
    for message in messages:
    send_message_sync(producer, TOPIC, message.encode('utf-8'))
    else:
    for message in messages:
    send_message_async(producer, TOPIC, message.encode('utf-8'))
    
    producer.flush()
    
    if __name__ == '__main__':
    main()
    
    

    Consumer Practice

    Consumer Parameters and Optimization

    Consumer Parameters

    from kafka import KafkaConsumer
    
    # Create a KafkaConsumer object for connecting to the Kafka cluster and consuming messages.
    consumer = KafkaConsumer(
    'topic_name', # List of Topics to subscribe to
    bootstrap_servers=['localhost:9092'], # Access point for the Kafka cluster
    group_id=None, # Consumer Group ID used for grouping consumers, required by dynamic partition allocation (if enabled) and used to access and submit the consumer group name of offset. If it is set to None, auto partition allocation (through the group coordinator) and offset submission are disabled.
    client_id='kafka-python-{version}', # The default client ID, with the default value being kafka-python-{version}
    api_version=None, # Specify the Kafka API version to use. If it is set to None, the client will try to activate different version features through API requests
    enable_auto_commit=True, # Whether to automatically commit the consumer offset, with the default value being True
    auto_commit_interval_ms=5000, # Interval for auto-committing consumer offset, with the default value being 5 seconds (5000 milliseconds)
    auto_offset_reset='latest', # Policy for consumer's consumption position in the partition being read, with the default value being 'latest' (starting consuming from the latest position)
    fetch_min_bytes=1, # Minimum bytes for consumer to read from a partition, with the default value being 1 byte
    fetch_max_wait_ms=500, # Waiting time when there is no more new consumption data, with the default value being 500 ms
    fetch_max_bytes=52428800, # Maximum bytes for consumer to read from a partition, with the default value being 52,428,800 bytes (50 MB)
    max_poll_interval_ms=300000 # Default interval is 300,000 milliseconds (5 minutes). If the consumer does not send a heartbeat signal within 5 minutes, it will be considered to have lost connection and will be removed from the consumer group. In this situation, other consumers will take over the partitions of the removed consumer and rebalancing will be triggered.
    retry_backoff_ms=100, # Retry interval, with the default value being 100 milliseconds
    reconnect_backoff_max_ms=1000, # Maximum interval for reconnection attempts to a broker after multiple failures, measured in milliseconds. If the connection fails, the value will exponentially increase after each consecutive failure until reaching this maximum value. Once the maximum value is reached, the reconnection attempts will continue at this fixed rate regularly.
    request_timeout_ms=305000, # Client request timeout, in milliseconds
    session_timeout_ms=10000, # session_timeout_ms (int) – Timeout period for detecting failures when the Kafka group management tool is used. Consumers regularly send heartbeats to the broker to indicate their activity. If the broker does not receive a heartbeat before this session timeout expires, it will remove that consumer from the group and initiate rebalancing.
    heartbeat_interval_ms=3000, # The expected time interval between heartbeats to the consumer coordinator when Kafka's group management tool is used, in milliseconds. Heartbeats are used to ensure the consumer's session remains active and facilitate rebalancing when new consumers join or leave the group. This value must be set to less than session_timeout_ms, but it typically should not exceed 1/3 of that value.
    receive_buffer_bytes=32768, # Size of the TCP receive buffer (SO_RCVBUF) used when reading data. It has no default value, or default value depends on system default, usually 32768.
    send_buffer_bytes=131072 # Size of the TCP send buffer (SO_SNDBUF) used when sending data. It has no default value, or default value depends on system default, usually 131072.
    )
    
    for message in consumer:
    print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")

    Parameter Description and Optimization

    1. max_poll_interval_ms is a configuration parameter for Kafka Python Consumer, specifying the maximum delay between two poll operations. Its primary function is to control the liveness of the Consumer, i.e., to determine if the Consumer is still active. If the Consumer does not conduct a poll operation within the time specified by max_poll_interval_ms, Kafka considers this Consumer as failed, triggering a rebalance operation for the Consumer. The setting of this parameter should be adjusted according to the actual consumption speed. Setting it too low may lead to frequent triggering of rebalance operations, increasing Kafka's load; setting it too high might prevent Kafka from timely detecting issues with the Consumer, thereby affecting message consumption. It is recommended to increase the setting of this value under high throughput conditions.
    2. For auto-commit offset requests, it is advised not to set auto_commit_interval_ms lower than 1000 ms, as too frequent offset requests can cause high CPU usage on the Broker, impacting the read/write operations of other services.

    Creating Consumer Instance

    Kafka Python provides a subscription model for creating consumers, which supports both manual and automatic offset commits.

    Auto-Commit Offsets

    Committing automatic offsets: After pulling messages, consumers automatically commit offsets without manual intervention. This method is easy to use, but may lead to duplicate message consumption or loss. It is recommended to commit offsets every 5 s.
    
    # auto_commit_consumer_interval.py
    from kafka import KafkaConsumer
    from time import sleep
    
    consumer = KafkaConsumer(
    'your_topic_name',
    bootstrap_servers=['localhost:9092'],
    group_id='auto_commit_group',
    auto_commit_interval_ms=5000 # Set the interval for automatic offset commits to 5000 milliseconds (5 seconds)
    )
    
    for message in consumer:
    print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
    sleep(1)

    Manual-Commit Offsets

    Manual-Commit Offsets: After processing messages, consumers need to manually commit their offsets. The advantage of this method is that it allows for precise control over offset committing, avoiding duplicate message consumption or loss. However, note that manual committing can lead to high Broker CPU usage, affecting performance. As message volume increases, CPU consumption will be significantly high, affecting other features of the Broker. Therefore, it is recommended to commit offsets after a certain number of messages.
    
    # manual_commit_consumer.py
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    from time import sleep
    
    consumer = KafkaConsumer(
    'your_topic_name',
    bootstrap_servers=['localhost:9092'],
    group_id='manual_commit_group',
    enable_auto_commit=False
    )
    
    count = 0
    for message in consumer:
    print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
    count += 1
    
    if count % 10 == 0:
    try:
    consumer.commit()
    except KafkaError as e:
    print(f"Error while committing offset: {e}")
    
    sleep(1)
    
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support