当前位置: 首页 > news >正文

Flink系列之Flink中StateBackend深入剖析和应用


title: Flink系列


一、Flink StateBackend 深入剖析和应用

​ StateBackend 定义了状态是如何存储的,不同的 State Backend 会采用不同的方式来存储状态,核心入口是: StateBackend, Flink 提供了三种不同形式的存储后端,分别是 MemoryStateBackend, FsStateBackend 和 RocksDBStateBackend。

  • MemoryStateBackend 会将工作状态(Task State)存储在 TaskManager 的内存中,将检查点(Job State)存储在 JobManager 的内存中,速度很快,不支持持久化,通常用来存储一些 state 量小的情况下的 state。这种方式是非常不安全的,且受限于JobManager的内存大小,主要在开发调试中使用。

  • FsStateBackend 会将工作状态存储在 TaskManager 的内存中,将检查点存储在文件系统中(通常是分布式文件系统),用来存储 state 量比较大的,window 窗口很长的一些 job 的 state 比较合适。生产环境常用此方案。

  • RocksDBStateBackend 会把状态存储在 RocksDB 中,将检查点存储在文件系统中(类似 FsStateBackend),和 MemoryStateBackend 对比是速度快,GC 少,支持异步 Snapshot 持久化。用来存储 state 量比较大的,window 窗口很长的一些 job 的 state 比较合适。

​ 综上所述,MemoryStateBackend 和 FsStateBackend 都是在内存中进行状态管理,所以可以获取较低的读写延迟,但会受限于 TaskManager 的内存大小;而RocksDBStateBackend 直接将 State 存储到 RocksDB 数据库中,所以不受 JobManager 的内存限制,但会有读写延迟,同时 RocksDBStateBackend 支持增量备份,这是其他两个都不支持的特性。一般来说,如果不是对延迟有极高的要求,RocksDBStateBackend 是更好的选择。

​ 淘汰掉原来的三种实现,提供两种新的实现的目的:为了接口统一!底层原理没变。window编程也被统一了,Time编程也被统一了。

配置:
state.backend: hashmap
state.checkpoint-storage: jobmanager
state.checkpoints.dir: hdfs://hadoop10/flink/checkpoints
state.savepoints.dir: hdfs://hadoop10/flink/savepoints

实现支持MemoryStateBackend
HashMapStateBackend
FsStateBackend
HashMapStateBackend
RocksDBStateBackend
EmbeddedRocksDBStateBackend
代号jobmanager
hashmap
filesystem
hashmap
rocksdb
Task StateTaskManager 堆内存中TaskManager 堆内存中TaskManager 中的 RocksDB 实例中
Job StateJobManager 堆内存中
hashmap 的话基于 CheckpointStorage 来定
外部高可用文件系统,比如 HDFS
hashmap 的话基于 CheckpointStorage 来定
外部高可用文件系统,比如 HDFS
缺点只能保存数据量小的状态
状态数据有可能会丢失
状态大小受TaskManager内存限制(默认支持5M)状态访问速度有所下降
优点开发测试很方便
性能好
状态访问速度很快
状态信息不会丢失
可以存储超大量的状态信息
状态信息不会丢失
使用场景本地开发测试State 量比较大
分钟级 window 窗口的状态数据
生产环境使用
State 量超大
小时级 window 窗口的状态数据
生产环境使用

细粒度:Task State: 一个 Application 会运行很多的 Task, 每个 Task 运行的时候,都有自己的状态, 故障转移 = FailOverStrategy

  • 要么是 TaskManager 的堆内存

  • 要么是 RocksDB 中

粗粒度:Job State:在某个时候,通过某种手段(checkpoint)把这个 job 的所有 Task 的 state 做一个持久化,就形成了 job 的 state, 重启策略 = RestartStrategy

  • 要么是 JobManager 的堆内存

  • 要么是外部的高可用系统中,可以是HDFS

Flink StateBackend 的三种实现对比:

1.1 MemoryStateBackend

​ 默认情况下,状态信息是存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到 JobManager 的堆内存中。

缺点:
	只能保存数据量小的状态
	状态数据有可能会丢失
优点:
	开发测试很方便

在这里插入图片描述

1.2 FSStateBackend

状态信息存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)

缺点:
	状态大小受TaskManager内存限制(默认支持5M)
