checkpoint参数设置
//设置精准一次性保证(默认) 每5000ms 开始一次 checkpoint
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);
//Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
- 设置最小时间间隔
// 两次Checkpoint的间隔为60秒
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60*1000)
文件持久化
// 使用HDFS作为State Backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"))
// 使用阿里云OSS作为State Backend
env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>"))
// 使用Amazon作为State Backend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"))
// 关闭Asynchronous Snapshot
env.setStateBackend(new FsStateBackend(checkpointPath, false))
// 最多同时进行3个Checkpoint (一般情况是一个checkpoint)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(3)
//checkpoint过程失败,影响应用重启 , 可以关闭checkpoint
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)