分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

一、单进程多线程的锁--线程锁

锁住线程的锁叫线程锁,像C#中的lock,Monitor,让线程排队,同一时刻只能有一个线程进来,让线程同步排队。

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

 二、多进程的锁--分布式锁

锁住进程的锁就叫分布式锁,是锁住进程的一种机制,让进程排队。

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

三、电商秒杀场景

1、单体架构

并发量不够,秒杀服务只能并发1000,而客户端同时发送3000个请求。

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

2、集群架构

这时候就需要多两个角色,一个角色是网关,一个角色是秒杀集群,网关把用户请求转发到3个秒杀服务,这样每个秒杀服务并发1000个请求,就能够满足客户端同时发送3000个请求。

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

四、秒杀服务集群带来新的问题

第1个请求进入到秒杀服务1里面,查询数据库商品库存是10,判断有库存,扣减库存,更新数据库,当前库存是9。
第2个请求进入到秒杀服务2里面,查询数据库商品库存是10,判断有库存,扣减库存,更新数据库,当前库存是9。
第3个请求进入到秒杀服务3里面,查询数据库商品库存是10,判断有库存,扣减库存,更新数据库,当前库存是9。

实际库存只减少了1个,但是同1个商品被3个人秒杀到了,这就是超卖问题。

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

五、分布式锁解决什么问题?

分布式系统中,涉及到多个进程共享资源的时候,就需要使用分布式锁。

谁持有了锁,谁才能操作数据库扣减库存。

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

 六、运行效果

1、单进程发起20个线程模拟20个用户并发请求,秒杀商品,会发现20个线程,20个请求秒杀到同1个商品。

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

 2、对于单进程可以加lock锁解决超卖问题

商品库存有10个,开启20个线程秒杀商品,有10个请求分别秒杀到不同的商品,另外10个线程没有秒杀到商品,因为库存只有10个。

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

3、我现在把相同的代码Copy一份,新建个工程MyRedis.SecKill.MultiProcess.Other,也同样使用了lock锁,快速的启动2个进程,每个进程中开启20个线程就发现lock锁不住了,lock锁失效了,同一个商品编号10被2个不同的进程中的线程秒杀到了。

 

我们看到单进程通过加lock锁可以保证不发生超卖问题,10个线程秒杀到商品,商品编号不同,另外10个线程没有秒杀到商品。
但是因为为了提高并发量,现在是秒杀服务集群提供秒杀服务了,我们在两个秒杀服务进程中都开启20个线程去秒杀商品,就会发现如图所示控制不住了,两个进程中的线程都秒杀到同一个商品了(这里用商品库存当做商品编号),那么如何解决跨进程并发引起的商品超卖问题?这就需要分布式锁了。

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

 分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

七、封装Redis分布式锁--解决跨进程并发秒杀超卖问题

1、秒杀服务端

namespace MyRedis.SecKill.MultiProcess.SecKill
{
    /// <summary>
    /// 商品秒杀服务
    /// </summary>
    public class ProductSecKill
    {
        /// <summary>
        /// 秒杀方法
        /// </summary>
        public void SecKillProduct()
        {
            RedisLock redisLock = new RedisLock();
            redisLock.Lock();
            //lock (this)//只是适合单进程
            //{
                // 1、获取商品库存
                var productStock = GetPorductStocks();

                // 2、判断商品库存是否为空
                if (productStock.Conut == 0)
                {
                    // 2.1 秒杀失败消息
                    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}:不好意思,秒杀已结束,商品编号:{productStock.Conut}");
                    redisLock.UnLock();
                    return;
                }

                // 3、秒杀成功消息
                Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}:恭喜你,秒杀成功,商品编号:{productStock.Conut}");

                // 4、扣减商品库存
                SubtracPorductStocks(productStock);
            //}
           
            redisLock.UnLock();
        }

        /// <summary>
        /// 获取商品库存
        /// </summary>
        /// <returns></returns>
        private Product_Stock GetPorductStocks()
        {
            using (ShoppingEntities shoppingEntities = new ShoppingEntities())
            {
                // 1、查询数据库获取库存,获取第一个商品的库存数
                Product_Stock productStock = shoppingEntities.Product_Stock.FirstOrDefault(s => s.Id == 1);

                // 2、返回库存
                return productStock;
            }

        }

        /// <summary>
        /// 扣减商品库存
        /// </summary>
        private void SubtracPorductStocks(Product_Stock stocks)
        {
            using (ShoppingEntities shoppingEntities = new ShoppingEntities())
            {
                // 1、扣减商品库存
                Product_Stock updateStocks = shoppingEntities.Product_Stock.FirstOrDefault(s => s.Id == stocks.Id);
                updateStocks.Conut = stocks.Conut - 1;

                // 2、更新数据库
                shoppingEntities.SaveChanges();
            }
        }
    }
}

