SELECT FROM
SELECT 不能单独使用,必须配合CREATE VIEW … AS
或 INSERT INTO 使用,否则系统会提示没有合适的 Operator。
语法
SELECT 以逗号分隔的需要选中的字段
FROM 数据源或视图
WHERE 过滤条件
其他子查询
示例
SELECT s1.time_, s1.client_ip, s1.uri, s1.protocol_version, s2.status_code, s2.date_
FROM KafkaSource1 AS s1, KafkaSource2 AS s2
WHERE s1.time_ = s2.time_ AND s1.client_ip = s2.client_ip;
WHERE
WHERE 用来过滤查询条件(谓词),多个并列的条件可以用 AND、OR 来连接。在与外部数据库 TencentDB 的表 JOIN 时,条件的连接只支持 AND。如需使用 OR 的功能,请参见 UNION ALL。
HAVING
HAVING 用于过滤 GROUP BY 之后的结果。WHERE 在 GROUP BY 之前过滤,而 HAVING 在 GROUP BY 分组之后过滤。
SELECT SUM(amount)
FROM Orders
WHERE price > 10
GROUP BY users
HAVING SUM(amount) > 50
GROUP BY
在流计算 Oceanus 中,GROUP BY 用于对结果进行分组聚合。目前有含时间窗口(Window)类型的 GROUP BY 和不含窗口的 GROUP BY(即持续查询)。
含时间窗口(Window)类型的 GROUP BY 不会更新之前的结果,因而会产生 Append(Tuple)类型的数据流,只允许写入不带主键的 MySQL、PostgreSQL、Kafka、Elasticsearch 等数据目的。
不含窗口的 GROUP BY 会更新之前发出的记录,因而会产生 Upsert 类型的数据流,只允许写入含主键的云数据库 MySQL、PostgreSQL 数据目的表(Sink)、Elasticsearch 等,且主键必须与 GROUP BY 语句里 Upsert 字段一致。
含时间窗口的 GROUP BY
本示例定义了一个包含时间窗口的 GROUP BY 查询语句。关于时间窗口函数的使用方法,请参见 时间窗口函数。 SELECT user, SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
在 Event Time 时间模式下,使用 WATERMARK FOR
定义时间戳字段,那么 TUMBLE 窗口函数的第一个参数必须为该字段。HOP 和 SESSION 窗口同理。
在 Processing Time 时间模式下,TUMBLE 窗口函数的第一个参数必须为表proctime()
声明的字段。HOP 和 SESSION 窗口同理。
不含时间窗口的 GROUP BY(持续查询)
本示例定义了一个不包含时间窗口的 GROUP BY 查询语句,这种查询叫做持续查询,因为它会根据每条新到的数据来计算并决定是否更新之前发出的结果,因而会产生一个 Upsert 流。
SELECT a, SUM(b) as d
FROM Orders
GROUP BY a
注意
这种方式可能会因为 key 的数量过大或数据过多而发生内存溢出。因而请谨慎设置对象超时时间,不要过长。
JOIN
目前流计算 Oceanus 系统只支持等值连接 Equi-JOIN,即 JOIN 条件内包含至少一条令左右表某字段相等的过滤条件。
流和流的 Inner Equi-JOIN
目前流和流的连接也分为两种:含时间范围的和不含时间范围的。前者会生成 Append(Tuple)类型的流,而后者会生成 Upsert 类型的流。
含时间范围的 Inner JOIN
含时间范围的 JOIN 也称为 Interval Join,它的 WHERE 条件中需要至少一个等值连接的 JOIN 条件和一个指定的时间范围。这个时间范围可以用 <、<=、>=、> 或 BETWEEN … AND 等来表示。
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
示例:
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
不含时间范围的 Inner JOIN
不含时间范围的流-流 JOIN 的特点是只要求有至少一个等值连接,而不要求指定时间范围。也就是说,它会将历史以来所有的活跃数据参与计算(可以通过指定超时时间来去除不活跃的元素)。
注意
可能会导致非常大的内存占用,需要谨慎使用。通常需要设置合适的对象超时时间,以及时清除失活的对象。
这种查询会产生一个 Upsert 流,只能使用含主键的 MySQL、PostgreSQL、Elasticsearch 等数据目的(Sink)来接收数据。
示例:
SELECT *
FROM Orders INNER JOIN Product ON Orders.productId = Product.id
Outer Equi-JOIN
Outer Equi-JOIN 会产生 Upsert 流,因此需要注意数据目的(Sink)必须可以接受 Upsert 数据流,例如 MySQL、PostgreSQL、Elasticsearch 等。
注意
由于不对 JOIN 的顺序做优化,JOIN 操作会依次按照 FROM 语句后定义的各表进行,因此可能会产生非常大的状态压力,甚至可能执行失败。
SELECT *
FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
SELECT *
FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT *
FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
维表 JOIN
流计算 Oceanus 也支持流与 MySQL、PostgreSQL 数据库中维表(Temporal table,即随时间不断变化的表)的 JOIN,语法同上面介绍的完全一致,只是要求维表必须放在 JOIN 条件的右表。
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency
注意
一定要加入 FOR SYSTEM_TIME AS OF
语句,否则虽然仍然可以执行 JOIN,但是只会全量读取一次数据库,结果可能不符合预期。
与 UDTF 的 JOIN
如果用户自定义了表函数(UDTF,即 User-Defined Table Function),则可以将表函数作为 JOIN 的右表,语法与普通 JOIN 类似,只是需要加上 LATERAL TABLE( )
关键字,将 UDTF 包围起来。
Inner UDTF JOIN
SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag
Left Outer UDTF JOIN
SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
注意
目前 Left Outer UDTF JOIN 只支持ON TRUE
语法,类似于 CROSS JOIN。
与数组进行 JOIN
流计算 Oceanus 系统支持和一个已定义的数组对象(可通过 值构造函数 构造数组对象 ARRAY)做 JOIN 操作。 示例:假设 tags 是一个已定义的数组。
SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
UNION ALL
UNION ALL 用来合并两个查询的结果。
SELECT *
FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
UNION ALL
(SELECT user FROM Orders WHERE b = 0)
)
目前流计算 Oceanus 只支持 UNION ALL 而暂不支持 UNION,即不会对相同的行进行去重操作。
如果需要实现去重以达到 UNION 的效果,请配合 DISTINCT 使用。DISTINCT 会让结果从 Append(Tuple)流变为 Upsert 流,因而只能使用含主键的 MySQL、PostgreSQL、Elasticsearch 数据目的(Sink)来接收数据。
OVER Window 聚合
如果需要对数据流做基于滑动窗口的聚合(不使用 GROUP BY 的聚合),那么可以使用 OVER 来进行滑动窗口的聚合操作。在 OVER 中可以指定 PARTITION、ORDER、窗口范围等。
下面的示例定义了一个滑动窗口聚合查询,统计一个大小为3的滑动窗口的交易总额(amount)。其中对之前的行使用 PRECEDING,而目前还不支持 FOLLOWING。
另外 ORDER BY 后面只允许一个时间戳字段,本例使用数据源中声明的 proctime
字段。
SELECT SUM(amount) OVER (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
SELECT COUNT(amount) OVER w, SUM(amount) OVER w
FROM Orders
WINDOW w AS (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
ORDER BY
ORDER BY 用来对查询的结果做排序,默认是 ASC(升序排列),也可以显式指定 DESC(降序排列)。
注意
要求第一个排序项必须是升序的时间列(Event Time 时间戳或 Processing Time 时间戳,即 PROCTIME),之后的排序项可以自由指定。
SELECT *
FROM Orders
ORDER BY `orderTime`, `username` DESC, `userId` ASC
DISTINCT
DISTINCT 用来对查询结果进行去重,它必须放在 SELECT 后面。
SELECT DISTINCT users FROM Orders
DISTINCT 会产生一个 Upsert 流,因而只有 Upsert 类型的数据目的(Sink)才可以接收其结果。而且长时间查询可能会导致内存占用过大,请谨慎使用。
通过设置合适的对象过期时间,可以及时清除失活对象来节省内存。
IN
可以使用 IN 关键字对判断指定集合(例如子查询)中是否存在某元素。
SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts
)
EXISTS
如果 EXISTS 后面子查询的结果大于或等于一行(存在数据),则返回true
。
SELECT user, amount
FROM Orders
WHERE product EXISTS (
SELECT product FROM NewProducts
)
ORDER BY
ORDER BY 可以对某个字段进行排序后输出。
SELECT *
FROM Orders
ORDER BY orderTime
Grouping Sets、Rollup、Cube
对于 Grouping Sets、Rollup、Cube 操作,产生的是一个 Upsert 流,因此只有 Upsert 类型的数据目的才可以接收其结果。
SELECT SUM(amount)
FROM Orders
GROUP BY GROUPING SETS ((user), (product))
模式匹配
目前流计算 Oceanus 支持 MATCH_RECOGNIZE
语句对一条输入流进行模式匹配,用户可以用 SQL 语句来描述 CEP(复杂事件处理)的逻辑。
SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE (
PARTITION BY userid
ORDER BY proctime
MEASURES
A.id AS aid,
B.id AS bid,
C.id AS cid
PATTERN (A B C)
DEFINE
A AS name = 'a',
B AS name = 'b',
C AS name = 'c'
) AS T
上述例子定义了 A、B、C 三个事件,分别表示 name 字段等于 a、b、c 时的事件。PATTERN 表示事件触发的规则,即 A、B、C 三个事件连续出现时触发。MEASURES 指定了事件触发后的输出格式。
Top-N
Top-N 查询可以在一批流数据中,不断输出当前最新的前 N 大或者前 N 小的记录,因此输出类型为 Upsert 流,需要写入支持 Upsert 数据流的数据目的(Sink)。
相邻数据去重
有时候上游输入的数据可能会包含连续的重复值,该查询语句可以将重复的数据删除,只保留一个。去重的语法,可参见 Flink 官方文档的 去重。
本页内容是否解决了您的问题?