基元是指可以在代码中使用的最简单的构造。
有两种基元构造:用户模式构造内核模式构造。应尽量使用用户模式构造,它们的速度要比内核构造快很多,因为他们使用了特殊的CPU指令来协调线程(意味着协调是在硬件中发生,所以会快很多)。

这两种构造都有自身的特点

本文基于.NET4.5框架

用户模式构造

易变构造

volatile关键字指示一个字段可以由多个同时执行的线程修改,通过volatile关键字,可以保证变量在执行上有着原子性的读、写操作。 volatile关键字可应用于以下类型的字段:

  • 引用类型。
  • 指针类型(在不安全的上下文中)。 请注意,虽然指针本身可以是可变的,但是它指向的对象不能是可变的。 换句话说,不能声明“指向可变对象的指针”。
  • 简单类型,如 sbyte、byte、short、ushort、int、uint、char、float 和 bool。
  • 具有以下基本类型之一的 enum 类型:byte、sbyte、short、ushort、int 或 uint。
  • 已知为引用类型的泛型类型参数。
  • IntPtr 和 UIntPtr。

这些描述读起来比较绕口,那么我们通过一个示例来演示以下volatile的基本用法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    /// <summary>
    /// 定义测试类
    /// </summary>
    public class VolatileTest
    {
        private bool _isStop = false;
        public void Do()
        {
            Console.WriteLine("任务开始");
            var th = new Thread(Thread1);
            th.Start();

            Thread.Sleep(5000);
            _isStop = true;
            Console.WriteLine("等待任务完成");
            th.Join();

            Console.ReadLine();
        }

        public void Thread1()
        {
            int x = 0;
            while (!_isStop)
            {
                x = x + 1;
            }
            Console.WriteLine($"任务执行完成,计数:{x}");
        }
    }
1
2
3
4
5
        static void Main(string[] args)
        {
            //调用测试方法
            new VolatileTest().Do();
        }

先来猜猜上面这段代码的执行结果是怎样的😓😓

代码在编译时,根据不同的编译配置,可能会出现不同的结果。

  • Debug模式下,编译器并没有对代码进行优化,所以程序会得到正常的执行结果🔻 VolatileDebug.jpg
    至于这里的计数为什么会是负数,这个以后有机会在单独写文章分析

  • Release模式下,编译器默认打开了代码优化功能。编译VolatileTest文件时,编译器发现_isStopThread1方法中并不会发生改变,并且_isStop的检查只会在循环前发生一次,不会每次循环时都检查。因此会生成一段一直累加x的代码出来。因此,程序运行时,得到了以下结果🔻 VolatileRelease.jpg

接下来通过IL代码来对比一下两者的不同(左边为Debug版本,右边为Release版本)

VolatileIL.jpg

在两个版本中,主要有两点不同:

  1. Debug版本中,在初始化变量时,定义了一个bool类型的变量;Release版本没有
  2. Debug版本中,在循环结束前,对定义的bool变量和_isStop进行了计算,然后判断是否需要继续循环;Release版本中直接获取并校验了_isStop的值,结果为false,则无限循环。

要解决这个问题其实很简单,就是使用前面所说的Volatile关键字定义变量即可。Volatile能保证变量的原子操作,因此用Volatile定义的变量,编译器是不会进行之前的优化操作。 所以重新修改_isStop变量的定义即可修正前面的问题。

1
private volatile bool _isStop = false;

另外,System.Threading.Volatile类还提供了以下两个静态方法来进行变量的读写🔻

1
2
            Volatile.Read(ref _isStop);
            Volatile.Write(ref _isStop,true);

因此通过如下代码也可以修正前面的问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
        public void Thread1()
        {
            int x = 0;
            //这里使用Volatile.Read来读取最新的值
            while (!Volatile.Read(ref _isStop))
            {
                x = x + 1;
            }
            Console.WriteLine($"任务执行完成,计数:{x}");
        }

如果对上面的概念还是不太理解,那么可以再参考下书中的这样一段定义🔻

当线程通过共享内存相互通信时,调用Volatile.Write来写入最后一个值,调用Volatile.Read来读取第一个值

互锁构造

System.Threading.Interlocked类中的每个方法都为多个线程共享的变量提供原子操作。

调用某个Interlocked方法之前的任何变量写入都在这个Interlocked方法调用之前执行;而这个调用之后的任何变量读取都在这个调用之后读取。