2、秒杀客户端

 

namespace MyRedis.SecKill.MultiProcess
{
    class Program
    {
        static void Main(string[] args)
        {
            // 1、开始秒杀
            ClientRequest.SendRequest(20);
            Console.ReadKey();
        }
    }
}

 

namespace MyRedis.SecKill.MultiProcess.SecKill
{
    class ClientRequest
    {
        /// <summary>
        /// 客户端请求
        /// </summary>
        /// <param name="threadCount">线程数</param>
        public static void SendRequest(int threadCount)
        {
            // 1、商品秒杀服务
            ProductSecKill productSecKill = new ProductSecKill();
            // 2、创建20个请求来秒杀
            for (int i = 0; i < threadCount; i++)
            {
                Thread thread = new Thread(() =>
                {
                    productSecKill.SecKillProduct();
                });
                thread.Start();
            }
        }
    }
}

3、Redis分布式锁

封装分布式锁4要素

3.1、锁名
3.2、加锁操作
        锁对象,也就是谁持有这把锁,持有锁的才能解锁
3.3、解锁操作
3.4、锁超时时间

namespace MyRedis.SecKill.MultiProcess.Locks
{
    /// <summary>
    /// redis分布式锁
    /// 分布式锁四要素
    /// 1、锁名
    /// 2、加锁操作
    /// 3、解锁操作
    /// 4、锁超时时间
    /// </summary>
    class RedisLock
    {
        // 1、redis连接管理类
        private ConnectionMultiplexer connectionMultiplexer = null;

        // 2、redis数据操作类
        private IDatabase database = null;
        public RedisLock()
        {
            connectionMultiplexer = ConnectionMultiplexer.Connect("localhost:6379");

            database = connectionMultiplexer.GetDatabase(0);
        }

        /// <summary>
        /// 1、加锁
        /// </summary>
        public void Lock()
        {
            // 1、redis加锁api--LockTake
            // key--锁名--redis_lock
            // value--锁对象(谁持有这把锁)--进程Id+线程Id
            // expiry--锁超时时间,为什么?解锁死锁问题!
            // 2、如果加锁失败?循环加锁,对于未知的事情用循环
            while (true)
            {
                bool flag = database.LockTake("redis_lock", "ProcessNo1" +Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds(10));

                // 3、如果加锁成功,则退出循环
                if (flag)
                {
                    break;
                }

                // 3.1 加锁失败,线程休眠下,走循环,再尝试加锁
                Thread.Sleep(200);
            }
        }

        /// <summary>
        /// 2、解锁
        /// </summary>
        public void UnLock()
        {
            //1、redis解锁api--LockRelease
            // key--锁名--redis_lock
            // value--锁对象(谁持有这把锁)--进程Id+线程Id--使加锁和解锁是同一个对象
            // 2、如果解锁失败?循环解锁,对于未知的事情用循环
            bool flag = database.LockRelease("redis_lock", "ProcessNo1" +Thread.CurrentThread.ManagedThreadId);
            while (true) 
            {
                // 3、如果解锁成功,则退出循环
                if (flag)
                {
                    break;
                }

                // 3.1 解锁失败,线程休眠下,走循环,再尝试解锁
                Thread.Sleep(200);
            }
            // 4、关闭资源
            connectionMultiplexer.Dispose();
        }
    }
}

八、再次运行效果

最后我们发现库存36个商品,2个进程,每个进程开启20个线程,都是不同的商品编号没有秒杀到同一件商品。

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

 九、项目结构

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

十、思考Redis集群环境死锁

如果在Redis Master上持有了锁,但是Redis Master宕机了,需要把Redis Slave提成Redis Master,但是原来的Redis Master的锁没有释放,造成死锁了怎么办?

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

 

分布式系统--封装Redis分布式锁解决跨进程并发秒杀引起的超卖问题

上一篇:怎样开发单页面app


下一篇:iOS学习笔记---c语言第六天