tencent cloud

All product documents
Stream Compute Service
Advanced Job Parameters
Last updated: 2023-12-26 17:49:27
Advanced Job Parameters
Last updated: 2023-12-26 17:49:27

Overview

You can set more custom Flink parameters in Job parameters > Advanced parameters ‍to tune a job. For example, you can set the job restart policy, adjust the mini-match settings of SQL, disable async checkpointing, set minimum checkpoint interval, and adjust the cache size of RocksDB StateBackend.
Custom advanced parameters must comply with the YAML syntax, configured in the "key: value" format (note there is a space following the colon). After job parameters are modified, you need to re-publish and start the job to apply new parameters. For details of parameters in Flink v1.11, see Configuration.

Example

Setting the state backend of a job

RocksDB state backend is used by default in Stream Compute Service. It allows access of a larger state, but its throughput and performance are inferior to those of the memory-based FileSystem state backend.
If your job state is small, and you require low latency and high throughput, change the state backend to the FileSystem state backend using the following statement:
state.backend: filesystem

Setting job restart policy and threshold

By default, a Flink job can be internally restarted (hot restart when the JobManager is still active, which takes about 15s) a maximum of five times after crash. If a crash occurs again after the number of restarts reaches the threshold, the JobManager will exit, resulting in a longer cold recovery period of the job (about 3-5 minutes). If checkpointing hasn't been enabled for the job, a lot of its state and data may be lost.
To adjust the number of internal restarts allowed of the job, configure the following parameter (in this example, a maximum of 100 internal restarts are allowed. Set the parameter with caution):
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 100
restart-strategy.fixed-delay.delay: 5 s

Setting the JVM overhead percentage

The percentage of JVM overhead defaults to 10% in Flink. When the RocksDB state backend is used, it requires a larger memory in this area, which may cause overuse and JVM to be killed by the container management system. To minimize this situation and keep the job using the RocksDB state backend more stable, we recommend you increase the value of this parameter as appropriate.
Note
Increasing the value of this parameter will lower the percentage of available heap memory in JVM, making the job more prone to the out-of-memory (OOM) error in the heap. Please proceed with caution.
taskmanager.memory.jvm-overhead.fraction: 0.3

Setting the checkpoint policy to at-least-once

The default checkpoint policy is exactly-once in Stream Compute Service. This policy can ensure exact state consistency after a crashed job is recovered, but it may sometimes cause a high latency.
If a part of duplicate data is allowed to be used in computing (resulting in inaccurate results for a short period of time) when the crashed job is recovered, you can change the Flink checkpoint policy to at-least-once for better checkpoint performance, especially when the state is huge and multiple streams have different rates.
execution.checkpointing.mode: AT_LEAST_ONCE

Disabling operator chaining

By default, operators with the same parallelism are chained together if possible in the execution graph in Flink to avoid additional serialization or deserialization of data transferred between upstream and downstream operators. If you want to view the data inflow and outflow of each operator to facilitate troubleshooting, disable this operator chaining feature.
Note
Disabling this feature may cause the running efficiency of the job to decline greatly. Please proceed with caution.
pipeline.operator-chaining: false

Setting the checkpoint timeout of a job

The checkpoint timeout defaults to 20 minutes (1,200s) in Stream Compute Service.
If your job state is large, you can set a longer timeout with the following parameter:
execution.checkpointing.timeout: 3000s
You can also reduce the timeout:
execution.checkpointing.timeout: 1000s
You also need to add the following statement on the editing page of a SQL job, with the value set to the configured timeout. For details, see Flink Configuration Options.
set CHECKPOINT_TIMEOUT= '1000 s';

Setting the checkpoint storage policy

In Stream Compute Service, three checkpoint storage policies are available to Flink jobs: DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION, and RETAIN_ON_SUCCESS. The default policy is DELETE_ON_CANCELLATION. If this parameter is not set, the default policy will be used.
The following table compares these policies.
Checkpoint Storage Policy
Checkpoint Clearing
DELETE_ON_CANCELLATION (default)
1. Create a checkpoint when the job is stopped, and delete the old checkpoint (so cannot recover the job from the old one)
2. Create no checkpoint when the job is stopped, and delete the old checkpoint (so cannot recover the job from checkpoint)
RETAIN_ON_CANCELLATION
1. Create a checkpoint when the job is stopped, and delete the old checkpoint (so cannot recover the job from the old one)
2. Create no checkpoint when the job is stopped, and do not delete the old checkpoint (so can recover the job from checkpoint)
RETAIN_ON_SUCCESS
1. Create a checkpoint when the job is stopped, and do not delete the old checkpoint (so can recover the job from checkpoint)
2. Create no checkpoint when the job is stopped, and do not delete the old checkpoint (so can recover the job from checkpoint)
You can set the checkpoint storage policy of a job in Job parameters ‍> Advanced parameters. After the setting, you need to restart the job to apply the policy.
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_SUCCESS
Note
For a JAR or Python Flink job, you are not advised to explicitly set the checkpoint storage policy in the JAR package, because the settings there will overwrite those in advanced parameters.

Setting more options

Flink provides many other options. For a complete list, see the Flink documentation.
Note
Not all options are supported in Stream Compute Service. Before making any adjustment, please carefully read the following use limits and fully understand the relevant issues and risks to avoid unstable job running, failure to start a job, or other events due to inappropriate parameter adjustments.

Use limits

The following parameters are set by the Stream Compute Service system and cannot be modified. Please do not pass them in through advanced parameters.
Non-customizable Parameters
kubernetes.container.image
kubernetes.jobmanager.cpu
taskmanager.cpu.cores
kubernetes.taskmanager.cpu
jobmanager.heap.size
jobmanager.heap.mb
jobmanager.memory.process.size
taskmanager.heap.size
taskmanager.heap.mb
taskmanager.memory.process.size
taskmanager.numberOfTaskSlots
env.java.opts (you can customize another two separate parameters: env.java.opts.taskmanager and env.java.opts.jobmanager)

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 available.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon