SELECT FROM
The SELECT statement must be used together with CREATE VIEW … AS
or INSERT INTO
; otherwise, a no operator error will occur.
Syntax
SELECT Fields to select, separated with commas
FROM Data source or view
WHERE Filter condition
Other subqueries
Example
SELECT s1.time_, s1.client_ip, s1.uri, s1.protocol_version, s2.status_code, s2.date_
FROM KafkaSource1 AS s1, KafkaSource2 AS s2
WHERE s1.time_ = s2.time_ AND s1.client_ip = s2.client_ip;
WHERE
You can use WHERE
to filter the data to select from. Combine multiple filter conditions with AND
or OR
. When a TencentDB table is joined, only AND
is supported. To use OR
, see UNION ALL
below.
HAVING
You can use HAVING
to filter the result of GROUP BY
. WHERE
filters data before GROUP BY
, while HAVING
filters data after GROUP BY
.
SELECT SUM(amount)
FROM Orders
WHERE price > 10
GROUP BY users
HAVING SUM(amount) > 50
GROUP BY
In Stream Compute Service, GROUP BY
arranges query results into groups. It supports GROUP BY
with a time window and GROUP BY
without (continuous query).
GROUP BY
with a time window does not update previous results and therefore generates append (tuple) data streams. Such data can only be written to MySQL, PostgreSQL, Kafka, and Elasticsearch sinks that do not have a primary key.
GROUP BY
without a time window updates previously sent records and therefore generates upsert data streams. Such data can be written to MySQL, PostgreSQL, and Elasticsearch sinks with a primary key (the primary key must be identical to the upsert field in the GROUP BY
statement).
GROUP BY
with a time window
The example below defines a GROUP BY
query with a time window. For details about time window functions, see Time Window Functions. SELECT user, SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
In Event Time mode (the WATERMARK FOR statement is used to define the timestamp field), the first parameter of the TUMBLE function must be the timestamp field. The same is true for HOP and SESSION.
In Processing Time mode, the first parameter of the TUMBLE function must be the field declared by proctime()
. The same is true for HOP and SESSION.
GROUP BY
without a time window (continuous query)
The example below defines a GROUP BY
query without a time window. This is known as a continuous query. It determines whether to update previously sent results depending on each arriving data record and therefore generates an upsert stream.
SELECT a, SUM(b) as d
FROM Orders
GROUP BY a
Note
Out of Memory errors may occur for such queries due to too many keys or too much data. Please consider this when setting the timeout period. Do not set it too long.
JOIN
Currently, Stream Compute Service only supports Equi-JOIN
. That is to say, the JOIN
query must include at least one filter condition that equates a field in the left and right tables.
Inner Equi-JOIN (stream join)
There are two types of stream join: JOIN
with a time range and JOIN
without a time range. The former generates append (tuple) streams, while the latter generates upsert streams.
Inner JOIN with a time range
A JOIN
query with a time range is also known as interval join. The WHERE
clause of such queries must have at least one equality join condition and a time range. The time range can be specified using <, <=, >=, > or BETWEEN … AND
.
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
Example:
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
Inner JOIN without a time range
An inner join without a time range must have at least one equality join condition, but does not need to specify a time range. This means all historical data will be used for calculation (you can specify a timeout period to exclude inactive elements).
Note
Such queries may significantly drive up memory usage. We recommend you set an appropriate timeout period to exclude inactive objects.
Such queries generate upsert streams and therefore can only use MySQL, PostgreSQL, and Elasticsearch sinks that have a primary key.
Example:
SELECT *
FROM Orders INNER JOIN Product ON Orders.productId = Product.id
Outer Equi-JOIN
Outer Equi-JOIN generates upsert streams and therefore can only use MySQL, PostgreSQL, and Elasticsearch sinks that accept such streams.
Note
Because the join order is not optimized, the JOIN
query will follow the order of tables in the FROM
clause. This may result in high state pressure and cause the query to fail.
SELECT *
FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
SELECT *
FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT *
FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
JOIN with temporal tables
Stream Compute Service also supports joining streams and temporal tables (tables that change constantly over time). The syntax is the same, except that the temporal table must be used as the right table.
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency
Note
Make sure you include the FOR SYSTEM_TIME AS OF
clause, without which the JOIN
query will still be executed, but the database will be read in its entirety only once, and the result may not meet expectations.
JOIN with user-defined table functions
You can use a user-defined table function (UDTF) as the right table in a JOIN
query. The syntax is similar to other JOIN
queries. You just need to put the UDTF in LATERAL TABLE( )
.
Inner UDTF JOIN
SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag
Left Outer UDTF JOIN
SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
Note
Currently, Left Outer JOIN
with UDTFs only supports ON TRUE
, which is similar to CROSS JOIN
.
JOIN with arrays
Stream Compute Service supports join operations with a defined array object (you can use value construction functions to construct an array object). Example: Assume that tags
is a defined array.
SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
UNION ALL
UNION ALL
combines the results of two queries.
SELECT *
FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
UNION ALL
(SELECT user FROM Orders WHERE b = 0)
)
Currently, Stream Compute Service only supports UNION ALL
and not UNION
. That is, it does not remove duplicate rows.
To remove duplicates, you can use DISTINCT
together with UNION ALL
. DISTINCT
converts append (tuple) streams into upsert streams and therefore can only use MySQL, PostgreSQL, and Elasticsearch sinks that have a primary key.
OVER
You can use OVER
to aggregate data streams based on hopping windows (without using GROUP BY
). In the OVER
clause, you can specify the partition, order, and window frame.
The example below defines an aggregate query based on topping windows. It calculates the total transaction volume (amount
) for a window size of 3. To specify the number of preceding rows, use PRECEDING
. FOLLOWING
is not supported currently.
You can specify only one timestamp field for ORDER BY
. In the example below, the proctime
field declared in the data source is used.
SELECT SUM(amount) OVER (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
SELECT COUNT(amount) OVER w, SUM(amount) OVER w
FROM Orders
WINDOW w AS (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
ORDER BY
ORDER BY
sorts query results. By default, it sorts the data in ascending order (ASC
). You can also use DESC
to sort the data in descending order.
Note
The first field specified for sorting must be the time column (Event Time or Processing Time, i.e., PROCTIME
), in ascending order. After that, you can specify your own fields to sort by.
SELECT *
FROM Orders
ORDER BY `orderTime`, `username` DESC, `userId` ASC
DISTINCT
DISTINCT
removes duplicates from query results. It should be added after SELECT
.
SELECT DISTINCT users FROM Orders
DISTINCT
generates upsert streams, so you need to use sinks that accept upsert streams. Please note that continuous use of such queries may result in high memory usage.
We recommend you set an appropriate timeout period to exclude inactive objects and reduce memory usage.
IN
You can use the IN
keyword to determine whether an element exists in a specified set, such as a subquery.
Note
This operation is demanding on memory.
SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts
)
EXISTS
If the result of the subquery following EXISTS
has one or more rows (data exists), true
is returned.
Note
This operation is demanding on memory.
SELECT user, amount
FROM Orders
WHERE product EXISTS (
SELECT product FROM NewProducts
)
ORDER BY
ORDER BY
sorts data by a specified field.
Note
This operation is demanding on memory.
SELECT *
FROM Orders
ORDER BY orderTime
Grouping Sets, Rollup, Cube
Grouping Sets
, Rollup
, and Cube
generate upsert streams. Therefore, you need to use data sinks that accept upsert streams.
SELECT SUM(amount)
FROM Orders
GROUP BY GROUPING SETS ((user), (product))
MATCH_RECOGNIZE
MATCH_RECOGNIZE
performs pattern recognition on an input stream, allowing you to use a SQL query to describe complex event processing (CEP) logic.
SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE (
PARTITION BY userid
ORDER BY proctime
MEASURES
A.id AS aid,
B.id AS bid,
C.id AS cid
PATTERN (A B C)
DEFINE
A AS name = 'a',
B AS name = 'b',
C AS name = 'c'
) AS T
The above example defines three events, A, B, and C, where the name
field equates a
, b
, and c
. PATTERN
specifies the trigger rule. In the example, the trigger rule is when A, B, and C occur consecutively. MEASURES
specifies the output format.
Top-N
Top-N
gets the N smallest or largest values from a data stream. It generates upsert streams, so you need to use data sinks that accept upsert streams.
To learn more about Top-N
syntax, see the Flink document Top-N. Deduplication
In cases where the input data includes consecutive duplicate values, you can use this clause to remove the duplicates. To learn more about the syntax, see Flink document Deduplication.
Was this page helpful?