数据库路由组件设计与实现

业务背景

为何自研路由组件?
随着业务体量的增加,原先的库表存储已经不能支撑海量的并发请求。因此,可能需要考虑分库分表。
无论是业务之初就考虑分库分表,还是项目中期进行分库分表迁移,考虑自研数据库路由组件的出发点都是:现有的技术方案无法实现(不适合、不方便)个性化的业务需求,并且自研组件小而精,易于迭代维护,后续也可加入新的功能(例如事务支持)
分库、分表是两回事,可能只分库不分表,可能分表不分库,也可能分库分表

分库分表前 分库分表后
并发支撑情况 MySQL 单机部署,扛不住高并发 MySQL 从单机到多机,能承受的并发增加了多倍
磁盘使用情况 MySQL 单机磁盘容量几乎撑满 拆分为多个库,数据库服务器磁盘使用率大大降低
SQL 执行性能 单表数据量太大,SQL 越跑越慢 单表数据量减少,SQL 执行效率明显提升

水平拆分

垂直拆分

技术调研

现有的分库分表组件主要有如下两种:

基于代理

在应用和数据中间加了一个代理层。应用程序所有的数据请求都交给代理层处理,代理层负责分离读写请求,将它们路由到对应的数据库中。
提供类似功能的中间件有 MySQL Router(官方)、Atlas(基于 MySQL Proxy)、MaxScale、MyCat。

基于组件

基于组件的则直接基于独立的 jar 包就可以进行开发,不用部署,运维成本低,不需要代理层的二次转发请求,性能很高**。**比较经典的就是 Sharding-JDBC
image.png

方案设计

挑战

:::warning

  1. 实现层面:组件需要知道数据需要从哪个具体的数据库的子表中获取,并且对用户透明
  • **数据源切换:**如何在组件中实现_动态数据源切换_
  • **路由算法:**如何实现比较均匀的_路由散列算法_
  • **SQL 改写:**如何_拦截并修改 SQL_
  1. 引入数据分片带来的问题:主要考虑跨表连接查询、跨库事务问题
  • **跨表连接查询:**通常两种方案:(1)解决跨表查询;(2)规避跨表连接,采用第三方中间件汇总查询
  • **跨库事务:**这块也是痛点,通常有两种做法:(1)分布式事务;(2)规避分布式事务问题,采用最终一致性方案
    :::

Tips:
关于以上跨表连接查询、跨库事务具体解决方案,需要根据业务场景、对于数据一致性的要求、综合性能等综合考虑
例如:

  • 秒杀场景下,关于库存的处理,就不太适合使用繁重的分布式事务,采用最终一致性方案(MQ+JOB兜底)比较合适;反之,对于金融等场景,考虑分布式事务比较合适
  • 对于 B 端系统的跨表查询场景,业务访问量也不会很大,考虑适配跨表连接方案代价就比较高了,相反采用 ES 汇总查询,相对来说容易接受一点

架构图


主要包括:

  • AOP 切面拦截:拦截需要使用DB 路由的方法,这里采用自定义注解
  • 数据库连接池配置:分库分表需要按需配置数据库连接源,在这些连接池的集合中进行动态数据源切换
  • AbstractRoutingDataSource:是用于动态数据源切换的 Spring 服务类,提供了数据源切换的抽象方法 determineCurrentLookupKey
  • 路由哈希算法设计:在路由设计时,需要根据分库分表字段进行路由计算,让数据均匀地分布至各个库表之中。这里参考 HashMap 的 扰动函数设计
  • MyBatis 拦截器:实现 sql 动态拦截和修改

流程图

技术实现

工程结构

├─src
│  ├─main
│  │  ├─java
│  │  │  └─cn
│  │  │      └─mokeeqian
│  │  │          └─middleware
│  │  │              └─db
│  │  │                  └─router
│  │  │                      ├─annotation
│  │  │                      ├─config
│  │  │                      ├─dynamic
│  │  │                      ├─strategy
│  │  │                      │  └─impl
│  │  │                      └─util
│  │  └─resources
│  │      └─META-INF
│  └─test
│      └─java
│          └─cn
│              └─mokeeqian
│                  └─middleware
│                      └─test

代码仓库
mokeeqian-spring-boot-starter/db-router-spring-boot-starter at main · mokeeqian/mokeeqian-spring-boot-starter

