Redis实现高并发扣减库存,秒杀功能(可线上利用)

源码 2024-9-10 21:41:17 57 0 来自 中国
常见秒杀方案操持:
1.数据库行锁
2.分布式锁+分段锁提拔服从
3.Redis单线程机制,将库存放在Redis内里利用
set count 1000
decrby count 1 扣减库存,返回正数就可扣减库存
4.Redis+Lua脚本,查询库存和扣减库存放到Lua脚本内里去执行
这是一个原子利用,办理高并发下线程安全标题
总结:简朴利用redis的LUA脚本功能,一次性利用,实现原子性
Redis+Lua实现高并发秒杀功能

1.png 1、导入相干依靠

        <dependency>            <groupId>redis.clients</groupId>            <artifactId>jedis</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-data-redis</artifactId>            <exclusions>                <exclusion>                    <groupId>io.lettuce</groupId>                    <artifactId>lettuce-core</artifactId>                </exclusion>            </exclusions>        </dependency>        <dependency>            <groupId>org.apache.commons</groupId>            <artifactId>commons-lang3</artifactId>            <version>3.12.0</version>        </dependency>       <dependency>            <groupId>org.redisson</groupId>            <artifactId>redisson</artifactId>            <version>3.13.4</version>        </dependency>2、RedisConfig Bean初始化配置

import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.ObjectMapper;import org.apache.commons.lang3.StringUtils;import org.redisson.config.SingleServerConfig;import org.redisson.Redisson;import org.redisson.api.RedissonClient;import org.redisson.config.Config;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer;@Configurationpublic class RedisConfig {    @Value("${spring.redis.host}")    private String host;    @Value("${spring.redis.port}")    private String port;    @Value("${spring.redis.password}")    private String password;    @Bean(destroyMethod = "shutdown")    public RedissonClient redissonClient() {        final Config config = new Config();        SingleServerConfig singleServerConfig = config.useSingleServer()                .setAddress("redis://" + host + ":" + port);        if (StringUtils.isNotBlank(password)) {            singleServerConfig.setPassword(password);        }        System.out.println("------------- redisson -----------------------");        System.out.println(config.getTransportMode());        return Redisson.create(config);    }    /**     * 重写Redis序列化方式,利用Json方式:     * 数据存储到Redis的时候,我们的键(key)和值(value)都是通过Spring提供的Serializer序列化到数据库的     * RedisTemplate默认利用的是JdkSerializationRedisSerializer     * StringRedisTemplate默认利用的是StringRedisSerializer     * <p>     * Spring Data JPA为我们提供了下面的Serializer:     * GenericToStringSerializer、Jackson2JsonRedisSerializer     * JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。     * 在此我们将自己配置RedisTemplate并界说Serializer     */    @Bean    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();        redisTemplate.setConnectionFactory(redisConnectionFactory);        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =        new Jackson2JsonRedisSerializer<Object>(Object.class);        ObjectMapper om = new ObjectMapper();        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);        jackson2JsonRedisSerializer.setObjectMapper(om);        // 设置值(value)的序列化接纳Jackson2JsonRedisSerializer。        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);        // 设置键(key)的序列化接纳StringRedisSerializer。        redisTemplate.setKeySerializer(new StringRedisSerializer());        redisTemplate.setHashKeySerializer(new StringRedisSerializer());        redisTemplate.afterPropertiesSet();        return redisTemplate;    }}3、Redis+Lua脚本实现秒杀扣减库存