volatile只有Read、Write两个方法提供原子性的操作,而Interlocked类中的所有方法都执行一次原子性的读取和写入 该类提供的方法比较多,具体可以参考MSDN文档

依然很绕口吧🙇‍♂️🙇‍♂️那继续用代码来理解。 该示例使用HttpClient对指定的服务器请求,并显示请求返回的字节数🔻

1
2
3
4
5
6
7
8
9
    /// <summary>
    /// 服务器请求状态
    /// </summary>
    public enum CoordinationStatus
    {
        AllDone,
        Timeout,
        Cancel
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
    /// <summary>
    /// 定义测试类
    /// </summary>
    public class InterlockedTest
    {
        //这个辅助类用于协调所有异步操作
        private readonly AsyncCoordinator _ac = new AsyncCoordinator();

        //这是想要查询的web服务器及其响应(异常或int)的集合
        //注意:多个线程访问该字典不需要以同步方式进行
        //因为构造后键就是只读的
        private readonly Dictionary<String, Object> _remoteServers = new Dictionary<string, object> {
        {"https://www.microsoft.com" ,null},
        {"https://www.bing.com",null},
        {"https://www.google.com",null}
        };
        public void Do(int timeout = Timeout.Infinite)
        {
            var httpClient = new HttpClient();
            foreach (var server in _remoteServers.Keys)
            {
                _ac.AboutToBegin(1);//这里也可以修改为下面注释中的方式
                httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));
            }

            // //一次性向AboutToBegin方法传递所有的请求数量
            // _ac.AboutToBegin(_remoteServers.Count);
            // foreach (var server in _remoteServers.Keys)
            // {
            //     httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));
            // }

            //告诉AsyncCoordinator所有操作都已发起,并在所有操作完成
            //调用Cancel或者发生超时的时候调用AllDone
            _ac.AllBegun(AllDone, timeout);
        }
        private void ComputeResult(string server, Task<Byte[]> task)
        {
            object result;
            if (task.Exception != null)
            {
                result = task.Exception.InnerException;
            }
            else
            {
                //线程池线程处理I/O完成
                //在此添加自己的计算密集型算法
                //本例只是返回长度
                result = task.Result.Length;
            }
            //保存结果(exception/sum),指出1个操作完成
            _remoteServers[server] = result;
            _ac.JustEnded();
        }

        //调用这个方法指出结果已无关紧要
        public void Cancel()
        {
            _ac.Cancel();
        }

        //所有Web服务器都响应、调用了Cancel或者发生超时,就调用该方法
        private void AllDone(CoordinationStatus status)
        {
            switch (status)
            {
                case CoordinationStatus.Cancel:
                    Console.WriteLine("请求取消");
                    break;
                case CoordinationStatus.Timeout:
                    Console.WriteLine("请求超时");
                    break;
                case CoordinationStatus.AllDone:
                    Console.WriteLine("请求完成,结果:");
                    foreach (var server in _remoteServers)
                    {
                        Console.WriteLine($"{server.Key}");
                        object result = server.Value;
                        if (result is Exception)
                        {
                            Console.WriteLine($"请求发生异常:{result.GetType().Name}");
                        }
                        else
                        {
                            Console.WriteLine($"返回字节数:{result}");
                        }
                    }
                    break;
            }
        }
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
    /// <summary>
    /// 这个辅助类用于协调所有异步操作
    /// </summary>
    public class AsyncCoordinator
    {
        //AllBegun内部调用JustEnded来递减它
        private int _opCount = 1;
        //0 = false  1= true
        private int _statusReported = 0;
        private Action<CoordinationStatus> _callback;
        private Timer _timer;

        //该方法必须在发起一个操作之前调用
        public void AboutToBegin(int opsToAdd = 1)
        {
            Interlocked.Add(ref _opCount, opsToAdd);
        }
        //该方法必须在处理好一个操作的结果之后调用
        public void JustEnded()
        {
            if (Interlocked.Decrement(ref _opCount) == 0)
            {
                ReportStatus(CoordinationStatus.AllDone);
            }
        }

        //该方法必须在发起所有操作之后调用
        public void AllBegun(Action<CoordinationStatus> callback, int timeout = Timeout.Infinite)
        {
            _callback = callback;
            if (timeout != Timeout.Infinite)
            {
                _timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
            }
            JustEnded();
        }

        private void TimeExpired(object o)
        {
            ReportStatus(CoordinationStatus.Timeout);
        }

        public void Cancel()
        {
            ReportStatus(CoordinationStatus.Cancel);
        }

        private void ReportStatus(CoordinationStatus status)
        {
            //如果状态从未报告过,就报告它;否则忽略它
            if (Interlocked.Exchange(ref _statusReported, 1) == 0)
            {
                _callback(status);
            }
        }
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
        static void Main(string[] args)
        {
            //调用测试方法
            new InterlockedTest().Do();
            Console.ReadLine();
        }

        //↓↓↓代码运行结果如下↓↓↓

        // 请求完成,结果:
        // https://www.microsoft.com
        // 返回字节数:185796
        // https://www.bing.com
        // 返回字节数:114613
        // https://www.google.com
        // 请求发生异常:HttpRequestException

接下来说明一下这个示例🔻

  1. InterlockedTest类主要负责定义一个多线程访问网站的字典,并且对字典中的网站进行访问。任务开始前会向AsyncCoordinator类发送本次需要请求的操作数量,所有任务创建完成后调用AllBegun方法向辅助类报告任务已创建完成,并且绑定一个回调方法AllDone(该方法的作用是打印所有请求的结果)。
  2. AsyncCoordinator类用于协调所有异步操作,之所以单独提取出来,是因为这个类的功能可以复用。该类的两个公共方法AboutToBeginJustEnded分别用于对内部维护的异步任务数(_opCount)进行原子的加、减操作(通过Interlocked.AddInterlocked.Decrement来实现)。当任务数为0时,调用ReportStatus方法向InterlockedTest报告执行结果,说明所有异步任务已完成。

这里特别说明以下几个点🔻

  • _opCount变量的初始值为1(而非0),这一点很重要。假定初始值为0,当某一个请求到完成又很迅速时(下一个任务还没调用AboutToBegin的时候当前任务就已经完成并且调用了JustEnded方法),那么这时就会导致AllDone被提前执行。因此初始化时将_opCount的值设置为1,等所有任务添加完成后,调用AllBegun方法时再通过JustEnded将那个1减掉。
  • _statusReported变量实际上是作为一个Boolean来使用的,因为Interlocked类不提供Boolean类型的原子操作,所以这里我们使用int类型的变量来代替。
  • 示例中的Cancel方法没有做展示,想要看效果的同学可以修改代码自己调用。
  • 示例中的Timeout方法没有做展示,因为默认传递的参数是不超时,想要看效果的同学可以修改代码自己调用。

实现简单的自旋锁

前面所说的Interlocked的方法很好用,但是主要用于操作Int32类型,如果需要原子性的操作类中的一组字段,Interlocked很明显是做不到的。所以这时我们可以构造一个让线程自旋模块,来实现代码同步。
所谓自旋锁(spin lock),其实就是在多线程的处理中,一个线程暂时“原地打转”,以免跑去跟另外一个线程竞争资源。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    /// <summary>
    /// 定义一个简单的自旋锁
    /// </summary>
    public class SpinLockTest
    {
        private int _isInUse = 0;

        public void Enter()
        {
            while (true)
            {
                if (Interlocked.Exchange(ref _isInUse, 1) == 0)
                {
                    return;
                }
                //如果有线程正在调用,那么后来的线程就一直在这里“自旋”
            }
        }

        public void Leave()
        {
            Interlocked.Exchange(ref _isInUse, 0);
        }
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    /// <summary>
    /// 使用自旋锁
    /// </summary>
    public class MySpinLock
    {
        private int _count = 0;
        private readonly SpinLockTest _spinLockTest = new SpinLockTest();
        public void Do()
        {
            Task.Run(() =>
            {
                //定义100个线程,同时对_count++
                for (int i = 0; i < 100; i++)
                {
                    var th = new Thread(() =>
                    {
                        _spinLockTest.Enter();
                        //一次只有一个线程能访问当前代码块
                        _count++;                        
                        _spinLockTest.Leave();
                    });
                    th.Start();
                }
            });
            //等待所有线程调用完成
            Thread.Sleep(10 * 1000);
            Console.WriteLine($"_count={_count}");
        }
    }

这个自旋锁实际上很简单
当两个线程同时调用SpinLockTest.Enter()时,那么Interlocked.Exchange(ref _isInUse, 1)会确保一个线程将_isInUse由0变成1,并发现其初始值为0,所以跳出循环并继续执行后续逻辑;另一个线程将_isInUse由1变成1,初始值为1,所以会一直进行“自旋”,等待直到_isInUse的值再次变为0。
当第一个线程执行完需要同步的代码块后调用_spinLockTest.Leave()方法,将_isInUse的值再次变为0,此时“自旋”的线程进入要同步的代码块。

“自旋锁”虽然简单,但是也有以下的缺点:

  • 因为一直“自旋”,所以会浪费大量的CPU时间,因此该方法仅适用于执行的非常快的代码块
  • 如果占用锁的线程优先级低于想要获取锁的线程,那么这里会出现活锁的情况(因为占用锁的线程根本没有机会执行)这部分为线程优先级的知识,这里不过多的解释

Interlocked Anything模式

我们在使用Interlocked方法的时候,会发现它有好多常用的操作都不支持,比如MinMaxAndOrXor等,但是Interlocked提供了一个方法Interlocked.CompareExchange,可以进行两个数的比较。我们所说的 Interlocked Anything模式 就是运用了该方法扩展出来的一种写法。

Interlocked Anything模式有点类似数据库的乐观并发模式,下面我们使用该模式来实现一个原子的Max方法。

乐观锁的特点:在提交数据更新之前,每个事务会先检查在该事务读取数据之后,有没有其他事务又修改了该数据。如果其他事务更新了该数据,正在提交的事务将会进行回滚。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
        /// <summary>
        /// 获取两个数的最大值
        /// </summary>
        public static int Max(ref int target, int value)
        {
            int currentVal = target;
            int startVal;
            int desiredVal;
            do
            {
                //记录本次循环开始的起始值
                startVal = currentVal;

                //可以用startVal执行你希望的任何操作,本例以获取最大值为例
                desiredVal = Math.Max(startVal, value);

                //返回target的原始值(修改前的值,因为执行代码期间该值可能会被其他线程修改)
                currentVal = Interlocked.CompareExchange(ref target, desiredVal, startVal);

                //如果target的值被修改,那么currentVal的值就不会等于startVal值
                //重新循环一次用最新值进行计算
            } while (startVal != currentVal);

            return desiredVal;
        }

结合前面所说的乐观锁的特点,现在来分析以下这段代码。

  1. 进入方法后currentVal值被初始化为target的值
  2. 循环开始后,初始化本次操作的起始值startVal
  3. 执行需要的代码操作(本例以获取两个数的最大值为例)
  4. 操作完成后,调用Interlocked.CompareExchange方法比较targetstartVal是否相等。如果相等,则将desiredVal赋值给target(也就是上一步的操作结果);如果不相等,则说明在方法执行期间,target发生了改变,这时将最新的值赋值给currentVal
  5. 如果上一步的targetstartVal相等,那么Interlocked.CompareExchange方法的返回值currentVal,也将和前面两个值相等,此时跳出循环,方法执行完成。如果targetstartVal不相等,则用最新的target值(即currentVal)重新进行循环。

《CLR via C#》的作者在书中写道,自己在大量的代码中都运用了这个模式,还专门写了一个泛型的Morph来封装了这个模式。如下🔻

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
        public delegate int Morpher<TResult, TArgument>(int startValue, TArgument argument, out TResult morphResult);
        public static TResult Morph<TResult, TArgument>(ref int target, TArgument argument, Morpher<TResult, TArgument> morpher)
        {
            TResult morphResult;
            int currentVal = target, startVal, desiredVal;
            do
            {
                startVal = currentVal;
                desiredVal = morpher(startVal, argument, out morphResult);
                currentVal = Interlocked.CompareExchange(ref target, desiredVal, startVal);
            } while (startVal != currentVal);

            return morphResult;
        }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
        static void Main(string[] args)
        {
            //调用测试方法,同样已Max为例
            int a = 2;
            int b = 3;
            int a1 = a;
            int c = InterlockedAnythingTest.Morph<int, int>(ref a, b, (int x, int y, out int z) =>
            {
                z = 999;
                return Math.Max(a1, b); ;
            });
            Console.WriteLine($"max value={a},return value={c}");
            Console.ReadLine();
        }

            //↓↓↓代码运行结果如下↓↓↓            

            //max value=3,return value=999

参考资料:
《CLR via C#》
volatile(C# 参考)(MSDN)
Interlocked 类(MSDN)

本文未完,后面还会持续更新,学一点总结一点🐶🐶