Prerequisites
Step 1. Create a SQL job
Log in to the Stream Compute Service console. Enter a workspace and click Jobs on the left menu. On the Jobs page, click Create job, select SQL job as the type, enter a job name, select a running cluster (where the job will be executed), and click Confirm. Step 2. Grant access to related services
Select the job created and click Development & Testing. If you haven't granted Stream Compute Service access to related services, in the window that pops up, click Authorize now to allow Stream Compute Service to access resources such as message queues and cloud databases.
Step 3. Write SQL statements
After the authorization is completed, you can enter SQL statements in the code editing box to create a job quickly without preparing additional data. The example below performs the following actions:
1. Use the built-in connector Datagen to create the data source table "Data_Input", which includes the fields "age" and "score". The data type of both is BigInt. Datagen is a local data source of the cluster and can generate random data continuously.
2. Use the built-in connector Blackhole to create the data output table "Data_Output", which includes the fields "avg_age" and "avg_score". The data type of both is BigInt. Blackhole is a local data sink of the cluster and can receive data continuously.
3. Calculate the average of the "age" and "score" values in "Data_Input" and store the results in "Data_Output".
To use the data sources or sinks of other connectors, such as CKafka or Elasticsearch, see Connectors. Please note that you will need to prepare your own data. CREATE TABLE `Data_Input` ( --Step 1. Create the data source "Data_Input"
age BIGINT,
score BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second'='100', -- The number of data records generated per second
'fields.age.kind'='random', -- A random number without range
'fields.f_random.min'='1', -- The minimum random number
'fields.f_random.max'='100', -- The maximum random number
'fields.score.kind'='random', -- A random number without range
'fields.score.min'='1', -- The minimum random number
'fields.score.max'='1000' -- The maximum random number
);
CREATE TABLE `Data_Output` ( -- Step 2. Create the output data table "Data_Output"
`avg_age` BIGINT,
`avg_score` BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO `Data_Output` -- Step 3. Calculate the average of the "age" and "score" values in "Data_Input" and store the results in "Data_Output"
SELECT AVG(age), AVG(score) FROM `Data_Input`;
Step 4. Configure job parameters
In Job parameters, configure parameters including the checkpoint interval and default operator parallelism. If you use other sources and sinks, you also need to select the corresponding connectors.
Step 5. Publish and run the SQL job
Click Publish draft to run and check the job and then click Confirm to publish the job. A version of the job will be created, and a version number will be automatically generated.
Under Manage version, you can view and switch to different versions of the job.
Select the version you want to run, click Run version, and then click Confirm to run the job.
Step 6. View job execution details
After you click Confirm, the job status will become "Operating" first and change to "Running" after the job is successfully run. After the job is successfully run, you can view its execution details via Monitoring, Log, and Flink UI.
Was this page helpful?