基元是指可以在代码中使用的最简单的构造。
有两种基元构造:用户模式构造和内核模式构造。应尽量使用用户模式构造,它们的速度要比内核构造快很多,因为他们使用了特殊的CPU指令来协调线程(意味着协调是在硬件中发生,所以会快很多)。
这两种构造都有自身的特点
本文基于.NET4.5框架
用户模式构造
易变构造
volatile关键字指示一个字段可以由多个同时执行的线程修改,通过volatile关键字,可以保证变量在执行上有着原子性的读、写操作。
volatile关键字可应用于以下类型的字段:
- 引用类型。
- 指针类型(在不安全的上下文中)。 请注意,虽然指针本身可以是可变的,但是它指向的对象不能是可变的。 换句话说,不能声明“指向可变对象的指针”。
- 简单类型,如 sbyte、byte、short、ushort、int、uint、char、float 和 bool。
- 具有以下基本类型之一的 enum 类型:byte、sbyte、short、ushort、int 或 uint。
- 已知为引用类型的泛型类型参数。
- IntPtr 和 UIntPtr。
这些描述读起来比较绕口,那么我们通过一个示例来演示以下volatile的基本用法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
/// <summary>
/// 定义测试类
/// </summary>
public class VolatileTest
{
private bool _isStop = false;
public void Do()
{
Console.WriteLine("任务开始");
var th = new Thread(Thread1);
th.Start();
Thread.Sleep(5000);
_isStop = true;
Console.WriteLine("等待任务完成");
th.Join();
Console.ReadLine();
}
public void Thread1()
{
int x = 0;
while (!_isStop)
{
x = x + 1;
}
Console.WriteLine($"任务执行完成,计数:{x}");
}
}
|
1
2
3
4
5
|
static void Main(string[] args)
{
//调用测试方法
new VolatileTest().Do();
}
|
先来猜猜上面这段代码的执行结果是怎样的😓😓
代码在编译时,根据不同的编译配置,可能会出现不同的结果。
-
在Debug模式下,编译器并没有对代码进行优化,所以程序会得到正常的执行结果🔻

至于这里的计数为什么会是负数,这个以后有机会在单独写文章分析
-
在Release模式下,编译器默认打开了代码优化功能。编译VolatileTest文件时,编译器发现_isStop在Thread1方法中并不会发生改变,并且对_isStop的检查只会在循环前发生一次,不会每次循环时都检查。因此会生成一段一直累加x的代码出来。因此,程序运行时,得到了以下结果🔻

接下来通过IL代码来对比一下两者的不同(左边为Debug版本,右边为Release版本)

