Overview
CKafka's Python client has the following major libraries:
kafka-python: This is a Kafka client implemented in pure Python, supporting Kafka 0.8.2 and higher versions. It provides APIs for producers, consumers, and managing the Kafka cluster. This library is easy to use, but its performance may not be as good as clients based on librdkafka. Installation method: pip install kafka-python
confluent-kafka-python: This library is based on the high-performance C library librdkafka. It supports Kafka 0.9 and later versions, and provides APIs for producers, consumers, and managing the Kafka cluster. This library has better performance, but might require the installation of additional dependencies. Installation method: pip install confluent-kafka
aiokafka: This is an asynchronous Kafka client based on kafka-python, using the asyncio library. It is suitable for scenarios that require asynchronous programming. Installation method: pip install aiokafka
pykafka: This is a Python client supporting Kafka version 0.8.x. It provides APIs for producers, consumers, and managing the Kafka cluster. This library is no longer actively maintained, but still suitable for scenarios requiring support for older Kafka versions. Installation method: pip install pykafka
When choosing a Python Kafka client, select the appropriate library based on your application requirements and Kafka version. For most scenarios, kafka-python or confluent-kafka-python is recommended, as they support newer versions of Kafka and have more comprehensive features. If your application requires asynchronous programming, consider using aiokafka.
This document mainly describes how to use kafka-python. See the official documentation at kafka-python. Producer Practices
Version Selection
To use kafka-python, you should install the kafka-python library first. Run the following command to install it:
pip install kafka-python
Producer Parameters and Optimization
Producer Parameters
Kafka Python involves the following key parameters. The parameters and their default values are as follows:
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='localhost:9092', # Broker list used to initialize connection to the Kafka cluster, with the default value being 'localhost:9092'
client_id=None, # Custom client ID for identification in Kafka server logs, with the default value being None
key_serializer=None, # Callable object used for serializing message keys into bytes, with the default value being None
value_serializer=None, # Callable object used for serializing message values into bytes, with the default value being None
compression_type=None, # Message compression type, valid values are 'gzip', 'snappy', 'lz4', or None. Default value is None, indicating no compression.
retries=0, # Number of times to retry failed messages, with the default value being 0
batch_size=16384, # The size of messages used for batch processing, measured in bytes, with the default value being 16384
linger_ms=0, # Maximum waiting time for additional messages before batch processing, in milliseconds, with the default value being 0
partitioner=None, # Callable object used to determine the message partition, with the default value being None
buffer_memory=33554432, # Total memory allocated for buffering messages awaiting dispatch, in bytes, with the default value being 33554432
connections_max_idle_ms=540000, # Maximum duration to maintain idle connections, in milliseconds, with the default value being 540000
max_block_ms=60000, # Maximum duration to block the send() method when reaching buffer memory limits, in milliseconds, with the default value being 60000
max_request_size=1048576, # Maximum byte size of requests sent to the broker, with the default value being 1048576
metadata_max_age_ms=300000, # Maximum lifespan of metadata in the local cache, in milliseconds, with the default value being 300000
retry_backoff_ms=100, # Waiting time between two retry attempts, in milliseconds, with the default value being 100
request_timeout_ms=30000, # Maximum waiting time for client to receive a response, in milliseconds, with the default value being 30000
receive_buffer_bytes=32768, # Network buffer size for receiving data, in bytes, with the default value being 32768
send_buffer_bytes=131072, # Network buffer size for sending data, in bytes, with the default value being 131072
acks='all', # Message acknowledgment mechanism, optional values are '0', '1', or 'all', with the default value being 'all'
transactional_id=None, # Transaction ID, a unique identifier for producer participating in a transaction, with the default value being None
transaction_timeout_ms=60000, # Transaction timeout, in milliseconds, with the default value being 60000
enable_idempotence=False, # Whether to enable Idempotence, with the default value being False
security_protocol='PLAINTEXT', # Security protocol type, optional values are 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL', with the default value being 'PLAINTEXT'
Parameter Description and Optimization
acks Parameter Optimization
The acks parameter controls the confirmation mechanism for producers when sending messages. The default value of this parameter is -1, indicating that the message will only be returned after being sent to the Leader Broker and confirmed by the Leader, along with the associated Follower messages being written. The acks parameter has the following optional values: 0, 1, -1. In cross-availability zone scenarios, as well as Topic with a higher number of replicas, the value of the acks parameter will affect the message's reliability and throughput. Therefore:
In some online business messaging scenarios, where required throughput is not high, you can set the acks parameter to -1. This ensures that the message is received and acknowledged by all replicas before returning, thereby increasing the message's reliability.
In scenarios involving big data, such as log collection or offline computing, where high throughput (i.e., the volume of data written to Kafka per second) is required, you can set the acks to 1 to improve throughput.
buffer_memory Parameter Optimization (Caching)
By default, for transmitting the same volume of data, a single request's can effectively reduce computation and network resources compared to multiple requests, thereby improving the overall write throughput.
Therefore, the message sending throughput of the client can be optimized by setting this parameter. For Kafka Python Client, the default linger_ms is set to 0 ms for batching time to accumulate messages, which can be optimized by appropriately increasing the value, for example, setting it to 100 ms, to aggregate and send multiple requests in batches, thus improving throughput. If bandwidth is high and memory on the machine is sufficient, it is recommended to increase buffer_memory to enhance throughput.
Compression Parameter Optimization
The Kafka Python Client supports the following compression parameters: none, gzip, snappy, lz4.
none: Do not compress.
gzip: Compress by GZIP.
snappy: Compress by Snappy.
lz4: Compress by LZ4.
To use compressed messages on the Producer client, you need to set the compression_type parameter when creating the producer. For example, to use the LZ4 compression algorithm, you can set the compression_type to lz4. As an optimization method that exchanges computation for bandwidth, message compression and decompression occurs on the client side. However, due to additional computation cost of the Broker's behavior in verifying compressed messages, compression is not recommended in low traffic situations. For GZIP compression, especially, the verification computation cost for the Broker can be significant, which is not worthwhile in some cases. Due to increased computation, message processing capabilities of the Broker may be lower, resulting in lower bandwidth throughput. In these situations, the following compression method is recommended:
Independently compress message data on the Producer side to generate packed data compression: messageCompression, and store the compression method in the message's key:
{"Compression","lz4"}
On the Producer side, send messageCompression as a normal message.
On the Consumer side, read the message key, access the compression method used, and perform independent decompression.
Creating Producer Instance
If an application requires higher reliability, a synchronous producer can be used to ensure message delivery success. Furthermore, the ACK acknowledgement mechanism and transaction mechanism can be used to ensure the reliability and consistency of messages. For specific parameter optimization, see Producer Parameters and Optimization. If an application requires higher throughput, employ an asynchronous producer to increase the speed of message delivery. Additionally, use the batch message sending method to reduce network overhead and IO consumption. See the example below:
from kafka import KafkaProducer
import sys
BOOTSTRAP_SERVERS = 'localhost:9092'
TOPIC = 'test_topic'
SYNC = True
ACKS = '1'
LINGER_MS = 500
BATCH_SIZE = 16384
def create_producer(servers, acks, linger_ms, batch_size):
return KafkaProducer(bootstrap_servers=servers, acks=acks, linger_ms=linger_ms, batch_size=batch_size)
def send_message_sync(producer, topic, message):
future = producer.send(topic, message)
result = future.get(timeout=10)
print(f"Sent message: {message} to topic: {topic}, partition: {result.partition}, offset: {result.offset}")
def send_message_async(producer, topic, message):
def on_send_success(record_metadata):
print(f"Sent message: {message} to topic: {topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")
def on_send_error(excp):
print(f"Error sending message: {message} to topic: {topic}", file=sys.stderr)
print(excp, file=sys.stderr)
future = producer.send(topic, message)
future.add_callback(on_send_success).add_errback(on_send_error)
def main():
producer = create_producer(BOOTSTRAP_SERVERS, ACKS, LINGER_MS, BATCH_SIZE)
messages = ['Hello Kafka', 'Async vs Sync', 'Demo']
if SYNC:
for message in messages:
send_message_sync(producer, TOPIC, message.encode('utf-8'))
else:
for message in messages:
send_message_async(producer, TOPIC, message.encode('utf-8'))
producer.flush()
if __name__ == '__main__':
main()
Consumer Practice
Consumer Parameters and Optimization
Consumer Parameters
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'topic_name',
bootstrap_servers=['localhost:9092'],
group_id=None,
client_id='kafka-python-{version}',
api_version=None,
enable_auto_commit=True,
auto_commit_interval_ms=5000,
auto_offset_reset='latest',
fetch_min_bytes=1,
fetch_max_wait_ms=500,
fetch_max_bytes=52428800,
max_poll_interval_ms=300000
retry_backoff_ms=100,
reconnect_backoff_max_ms=1000,
request_timeout_ms=305000,
session_timeout_ms=10000,
heartbeat_interval_ms=3000,
receive_buffer_bytes=32768,
send_buffer_bytes=131072
)
for message in consumer:
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
Parameter Description and Optimization
1. max_poll_interval_ms is a configuration parameter for Kafka Python Consumer, specifying the maximum delay between two poll operations. Its primary function is to control the liveness of the Consumer, i.e., to determine if the Consumer is still active. If the Consumer does not conduct a poll operation within the time specified by max_poll_interval_ms, Kafka considers this Consumer as failed, triggering a rebalance operation for the Consumer. The setting of this parameter should be adjusted according to the actual consumption speed. Setting it too low may lead to frequent triggering of rebalance operations, increasing Kafka's load; setting it too high might prevent Kafka from timely detecting issues with the Consumer, thereby affecting message consumption. It is recommended to increase the setting of this value under high throughput conditions.
2. For auto-commit offset requests, it is advised not to set auto_commit_interval_ms lower than 1000 ms, as too frequent offset requests can cause high CPU usage on the Broker, impacting the read/write operations of other services.
Creating Consumer Instance
Kafka Python provides a subscription model for creating consumers, which supports both manual and automatic offset commits.
Auto-Commit Offsets
Committing automatic offsets: After pulling messages, consumers automatically commit offsets without manual intervention. This method is easy to use, but may lead to duplicate message consumption or loss. It is recommended to commit offsets every 5 s.
from kafka import KafkaConsumer
from time import sleep
consumer = KafkaConsumer(
'your_topic_name',
bootstrap_servers=['localhost:9092'],
group_id='auto_commit_group',
auto_commit_interval_ms=5000
)
for message in consumer:
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
sleep(1)
Manual-Commit Offsets
Manual-Commit Offsets: After processing messages, consumers need to manually commit their offsets. The advantage of this method is that it allows for precise control over offset committing, avoiding duplicate message consumption or loss. However, note that manual committing can lead to high Broker CPU usage, affecting performance. As message volume increases, CPU consumption will be significantly high, affecting other features of the Broker. Therefore, it is recommended to commit offsets after a certain number of messages.
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from time import sleep
consumer = KafkaConsumer(
'your_topic_name',
bootstrap_servers=['localhost:9092'],
group_id='manual_commit_group',
enable_auto_commit=False
)
count = 0
for message in consumer:
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
count += 1
if count % 10 == 0:
try:
consumer.commit()
except KafkaError as e:
print(f"Error while committing offset: {e}")
sleep(1)
Was this page helpful?