Overview
You can set more custom Flink parameters in Job parameters > Advanced parameters to tune a job. For example, you can set the job restart policy, adjust the mini-match settings of SQL, disable async checkpointing, set minimum checkpoint interval, and adjust the cache size of RocksDB StateBackend
.
Custom advanced parameters must comply with the YAML syntax, configured in the "key: value" format (note there is a space following the colon). After job parameters are modified, you need to re-publish and start the job to apply new parameters. For details of parameters in Flink v1.11, see Configuration. Example
Setting the state backend of a job
RocksDB state backend is used by default in Stream Compute Service. It allows access of a larger state, but its throughput and performance are inferior to those of the memory-based FileSystem state backend.
If your job state is small, and you require low latency and high throughput, change the state backend to the FileSystem state backend using the following statement:
state.backend: filesystem
Setting job restart policy and threshold
By default, a Flink job can be internally restarted (hot restart when the JobManager is still active, which takes about 15s) a maximum of five times after crash. If a crash occurs again after the number of restarts reaches the threshold, the JobManager will exit, resulting in a longer cold recovery period of the job (about 3-5 minutes). If checkpointing hasn't been enabled for the job, a lot of its state and data may be lost.
To adjust the number of internal restarts allowed of the job, configure the following parameter (in this example, a maximum of 100 internal restarts are allowed. Set the parameter with caution):
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 100
restart-strategy.fixed-delay.delay: 5 s
Setting the JVM overhead percentage
The percentage of JVM overhead defaults to 10% in Flink. When the RocksDB state backend is used, it requires a larger memory in this area, which may cause overuse and JVM to be killed by the container management system. To minimize this situation and keep the job using the RocksDB state backend more stable, we recommend you increase the value of this parameter as appropriate.
Note
Increasing the value of this parameter will lower the percentage of available heap memory in JVM, making the job more prone to the out-of-memory (OOM) error in the heap. Please proceed with caution.
taskmanager.memory.jvm-overhead.fraction: 0.3
Setting the checkpoint policy to at-least-once
The default checkpoint policy is exactly-once in Stream Compute Service. This policy can ensure exact state consistency after a crashed job is recovered, but it may sometimes cause a high latency.
If a part of duplicate data is allowed to be used in computing (resulting in inaccurate results for a short period of time) when the crashed job is recovered, you can change the Flink checkpoint policy to at-least-once for better checkpoint performance, especially when the state is huge and multiple streams have different rates.
execution.checkpointing.mode: AT_LEAST_ONCE
Disabling operator chaining
By default, operators with the same parallelism are chained together if possible in the execution graph in Flink to avoid additional serialization or deserialization of data transferred between upstream and downstream operators. If you want to view the data inflow and outflow of each operator to facilitate troubleshooting, disable this operator chaining feature.
Note
Disabling this feature may cause the running efficiency of the job to decline greatly. Please proceed with caution.
pipeline.operator-chaining: false
Setting the checkpoint timeout of a job
The checkpoint timeout defaults to 20 minutes (1,200s) in Stream Compute Service.
If your job state is large, you can set a longer timeout with the following parameter:
execution.checkpointing.timeout: 3000s
You can also reduce the timeout:
execution.checkpointing.timeout: 1000s
You also need to add the following statement on the editing page of a SQL job, with the value set to the configured timeout. For details, see Flink Configuration Options. set CHECKPOINT_TIMEOUT= '1000 s';
Setting the checkpoint storage policy
In Stream Compute Service, three checkpoint storage policies are available to Flink jobs: DELETE_ON_CANCELLATION
, RETAIN_ON_CANCELLATION
, and RETAIN_ON_SUCCESS
. The default policy is DELETE_ON_CANCELLATION
. If this parameter is not set, the default policy will be used.
The following table compares these policies.
|
DELETE_ON_CANCELLATION (default) | 1. Create a checkpoint when the job is stopped, and delete the old checkpoint (so cannot recover the job from the old one) 2. Create no checkpoint when the job is stopped, and delete the old checkpoint (so cannot recover the job from checkpoint) |
| 1. Create a checkpoint when the job is stopped, and delete the old checkpoint (so cannot recover the job from the old one) 2. Create no checkpoint when the job is stopped, and do not delete the old checkpoint (so can recover the job from checkpoint) |
| 1. Create a checkpoint when the job is stopped, and do not delete the old checkpoint (so can recover the job from checkpoint) 2. Create no checkpoint when the job is stopped, and do not delete the old checkpoint (so can recover the job from checkpoint) |
You can set the checkpoint storage policy of a job in Job parameters > Advanced parameters. After the setting, you need to restart the job to apply the policy.
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_SUCCESS
Note
For a JAR or Python Flink job, you are not advised to explicitly set the checkpoint storage policy in the JAR package, because the settings there will overwrite those in advanced parameters.
Setting more options
Flink provides many other options. For a complete list, see the Flink documentation.
Note
Not all options are supported in Stream Compute Service. Before making any adjustment, please carefully read the following use limits and fully understand the relevant issues and risks to avoid unstable job running, failure to start a job, or other events due to inappropriate parameter adjustments.
Use limits
The following parameters are set by the Stream Compute Service system and cannot be modified. Please do not pass them in through advanced parameters.
|
kubernetes.container.image |
kubernetes.jobmanager.cpu |
|
kubernetes.taskmanager.cpu |
|
|
jobmanager.memory.process.size |
|
|
taskmanager.memory.process.size |
taskmanager.numberOfTaskSlots |
env.java.opts (you can customize another two separate parameters: env.java.opts.taskmanager and env.java.opts.jobmanager ) |
Was this page helpful?