在两个版本中,主要有两点不同:
Debug版本中,在初始化变量时,定义了一个bool类型的变量;Release版本没有
Debug版本中,在循环结束前,对定义的bool变量和_isStop进行了计算,然后判断是否需要继续循环;Release版本中直接获取并校验了_isStop的值,结果为false,则无限循环。
要解决这个问题其实很简单,就是使用前面所说的Volatile关键字定义变量即可。Volatile能保证变量的原子操作,因此用Volatile定义的变量,编译器是不会进行之前的优化操作。
所以重新修改_isStop变量的定义即可修正前面的问题。
1
|
private volatile bool _isStop = false;
|
另外,System.Threading.Volatile类还提供了以下两个静态方法来进行变量的读写🔻
1
2
|
Volatile.Read(ref _isStop);
Volatile.Write(ref _isStop,true);
|
因此通过如下代码也可以修正前面的问题:
1
2
3
4
5
6
7
8
9
10
|
public void Thread1()
{
int x = 0;
//这里使用Volatile.Read来读取最新的值
while (!Volatile.Read(ref _isStop))
{
x = x + 1;
}
Console.WriteLine($"任务执行完成,计数:{x}");
}
|
如果对上面的概念还是不太理解,那么可以再参考下书中的这样一段定义🔻
当线程通过共享内存相互通信时,调用Volatile.Write来写入最后一个值,调用Volatile.Read来读取第一个值
互锁构造
System.Threading.Interlocked类中的每个方法都为多个线程共享的变量提供原子操作。
调用某个Interlocked方法之前的任何变量写入都在这个Interlocked方法调用之前执行;而这个调用之后的任何变量读取都在这个调用之后读取。
volatile只有Read、Write两个方法提供原子性的操作,而Interlocked类中的所有方法都执行一次原子性的读取和写入
该类提供的方法比较多,具体可以参考MSDN文档
依然很绕口吧🙇♂️🙇♂️那继续用代码来理解。
该示例使用HttpClient对指定的服务器请求,并显示请求返回的字节数🔻
1
2
3
4
5
6
7
8
9
|
/// <summary>
/// 服务器请求状态
/// </summary>
public enum CoordinationStatus
{
AllDone,
Timeout,
Cancel
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
/// <summary>
/// 定义测试类
/// </summary>
public class InterlockedTest
{
//这个辅助类用于协调所有异步操作
private readonly AsyncCoordinator _ac = new AsyncCoordinator();
//这是想要查询的web服务器及其响应(异常或int)的集合
//注意:多个线程访问该字典不需要以同步方式进行
//因为构造后键就是只读的
private readonly Dictionary<String, Object> _remoteServers = new Dictionary<string, object> {
{"https://www.microsoft.com" ,null},
{"https://www.bing.com",null},
{"https://www.google.com",null}
};
public void Do(int timeout = Timeout.Infinite)
{
var httpClient = new HttpClient();
foreach (var server in _remoteServers.Keys)
{
_ac.AboutToBegin(1);//这里也可以修改为下面注释中的方式
httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));
}
// //一次性向AboutToBegin方法传递所有的请求数量
// _ac.AboutToBegin(_remoteServers.Count);
// foreach (var server in _remoteServers.Keys)
// {
// httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));
// }
//告诉AsyncCoordinator所有操作都已发起,并在所有操作完成
//调用Cancel或者发生超时的时候调用AllDone
_ac.AllBegun(AllDone, timeout);
}
private void ComputeResult(string server, Task<Byte[]> task)
{
object result;
if (task.Exception != null)
{
result = task.Exception.InnerException;
}
else
{
//线程池线程处理I/O完成
//在此添加自己的计算密集型算法
//本例只是返回长度
result = task.Result.Length;
}
//保存结果(exception/sum),指出1个操作完成
_remoteServers[server] = result;
_ac.JustEnded();
}
//调用这个方法指出结果已无关紧要
public void Cancel()
{
_ac.Cancel();
}
//所有Web服务器都响应、调用了Cancel或者发生超时,就调用该方法
private void AllDone(CoordinationStatus status)
{
switch (status)
{
case CoordinationStatus.Cancel:
Console.WriteLine("请求取消");
break;
case CoordinationStatus.Timeout:
Console.WriteLine("请求超时");
break;
case CoordinationStatus.AllDone:
Console.WriteLine("请求完成,结果:");
foreach (var server in _remoteServers)
{
Console.WriteLine($"{server.Key}");
object result = server.Value;
if (result is Exception)
{
Console.WriteLine($"请求发生异常:{result.GetType().Name}");
}
else
{
Console.WriteLine($"返回字节数:{result}");
}
}
break;
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
/// <summary>
/// 这个辅助类用于协调所有异步操作
/// </summary>
public class AsyncCoordinator
{
//AllBegun内部调用JustEnded来递减它
private int _opCount = 1;
//0 = false 1= true
private int _statusReported = 0;
private Action<CoordinationStatus> _callback;
private Timer _timer;
//该方法必须在发起一个操作之前调用
public void AboutToBegin(int opsToAdd = 1)
{
Interlocked.Add(ref _opCount, opsToAdd);
}
//该方法必须在处理好一个操作的结果之后调用
public void JustEnded()
{
if (Interlocked.Decrement(ref _opCount) == 0)
{
ReportStatus(CoordinationStatus.AllDone);
}
}
//该方法必须在发起所有操作之后调用
public void AllBegun(Action<CoordinationStatus> callback, int timeout = Timeout.Infinite)
{
_callback = callback;
if (timeout != Timeout.Infinite)
{
_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
}
JustEnded();
}
private void TimeExpired(object o)
{
ReportStatus(CoordinationStatus.Timeout);
}
public void Cancel()
{
ReportStatus(CoordinationStatus.Cancel);
}
private void ReportStatus(CoordinationStatus status)
{
//如果状态从未报告过,就报告它;否则忽略它
if (Interlocked.Exchange(ref _statusReported, 1) == 0)
{
_callback(status);
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
static void Main(string[] args)
{
//调用测试方法
new InterlockedTest().Do();
Console.ReadLine();
}
//↓↓↓代码运行结果如下↓↓↓
// 请求完成,结果:
// https://www.microsoft.com
// 返回字节数:185796
// https://www.bing.com
// 返回字节数:114613
// https://www.google.com
// 请求发生异常:HttpRequestException
|
接下来说明一下这个示例🔻
InterlockedTest类主要负责定义一个多线程访问网站的字典,并且对字典中的网站进行访问。任务开始前会向AsyncCoordinator类发送本次需要请求的操作数量,所有任务创建完成后调用AllBegun方法向辅助类报告任务已创建完成,并且绑定一个回调方法AllDone(该方法的作用是打印所有请求的结果)。
AsyncCoordinator类用于协调所有异步操作,之所以单独提取出来,是因为这个类的功能可以复用。该类的两个公共方法AboutToBegin和JustEnded分别用于对内部维护的异步任务数(_opCount)进行原子的加、减操作(通过Interlocked.Add、Interlocked.Decrement来实现)。当任务数为0时,调用ReportStatus方法向InterlockedTest报告执行结果,说明所有异步任务已完成。
这里特别说明以下几个点🔻
_opCount变量的初始值为1(而非0),这一点很重要。假定初始值为0,当某一个请求到完成又很迅速时(下一个任务还没调用AboutToBegin的时候当前任务就已经完成并且调用了JustEnded方法),那么这时就会导致AllDone被提前执行。因此初始化时将_opCount的值设置为1,等所有任务添加完成后,调用AllBegun方法时再通过JustEnded将那个1减掉。
_statusReported变量实际上是作为一个Boolean来使用的,因为Interlocked类不提供Boolean类型的原子操作,所以这里我们使用int类型的变量来代替。
- 示例中的
Cancel方法没有做展示,想要看效果的同学可以修改代码自己调用。
- 示例中的
Timeout方法没有做展示,因为默认传递的参数是不超时,想要看效果的同学可以修改代码自己调用。
实现简单的自旋锁
前面所说的Interlocked的方法很好用,但是主要用于操作Int32类型,如果需要原子性的操作类中的一组字段,Interlocked很明显是做不到的。所以这时我们可以构造一个让线程自旋模块,来实现代码同步。
所谓自旋锁(spin lock),其实就是在多线程的处理中,一个线程暂时“原地打转”,以免跑去跟另外一个线程竞争资源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
/// <summary>
/// 定义一个简单的自旋锁
/// </summary>
public class SpinLockTest
{
private int _isInUse = 0;
public void Enter()
{
while (true)
{
if (Interlocked.Exchange(ref _isInUse, 1) == 0)
{
return;
}
//如果有线程正在调用,那么后来的线程就一直在这里“自旋”
}
}
public void Leave()
{
Interlocked.Exchange(ref _isInUse, 0);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
/// <summary>
/// 使用自旋锁
/// </summary>
public class MySpinLock
{
private int _count = 0;
private readonly SpinLockTest _spinLockTest = new SpinLockTest();
public void Do()
{
Task.Run(() =>
{
//定义100个线程,同时对_count++
for (int i = 0; i < 100; i++)
{
var th = new Thread(() =>
{
_spinLockTest.Enter();
//一次只有一个线程能访问当前代码块
_count++;
_spinLockTest.Leave();
});
th.Start();
}
});
//等待所有线程调用完成
Thread.Sleep(10 * 1000);
Console.WriteLine($"_count={_count}");
}
}
|
这个自旋锁实际上很简单
当两个线程同时调用SpinLockTest.Enter()时,那么Interlocked.Exchange(ref _isInUse, 1)会确保一个线程将_isInUse由0变成1,并发现其初始值为0,所以跳出循环并继续执行后续逻辑;另一个线程将_isInUse由1变成1,初始值为1,所以会一直进行“自旋”,等待直到_isInUse的值再次变为0。
当第一个线程执行完需要同步的代码块后调用_spinLockTest.Leave()方法,将_isInUse的值再次变为0,此时“自旋”的线程进入要同步的代码块。
“自旋锁”虽然简单,但是也有以下的缺点:
- 因为一直“自旋”,所以会浪费大量的CPU时间,因此该方法仅适用于执行的非常快的代码块。
- 如果占用锁的线程优先级低于想要获取锁的线程,那么这里会出现活锁的情况(因为占用锁的线程根本没有机会执行)这部分为线程优先级的知识,这里不过多的解释。
Interlocked Anything模式
我们在使用Interlocked方法的时候,会发现它有好多常用的操作都不支持,比如Min、Max、And、Or、Xor等,但是Interlocked提供了一个方法Interlocked.CompareExchange,可以进行两个数的比较。我们所说的 Interlocked Anything模式 就是运用了该方法扩展出来的一种写法。
Interlocked Anything模式有点类似数据库的乐观并发模式,下面我们使用该模式来实现一个原子的Max方法。
乐观锁的特点:在提交数据更新之前,每个事务会先检查在该事务读取数据之后,有没有其他事务又修改了该数据。如果其他事务更新了该数据,正在提交的事务将会进行回滚。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/// <summary>
/// 获取两个数的最大值
/// </summary>
public static int Max(ref int target, int value)
{
int currentVal = target;
int startVal;
int desiredVal;
do
{
//记录本次循环开始的起始值
startVal = currentVal;
//可以用startVal执行你希望的任何操作,本例以获取最大值为例
desiredVal = Math.Max(startVal, value);
//返回target的原始值(修改前的值,因为执行代码期间该值可能会被其他线程修改)
currentVal = Interlocked.CompareExchange(ref target, desiredVal, startVal);
//如果target的值被修改,那么currentVal的值就不会等于startVal值
//重新循环一次用最新值进行计算
} while (startVal != currentVal);
return desiredVal;
}
|
结合前面所说的乐观锁的特点,现在来分析以下这段代码。
- 进入方法后
currentVal值被初始化为target的值
- 循环开始后,初始化本次操作的起始值
startVal
- 执行需要的代码操作(本例以获取两个数的最大值为例)
- 操作完成后,调用
Interlocked.CompareExchange方法比较target和startVal是否相等。如果相等,则将desiredVal赋值给target(也就是上一步的操作结果);如果不相等,则说明在方法执行期间,target发生了改变,这时将最新的值赋值给currentVal
- 如果上一步的
target和startVal相等,那么Interlocked.CompareExchange方法的返回值currentVal,也将和前面两个值相等,此时跳出循环,方法执行完成。如果target和startVal不相等,则用最新的target值(即currentVal)重新进行循环。
《CLR via C#》的作者在书中写道,自己在大量的代码中都运用了这个模式,还专门写了一个泛型的Morph来封装了这个模式。如下🔻
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public delegate int Morpher<TResult, TArgument>(int startValue, TArgument argument, out TResult morphResult);
public static TResult Morph<TResult, TArgument>(ref int target, TArgument argument, Morpher<TResult, TArgument> morpher)
{
TResult morphResult;
int currentVal = target, startVal, desiredVal;
do
{
startVal = currentVal;
desiredVal = morpher(startVal, argument, out morphResult);
currentVal = Interlocked.CompareExchange(ref target, desiredVal, startVal);
} while (startVal != currentVal);
return morphResult;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
static void Main(string[] args)
{
//调用测试方法,同样已Max为例
int a = 2;
int b = 3;
int a1 = a;
int c = InterlockedAnythingTest.Morph<int, int>(ref a, b, (int x, int y, out int z) =>
{
z = 999;
return Math.Max(a1, b); ;
});
Console.WriteLine($"max value={a},return value={c}");
Console.ReadLine();
}
//↓↓↓代码运行结果如下↓↓↓
//max value=3,return value=999
|
参考资料:
《CLR via C#》
volatile(C# 参考)(MSDN)
Interlocked 类(MSDN)
本文未完,后面还会持续更新,学一点总结一点🐶🐶