tencent cloud

masukan

Customized Consumption

Terakhir diperbarui:2024-10-17 15:55:34

    Prerequisites

    1. Cloud Log Service is activated. Create a log set and log topic, and you have successfully collected log data.
    2. Sub-accounts/Collaborators need root account authorization. For authorization steps, see CAM-Based Permission Management. For copying authorization policy, see CLS Access Policy Templates.

    Consumption Process within a Consumer Group

    When consuming data within a consumer group, the server manages the consumption tasks for all consumers within the group. It automatically balances these tasks based on the correlation between the number of topic partitions and the number of consumers. Moreover, it records the consumption progress for each partition in the topic to guarantee that different consumers can consume data without any duplication. The detailed process of consumption within a consumer group proceeds as follows:
    1. Create a consumer group.
    2. Every consumer periodically sends heartbeats to the server.
    3. The consumer group automatically assigns topic partitions to consumers according to the load balancing situation of the topic partitions.
    4. Consumers retrieve the partition offsets and consume the data according to the list of allocated partitions.
    5. Consumers periodically update their consumption progress for each partition to the consumer group, facilitating the next round of task allocation by the group.
    6. Repeat steps 2 through 6 until consumption is completed.

    Consumption Balancing

    The consumer group will dynamically adjust the consumption tasks of each consumer according to the number of active consumers and topic partitions to ensure balanced consumption. At the same time, consumers can save the consumption progress in each topic partition to ensure that they can continue to consume data after fault recovery and avoid repeated consumption.

    Example 1: Topic Partition Change

    For example, a log topic has two consumers. Consumer A consumes data in partitions 1 and 2, and consumer B in partitions 3 and 4. After partition 5 is added through partition splitting, the consumer group will automatically allocate partition 5 to consumer B for consumption, as shown in the figure below:
    
    
    

    Example 2: Consumer Change

    For example, a log topic has two consumers. Consumer A consumes data in partitions 1, 2, and 3, and consumer B in partitions 4, 5, and 6. To ensure that the consumption speed is equal to the generation speed, consumer C is added. The consumer group will reallocate partitions. Then, partitions 3 and 6 will be allocated to consumer C for consumption, as shown in the figure below:
    
    
    

    Consumption Demo (Python)

    Note:
    For the complete Demo, see tencentcloud-cls-sdk-python, It is recommended to use Python version 3.5 or above for data consumption.
    The usage and instructions of the Demo are as follows:
    1. Install SDK. For details, see tencentcloud-cls-sdk-python.
    pip install git+https://github.com/TencentCloud/tencentcloud-cls-sdk-python.git
    2. Process the data to be consumed by the consumer and save the user log data in the log_group struct. The log_group struct is as follows:
    log_group {
    source //Log source, which is usually the machine's IP address.
    filename //Log file name
    logs {
    time //Log time, which is a Unix timestamp in microseconds.
    user_defined_log_kvs //User log fields
    }
    }
    The implementation by using the SampleConsumer method is as follows:
    class SampleConsumer(ConsumerProcessorBase):
    last_check_time = 0
    log_results = []
    lock = RLock()
    
    def initialize(self, topic_id):
    self.topic_id = topic_id
    
    # Process the data to be consumed.
    def process(self, log_groups, offset_tracker):
    for log_group in log_groups:
    for log in log_group.logs:
    # Process a single row of data.
    item = dict()
    item['filename'] = log_group.filename
    item['source'] = log_group.source
    item['time'] = log.time
    for content in log.contents:
    item[content.key] = content.value
    
    with SampleConsumer.lock:
    # Aggregate data to SampleConsumer.log_results.
    SampleConsumer.log_results.append(item)
    
    # Submit offset every 3 seconds.
    current_time = time.time()
    if current_time - self.last_check_time > 3:
    try:
    self.last_check_time = current_time
    offset_tracker.save_offset(True)
    except Exception:
    import traceback
    traceback.print_exc()
    else:
    try:
    offset_tracker.save_offset(False)
    except Exception:
    import traceback
    traceback.print_exc()
    
    return None
    
    # Call this function when Worker exits for cleanup.
    def shutdown(self, offset_tracker):
    try:
    offset_tracker.save_offset(True)
    except Exception:
    import traceback
    traceback.print_exc()
    3. Create a consumer and start the consumer thread. The consumer then consumes data from the specified topic.
    Parameter
    Description
    Default Value
    Value Range
    endpoint
    Request Domain, domain name of the API for Log Upload Tag page.
    -
    Supported regions: Beijing, Shanghai, Guangzhou, Nanjing, Hong Kong (China), Tokyo, Eastern United States, Singapore, and Frankfurt.
    access_key_id
    For your Secret_id, go to CAM.
    -
    -
    access_key
    For your Secret_key, go to CAM.
    -
    -
    region
    Topic's region. For example, ap-beijing, ap-guangzhou, ap-shanghai. For more details, see Regions and Access Domains.
    -
    Supported regions: Beijing, Shanghai, Guangzhou, Nanjing, Hong Kong (China), Tokyo, Eastern United States, Singapore, and Frankfurt.
    logset_id
    Logset ID. Only one logset is supported.
    -
    -
    topic_ids
    Log topic ID. For multiple topics, use , to separate.
    -
    -
    consumer_group_name
    Consumer Group Name
    -
    -
    internal
    Private network: TRUE
    Public network: FALSE
    Note:
    For private network/public network read traffic cost, see Product Pricing.
    FALSE
    TRUE/FALSE
    consumer_name
    Consumer name. Within the same consumer group, consumer names must be unique.
    -
    A string consisting of 0-9, aA-zZ, '-', '_', '.'.
    heartbeat_interval
    The interval of heartbeats. If consumers fail to report a heartbeat for two intervals, they will be considered offline.
    20
    0-30 minutes
    data_fetch_interval
    The interval of consumer data pulling. Cannot be less than 1 second.
    2
    -
    offset_start_time
    The start time for data pulling. The string type of unix Timestamp , with second-level precision. For example, 1711607794. It can also be directly configured as "begin" and "end".
    begin: The earliest data within the log topic lifetime.
    end: The latest data within the log topic lifetime.
    "end"
    "begin"/"end"/unix Timestamp
    max_fetch_log_group_size
    The data size for a consumer in a single pulling. Defaults to 2 M and up to 10 M.
    2097152
    2M - 10M
    offset_end_time
    The end time for data pulling. Supports a string-type unix Timestamp , with second-level precision. For example, 1711607794. Not filling this field represents continuous pulling.
    -
    -
    def sample_consumer_group():
    # CLS Access Point. Fill in according to the actual situation.
    endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')
    # Region to be accessed
    region = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_REGION', '')
    # User Secret_id
    access_key_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')
    # User Secret_key
    access_key = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')
    # ID of the logset to be consumed
    logset_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')
    # ID list for consumed log topics, separated by English commas.
    topic_ids = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_TOPICS', '').split(',')
    # Consumer group name. The consumer group name under the same logset is unique.
    consumer_group = 'consumer-group-1'
    # Consumer name
    consumer_name1 = "consumer-group-1-A"
    consumer_name2 = "consumer-group-1-B"
    
    assert endpoint and access_key_id and access_key and logset_id, ValueError("endpoint/access_id/access_key and "
    "logset_id cannot be empty")
    # Create Client to access the TencentCloud API.
    client = YunApiLogClient(access_key_id, access_key, region=region)
    SampleConsumer.log_results = []
    
    try:
    # Create two consumer configurations.
    option1 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,
    consumer_name1, heartbeat_interval=3, data_fetch_interval=1,
    offset_start_time='end', max_fetch_log_group_size=1048576)
    option2 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,
    consumer_name2, heartbeat_interval=3, data_fetch_interval=1,
    offset_start_time='end', max_fetch_log_group_size=1048576)
    
    # Create a consumer.
    print("*** start to consume data...")
    client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
    client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
    # Start the consumer
    client_worker1.start()
    client_worker2.start()
    
    # Wait for 2 minutes, or continue execution after the data is obtained.
    sleep_until(120, lambda: len(SampleConsumer.log_results) > 0)
    
    # Stop the consumer.
    print("*** stopping workers")
    client_worker1.shutdown()
    client_worker2.shutdown()
    
    # Print the summarized log data
    print("*** get content:")
    for log in SampleConsumer.log_results:
    print(json.dumps(log))
    
    # Print consumer group information, including the consumer group name, consumed log topic, and consumer heartbeat timeout.
    print("* consumer group status *")
    ret = client.list_consumer_group(logset_id, topic_ids)
    ret.log_print()
    
    # Delete consumer group
    print("*** delete consumer group")
    time.sleep(30)
    client.delete_consumer_group(logset_id, consumer_group)
    except Exception as e:
    import traceback
    traceback.print_exc()
    raise e
    
    if __name__ == '__main__':
    sample_consumer_group()
    
    
    Hubungi Kami

    Hubungi tim penjualan atau penasihat bisnis kami untuk membantu bisnis Anda.

    Dukungan Teknis

    Buka tiket jika Anda mencari bantuan lebih lanjut. Tiket kami tersedia 7x24.

    Dukungan Telepon 7x24