tencent cloud

文档反馈

使用 Flink 消费 CLS 日志

最后更新时间:2024-01-20 17:28:40
    本文详细描述了如何使用 Flink 实时消费 CLS 日志,使用 Flink-sql 分析 Nginx 日志数据,计算 Web 端的 PV/UV 值,并将结果数据实时写入到自建的数据库 MySQL 数据库。
    文中使用的组件/应用及版本如下:
    技术组件
    版本
    Nginx
    1.22
    CLS 日志服务
    -
    Java
    openjdk version "1.8.0_232"
    Scala
    2.11.12
    Flink sql
    flink-1.14.5
    MySQL
    5.7

    操作步骤

    步骤1:安装腾讯云 Nginx 网关

    1. 购买腾讯云主机 CVM,请参考 通过购买页创建实例
    2. Nginx 安装,请参考LINUX安装nginx详细步骤。
    3. 成功通过浏览器访问 nginx,并可以下图说明安装成功:
    
    

    步骤2:采集 Nginx 日志到腾讯云 CLS 日志服务

    2. CLS 日志服务采集终端 Loglistener的安装,Loglistener 类似于开源组件 Beats,用来采集日志数据的 Agent。
    3. 日志主题开启索引后,可以正常查询到 Nginx 的日志数据,如下图所示:
    4. 最后,在 CLS 控制台 开启 kafka 消费,使用 Kafka 协议消费功能,您可以将一个日志主题,当作一个 Kafka Topic 来消费。本文就是使用流计算框架 Flink,实时消费 Nginx 日志数据,将实时计算的结果写入到 MySQL。

    步骤3:搭建 MySQL 数据库

    参考文档:创建 MySQL 实例
    1. 登录数据库:
    mysql -h 172.16.1.1 -uroot
    2. 新建需要使用的 database 和表,例子中的 database 名为 flink_nginx,表名为 mysql_dest。
    create database if not exists flink_nginx;
    create table if not exists mysql_dest(
    ts timestamp,
    pv bigint,
    uv bigint
    );
    1. 部署 Flink 时,建议使用如下版本,否则可能会安装不成功。
    2. 安装 Flink 1.14.15 ,并进入 SQL 界面,从 Apache Flink 官网 下载 Flink 二进制代码包并开始安装。
    # 解压缩 Flink 二进制包
    tar -xf flink-1.14.5-bin-scala_2.11.tgz
    cd flink-1.14.5
    
    # 下载 kafka 相关依赖
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.14.5/flink-connector-kafka_2.11-1.14.5.jar
    mv flink-connector-kafka_2.11-1.14.5.jar lib
    wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.4.1/kafka-clients-2.4.1.jar
    mv kafka-clients-2.4.1.jar lib
    
    # 下载 MySQL 相关依赖
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.14.5/flink-connector-jdbc_2.11-1.14.5.jar
    mv flink-connector-jdbc_2.11-1.14.5.jar lib
    wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.11/mysql-connector-java-8.0.11.jar
    mv mysql-connector-java-8.0.11.jar lib
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-common/1.14.5/flink-table-common-1.14.5.jar
    mv flink-table-common-1.14.5.jar lib
    
    # 启动 Flink
    bin/start-cluster.sh
    bin/sql-client.sh
    3. 当出现以下画面则说明安装成功。注意默认的网页端口是8081。
    
    
    
    
    
    
    1. 在 SQL Client 界面中,执行如下 SQL:
    -- 建数据源表消费 kafka 数据
    CREATE TABLE `nginx_source`
    (
    `remote_user` STRING, -- 日志中字段,客户端名称
    `time_local` STRING, -- 日志中字段,服务器本地时间
    `body_bytes_sent` BIGINT, -- 日志中字段,发送给客户端的字节数
    `http_x_forwarded_for` STRING, -- 日志中字段,当前端有代理服务器时,记录客户端真实 IP 地址的配
    `remote_addr` STRING, -- 日志中字段,客户端 IP 地址
    `protocol` STRING, -- 日志中字段,协议类型
    `status` INT, -- 日志中字段,HTTP 请求状态码
    `url` STRING, -- 日志中字段,url 地址
    `http_referer` STRING, -- 日志中字段,访问来源的页面链接地址
    `http_user_agent` STRING, -- 日志中字段,客户端浏览器信息
    `method` STRING, -- 日志中字段,HTTP 请求方法
    `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka分区
    `ts` AS PROCTIME()
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'YourTopic', -- cls kafka协议消费控制台给出的的主题名称,例如out-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX
    'properties.bootstrap.servers' = 'kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096', -- cls kakfa协议消费控制台给出的服务地址,例子中是广州地域的外网消费地址,请按照您的实际情况填写
    'properties.group.id' = 'kafka_flink', -- kafka 消费组名称
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true' ,
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="your username" password="your password";',--用户名是日志主题所属的日志集合ID,例如ca5cXXXX-dd2e-4ac0-af12-92d4b677d2c6,密码是用户的secretid#secrectkey组合的字符串,比AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN'
    
    );
    
    --- 建立目标表,写入mysql
    CREATE TABLE `mysql_dest`
    (
    `ts` TIMESTAMP,
    `pv` BIGINT,
    `uv` BIGINT
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://11.150.2.1:3306/flink_nginx?&serverTimezone=Asia/Shanghai', -- 注意这边的时区设置
    'username'= 'username', -- mysql账号
    'password'= 'password', -- mysql密码
    'table-name' = 'mysql_dest' -- mysql表名
    );
    
    --- 查询 kafka 数据源表,计算后写入 mysql 目标表
    INSERT INTO mysql_dest (ts,uv,pv)
    SELECT TUMBLE_START(ts, INTERVAL '1' MINUTE) start_ts, COUNT(DISTINCT remote_addr) uv,count(*) pv
    FROM nginx_source
    GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
    2. 在 Flink 的任务监控页,我们可以看到任务的监控数据:
    
    
    3. 进入 MySql 数据库,即可看到计算 PV、UV 的结果数据实时写入:
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持