pip install git+https://github.com/TencentCloud/tencentcloud-cls-sdk-python.git
log_group {
source //Log source, which is usually the machine's IP address.
filename //Log file name
logs {
time //Log time, which is a Unix timestamp in microseconds.
user_defined_log_kvs //User log fields
}
}
class SampleConsumer(ConsumerProcessorBase):last_check_time = 0log_results = []lock = RLock()def initialize(self, topic_id):self.topic_id = topic_id# Process the data to be consumed.def process(self, log_groups, offset_tracker):for log_group in log_groups:for log in log_group.logs:# Process a single row of data.item = dict()item['filename'] = log_group.filenameitem['source'] = log_group.sourceitem['time'] = log.timefor content in log.contents:item[content.key] = content.valuewith SampleConsumer.lock:# Aggregate data to SampleConsumer.log_results.SampleConsumer.log_results.append(item)# Submit offset every 3 seconds.current_time = time.time()if current_time - self.last_check_time > 3:try:self.last_check_time = current_timeoffset_tracker.save_offset(True)except Exception:import tracebacktraceback.print_exc()else:try:offset_tracker.save_offset(False)except Exception:import tracebacktraceback.print_exc()return None# Call this function when Worker exits for cleanup.def shutdown(self, offset_tracker):try:offset_tracker.save_offset(True)except Exception:import tracebacktraceback.print_exc()
Parameter | Description | Default Value | Value Range |
endpoint | - | Supported regions: Beijing, Shanghai, Guangzhou, Nanjing, Hong Kong (China), Tokyo, Eastern United States, Singapore, and Frankfurt. | |
access_key_id | - | - | |
access_key | - | - | |
region | Topic's region. For example, ap-beijing, ap-guangzhou, ap-shanghai. For more details, see Regions and Access Domains. | - | Supported regions: Beijing, Shanghai, Guangzhou, Nanjing, Hong Kong (China), Tokyo, Eastern United States, Singapore, and Frankfurt. |
logset_id | Logset ID. Only one logset is supported. | - | - |
topic_ids | Log topic ID. For multiple topics, use , to separate. | - | - |
consumer_group_name | Consumer Group Name | - | - |
internal | Private network: TRUE Public network: FALSE Note: | FALSE | TRUE/FALSE |
consumer_name | Consumer name. Within the same consumer group, consumer names must be unique. | - | A string consisting of 0-9, aA-zZ, '-', '_', '.'. |
heartbeat_interval | The interval of heartbeats. If consumers fail to report a heartbeat for two intervals, they will be considered offline. | 20 | 0-30 minutes |
data_fetch_interval | The interval of consumer data pulling. Cannot be less than 1 second. | 2 | - |
offset_start_time | The start time for data pulling. The string type of unix Timestamp , with second-level precision. For example, 1711607794. It can also be directly configured as "begin" and "end". begin: The earliest data within the log topic lifetime. end: The latest data within the log topic lifetime. | "end" | "begin"/"end"/unix Timestamp |
max_fetch_log_group_size | The data size for a consumer in a single pulling. Defaults to 2 M and up to 10 M. | 2097152 | 2M - 10M |
offset_end_time | The end time for data pulling. Supports a string-type unix Timestamp , with second-level precision. For example, 1711607794. Not filling this field represents continuous pulling. | - | - |
def sample_consumer_group():# CLS Access Point. Fill in according to the actual situation.endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')# Region to be accessedregion = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_REGION', '')# User Secret_idaccess_key_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')# User Secret_keyaccess_key = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')# ID of the logset to be consumedlogset_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')# ID list for consumed log topics, separated by English commas.topic_ids = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_TOPICS', '').split(',')# Consumer group name. The consumer group name under the same logset is unique.consumer_group = 'consumer-group-1'# Consumer nameconsumer_name1 = "consumer-group-1-A"consumer_name2 = "consumer-group-1-B"assert endpoint and access_key_id and access_key and logset_id, ValueError("endpoint/access_id/access_key and ""logset_id cannot be empty")# Create Client to access the TencentCloud API.client = YunApiLogClient(access_key_id, access_key, region=region)SampleConsumer.log_results = []try:# Create two consumer configurations.option1 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,consumer_name1, heartbeat_interval=3, data_fetch_interval=1,offset_start_time='end', max_fetch_log_group_size=1048576)option2 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,consumer_name2, heartbeat_interval=3, data_fetch_interval=1,offset_start_time='end', max_fetch_log_group_size=1048576)# Create a consumer.print("*** start to consume data...")client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)# Start the consumerclient_worker1.start()client_worker2.start()# Wait for 2 minutes, or continue execution after the data is obtained.sleep_until(120, lambda: len(SampleConsumer.log_results) > 0)# Stop the consumer.print("*** stopping workers")client_worker1.shutdown()client_worker2.shutdown()# Print the summarized log dataprint("*** get content:")for log in SampleConsumer.log_results:print(json.dumps(log))# Print consumer group information, including the consumer group name, consumed log topic, and consumer heartbeat timeout.print("* consumer group status *")ret = client.list_consumer_group(logset_id, topic_ids)ret.log_print()# Delete consumer groupprint("*** delete consumer group")time.sleep(30)client.delete_consumer_group(logset_id, consumer_group)except Exception as e:import tracebacktraceback.print_exc()raise eif __name__ == '__main__':sample_consumer_group()
Was this page helpful?