自定义路由注解

路由注解
为切面提供切点,同时获取被注解的方法入参属性中的路由字段

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface DBRouter {
    /**
     * 路由字段
     * @return
     */
    String key() default "";
}
  • @Retention:告诉编译程序如何处理,也可理解为注解类的生命周期。
  • @Target:该注解的作用点,主要有:TYPE(类、接口)、METHOD(方法)、PACKAGE、FIELD、PARAMETER等。这里我们选择作用在方法上。

路由策略注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface DBRouterStrategy {
    /**
     * 是否分表
     * @return
     */
    boolean splitTable() default false;
}

🔃动态数据源切换

继承抽象类 AbstractRoutingDataSource,实现其 determineCurrentLookupKey 方法,从 DBContextHolder中获取 DB key,用以实现动态切换数据源

public class DynamicDataSource extends AbstractRoutingDataSource {
    /**
     * 返回 db 路由
     * @return aka db01, db02, ...
     */
    @Override
    protected Object determineCurrentLookupKey() {
        return "db" + DBContextHolder.getDbKey();
    }
}

AbstractRoutingDataSource的getConnection() ⽅法根据查找 lookup key 键对不同⽬标数据源的调⽤, 通常是通过(但不⼀定)某些线程绑定的事物上下⽂来实现
AbstractRoutingDataSource的多数据源动态 切换的核⼼逻辑是:在程序运⾏时,把数据源数据源通过AbstractRoutingDataSource 动态织⼊到程序 中,灵活的进⾏数据源切换
基于AbstractRoutingDataSource的多数据源动态切换,可以实现读写分离,这么做缺点也很明显,⽆法 动态的增加数据源

⚙️配置、加载、创建数据源

对于较复杂的数据源配置,一般使用 org.springframework.context.EnvironmentAware来实现:
EnvironmentAware#setEnvironment:读取 yml 配置文件中的自定义分库分表配置

@Override
public void setEnvironment(Environment environment) {
	String prefix = "mini-db-router.jdbc.datasource.";

	dbCount = Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix + "dbCount")));
	tbCount = Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix + "tbCount")));
	routerKey = environment.getProperty(prefix + "routerKey");

	// 分库列表 db01,db02
	String dataSources = environment.getProperty(prefix + "list");
	assert dataSources != null;
	for (String dbInfo : dataSources.split(",")) {
		Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, prefix + dbInfo, Map.class);
		dataSourceMap.put(dbInfo, dataSourceProps);
	}

	// 默认数据源
	String defaultData = environment.getProperty(prefix + "default");
	defaultDataSourceConfig = PropertyUtil.handle(environment, prefix + defaultData, Map.class);
}
router:
  jdbc:
    datasource:
	    # 从这里开始就是数据源的配置了
      dbCount: 2
      tbCount: 4
      default: db00
      routerKey: uId	# 路由字段
      list: db01,db02 # 分库
      db00:
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/lottery?useUnicode=true
        username: root
        password: xxx
      db01:
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/lottery_01?useUnicode=true
        username: root
        password: xxx
      db02:
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/lottery_02?useUnicode=true
        username: root
        password: xxx

创建数据源:DynamicDataSource#setTargetDataSources,DynamicDataSource#setDefaultTargetDataSource

@Bean
public DataSource dataSource() {
	// 创建数据源
	Map<Object, Object> targetDataSources = new HashMap<>();

	for (String dbInfo : dataSourceMap.keySet()) {
		Map<String, Object> objMap = dataSourceMap.get(dbInfo);
		targetDataSources.put(dbInfo, new DriverManagerDataSource(
				objMap.get("url").toString(), objMap.get("username").toString(), objMap.get("password").toString())
		);
	}

	// 设置数据源
	DynamicDataSource dynamicDataSource = new DynamicDataSource();
	dynamicDataSource.setTargetDataSources(targetDataSources);
	dynamicDataSource.setDefaultTargetDataSource(new DriverManagerDataSource(
		defaultDataSourceConfig.get("url").toString(), 
		defaultDataSourceConfig.get("username").toString(), 
		defaultDataSourceConfig.get("password").toString()));

	return dynamicDataSource;
}

ThreadLocal 保存路由结果

使用 ThreadLocal 保存分库、分表的路由结果,借鉴 SecurityContextHolder

public class DBContextHolder {

