数据库路由组件设计与实现
业务背景
为何自研路由组件?
随着业务体量的增加,原先的库表存储已经不能支撑海量的并发请求。因此,可能需要考虑分库分表。
无论是业务之初就考虑分库分表,还是项目中期进行分库分表迁移,考虑自研数据库路由组件的出发点都是:现有的技术方案无法实现(不适合、不方便)个性化的业务需求,并且自研组件小而精,易于迭代维护,后续也可加入新的功能(例如事务支持)
分库、分表是两回事,可能只分库不分表,可能分表不分库,也可能分库分表
分库分表前 | 分库分表后 | |
---|---|---|
并发支撑情况 | MySQL 单机部署,扛不住高并发 | MySQL 从单机到多机,能承受的并发增加了多倍 |
磁盘使用情况 | MySQL 单机磁盘容量几乎撑满 | 拆分为多个库,数据库服务器磁盘使用率大大降低 |
SQL 执行性能 | 单表数据量太大,SQL 越跑越慢 | 单表数据量减少,SQL 执行效率明显提升 |
水平拆分
垂直拆分
技术调研
现有的分库分表组件主要有如下两种:
基于代理
在应用和数据中间加了一个代理层。应用程序所有的数据请求都交给代理层处理,代理层负责分离读写请求,将它们路由到对应的数据库中。
提供类似功能的中间件有 MySQL Router(官方)、Atlas(基于 MySQL Proxy)、MaxScale、MyCat。
基于组件
基于组件的则直接基于独立的 jar 包就可以进行开发,不用部署,运维成本低,不需要代理层的二次转发请求,性能很高**。**比较经典的就是 Sharding-JDBC
方案设计
挑战
:::warning
- 实现层面:组件需要知道数据需要从哪个具体的数据库的子表中获取,并且对用户透明
- **数据源切换:**如何在组件中实现_动态数据源切换_
- **路由算法:**如何实现比较均匀的_路由散列算法_
- **SQL 改写:**如何_拦截并修改 SQL_
- 引入数据分片带来的问题:主要考虑跨表连接查询、跨库事务问题
- **跨表连接查询:**通常两种方案:(1)解决跨表查询;(2)规避跨表连接,采用第三方中间件汇总查询
- **跨库事务:**这块也是痛点,通常有两种做法:(1)分布式事务;(2)规避分布式事务问题,采用最终一致性方案
:::
Tips:
关于以上跨表连接查询、跨库事务具体解决方案,需要根据业务场景、对于数据一致性的要求、综合性能等综合考虑。
例如:
- 秒杀场景下,关于库存的处理,就不太适合使用繁重的分布式事务,采用最终一致性方案(MQ+JOB兜底)比较合适;反之,对于金融等场景,考虑分布式事务比较合适
- 对于 B 端系统的跨表查询场景,业务访问量也不会很大,考虑适配跨表连接方案代价就比较高了,相反采用 ES 汇总查询,相对来说容易接受一点
架构图
主要包括:
- AOP 切面拦截:拦截需要使用DB 路由的方法,这里采用自定义注解
- 数据库连接池配置:分库分表需要按需配置数据库连接源,在这些连接池的集合中进行动态数据源切换
AbstractRoutingDataSource
:是用于动态数据源切换的 Spring 服务类,提供了数据源切换的抽象方法determineCurrentLookupKey
- 路由哈希算法设计:在路由设计时,需要根据分库分表字段进行路由计算,让数据均匀地分布至各个库表之中。这里参考 HashMap 的 扰动函数设计
- MyBatis 拦截器:实现 sql 动态拦截和修改
流程图
技术实现
工程结构
├─src
│ ├─main
│ │ ├─java
│ │ │ └─cn
│ │ │ └─mokeeqian
│ │ │ └─middleware
│ │ │ └─db
│ │ │ └─router
│ │ │ ├─annotation
│ │ │ ├─config
│ │ │ ├─dynamic
│ │ │ ├─strategy
│ │ │ │ └─impl
│ │ │ └─util
│ │ └─resources
│ │ └─META-INF
│ └─test
│ └─java
│ └─cn
│ └─mokeeqian
│ └─middleware
│ └─test
自定义路由注解
路由注解
为切面提供切点,同时获取被注解的方法入参属性中的路由字段
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface DBRouter {
/**
* 路由字段
* @return
*/
String key() default "";
}
- @Retention:告诉编译程序如何处理,也可理解为注解类的生命周期。
- @Target:该注解的作用点,主要有:TYPE(类、接口)、METHOD(方法)、PACKAGE、FIELD、PARAMETER等。这里我们选择作用在方法上。
路由策略注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface DBRouterStrategy {
/**
* 是否分表
* @return
*/
boolean splitTable() default false;
}
🔃动态数据源切换
继承抽象类 AbstractRoutingDataSource
,实现其 determineCurrentLookupKey
方法,从 DBContextHolder中获取 DB key,用以实现动态切换数据源
public class DynamicDataSource extends AbstractRoutingDataSource {
/**
* 返回 db 路由
* @return aka db01, db02, ...
*/
@Override
protected Object determineCurrentLookupKey() {
return "db" + DBContextHolder.getDbKey();
}
}
AbstractRoutingDataSource的getConnection() ⽅法根据查找 lookup key 键对不同⽬标数据源的调⽤, 通常是通过(但不⼀定)某些线程绑定的事物上下⽂来实现
AbstractRoutingDataSource的多数据源动态 切换的核⼼逻辑是:在程序运⾏时,把数据源数据源通过AbstractRoutingDataSource 动态织⼊到程序 中,灵活的进⾏数据源切换
基于AbstractRoutingDataSource的多数据源动态切换,可以实现读写分离,这么做缺点也很明显,⽆法 动态的增加数据源
⚙️配置、加载、创建数据源
对于较复杂的数据源配置,一般使用 org.springframework.context.EnvironmentAware
来实现:
EnvironmentAware#setEnvironment:读取 yml 配置文件中的自定义分库分表配置
@Override
public void setEnvironment(Environment environment) {
String prefix = "mini-db-router.jdbc.datasource.";
dbCount = Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix + "dbCount")));
tbCount = Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix + "tbCount")));
routerKey = environment.getProperty(prefix + "routerKey");
// 分库列表 db01,db02
String dataSources = environment.getProperty(prefix + "list");
assert dataSources != null;
for (String dbInfo : dataSources.split(",")) {
Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, prefix + dbInfo, Map.class);
dataSourceMap.put(dbInfo, dataSourceProps);
}
// 默认数据源
String defaultData = environment.getProperty(prefix + "default");
defaultDataSourceConfig = PropertyUtil.handle(environment, prefix + defaultData, Map.class);
}
router:
jdbc:
datasource:
# 从这里开始就是数据源的配置了
dbCount: 2
tbCount: 4
default: db00
routerKey: uId # 路由字段
list: db01,db02 # 分库
db00:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/lottery?useUnicode=true
username: root
password: xxx
db01:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/lottery_01?useUnicode=true
username: root
password: xxx
db02:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/lottery_02?useUnicode=true
username: root
password: xxx
创建数据源:DynamicDataSource#setTargetDataSources,DynamicDataSource#setDefaultTargetDataSource
@Bean
public DataSource dataSource() {
// 创建数据源
Map<Object, Object> targetDataSources = new HashMap<>();
for (String dbInfo : dataSourceMap.keySet()) {
Map<String, Object> objMap = dataSourceMap.get(dbInfo);
targetDataSources.put(dbInfo, new DriverManagerDataSource(
objMap.get("url").toString(), objMap.get("username").toString(), objMap.get("password").toString())
);
}
// 设置数据源
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setTargetDataSources(targetDataSources);
dynamicDataSource.setDefaultTargetDataSource(new DriverManagerDataSource(
defaultDataSourceConfig.get("url").toString(),
defaultDataSourceConfig.get("username").toString(),
defaultDataSourceConfig.get("password").toString()));
return dynamicDataSource;
}
ThreadLocal 保存路由结果
使用 ThreadLocal 保存分库、分表的路由结果,借鉴 SecurityContextHolder
public class DBContextHolder {
private static final ThreadLocal<String> dbKey = new ThreadLocal<>();
private static final ThreadLocal<String> tbKey = new ThreadLocal<>();
public static String getDBKey() {
return dbKey.get();
}
public static String getTBKey() {
return tbKey.get();
}
public static void setDBKey(String dbKeyIdx) {
dbKey.set(dbKeyIdx);
}
public static void setTBKey(String tbKeyIdx) {
tbKey.set(tbKeyIdx);
}
public static void clearDBKey() {
dbKey.remove();
}
public static void clearTBKey() {
tbKey.remove();
}
}
具体路由策略
这里采用接口 IDBRouterStrategy
,后续可以实现该接口,进行个性化的路由策略配置
基于HashMap 扰动函数思想实现路由分发
@Override
public void doRouter(String dbKeyAttr) {
int size = dbRouterConfig.getDbCount() * dbRouterConfig.getTbCount();
// 扰动函数;在 JDK 的 HashMap 中,对于一个元素的存放,需要进行哈希散列。而为了让散列更加均匀,所以添加了扰动函数。
int idx = (size - 1) & (dbKeyAttr.hashCode() ^ (dbKeyAttr.hashCode() >>> 16));
// 库表索引;相当于是把一个长条的桶,切割成段,对应分库分表中的库编号和表编号
int dbIdx = idx / dbRouterConfig.getTbCount() + 1;
int tbIdx = idx - dbRouterConfig.getTbCount() * (dbIdx - 1);
// 设置到 ThreadLocal
DBContextHolder.setDBKey(String.format("%02d", dbIdx));
DBContextHolder.setTBKey(String.format("%03d", tbIdx));
logger.debug("数据库路由 dbIdx:{} tbIdx:{}", dbIdx, tbIdx);
}
✅动态 SQL 修改
基于以上,分库功能已经实现,但是,如何分表?即将逻辑 SQL 转化为 **物理 SQL,**例如:
逻辑SQL:SELECT * FROM tb_user WHERE id = 123;
物理SQL:SELECT * FROM tb_user_01 WHERE id = 123;
一种思路是:使用 MyBatis 的 Interceptor 进行 SQL 拦截,然后动态修改 SQL
mybatis:自定义实现拦截器插件Interceptor
- **@**Intercepts注解:拦截器
可以被拦截的四种类型:- Executor:拦截执行器的方法
- ParameterHandler:拦截参数的处理
- ResultHandler:拦截结果集的处理
- StatementHandler:拦截Sql语法构建的处理
- @Signature注解:拦截点,指定拦截哪个对象里面的哪个方法
其参数如下:- type:要被拦截的类型(上述四种之一)
- method:在类型基础上,指定被拦截的方法
- args:在方法基础上,指定方法入参参数(Java里可能存在重载,故要注意参数顺序和类型)
- 类型&方法一览
| 拦截类型 | 拦截方法 |
| — | — |
| Executor | update、query、flushStatements、commit、rollback、getTransaction、close、isClosed |
| ParameterHandler | getParameterObject、setParameters |
| ResultHandler | handleResultSets、handleOutputParameters |
| StatementHandler | prepare、parameterize、batch、update、query |
StatementHandler 的具体方法:
- prepare: ⽤于创建⼀个具体的 Statement 对象的实现类或者是 Statement 对象
- parametersize: ⽤于初始化 Statement 对象以及对sql的占位符进⾏赋值
- update: ⽤于通知 Statement 对象将 insert、update、delete 操作推送到数据库
- query: ⽤于通知 Statement 对象将 select 操作推送数据库并返回对应的查询结果
我们主要使用 StatementHandler 的 prepare 方法,拦截 sql 语句
@Intercepts(
@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})
)
这里的 Interceptor#intercept 方法就是我们要实现的方法,其中,invocation 就是被拦截的对象(StatementHandler#prepare方法)
/**
* @author Clinton Begin
*/
public interface Interceptor {
Object intercept(Invocation invocation) throws Throwable;
default Object plugin(Object target) {
return Plugin.wrap(target, this);
}
default void setProperties(Properties properties) {
// NOP
}
}
如何获取 MyBatis 中的 SQL 语句?
基于 StatementHandler
,然后 获取其 BoundSql
// 获取 StatementHandler
StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
MetaObject metaObject = MetaObject.forObject(statementHandler, SystemMetaObject.DEFAULT_OBJECT_FACTORY,
SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY, new DefaultReflectorFactory());
MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement");
// 获取 MyBatis 原始 SQL
BoundSql boundSql = statementHandler.getBoundSql();
String originalSql = boundSql.getSql();
如何识别 SQL 中的表名称?
使用正则表达式匹配:from,into,update 这三个关键字,其之后就是表名称
private Pattern pattern = Pattern.compile("(from|into|update)[\\s]{1,}(\\w{1,})", Pattern.CASE_INSENSITIVE);
识别之后如何替换?
使用反射,直接修改 BoundSql#sql
字段
// 通过反射修改 sql 语句
// getDeclaredField:可以获取所有已声明字段(无视访问限定符); getField:只能获取public 字段
Field field = boundSql.getClass().getDeclaredField("sql");
field.setAccessible(true);
field.set(boundSql, replacedSql);
field.setAccessible(false);
- 使用反射可以访问 Java 类的私有成员、私有方法。在框架开发中,用处十分广泛
🚫AOP 实现调用方法拦截
目前为止,底层逻辑已经全部实现,现在只需要使用AOP对调用方法进行拦截处理即可
定义切点
这里直接拦截先前定义的自定义注解,也可以是使用表达式匹配
/**
* 切点
*/
@Pointcut("@annotation(cn.mokeeqian.middleware.db.router.annotation.DBRouter)")
public void aopPoint() {}
定义切面拦截具体逻辑
/**
* 切面拦截后的具体操作
* @param proceedingJoinPoint
* @param dbRouter
* @return
* @throws Throwable
*/
@Around("aopPoint() && @annotation(dbRouter)")
public Object doRouter(ProceedingJoinPoint proceedingJoinPoint, DBRouter dbRouter) throws Throwable {
String dbKey = dbRouter.key();
if (StringUtils.isBlank(dbKey) && StringUtils.isBlank(dbRouterConfig.getRouterKey())) {
throw new RuntimeException("annotation DBRouter key is null");
}
// 如果注解中没配置 key,则使用配置文件的 key
dbKey = StringUtils.isNotBlank(dbKey) ? dbKey : dbRouterConfig.getRouterKey();
// 计算路由
String dbKeyAttr = getAttrValue(dbKey, proceedingJoinPoint.getArgs());
// 路由分发
dbRouterStrategy.doRouter(dbKeyAttr);
// 返回结果
try {
// 放行
return proceedingJoinPoint.proceed();
} finally {
// 清除 ThreadLocal
dbRouterStrategy.clear();
}
}
- 几种切面环绕逻辑:
- @Before:前置通知,在方法执行之前执行
- @After:后置通知,在方法执行之后执行(即使出现异常,后置通知也会执行)
- @Around:环绕通知,围绕着方法执行(可以实现其他四种通知)
- @AfterReturning:返回通知,在方法返回结果之后执行
- @AfterThrowing:异常通知,在方法抛出异常之后
- AspectJ 注解的执行顺序:
@Around 都会出现两次:@Before、@AfterReturning、@AfterReturning、@After 这四个都会在两次@Around 执行之间被执行- 无异常时:@Aspect、@Pointcut、@Around、@Before、@AfterReturning、@After、@Around
- 有异常时:@Aspect、@Pointcut、@Around、@Before、@AfterThrowing、@After、@Around
@Around("aopPoint() && @annotation(dbRouter)")
的理解
封装起步依赖
最后的最后,将项目封装成 SpringBoot 起步依赖,编写配置类,然后利用自动装配机制。
@Configuration
public class DataSourceAutoConfigure implements EnvironmentAware {
/**
* 数据源配置组
*/
private Map<String, Map<String, Object>> dataSourceMap = new HashMap<>();
/**
* 默认数据源配置
* 也就是:不走路由组件的数据源
*/
private Map<String, Object> defaultDataSourceConfig;
/**
* 分库数目
*/
private int dbCount;
/**
* 分表数目
*/
private int tbCount;
/**
* 路由字段
*/
private String routerKey;
@Bean
public IDBRouterStrategy doRouterStrategy(DBRouterConfig dbRouterConfig) {
return new DBRouterStrategyHashCode(dbRouterConfig);
}
@Bean
public DBRouterConfig dbRouterConfig() {
return new DBRouterConfig(this.dbCount, this.tbCount, this.routerKey);
}
@Bean
public Interceptor plugin() {
return new DynamicMybatisPlugin();
}
@Bean(name = "db-router-point")
@ConditionalOnMissingBean
public DBRouterJoinPoint dbRouterJoinPoint(DBRouterConfig dbRouterConfig, IDBRouterStrategy dbRouterStrategy) {
return new DBRouterJoinPoint(dbRouterConfig, dbRouterStrategy);
}
/**
* 创建动态数据源
* 这个数据源就会被 MyBatis SpringBoot Starter 中 SqlSessionFactory sqlSessionFactory(DataSource dataSource) 注入使用
* @return
*/
@Bean
public DataSource dataSource() {
// 创建数据源
Map<Object, Object> targetDataSources = new HashMap<>();
for (String dbInfo : dataSourceMap.keySet()) {
Map<String, Object> objMap = dataSourceMap.get(dbInfo);
targetDataSources.put(dbInfo, new DriverManagerDataSource(
objMap.get("url").toString(), objMap.get("username").toString(), objMap.get("password").toString())
);
}
// 设置数据源
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setTargetDataSources(targetDataSources);
dynamicDataSource.setDefaultTargetDataSource(new DriverManagerDataSource(defaultDataSourceConfig.get("url").toString(), defaultDataSourceConfig.get("username").toString(), defaultDataSourceConfig.get("password").toString()));
return dynamicDataSource;
}
@Bean
public TransactionTemplate transactionTemplate(DataSource dataSource) {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
dataSourceTransactionManager.setDataSource(dataSource);
TransactionTemplate transactionTemplate = new TransactionTemplate();
transactionTemplate.setTransactionManager(dataSourceTransactionManager);
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return transactionTemplate;
}
/**
* 读取 yml 中自定义的配置
* @param environment
*/
@Override
public void setEnvironment(Environment environment) {
String prefix = "mini-db-router.jdbc.datasource.";
dbCount = Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix + "dbCount")));
tbCount = Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix + "tbCount")));
routerKey = environment.getProperty(prefix + "routerKey");
// 分库列表 db01,db02
String dataSources = environment.getProperty(prefix + "list");
assert dataSources != null;
for (String dbInfo : dataSources.split(",")) {
Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, prefix + dbInfo, Map.class);
dataSourceMap.put(dbInfo, dataSourceProps);
}
// 默认数据源
String defaultData = environment.getProperty(prefix + "default");
defaultDataSourceConfig = PropertyUtil.handle(environment, prefix + defaultData, Map.class);
}
}
随后,需要编写 resources/META_INF/spring.factories
文件,配置数据源配置类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.mokeeqian.middleware.db.router.config.DataSourceAutoConfigure
最后,将项目打包到 maven 仓库,即可使用啦
【番外】分库分表平滑过渡?
基于原先的单库单表,现在如何做迁移?
停机迁移
最简单的就是直接停机迁移,停止一切写入。然后将旧库数据迁移至新库。
双写迁移
如果线上业务不能停机怎么办?
- 我们对老库的更新操作(增删改),同时也要写入新库(双写)。如果操作的数据不存在于新库的话,需要插入到新库中。 这样就能保证,咱们新库里的数据是最新的。
- 在迁移过程,双写只会让被更新操作过的老库中的数据同步到新库,我们_还需要自己写脚本将老库中的数据和新库的数据做比对_。如果新库中没有,那咱们就把数据插入到新库。如果新库有,旧库没有,就把新库对应的数据删除(冗余数据清理)。
- 重复上一步的操作,直到老库和新库的数据一致为止
这块其实可以借助 Canal 等中间件(binlog主从同步原理)来实现
参考文章
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!