常见秒杀方案操持:
1.数据库行锁
2.分布式锁+分段锁提拔服从
3.Redis单线程机制,将库存放在Redis内里利用
set count 1000
decrby count 1 扣减库存,返回正数就可扣减库存
4.Redis+Lua脚本,查询库存和扣减库存放到Lua脚本内里去执行
这是一个原子利用,办理高并发下线程安全标题
总结:简朴利用redis的LUA脚本功能,一次性利用,实现原子性
Redis+Lua实现高并发秒杀功能
1、导入相干依靠
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.4</version> </dependency>2、RedisConfig Bean初始化配置
import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.ObjectMapper;import org.apache.commons.lang3.StringUtils;import org.redisson.config.SingleServerConfig;import org.redisson.Redisson;import org.redisson.api.RedissonClient;import org.redisson.config.Config;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer;@Configurationpublic class RedisConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Value("${spring.redis.password}") private String password; @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { final Config config = new Config(); SingleServerConfig singleServerConfig = config.useSingleServer() .setAddress("redis://" + host + ":" + port); if (StringUtils.isNotBlank(password)) { singleServerConfig.setPassword(password); } System.out.println("------------- redisson -----------------------"); System.out.println(config.getTransportMode()); return Redisson.create(config); } /** * 重写Redis序列化方式,利用Json方式: * 数据存储到Redis的时候,我们的键(key)和值(value)都是通过Spring提供的Serializer序列化到数据库的 * RedisTemplate默认利用的是JdkSerializationRedisSerializer * StringRedisTemplate默认利用的是StringRedisSerializer * <p> * Spring Data JPA为我们提供了下面的Serializer: * GenericToStringSerializer、Jackson2JsonRedisSerializer * JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。 * 在此我们将自己配置RedisTemplate并界说Serializer */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); jackson2JsonRedisSerializer.setObjectMapper(om); // 设置值(value)的序列化接纳Jackson2JsonRedisSerializer。 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); // 设置键(key)的序列化接纳StringRedisSerializer。 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; }}3、Redis+Lua脚本实现秒杀扣减库存
public interface IStockCallback { /** * 扣减Redis库存 * * @param batchNo 商品唯一编号 * @param expire 逾期时间 * @param num 扣减库存数量 * @return 剩余库存数量 */ long getStock(String batchNo, long expire, int num); /** * 初始化库存 * * @param commodityId 业务 * @return 库存 */ int initStock(long commodityId);}下面是下单扣减库存业务的代码块,在扣库存的时候,不能超发,也不能扣到负数,
然后再同步到MYSQL里,初始化库存数量,这个可以从DB里取现实的量,
LUA脚本包管原子性,查询剩余库存和扣减逻辑是一个原子性利用
@Slf4j@Servicepublic class RedisStockService implements IStockCallback { /** * 库存还未初始化 */ public static final long UNINITIALIZED_STOCK = -3L; /** * 判断商品是否存在KEY标识 */ public static final long EXIST_FLAG = -2L; /** * 配置库存Redis缓存Key前缀 */ public static final String REDIS_KEY = "REDIS_KEY:STOCK:"; /** * 执行扣库存的Lua脚本 */ public static final String STOCK_LUA; /** * Redisson 客户端 */ @Resource private RedissonClient redissonClient; /** * Redis 客户端 */ @Resource private RedisTemplate<String, Integer> redisTemplate; static { /* * @desc 扣减库存Lua脚本 * 库存(stock)-1:体现不限库存 * 库存(stock) 0:体现没有库存 * 库存(stock)大于0:体现剩余库存 * * @params 库存key * @return * -3:库存未初始化 * -2:库存不敷 * -1:不限库存 * 大于便是0: 剩余库存(扣减之后剩余的库存), 直接返回-1 */ final StringBuilder strBuilder = new StringBuilder(); strBuilder.append("if (redis.call('exists', KEYS[1]) == 1) then"); strBuilder.append(" local stock = tonumber(redis.call('get', KEYS[1]));"); strBuilder.append(" local num = tonumber(ARGV[1]);"); strBuilder.append(" if (stock == -1) then"); strBuilder.append(" return -1;"); strBuilder.append(" end;"); strBuilder.append(" if (stock >= num) then"); strBuilder.append(" return redis.call('incrby', KEYS[1], 0 - num);"); strBuilder.append(" end;"); strBuilder.append(" return -2;"); strBuilder.append("end;"); strBuilder.append("return -3;"); STOCK_LUA = strBuilder.toString(); } /** * 执行扣减库存业务 * * @param batchNo 库存唯一标识 * @param expire 库存逾期时间 * @param num 扣减库存的数量 * @return 返回扣减库存后剩余库存数量 */ @Override public long getStock(String batchNo, long expire, int num) { // 商品库存唯一标识 final String key = REDIS_KEY + batchNo; /* * 从redis中获取key对应的逾期时间; * 1、假如该值有逾期时间,就返回相应的逾期时间; * 2、假如该值没有设置逾期时间,就返回-1; * 3、假如没有该值,就返回-2; * * 留意:这里为了方便模仿,现实线上。通过缓存预热的方式通过DB查询现实的库存数据 * 添加到Redis中 */ Long expire1 = redisTemplate.opsForValue().getOperations().getExpire(key); if (Objects.equals(EXIST_FLAG, expire1)) { redisTemplate.opsForValue().set(key, 100, expire, TimeUnit.SECONDS); System.out.println("Redis无初始库存,设置库存数据 = " + expire1); } // 初始化商品库存 Integer stock = redisTemplate.opsForValue().get(key); // 设置分布式锁 final RLock rLock = redissonClient.getLock(REDIS_KEY + " OCK"); try { if (rLock.tryLock(1, TimeUnit.SECONDS)) { stock = redisTemplate.opsForValue().get(key); log.info("--- 当前Key:[{}]加锁乐成,当前最新库存:{}---", key, stock); // 调一次扣库存的利用 Long stock1 = stock(key, num); System.out.println("stock1 = " + stock1); stock = redisTemplate.opsForValue().get(key); int batchNoLock = Objects.requireNonNull(stock); log.info("--- 当前剩余库存:{}", batchNoLock); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { if (rLock != null && rLock.isHeldByCurrentThread()) { rLock.unlock(); } } return stock; } /** * 扣库存这步特别留意,分布式毗连有标题,必要依靠包里,去掉lettuce组件 * 初始化库存数量,这个可以从DB里取现实的量 * * @param key 库存key * @param num 扣减库存数量 * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不敷; -1:不限库存; 大于便是0:扣减库存之后的剩余库存】 */ private Long stock(String key, int num) { // 脚本里的KEYS参数 List<String> keys = new ArrayList<>(); keys.add(key); // 脚本里的ARGV参数 List<String> argvList = new ArrayList<>(); argvList.add(Integer.toString(num)); // 执行扣减库存LUA脚本 return redisTemplate.execute((RedisCallback<Long>) connection -> { Object nativeConnection = connection.getNativeConnection(); // 集群模式 if (nativeConnection instanceof JedisCluster) { return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, argvList); } // 单机模式 else if (nativeConnection instanceof Jedis) { return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, argvList); } return UNINITIALIZED_STOCK; }); } /** * 获取初始的库存 * 初始化库存数量,这个可以从DB里取现实的量 * * @param commodityId 业务ID * @return 初始库存 */ @Override public int initStock(long commodityId) { // TODO 这里做一些初始化库存的利用 return 30; }}3、调用接口并发Controller,测试分布式库存扣减
@Resource private RedisStockService redisStockService; /** * 下单扣减库存 * * @param stockDTO 下单哀求参数 * @return 扣减库存结果 */ @PostMapping("/buyProductCreateOrder") public Object buyProductCreateOrder(@RequestBody StockDTO stockDTO) { try { return redisStockService.getStock(stockDTO.getBatchNo(), stockDTO.getExpire(), stockDTO.getNum()); } catch (Exception e) { return e.getMessage(); } } |