    private static final ThreadLocal<String> dbKey = new ThreadLocal<>();
    private static final ThreadLocal<String> tbKey = new ThreadLocal<>();
    public static String getDBKey() {
        return dbKey.get();
    }
    public static String getTBKey() {
        return tbKey.get();
    }
    public static void setDBKey(String dbKeyIdx) {
        dbKey.set(dbKeyIdx);
    }
    public static void setTBKey(String tbKeyIdx) {
        tbKey.set(tbKeyIdx);
    }
    public static void clearDBKey() {
        dbKey.remove();
    }
    public static void clearTBKey() {
        tbKey.remove();
    }
}

具体路由策略

这里采用接口 IDBRouterStrategy ,后续可以实现该接口,进行个性化的路由策略配置
image.png
基于HashMap 扰动函数思想实现路由分发

@Override
public void doRouter(String dbKeyAttr) {
	int size = dbRouterConfig.getDbCount() * dbRouterConfig.getTbCount();

	// 扰动函数;在 JDK 的 HashMap 中,对于一个元素的存放,需要进行哈希散列。而为了让散列更加均匀,所以添加了扰动函数。
	int idx = (size - 1) & (dbKeyAttr.hashCode() ^ (dbKeyAttr.hashCode() >>> 16));

	// 库表索引;相当于是把一个长条的桶,切割成段,对应分库分表中的库编号和表编号
	int dbIdx = idx / dbRouterConfig.getTbCount() + 1;
	int tbIdx = idx - dbRouterConfig.getTbCount() * (dbIdx - 1);

	// 设置到 ThreadLocal
	DBContextHolder.setDBKey(String.format("%02d", dbIdx));
	DBContextHolder.setTBKey(String.format("%03d", tbIdx));
	logger.debug("数据库路由 dbIdx:{} tbIdx:{}",  dbIdx, tbIdx);
}

✅动态 SQL 修改

基于以上,分库功能已经实现,但是,如何分表?即将逻辑 SQL 转化为 **物理 SQL,**例如:
逻辑SQL:SELECT * FROM tb_user WHERE id = 123;
物理SQL:SELECT * FROM tb_user_01 WHERE id = 123;
一种思路是:使用 MyBatis 的 Interceptor 进行 SQL 拦截,然后动态修改 SQL
mybatis:自定义实现拦截器插件Interceptor

  • **@**Intercepts注解:拦截器
    可以被拦截的四种类型:
    • Executor:拦截执行器的方法
    • ParameterHandler:拦截参数的处理
    • ResultHandler:拦截结果集的处理
    • StatementHandler:拦截Sql语法构建的处理
  • @Signature注解:拦截点,指定拦截哪个对象里面的哪个方法
    其参数如下:
    • type:要被拦截的类型(上述四种之一)
    • method:在类型基础上,指定被拦截的方法
    • args:在方法基础上,指定方法入参参数(Java里可能存在重载,故要注意参数顺序和类型)
  • 类型&方法一览
    | 拦截类型 | 拦截方法 |
    | — | — |
    | Executor | update、query、flushStatements、commit、rollback、getTransaction、close、isClosed |
    | ParameterHandler | getParameterObject、setParameters |
    | ResultHandler | handleResultSets、handleOutputParameters |
    | StatementHandler | prepare、parameterize、batch、update、query |

StatementHandler 的具体方法:

  • prepare: ⽤于创建⼀个具体的 Statement 对象的实现类或者是 Statement 对象
  • parametersize: ⽤于初始化 Statement 对象以及对sql的占位符进⾏赋值
  • update: ⽤于通知 Statement 对象将 insert、update、delete 操作推送到数据库
  • query: ⽤于通知 Statement 对象将 select 操作推送数据库并返回对应的查询结果

我们主要使用 StatementHandler 的 prepare 方法,拦截 sql 语句

@Intercepts(
        @Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})
)

这里的 Interceptor#intercept 方法就是我们要实现的方法,其中,invocation 就是被拦截的对象(StatementHandler#prepare方法)

/**
 * @author Clinton Begin
 */
public interface Interceptor {
  Object intercept(Invocation invocation) throws Throwable;
  default Object plugin(Object target) {
    return Plugin.wrap(target, this);
  }
  default void setProperties(Properties properties) {
    // NOP
  }
}

