INSERT INTO
INSERT INTO
is used together with SELECT
to write the selected data to the specified sink.
Syntax
INSERT INTO sink name
SELECT subclause
Example
The example below inserts the result of the SELECT
query to the sink named KafkaSink1
.
INSERT INTO KafkaSink1
SELECT s1.time_, s1.client_ip, s1.uri, s1.protocol_version, s2.status_code, s2.date_
FROM KafkaSource1 AS s1, KafkaSource2 AS s2
WHERE s1.time_ = s2.time_ AND s1.client_ip = s2.client_ip;
About sinks
Connector package
If a sink is specified using WITH parameters, make sure you select the corresponding built-in connector or upload the connector package.
Without a matching connector package, the error org.apache.flink.table.api.ValidationException: Could not find any factory
will occur when you run the job.
If data is read from or written to Kafka, instead of selecting an old-version package such as flink-connector-kafka-0.11
, we recommend you use flink-connector-kafka
(without a version number) and set connector.version
to universal
to support the latest features.
Calculated columns
INSERT INTO
ignores calculated columns in the sink. Assume that a sink is defined as follows. The SELECT
clause following INSERT INTO MySink
must include a (VARCHAR)
and b (BIGINT)
and cannot include the calculated column c
.
CREATE TABLE MySink (
a VARCHAR,
b BIGINT,
c AS PROCTIME()
) WITH ( ... ... );
Difference between tuple and upsert streams
Make sure you specify the correct WITH parameters for the sink. For example, some connectors can only be used as sources and not sinks, and some only support tuple streams and not upsert streams.
Was this page helpful?