public interface IStockCallback {    /**     * 扣减Redis库存     *     * @param batchNo 商品唯一编号     * @param expire  逾期时间     * @param num     扣减库存数量     * @return 剩余库存数量     */    long getStock(String batchNo, long expire, int num);    /**     * 初始化库存     *     * @param commodityId 业务     * @return 库存     */    int initStock(long commodityId);}
下面是下单扣减库存业务的代码块,在扣库存的时候,不能超发,也不能扣到负数,
然后再同步到MYSQL里,初始化库存数量,这个可以从DB里取现实的量,
LUA脚本包管原子性,查询剩余库存和扣减逻辑是一个原子性利用
@Slf4j@Servicepublic class RedisStockService implements IStockCallback {    /**     * 库存还未初始化     */    public static final long UNINITIALIZED_STOCK = -3L;    /**     * 判断商品是否存在KEY标识     */    public static final long EXIST_FLAG = -2L;    /**     * 配置库存Redis缓存Key前缀     */    public static final String REDIS_KEY = "REDIS_KEY:STOCK:";    /**     * 执行扣库存的Lua脚本     */    public static final String STOCK_LUA;    /**     * Redisson 客户端     */    @Resource    private RedissonClient redissonClient;    /**     * Redis 客户端     */    @Resource    private RedisTemplate<String, Integer> redisTemplate;    static {        /*         * @desc 扣减库存Lua脚本         * 库存(stock)-1:体现不限库存         * 库存(stock) 0:体现没有库存         * 库存(stock)大于0:体现剩余库存         *         * @params 库存key         * @return         *      -3:库存未初始化         *      -2:库存不敷         *      -1:不限库存         *      大于便是0: 剩余库存(扣减之后剩余的库存), 直接返回-1         */        final StringBuilder strBuilder = new StringBuilder();        strBuilder.append("if (redis.call('exists', KEYS[1]) == 1) then");        strBuilder.append("    local stock = tonumber(redis.call('get', KEYS[1]));");        strBuilder.append("    local num = tonumber(ARGV[1]);");        strBuilder.append("    if (stock == -1) then");        strBuilder.append("        return -1;");        strBuilder.append("    end;");        strBuilder.append("    if (stock >= num) then");        strBuilder.append("        return redis.call('incrby', KEYS[1], 0 - num);");        strBuilder.append("    end;");        strBuilder.append("    return -2;");        strBuilder.append("end;");        strBuilder.append("return -3;");        STOCK_LUA = strBuilder.toString();    }    /**     * 执行扣减库存业务     *     * @param batchNo 库存唯一标识     * @param expire  库存逾期时间     * @param num     扣减库存的数量     * @return 返回扣减库存后剩余库存数量     */    @Override    public long getStock(String batchNo, long expire, int num) {        // 商品库存唯一标识        final String key = REDIS_KEY + batchNo;        /*         * 从redis中获取key对应的逾期时间;         * 1、假如该值有逾期时间,就返回相应的逾期时间;         * 2、假如该值没有设置逾期时间,就返回-1;         * 3、假如没有该值,就返回-2;         *         * 留意:这里为了方便模仿,现实线上。通过缓存预热的方式通过DB查询现实的库存数据         * 添加到Redis中         */        Long expire1 = redisTemplate.opsForValue().getOperations().getExpire(key);        if (Objects.equals(EXIST_FLAG, expire1)) {            redisTemplate.opsForValue().set(key, 100, expire, TimeUnit.SECONDS);            System.out.println("Redis无初始库存,设置库存数据 = " + expire1);        }        // 初始化商品库存        Integer stock = redisTemplate.opsForValue().get(key);        // 设置分布式锁        final RLock rLock = redissonClient.getLock(REDIS_KEY + "OCK");        try {            if (rLock.tryLock(1, TimeUnit.SECONDS)) {                stock = redisTemplate.opsForValue().get(key);                log.info("--- 当前Key:[{}]加锁乐成,当前最新库存:{}---", key, stock);                // 调一次扣库存的利用                Long stock1 = stock(key, num);                System.out.println("stock1 = " + stock1);                stock = redisTemplate.opsForValue().get(key);                int batchNoLock = Objects.requireNonNull(stock);                log.info("--- 当前剩余库存:{}", batchNoLock);            }        } catch (Exception e) {            log.error(e.getMessage(), e);        } finally {            if (rLock != null && rLock.isHeldByCurrentThread()) {                rLock.unlock();            }        }        return stock;    }    /**     * 扣库存这步特别留意,分布式毗连有标题,必要依靠包里,去掉lettuce组件     * 初始化库存数量,这个可以从DB里取现实的量     *     * @param key 库存key     * @param num 扣减库存数量     * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不敷; -1:不限库存; 大于便是0:扣减库存之后的剩余库存】     */    private Long stock(String key, int num) {        // 脚本里的KEYS参数        List<String> keys = new ArrayList<>();        keys.add(key);        // 脚本里的ARGV参数        List<String> argvList = new ArrayList<>();        argvList.add(Integer.toString(num));        // 执行扣减库存LUA脚本        return redisTemplate.execute((RedisCallback<Long>) connection -> {            Object nativeConnection = connection.getNativeConnection();            // 集群模式            if (nativeConnection instanceof JedisCluster) {                return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, argvList);            }            // 单机模式            else if (nativeConnection instanceof Jedis) {                return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, argvList);            }            return UNINITIALIZED_STOCK;        });    }    /**     * 获取初始的库存     * 初始化库存数量,这个可以从DB里取现实的量     *     * @param commodityId 业务ID     * @return 初始库存     */    @Override    public int initStock(long commodityId) {        // TODO 这里做一些初始化库存的利用        return 30;    }}3、调用接口并发Controller,测试分布式库存扣减

   @Resource    private RedisStockService redisStockService;    /**     * 下单扣减库存     *     * @param stockDTO 下单哀求参数     * @return 扣减库存结果     */    @PostMapping("/buyProductCreateOrder")    public Object buyProductCreateOrder(@RequestBody StockDTO stockDTO) {        try {            return redisStockService.getStock(stockDTO.getBatchNo(), stockDTO.getExpire(), stockDTO.getNum());        } catch (Exception e) {            return e.getMessage();        }    }
您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2024-10-19 02:26, Processed in 0.149477 second(s), 35 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表