优点:
	状态访问速度很快
	状态信息不会丢失
用于:
	生产,也可存储状态数据量大的情况

在这里插入图片描述

1.3 RocksDBStateBackend

​ 状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中。checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)

缺点:
	状态访问速度有所下降
优点:
	可以存储超大量的状态信息
	状态信息不会丢失
用于:
	生产,可以存储超大量的状态信息

在这里插入图片描述

二、Flink StateBackend 原理剖析与实践

第一种:单任务调整

修改当前任务代码
env.setStateBackend(new FsStateBackend("hdfs://hadoop10/flink/checkpoints"));
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new RocksDBStateBackend(filebackend, true));

第二种:全局调整

修改 flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://hadoop10/flink/checkpoints

注意:state.backend的值可以是下面几种:
1、jobmanager(MemoryStateBackend)
2、filesystem(FsStateBackend)
3、rocksdb(RocksDBStateBackend)

MemoryStateBackend(老版本的默认实现) 和 FsStateBackend 的代码写法,其实它们已经被废弃,建议使用:HashMapStateBackend(新版本的默认实现)

用的是HashMapStateBackend,但是给job级别的数据保存到 Job Manager 的堆内内存中:

// HashMapStateBackend 替代 MemoryStateBackend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1、设置使用 HashMapStateBackend,Task State 存储在 TaskManager 的堆内存中
env.setStateBackend(new HashMapStateBackend());
// 2、这样设置 checkpoint 的 state 存储方式:把 job State 存储在 JobManager 的堆内存中
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

用的是HashMapStateBackend,但是给job级别的数据保存到 Job Manager 的外面的高可用系统HDFS中:

// HashMapStateBackend 替代 FsStateBackend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1、设置使用 HashMapStateBackend,Task State 存储于 TaskManager 堆内存中
env.setStateBackend(new HashMapStateBackend());
// 2、需要设置外部高可用文件系统存储路径用来保存 Job State
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop10/flink/checkpoints");

RocksDBStateBackend 代码写法,其实 RocksDBStateBackend 也已经被废弃,建议使用 EmbeddedRocksDBStateBackend

// EmbeddedRocksDBStateBackend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1、设置 EmbeddedRocksDBStateBackend,Task State 存储在 RocksDB 中(内存+磁盘)
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// 2、设置外部高可用文件系统存储路径用来保存 Job State
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop10/flink/checkpoints");

如果使用 RocksDB 的方式,需要引入依赖:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
	<version>1.14.3</version>
	<scope>test</scope>
</dependency>


声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

相关文章:

  • 2024做安全测试必须要知道的几种方法!
  • Flutter 中的 Divider 小部件:全面指南
  • LeetCode657.机器人能否返回原点
  • 2024年,诺基亚手机发售仅一天就售罄
  • 【代码随想录】day55
  • Lua 协程池
  • iOS App冷启动优化:Before Main阶段
  • QT Mingw编译ffmpeg源码以及测试
  • 个性化打造电子相册
  • SRIO--IP讲解及环回测试
  • Spring中的事务和事务的传播机制
  • java线程池原理源码解析,程序员如何技术划水
  • Java可变参数和集合工具类Collections的详细介绍
  • 网站构建初级教程
  • 项目管理逻辑:老板为什么赔钱的项目也做?为什么害怕你闲着?
  • Spring Boot 框架整合 MyBatis 连接数据库,详细说明
  • MySQL主从同步
  • 微服务自动化【Docker-Compose】
  • 在Postgres中分页的五种方法,从基本到异国情调
  • 工作经历分享
  • 堆(二叉堆)-优先队列-数据结构和算法(Java)
  • awk命令的使用
  • 初学Nodejs(5):npm包管理器与包的发布
  • mysql高阶语句
  • [附源码]Python计算机毕业设计Django勤工俭学管理小程序
  • 0115 查找算法Day4
  • HTML5期末大作业:基于HTML+CSS+JavaScript实现中国风文化传媒企业官网源码
  • 计算机导论第十一周课后作业
  • [附源码]计算机毕业设计线上社区管理系统Springboot程序
  • GIT分布式版本控制系统 | 命令讲解入门
  • CentOS下将 /home 目录合并到 / 目录
  • 「Redis数据结构」RedisObject