Technical Component | Version |
Nginx | 1.22 |
CLS | - |
Java | OpenJDK version "1.8.0_232" |
Scala | 2.11.12 |
Flink SQL | Flink 1.14.5 |
MySQL | 5.7 |
mysql -h 172.16.1.1 -uroot
flink_nginx
database and mysql_dest
table are created.create database if not exists flink_nginx;create table if not exists mysql_dest(ts timestamp,pv bigint,uv bigint);
# Decompress the Flink binary packagetar -xf flink-1.14.5-bin-scala_2.11.tgzcd flink-1.14.5# Download Kafka dependencieswget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.14.5/flink-connector-kafka_2.11-1.14.5.jarmv flink-connector-kafka_2.11-1.14.5.jar libwget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.4.1/kafka-clients-2.4.1.jarmv kafka-clients-2.4.1.jar lib# Download MySQL dependencieswget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.14.5/flink-connector-jdbc_2.11-1.14.5.jarmv flink-connector-jdbc_2.11-1.14.5.jar libwget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.11/mysql-connector-java-8.0.11.jarmv mysql-connector-java-8.0.11.jar libwget https://repo1.maven.org/maven2/org/apache/flink/flink-table-common/1.14.5/flink-table-common-1.14.5.jarmv flink-table-common-1.14.5.jar lib# Start Flinkbin/start-cluster.shbin/sql-client.sh
-- Create a data source table to consume Kafka dataCREATE TABLE `nginx_source`(`remote_user` STRING, -- Field in the log, which indicates the client name.`time_local` STRING, -- Field in the log, which indicates the local time of the server.`body_bytes_sent` BIGINT, -- Field in the log, which indicates the number of bytes sent to the client.`http_x_forwarded_for` STRING, -- Field in the log, which records the actual client IP when there is a proxy server on the frontend.`remote_addr` STRING, -- Field in the log, which indicates the client IP.`protocol` STRING, -- Field in the log, which indicates the protocol type.`status` INT, -- Field in the log, which indicates the HTTP request status code.`url` STRING, -- Field in the log, which indicates the URL.`http_referer` STRING, -- Field in the log, which indicates the URL of the referer.`http_user_agent` STRING, -- Field in the log, which indicates the client browser information.`method` STRING, -- Field in the log, which indicates the HTTP request method.`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- Kafka partition`ts` AS PROCTIME()) WITH ('connector' = 'kafka','topic' = 'YourTopic', -- Topic name provided in the CLS console for consumption over Kafka, such as `out-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX`'properties.bootstrap.servers' = 'kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096', -- Service address provided in the CLS console for consumption over Kafka. The public network consumer address in Guangzhou region is used as an example. You need to enter the actual information.'properties.group.id' = 'kafka_flink', -- Kafka consumer group name'scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="your username" password="your password";',--Your username is the logset ID of the log topic, such as `ca5cXXXX-dd2e-4ac0-af12-92d4b677d2c6`, and the password is a string of your `secretid#secrectkey`, such as `AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac`. Note that `#` is required. We recommend that you use a sub-account key and follow the principle of least privilege when authorizing a sub-account, that is, configure the minimum permission for `action` and `resource` in the access policy of the sub-account.'properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');--- Create the target table and write it to MySQLCREATE TABLE `mysql_dest`(`ts` TIMESTAMP,`pv` BIGINT,`uv` BIGINT) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://11.150.2.1:3306/flink_nginx?&serverTimezone=Asia/Shanghai', -- Note the time zone settings here'username'= 'username', -- MySQL account'password'= 'password', -- MySQL password'table-name' = 'mysql_dest' -- MySQL table name);--- Query the Kafka data source table and write the computing result to the MySQL target tableINSERT INTO mysql_dest (ts,uv,pv)SELECT TUMBLE_START(ts, INTERVAL '1' MINUTE) start_ts, COUNT(DISTINCT remote_addr) uv,count(*) pvFROM nginx_sourceGROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
문제 해결에 도움이 되었나요?