如何获取 MyBatis 中的 SQL 语句?
基于 StatementHandler,然后 获取其 BoundSql

// 获取 StatementHandler
StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
MetaObject metaObject = MetaObject.forObject(statementHandler, SystemMetaObject.DEFAULT_OBJECT_FACTORY,
		SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY, new DefaultReflectorFactory());
MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement");

// 获取 MyBatis 原始 SQL
BoundSql boundSql = statementHandler.getBoundSql();
String originalSql = boundSql.getSql();

如何识别 SQL 中的表名称?
使用正则表达式匹配:from,into,update 这三个关键字,其之后就是表名称

private Pattern pattern = Pattern.compile("(from|into|update)[\\s]{1,}(\\w{1,})", Pattern.CASE_INSENSITIVE);

识别之后如何替换?
使用反射,直接修改 BoundSql#sql 字段

// 通过反射修改 sql 语句
// getDeclaredField:可以获取所有已声明字段(无视访问限定符); getField:只能获取public 字段
Field field = boundSql.getClass().getDeclaredField("sql");
field.setAccessible(true);
field.set(boundSql, replacedSql);
field.setAccessible(false);
  • 使用反射可以访问 Java 类的私有成员、私有方法。在框架开发中,用处十分广泛

🚫AOP 实现调用方法拦截

目前为止,底层逻辑已经全部实现,现在只需要使用AOP对调用方法进行拦截处理即可
定义切点
这里直接拦截先前定义的自定义注解,也可以是使用表达式匹配

/**
 * 切点
 */
@Pointcut("@annotation(cn.mokeeqian.middleware.db.router.annotation.DBRouter)")
public void aopPoint() {}

定义切面拦截具体逻辑

/**
 * 切面拦截后的具体操作
 * @param proceedingJoinPoint
 * @param dbRouter
 * @return
 * @throws Throwable
 */
@Around("aopPoint() && @annotation(dbRouter)")
public Object doRouter(ProceedingJoinPoint proceedingJoinPoint, DBRouter dbRouter) throws Throwable {
	String dbKey = dbRouter.key();
	if (StringUtils.isBlank(dbKey) && StringUtils.isBlank(dbRouterConfig.getRouterKey())) {
		throw new RuntimeException("annotation DBRouter key is null");
	}
	// 如果注解中没配置 key,则使用配置文件的 key
	dbKey = StringUtils.isNotBlank(dbKey) ? dbKey : dbRouterConfig.getRouterKey();
	// 计算路由
	String dbKeyAttr = getAttrValue(dbKey, proceedingJoinPoint.getArgs());

	// 路由分发
	dbRouterStrategy.doRouter(dbKeyAttr);

	// 返回结果
	try {
		// 放行
		return proceedingJoinPoint.proceed();
	} finally {
		// 清除 ThreadLocal
		dbRouterStrategy.clear();
	}
}
  • 几种切面环绕逻辑:
    • @Before:前置通知,在方法执行之前执行
    • @After:后置通知,在方法执行之后执行(即使出现异常,后置通知也会执行)
    • @Around:环绕通知,围绕着方法执行(可以实现其他四种通知)
    • @AfterReturning:返回通知,在方法返回结果之后执行
    • @AfterThrowing:异常通知,在方法抛出异常之后
  • AspectJ 注解的执行顺序:
    @Around 都会出现两次:@Before、@AfterReturning、@AfterReturning、@After 这四个都会在两次@Around 执行之间被执行
    • 无异常时:@Aspect、@Pointcut、@Around、@Before、@AfterReturning、@After、@Around
    • 有异常时:@Aspect、@Pointcut、@Around、@Before、@AfterThrowing、@After、@Around
  • @Around("aopPoint() && @annotation(dbRouter)")的理解

封装起步依赖

最后的最后,将项目封装成 SpringBoot 起步依赖,编写配置类,然后利用自动装配机制。

@Configuration
public class DataSourceAutoConfigure implements EnvironmentAware {
    /**
     * 数据源配置组
     */
    private Map<String, Map<String, Object>> dataSourceMap = new HashMap<>();
    /**
     * 默认数据源配置
     * 也就是:不走路由组件的数据源
     */
    private Map<String, Object> defaultDataSourceConfig;
    /**
     * 分库数目
     */
    private int dbCount;
    /**
     * 分表数目
     */
    private int tbCount;
    /**
     * 路由字段
     */
    private String routerKey;

    @Bean
    public IDBRouterStrategy doRouterStrategy(DBRouterConfig dbRouterConfig) {
        return new DBRouterStrategyHashCode(dbRouterConfig);
    }

    @Bean
    public DBRouterConfig dbRouterConfig() {
        return new DBRouterConfig(this.dbCount, this.tbCount, this.routerKey);
    }

    @Bean
    public Interceptor plugin() {
        return new DynamicMybatisPlugin();
    }

    @Bean(name = "db-router-point")
    @ConditionalOnMissingBean
    public DBRouterJoinPoint dbRouterJoinPoint(DBRouterConfig dbRouterConfig, IDBRouterStrategy dbRouterStrategy) {
        return new DBRouterJoinPoint(dbRouterConfig, dbRouterStrategy);
    }

    /**
     * 创建动态数据源
     * 这个数据源就会被 MyBatis SpringBoot Starter 中 SqlSessionFactory sqlSessionFactory(DataSource dataSource) 注入使用
     * @return
     */
    @Bean
    public DataSource dataSource() {
        // 创建数据源
        Map<Object, Object> targetDataSources = new HashMap<>();

        for (String dbInfo : dataSourceMap.keySet()) {
            Map<String, Object> objMap = dataSourceMap.get(dbInfo);
            targetDataSources.put(dbInfo, new DriverManagerDataSource(
                    objMap.get("url").toString(), objMap.get("username").toString(), objMap.get("password").toString())
            );
        }

        // 设置数据源
        DynamicDataSource dynamicDataSource = new DynamicDataSource();
        dynamicDataSource.setTargetDataSources(targetDataSources);
        dynamicDataSource.setDefaultTargetDataSource(new DriverManagerDataSource(defaultDataSourceConfig.get("url").toString(), defaultDataSourceConfig.get("username").toString(), defaultDataSourceConfig.get("password").toString()));

        return dynamicDataSource;
    }

    @Bean
    public TransactionTemplate transactionTemplate(DataSource dataSource) {
        DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
        dataSourceTransactionManager.setDataSource(dataSource);

        TransactionTemplate transactionTemplate = new TransactionTemplate();
        transactionTemplate.setTransactionManager(dataSourceTransactionManager);
        transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
        return transactionTemplate;
    }

    /**
     * 读取 yml 中自定义的配置
     * @param environment
     */
    @Override
    public void setEnvironment(Environment environment) {
        String prefix = "mini-db-router.jdbc.datasource.";

        dbCount = Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix + "dbCount")));
        tbCount = Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix + "tbCount")));
        routerKey = environment.getProperty(prefix + "routerKey");

        // 分库列表 db01,db02
        String dataSources = environment.getProperty(prefix + "list");
        assert dataSources != null;
        for (String dbInfo : dataSources.split(",")) {
            Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, prefix + dbInfo, Map.class);
            dataSourceMap.put(dbInfo, dataSourceProps);
        }

        // 默认数据源
        String defaultData = environment.getProperty(prefix + "default");
        defaultDataSourceConfig = PropertyUtil.handle(environment, prefix + defaultData, Map.class);
    }
}

随后,需要编写 resources/META_INF/spring.factories 文件,配置数据源配置类

org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.mokeeqian.middleware.db.router.config.DataSourceAutoConfigure

最后,将项目打包到 maven 仓库,即可使用啦

【番外】分库分表平滑过渡?

基于原先的单库单表,现在如何做迁移?

停机迁移

最简单的就是直接停机迁移,停止一切写入。然后将旧库数据迁移至新库。

双写迁移

如果线上业务不能停机怎么办?

  • 我们对老库的更新操作(增删改),同时也要写入新库(双写)。如果操作的数据不存在于新库的话,需要插入到新库中。 这样就能保证,咱们新库里的数据是最新的。
  • 在迁移过程,双写只会让被更新操作过的老库中的数据同步到新库,我们_还需要自己写脚本将老库中的数据和新库的数据做比对_。如果新库中没有,那咱们就把数据插入到新库。如果新库有,旧库没有,就把新库对应的数据删除(冗余数据清理)。
  • 重复上一步的操作,直到老库和新库的数据一致为止

这块其实可以借助 Canal 等中间件(binlog主从同步原理)来实现

参考文章


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!