针对于单数据库的事务我们叫着本地事务/传统事务;在分布式环境中一个请求可能涉及到多个数据库的写操作(多数据源);要保证多数据源的一致性必须用到分布式事务。
系统微服务化后;一个看似简单的功能;内部可能需要调用多个服务并操作多个数据库实现;服务调用的分布式事务问题变的非常突出。
一个下单请求同时设计到订单库;优惠券库;库存库的写操作;需要保证三个库写操作的一致性;就要用到分布式事务 即;分布式事务就是要解决一个请求同时对多个数据库写操作的一致性
注意;微服务拆分原则;尽量让大部分操作都不要跨微服务操作;也就是跨库。 分布式事务比本地事务耗费的资源更多。
2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段( Prepare phase).提交阶段( pphase ) , 2是指两个阶段, P是指准备阶段, C是指提交阶段。
在第一阶段(准备阶段);事务管理器先事务参与者(资源)们发送准备请求;大家都返回OK状态;那么就进入第二阶段;提交事务;如果在第一阶段有任何一个参与者没有OK;那么事务协调器通知其他所有事务参与者(资源)回滚事务。2PC常见的标准是XA, JTA;Seata等。
Seata是由阿里中间件团队发起的开源项目Fescar ,后更名为Seata ,它是一个是开源的分布式事务框架。传统2PC的问题在Seata中得到了解决,它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0侵入的方式解决微服务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。
事务流程如下
具体的执行流程如下:
Seata 分布式事务;https://blog.csdn.net/u014494148/article/details/105781920
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.9</version>
</dependency>
seata:
enableAutoDataSourceProxy: false #关闭DataSource代理的自动配置;我们要手动配置
spring:
cloud:
alibaba:
seata:
tx-service-group: fsp_tx_group #这里和file.conf中事务组名一样
transport {
# tcp udt unix-domain-socket
type = ;TCP;
#NIO NATIVE
server = ;NIO;
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = ;NettyBoss;
workerThreadPrefix = ;NettyServerNIOWorker;
serverExecutorThread-prefix = ;NettyServerBizHandler;
shareBossWorker = false
clientSelectorThreadPrefix = ;NettyClientSelector;
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = ;NettyClientWorkerThread;
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = ;default;
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = ;seata;
compressor = ;none;
}
service {
#transaction service group mapping
vgroupMapping.fsp_tx_group = ;default;
#only support when registry.type=file, please don;t set multiple addresses
default.grouplist = ;127.0.0.1:8091;
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = ;jackson;
logTable = ;undo_log;
}
log {
exceptionRate = 100
}
}
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = ;file;
nacos {
serverAddr = ;localhost;
namespace = ;;
cluster = ;default;
}
eureka {
serviceUrl = ;http://localhost:8761/eureka;
application = ;default;
weight = ;1;
}
redis {
serverAddr = ;localhost:6379;
db = ;0;
password = ;;
cluster = ;default;
timeout = ;0;
}
zk {
cluster = ;default;
serverAddr = ;127.0.0.1:2181;
session.timeout = 6000
connect.timeout = 2000
username = ;;
password = ;;
}
consul {
cluster = ;default;
serverAddr = ;127.0.0.1:8500;
}
etcd3 {
cluster = ;default;
serverAddr = ;http://localhost:2379;
}
sofa {
serverAddr = ;127.0.0.1:9603;
application = ;default;
region = ;DEFAULT_ZONE;
datacenter = ;DefaultDataCenter;
cluster = ;default;
group = ;SEATA_GROUP;
addressWaitTime = ;3000;
}
file {
name = ;file.conf;
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = ;file;
nacos {
serverAddr = ;localhost;
namespace = ;;
group = ;SEATA_GROUP;
}
consul {
serverAddr = ;127.0.0.1:8500;
}
apollo {
app.id = ;seata-server;
apollo.meta = ;http://192.168.1.204:8801;
namespace = ;application;
}
zk {
serverAddr = ;127.0.0.1:2181;
session.timeout = 6000
connect.timeout = 2000
username = ;;
password = ;;
}
etcd3 {
serverAddr = ;http://localhost:2379;
}
file {
name = ;file.conf;
}
}
;SpringBootApplication(exclude = { DataSourceAutoConfiguration.class})
把DataSource交给Seata代理。
package io.coderyeah.ymcc.config;
import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.spring.MybatisSqlSessionFactoryBean;
import io.seata.rm.datasource.DataSourceProxy;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springFramework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
/**
* 数据源代理
*/
;Configuration
public class DataSourceConfiguration {
//mapper.xml路径
;Value(;${mybatis-plus.mapper-locations};)
private String mapperLocations;
//手动配置bean
;Bean
;ConfigurationProperties(;spring.datasource;)
public DataSource druidDataSource(){
return new DruidDataSource();
}
;Bean
public MybatisSqlSessionFactoryBean sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
//处理MybatisPlus
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
factory.setDataSource(dataSourceProxy);
factory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
//事务管理工厂
factory.setTransactionFactory(new SpringManagedTransactionFactory());
return factory;
}
;Primary
;Bean(;dataSource;)
public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
Mybatis版本
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
//使用seata对DataSource进行代理
;Configuration
public class DataSourceProxyConfig {
//mapper.xml路径
;Value(;${mybatis.mapper-locations};)
private String mapperLocations;
//手动配置bean
;Bean
;ConfigurationProperties(prefix = ;spring.datasource;)
public DataSource druidDataSource(){
return new DruidDataSource();
}
;Bean
public SqlSessionFactory sessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
sessionFactoryBean.setDataSource(dataSourceProxy);
sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
//事务管理工厂
sessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sessionFactoryBean.getObject();
}
;Bean
public DataSourceProxy dataSource() {
return new DataSourceProxy(druidDataSource());
}
}
方法上贴 : ;GlobalTransactional(rollbackFor = Exception.class) 开启Seata全局事务
注意;不能加;EnableTransactionManagement 注解了 ; 也不需要加;Transactional
数据库中创建表;涉及到事务的表都需要添加undolog
-- 注意此处0.3.0; 增加唯一索引 ux_undo_log
CREATE TABLE ;undo_log; (
;id; bigint(20) NOT NULL AUTO_INCREMENT,
;branch_id; bigint(20) NOT NULL,
;xid; varchar(100) NOT NULL,
;context; varchar(128) NOT NULL,
;rollback_info; longblob NOT NULL,
;log_status; int(11) NOT NULL,
;log_created; datetime NOT NULL,
;log_modified; datetime NOT NULL,
;ext; varchar(100) DEFAULT NULL,
PRIMARY KEY (;id;),
UNIQUE KEY ;ux_undo_log; (;xid;,;branch_id;)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.10.0</version>
</dependency>
spring:
application:
name: service-user #服务名
redis:
host: 43.136.61.70
port: 6379
password: 123456
database: 0
lettuce:
pool:
max-active: 8
max-idle: 8
max-wait: 2000ms
我们通常以JSON格式将数据存储到Redis中;这种格式是所有编程语言通用的;所以我们可以把Redis的序列化方式配置为JSON ,这样的话我们就可以不用自己去转JSON了.
//缓存的配置
;Configuration
public class RedisConfig {
;Resource
private RedisConnectionFactory factory;
//使用JSON进行序列化
;Bean
public RedisTemplate<Object, Object> redisTemplate() {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
//JSON格式序列化
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
//key的序列化
redisTemplate.setKeySerializer(genericJackson2JsonRedisSerializer);
//value的序列化
redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
//hash结构key的虚拟化
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
//hash结构value的虚拟化
redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
return redisTemplate;
}
}
举例
List<CourseType> list = null;
final Object o = redisTemplate.opsForValue().get(YmccConstants.CACHE_COURSE_TYPE);
if (null != o) {
// 从redis中读取返回数据
list = (List<CourseType>) o;
System.out.println(;-------redis;);
} else {
list = getCourseTypes();
// 存入redis
redisTemplate.opsForValue().set(YmccConstants.CACHE_COURSE_TYPE, list);
System.out.println(;-------mysql;);
}
SpringCahce对缓存流程进行了简化封装;提供了一些注解;我们通过简单的打注解就能实现缓存的添加;修改;删除等,注解如下;
;Cacheable:触发缓存写入。
;CacheEvict:触发缓存清除。
;CachePut:更新缓存(不会影响到方法的运行)。
;Caching:重新组合要应用于方法的多个缓存操作。
;CacheConfig:设置类级别上共享的一些常见缓存设置。
继承 CachingConfigurerSupport 对SpringCache进行配置
package io.coderyeah.ymcc.config;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.interceptor.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import javax.annotation.Resource;
;Configuration
public class CacheConfig extends CachingConfigurerSupport {
;Resource
private RedisConnectionFactory factory;
/*
* 自定义生成redis-key ; 类名.方法名
*/
;Override
;Bean
public KeyGenerator keyGenerator() {
return (o, method, objects) -> {
StringBuilder sb = new StringBuilder();
sb.append(o.getClass().getName()).append(;.;);
sb.append(method.getName()).append(;.;);
for (Object obj : objects) {
sb.append(obj.toString());
}
System.out.println(;keyGenerator=; ; sb.toString());
return sb.toString();
};
}
;Bean
;Override
public CacheResolver cacheResolver() {
return new SimpleCacheResolver(cacheManager());
}
;Bean
;Override
public CacheErrorHandler errorHandler() {
// 用于捕获从Cache中进行CRUD时的异常的回调处理器。
return new SimpleCacheErrorHandler();
}
//缓存管理器
;Bean
;Override
public CacheManager cacheManager() {
RedisCacheConfiguration cacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
.disableCachingNullValues() //不允许空值
.serializeValuesWith(RedisSerializationContext.SerializationPair
//值使用JSON序列化
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return RedisCacheManager.builder(factory).cacheDefaults(cacheConfiguration).build();
}
}
在启动类注解;;EnableCaching
缓存注解不能加在内部方法上;比如;方法A调用方法B;给方法B加上缓存注解会失效;因为内部方法调用代理会失效。在A方法上打注解即可。
添加缓存
;Cacheable(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = ;;all;;)
;Override
public List<CourseType> treeData() {
/* List<CourseType> list = null;
final Object o = redisTemplate.opsForValue().get(YmccConstants.CACHE_COURSE_TYPE);
if (null != o) {
// 从redis中读取返回数据
list = (List<CourseType>) o;
System.out.println(;-------redis;);
} else {
list = getCourseTypes();
// 存入redis
redisTemplate.opsForValue().set(YmccConstants.CACHE_COURSE_TYPE, list);
System.out.println(;-------mysql;);
}*/
log.debug(;=============查询了数据库============;);
return getCourseTypes();
}
// 从数据库中查询
private List<CourseType> getCourseTypes() {
// 查询所有分类
List<CourseType> courseTypes = selectList(null);
// 将集合转换为map
Map<Long, CourseType> map = courseTypes.stream().collect(Collectors.toMap(CourseType::getId, courseType -> courseType));
// 返回给前端的集合
List<CourseType> list = new ArrayList<>();
// 遍历
courseTypes.forEach(courseType -> {
if (courseType.getPid() == null || courseType.getPid() == 0) {
// 顶级
list.add(courseType);
} else {
// 找到父级
CourseType type = map.get(courseType.getPid());
if (type != null) {
type.getChildren().add(courseType);
}
}
});
return list;
}
// 剔除缓存
;CacheEvict(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = ;;all;;)
;Override
public boolean insert(CourseType entity) {
return super.insert(entity);
}
;CacheEvict(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = ;;all;;)
;Override
public boolean deleteById(Serializable id) {
return super.deleteById(id);
}
;CacheEvict(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = ;;all;;)
;Override
public boolean updateById(CourseType entity) {
return super.updateById(entity);
}
}
6. #### 剔除缓存 ;;;java // 剔除缓存 ;CacheEvict(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = ;;all;;) ;Override public boolean insert(CourseType entity) { return super.insert(entity); } ;CacheEvict(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = ;;all;;) ;Override public boolean deleteById(Serializable id) { return super.deleteById(id); } ;CacheEvict(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = ;;all;;) ;Override public boolean updateById(CourseType entity) { return super.updateById(entity); }