The transaction feature of Kafka is designed to support atomic operations in a distributed environment. It allows producers to ensure the integrity and consistency of messages when sending a message, especially in scenes where multiple messages need to be processed as a whole. The following is the main concept and feature introduction of Kafka transactions.
Transactional Related Concepts
Basic Concept of Transaction
Atomicity: All operations in a transaction either all succeed or all fail. Kafka ensures that messages sent in a transaction are either successfully written in the topic or not written.
Consistency: The state of data should remain consistent before and after a transaction is executed.
Isolation: Operations between transactions are independent of each other. The execution of one transaction shall not affect the execution of other transactions.
Durability: Once a transaction is committed, its result is permanent and will not be lost even if the system crashes.
Workflow of a Transaction
The workflow of a Kafka transaction mainly includes the following steps:
1. Start transaction: The producer calls the initTransactions()
method before sending a message to initialize the transaction.
2. Send Messages: The producer can send multiple messages to one or more topics. These messages will be marked as transactional messages.
3. Submit or terminate a transaction:
Commit transaction: If all messages are successfully sent, the producer calls the commitTransaction()
method to commit the transaction. All messages will be written into Kafka.
Abort transaction: If an error occurs during the sending process, the producer can call the abortTransaction()
method to terminate the transaction. None of the messages will be written.
Configuration of Transactions
To use the transaction feature of Kafka, you need to set the following parameters in the producer configuration:
transactional.id
: Each transactional producer needs a unique identifier. This ID is used to identify all messages of a transaction.
acks
: Set as all to ensure all replicas acknowledge the message.
enable.idempotence
: Set as true
to enable idempotence and ensure messages are not repeatedly sent.
Limitations of Transactions
Performance overhead: Using transactions introduces additional performance overhead because more coordination and confirmation are needed.
Transaction timeout: Kafka has a timeout limit for transactions. By default, it is 60 seconds. If the transaction is not committed or terminated within this time, it will be automatically terminated.
Consumer's handling: Consumers need to pay attention when handling transactional messages. Only after the transaction is committed can consumers see these messages.
Usage Example of Transactions
producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class TransactionalProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Sent message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), metadata.partition(), metadata.offset());
}
producer.commitTransaction();
System.out.println("Transaction committed successfully.");
} catch (Exception e) {
producer.abortTransaction();
System.err.println("Transaction aborted due to an error: " + e.getMessage());
} finally {
producer.close();
}
}
}
consumer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class TransactionalConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
Kafka Transaction Management
In Kafka, transaction management is involved in multiple components and data structures to ensure the atomicity and consistency of transactions. The memory usage of transaction information is mainly related to the following aspects:
Transaction ID and Producer ID
Transaction ID: Each transaction has a unique transaction ID used to identify it. The transaction ID is specified by the producer when sending a message and is usually a string.
Producer ID: Each producer will be allocated a unique Producer ID when connecting to Kafka. This ID is used to identify the producer's messages and ensure the order and idempotence of the messages.
Transaction Status Management
Kafka uses an internal topic called transaction status log to manage the status of transactions. This log records the status (such as in-progress, committed, suspended) of each transaction as well as the messages related to the transaction. The management of the transaction status log involves the following aspects:
Data structure in memory: Kafka maintains a data structure (such as a hash table or map) in memory to store the current active transaction information. This information includes transaction ID, Producer ID, transaction status, timestamp, etc.
Persistent storage: Transaction status logs will be persisted to disk to ensure that the transaction status can be recovered when the Kafka server restarts or recovers from a failure.
Memory Usage of Transaction Information
Memory usage of transaction information mainly depends on the following two factors:
The number of active transactions: The current number of in-progress transactions directly affects memory usage. Each active transaction occupies a certain amount of space in memory.
Metadata of a transaction: The metadata of each transaction (for example, transaction ID, Producer ID, status, etc.) also occupies memory. The specific memory usage depends on the size of this metadata.
Cleaning Up of Transactions
To prevent excessively high memory usage, Kafka will periodically check and clean up completed transactions based on the configured expiry date. They are retained for 7 days by default and deleted upon expiry.
Common FullGC/OOM Issues of Transactions
From transaction management, you can see that transaction information occupies a large amount of memory. Among them, the two most direct factors affecting the amount of memory used by transaction information are: the quantity of transaction IDs and the quantity of Producer IDs.
The quantity of transaction IDs refers to the number of transactions that the client initializes and commits to the Broker. This is closely related to the commit frequency of newly-added transactions of the client.
Producer ID refers to the Producer status information stored in each Topic partition within the Broker. Therefore, the number of Producer IDs is closely related to the number of Broker partitions.
In transaction scenarios, transaction IDs and Producer IDs are strongly bound. If the same Producer ID bound to a transaction ID sends messages to all partitions in a broker, then the quantity of Producer IDs in a broker can theoretically reach the product of the quantity of transaction IDs and the quantity of partitions in the broker. Assume that the quantity of transaction IDs under an instance is t and the quantity of partitions under a broker is p. Then the maximum quantity of Producer IDs can reach t * p.
Notes:
Therefore, assume that the quantity of transaction IDs under a broker is t, the average memory occupancy size of a transaction is tb, the quantity of partitions under a broker is p, and the average occupied size of a Producer ID is pb. Then the memory size occupied by transaction information in the memory of this broker is: t * tb + t * p * pb.
You can see that there are two scenarios that may lead to a surge in memory usage:
The client frequently adds and submits new transaction IDs during instance initialization.
A single transaction ID sends data to multiple partitions, and the cross-product quantity of Producer IDs increases super significantly, easily filling up the memory.
Notes:
Therefore, for both the Flink client and self-implemented transaction producers, try to avoid these two scenarios. For example, for Flink, you can appropriately reduce the frequency of checkpoints to reduce the frequency of transaction ID changes calculated from the transaction ID prefix + random string. Additionally, try to ensure that the same transaction ID sends data to the same partition.
Precautions for Flink to Use Transactions
For Flink, there are following optimization methods to ensure that transaction information will not expand rapidly:
Client optimization parameters: Increase the checkpoint
interval for Flink
The Flink production task can optimize sink.partitioner
to Fixed
mode