技术组件 | 版本 |
Nginx | 1.22 |
CLS 日志服务 | - |
Java | openjdk version "1.8.0_232" |
Scala | 2.11.12 |
Flink sql | flink-1.14.5 |
MySQL | 5.7 |
mysql -h 172.16.1.1 -uroot
create database if not exists flink_nginx;create table if not exists mysql_dest(ts timestamp,pv bigint,uv bigint);
# 解压缩 Flink 二进制包tar -xf flink-1.14.5-bin-scala_2.11.tgzcd flink-1.14.5# 下载 kafka 相关依赖wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.14.5/flink-connector-kafka_2.11-1.14.5.jarmv flink-connector-kafka_2.11-1.14.5.jar libwget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.4.1/kafka-clients-2.4.1.jarmv kafka-clients-2.4.1.jar lib# 下载 MySQL 相关依赖wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.14.5/flink-connector-jdbc_2.11-1.14.5.jarmv flink-connector-jdbc_2.11-1.14.5.jar libwget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.11/mysql-connector-java-8.0.11.jarmv mysql-connector-java-8.0.11.jar libwget https://repo1.maven.org/maven2/org/apache/flink/flink-table-common/1.14.5/flink-table-common-1.14.5.jarmv flink-table-common-1.14.5.jar lib# 启动 Flinkbin/start-cluster.shbin/sql-client.sh
-- 建数据源表消费 kafka 数据CREATE TABLE `nginx_source`(`remote_user` STRING, -- 日志中字段,客户端名称`time_local` STRING, -- 日志中字段,服务器本地时间`body_bytes_sent` BIGINT, -- 日志中字段,发送给客户端的字节数`http_x_forwarded_for` STRING, -- 日志中字段,当前端有代理服务器时,记录客户端真实 IP 地址的配`remote_addr` STRING, -- 日志中字段,客户端 IP 地址`protocol` STRING, -- 日志中字段,协议类型`status` INT, -- 日志中字段,HTTP 请求状态码`url` STRING, -- 日志中字段,url 地址`http_referer` STRING, -- 日志中字段,访问来源的页面链接地址`http_user_agent` STRING, -- 日志中字段,客户端浏览器信息`method` STRING, -- 日志中字段,HTTP 请求方法`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka分区`ts` AS PROCTIME()) WITH ('connector' = 'kafka','topic' = 'YourTopic', -- cls kafka协议消费控制台给出的的主题名称,例如out-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX'properties.bootstrap.servers' = 'kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096', -- cls kakfa协议消费控制台给出的服务地址,例子中是广州地域的外网消费地址,请按照您的实际情况填写'properties.group.id' = 'kafka_flink', -- kafka 消费组名称'scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="your username" password="your password";',--用户名是日志主题所属的日志集合ID,例如ca5cXXXX-dd2e-4ac0-af12-92d4b677d2c6,密码是用户的secretid#secrectkey组合的字符串,比AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.'properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');--- 建立目标表,写入mysqlCREATE TABLE `mysql_dest`(`ts` TIMESTAMP,`pv` BIGINT,`uv` BIGINT) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://11.150.2.1:3306/flink_nginx?&serverTimezone=Asia/Shanghai', -- 注意这边的时区设置'username'= 'username', -- mysql账号'password'= 'password', -- mysql密码'table-name' = 'mysql_dest' -- mysql表名);--- 查询 kafka 数据源表,计算后写入 mysql 目标表INSERT INTO mysql_dest (ts,uv,pv)SELECT TUMBLE_START(ts, INTERVAL '1' MINUTE) start_ts, COUNT(DISTINCT remote_addr) uv,count(*) pvFROM nginx_sourceGROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
本页内容是否解决了您的问题?