在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
文章原始出处 http://xxinside.blogbus.com/logs/47523285.html 预备知识:C#线程同步(1)- 临界区&Lock,C#线程同步(2)- 临界区&Monitor,C#线程同步(3)- 互斥量 Mutex WaitHandle一家 在前一篇我们已经提到过Mutex和本篇的主角们直接或间接继承自WaitHandle:
WaitHandle提供了若干用于同步的方法。上一篇关于Mutex的blog中已经讲到一个WaitOne(),这是一个实例方法。除此之外,WaitHandle另有3个用于同步的静态方法:
线程相关性(Thread Affinity ) EventWaitHandle和Mutex两者虽然是派生自同一父类,但有着完全不同的线程相关性:
Mutex与Event 我们在Mutex一篇中没有具体提到Mutex是否能发送信号,只是简单说Mutex不太适合有相互消息通知的同步,它仅有的一些同步方法是来自其父类的静态方法。那么现在我们可以仔细来看看Mutex到底能不能用于关于Monitor那篇提到的生产者、消费者和糖罐的场景。 回过头来仔细查看Mutex的所有方法,除了一个我们已经提到的WaitHandle上的静态方法SingnalAndWait(toSingnal, toWaitOn),我们找不到任何“属于Mutex自己”的、用于发送信号的方法。退而求其次吧,我们就来看看这个静态方法是否可以让Mutex具有通知的能力。 如果toSignal是一个Mutex,那么收到“信号”就等效于ReleaseMutex()。而由于Mutex的线程相关性,只有拥有当前Mutex的线程才能够发送这个信号(ReleaseMutex),否则会引发异常。也就是说如果要用这个方法来通知其它线程同步,Mutex只能自己发给自己。与之相反,如果第二个参数toWaitOn也是个Mutex,那么这个Mutex不能是自己。因为前篇已经讲过,Mutex的拥有者可以多次WaitOne()而不阻塞,这里也是一样。所以如果Mutex一定要使用这个方法,准确的说是只是成为这个方法的参数,那只能是WaitHandle.SignalAndWait(它自己,另一个Mutex)。 试想,如果有人试图只使用Mutex来进行同步通知。假设生产者线程通过Mutex上的WaitOne()获得了mutexA的拥有权,并且在生产完毕后调用了SingnalAndWait(mutexA,mutexB),通知由于当前mutexA而阻塞的消费者线程,并且将自己阻塞在mutexB上。那么被唤醒的消费者线程获得MutexA的拥有权吃掉糖后,也只能调用SingnalAndWait(mutexA,mutexB)释放它获得的mutexA且阻塞于MutexB。问题来了,此时的生产者是阻塞在mutexB上……也许,我们可以设计一段“精巧”的代码,让生产者和消费者一会儿阻塞在mutexA,一会儿阻塞在mutexB上……我不想花费这个力气去想了,你可以试试看:)。不管有没有这样的可能,Mutex很明显就不适用于通知的场景。 EventWaitHandle的独门秘笈 正因为Mutex没有很好地继承父辈的衣钵,EventWaitHandle以及它的儿子/女儿们便来到了这个世界上。 EventWaitHandle、AutoResetEvent、ManualResetEvent名字里都有一个“Event”,不过这跟.net的本身的事件机制完全没有关系,它不涉及任何委托或事件处理程序。相对于我们之前碰到的Monitor和Mutex需要线程去争夺“锁”而言,我们可以把它们理解为一些需要线程等待的“事件”。线程通过等待这些事件的“发生”,把自己阻塞起来。一旦“事件”完成,被阻塞的线程在收到信号后就可以继续工作。 为了配合WaitHandle上的3个静态方法SingnalAndWait()/WailAny()/WaitAll(),EventWaitHandle提供了自己独有的,使“Event”完成和重新开始的方法:
来看看EventWaitHandle众多构造函数中最简单的一个:
好了,现在我们可以清楚的知道Set()在什么时候分别类似于Monitor.Pulse()/PulseAll()了:
来看看EventWaitHandle的其它构造函数:
好啦,都差不多了,可以写一个例子试试了。让我们回到Monitor一篇中提到的生产者和消费者场景,让我们看看EventWaitHandle能不能完成它兄弟Mutex没有能完成的事业。不过,即便有强大通信能力的EventWaitHandle出马,也避免不要使用lock/monitor或是Mutex。原因很简单,糖罐是一个互斥资源,必须被互斥地访问。而EventWaitHanldle跟Mutex相反,能通信了但却完全失去了临界区的能力。所以,这个例子其实并不太适合展示EventWaitHandle的通信机制,我只是为了想用同样的例子来比较这些同步机制间的差异。 EventWaitHandle虽然还必须借助lock/Monitor/Mutex来实现这个例子(仅仅是临界区部分),但是它终究有强于Monitor的通信能力,所以让我们来扩展一下这个例子:现在有一个生产者,有多个消费者。
using System; using System.Collections; using System.Linq; using System.Text; using System.Threading; class WaitEventHandleSample:IDisposable { private volatile bool _shouldStop = false; //用于控制线程正常结束的标志 private const int _numberOfConsumer = 5; //消费者的数目 //容器,一个只能容纳一块糖的糖盒子。PS:现在MS已经不推荐使用ArrayList, //支持泛型的List才是应该在程序中使用的,我这里偷懒,不想再去写一个Candy类了。 private ArrayList _candyBox = null; private EventWaitHandle _EvntWtHndlProduced = null; //生产完成的事件,ManualReset,用于通知所有消费者生产完成 private EventWaitHandle[] _EvntWtHndlConsumed = null; //消费完成的事件,AutoReset,每一个消费线程对应一个事件,用于通知生产者有消费动作完成 /// <summary> /// 用于结束Produce()和Consume()在辅助线程中的执行 /// </summary> public void StopThread() { _shouldStop = true; //叫醒阻塞中的消费者,让他们看到线程结束标志 if (_EvntWtHndlProduced != null) { _EvntWtHndlProduced.Set(); }; //叫醒阻塞中的生产者,让他看到线程结束标志 if (_EvntWtHndlConsumed != null) { for (int i = 0; i < _numberOfConsumer; i++) { if (_EvntWtHndlConsumed[i] != null) { _EvntWtHndlConsumed[i].Set(); }; } } } /// <summary> /// 生产者的方法 /// </summary> public void Produce() { if (_candyBox == null) { Console.WriteLine("生产者:糖罐在哪里?!"); } else if (_EvntWtHndlConsumed == null) { Console.WriteLine("生产者:消费者们在哪里?!"); } else if (_EvntWtHndlProduced == null) //这个事件用于唤醒所有消费者,因此象个喇叭 { Console.WriteLine("生产者:喇叭坏啦,没办法通知消费者!"); } else { //逐一检查消费者是否到位 for (int i = 0; i < _numberOfConsumer; ++i) { if (_EvntWtHndlConsumed[i] == null) { Console.WriteLine("生产者:消费者{0}在哪里?!", i); return; } else { //什么也不做 }; }; int numberOfSugarProduced = 0; //本次一共生产了多少颗糖 while (!_shouldStop) { lock (_candyBox) { if (_candyBox.Count < _numberOfConsumer) { numberOfSugarProduced = 0; while (_candyBox.Count < _numberOfConsumer) //一共有多少个消费者就生产多少块糖 { //生产一块糖 _candyBox.Add("A Candy"); ++numberOfSugarProduced; }; Console.WriteLine("生产者:这次生产了{0}块糖,罐里现在一共有{1}块糖!", numberOfSugarProduced, _candyBox.Count); Console.WriteLine("生产者:赶快来吃!!"); } else //容器是满的 { Console.WriteLine("生产者:糖罐是满的!"); }; }; //通知消费者生产已完成 _EvntWtHndlProduced.Set(); //只要有消费者吃完糖,就开始生产 EventWaitHandle.WaitAny(_EvntWtHndlConsumed); Thread.Sleep(2000); }; Console.WriteLine("生产者:下班啦!"); } } /// <summary> /// 消费者的方法 /// </summary> /// <param name="consumerIndex">消费者序号,用于表明使用哪个_EvntWtHndlConsumed成员</param> public void Consume(object consumerIndex) { int index = (int)consumerIndex; if (_candyBox == null) { Console.WriteLine("消费者{0}:糖罐在哪里?!",index); } else if (_EvntWtHndlProduced == null) { Console.WriteLine("消费者{0}:生产者在哪里?!",index); } else if (_EvntWtHndlConsumed == null || _EvntWtHndlConsumed[index] == null) { Console.WriteLine("消费者{0}:电话坏啦,没办法通知生产者!", index); //由于每个消费者都有一个专属事件通知生产者,因此相当于电话 } else { while (!_shouldStop || _candyBox.Count > 0) //即便看到结束标致也应该把容器中的所有资源处理完毕再退出,否则容器中的资源可能就此丢失。需要指出_candybox.Count是有可能读到脏数据的 { lock (_candyBox) { if (_candyBox.Count > 0) { if (!_shouldStop) { _candyBox.RemoveAt(0); Console.WriteLine("消费者{0}:吃了1颗糖,还剩{1}颗!!", index, _candyBox.Count); Console.WriteLine("消费者{0}:赶快生产!!",index); } else { Console.WriteLine("消费者{0}:我来把剩下的糖都吃了!",index); while (_candyBox.Count > 0) { _candyBox.RemoveAt(0); Console.WriteLine("消费者{0}:吃了1颗糖,还剩{1}颗!!", index, _candyBox.Count); } break; } } else { Console.WriteLine("消费者{0}:糖罐是空的!",index); Console.WriteLine("消费者{0}:赶快生产!!",index); } } WaitHandle.SignalAndWait(_EvntWtHndlConsumed[index], _EvntWtHndlProduced); Thread.Sleep((index+1)*1500); } } Console.WriteLine("消费者{0}:都吃光啦,下次再吃!",index); } /// <summary> /// 初始化所需的各EventWaitHandle和糖罐等 /// </summary> public void Initialize() { if (_candyBox == null) { _candyBox = new ArrayList(_numberOfConsumer); //按有多少消费者最多生产多少糖的标准初始化糖罐大小 } else { //什么也不做 } if (_EvntWtHndlProduced == null) { _EvntWtHndlProduced = new EventWaitHandle(false, EventResetMode.ManualReset); } else { //什么也不做 } if (_EvntWtHndlConsumed == null) { _EvntWtHndlConsumed = new EventWaitHandle[_numberOfConsumer]; for (int i = 0; i < _numberOfConsumer; ++i) { _EvntWtHndlConsumed[i] = new EventWaitHandle(false, EventResetMode.AutoReset); } } else { //什么也不做 } } static void Main(string[] args) { WaitEventHandleSample ss = new WaitEventHandleSample(); try { ss.Initialize(); //Start threads. Console.WriteLine("开始启动线程,输入回车终止生产者和消费者的工作……\r\n******************************************"); Thread thdProduce = new Thread(new ThreadStart(ss.Produce)); thdProduce.Start(); Thread[] thdConsume = new Thread[_numberOfConsumer]; for (int i = 0; i < _numberOfConsumer; ++i) { thdConsume[i] = new Thread(new ParameterizedThreadStart(ss.Consume)); thdConsume[i].Start(i); } Console.ReadLine(); //通过IO阻塞主线程,等待辅助线程演示直到收到一个回车 ss.StopThread(); //正常且优雅的结束生产者和消费者线程 thdProduce.Join(); for (int i = 0; i < _numberOfConsumer; ++i) { thdConsume[i].Join(); } Console.WriteLine("******************************************\r\n输入回车结束!"); Console.ReadLine(); } finally { ss.Dispose(); ss = null; }; } #region IDisposable Members public void Dispose() { if (_candyBox != null) { _candyBox.Clear(); _candyBox = null; } else { //什么也不做 } if (_EvntWtHndlProduced != null) { _EvntWtHndlProduced.Set(); _EvntWtHndlProduced.Close(); _EvntWtHndlProduced = null; } else { //什么也不做 } if (_EvntWtHndlConsumed != null) { for (int i = 0; i < _numberOfConsumer; ++i) { if (_EvntWtHndlConsumed[i] != null) { _EvntWtHndlConsumed[i].Set(); _EvntWtHndlConsumed[i].Close(); _EvntWtHndlConsumed[i] = null; }; } _EvntWtHndlConsumed = null; } else { //什么也不做 }; } #endregion } Produce()和Consum()中加入的Sleep代码仅仅是为了让线程更为随机的被调度,这样我们可以更容易观察到线程乱序执行的情况。另外,如果是一个需要跨进程同步的程序,那么你也可以用Mutext替换lock实现临界区。下面是某次执行的输出情况,你的结果当然会跟它不同(空行位置是我输入回车终止线程的时机): 开始启动线程,输入回车终止生产者和消费者的工作…… ****************************************** 生产者:这次生产了5块糖,罐里现在一共有5块糖! 生产者:赶快来吃!! 消费者0:吃了1颗糖,还剩4颗!! 消费者0:赶快生产!! 消费者1:吃了1颗糖,还剩3颗!! 消费者1:赶快生产!! 消费者2:吃了1颗糖,还剩2颗!! 消费者2:赶快生产!! 消费者3:吃了1颗糖,还剩1颗!! 消费者3:赶快生产!! 消费者4:吃了1颗糖,还剩0颗!! 消费者4:赶快生产!! 消费者0:糖罐是空的! 消费者0:赶快生产!! 生产者:这次生产了5块糖,罐里现在一共有5块糖! 生产者:赶快来吃!! 消费者1:吃了1颗糖,还剩4颗!! 消费者1:赶快生产!! 消费者0:吃了1颗糖,还剩3颗!! 消费者0:赶快生产!! 生产者:这次生产了2块糖,罐里现在一共有5块糖! 生产者:赶快来吃!! 消费者0:吃了1颗糖,还剩4颗!! 消费者0:赶快生产!! 消费者2:吃了1颗糖,还剩3颗!! 消费者2:赶快生产!! 消费者1:吃了1颗糖,还剩2颗!! 消费者1:赶快生产!! 生产者:这次生产了3块糖,罐里现在一共有5块糖! 生产者:赶快来吃!! 消费者0:吃了1颗糖,还剩4颗!! 消费者0:赶快生产!! 消费者3:吃了1颗糖,还剩3颗!! 消费者3:赶快生产!! 消费者4:吃了1颗糖,还剩2颗!! 消费者4:赶快生产!! 消费者0:吃了1颗糖,还剩1颗!! 消费者0:赶快生产!! 生产者:下班啦! 消费者1:我来把剩下的糖都吃了! 消费者1:吃了1颗糖,还剩0颗!! 消费者1:都吃光啦,下次再吃! 消费者0:都吃光啦,下次再吃! 消费者2:都吃光啦,下次再吃! 消费者3:都吃光啦,下次再吃! 消费者4:都吃光啦,下次再吃! ****************************************** 输入回车结束! AutoResetEvent & ManuResetEvent 到此为止我们还没有提到过EventWaitHandle的这两个儿子,不过这就是一两句话的事:
好了,讲这么都就够了,这两个子类无非是为了方便使用而存在的。不过请记得这两个子类永远是局部/Local的,并不能象它们的父类一样用于进程间的通信。 还是给出一个简单的例子,这个例子只跟通知有关,不再涉及临界资源。假设一个跑步比赛的场景,我们用一个ManualResetEvent表示比赛,然后为每个运动员配备一个AutoResetEvent用于通知到起跑线或者是达终点。首先运动员需要到起跑线上就位,这个过程我们让运动员到达起跑线后调用AutoResetEvent上的Reset()发出信号,同时使用ManualResetEvent上的WaitOne()阻塞自己准备起跑。另一方面,我们在比赛线程上先用WaitHandle.WaitAll(AutoResetEvent[])等待所有运动员到位。WaitAll()完成后,使用ManualResetEvent上的Reset()发令开始比赛,再使用WaitHandle.WaitAny(AutoResetEvent[])等待第一个运动员冲线。而每个运动员到终点后会再次调用AutoResetEvent.Reset()表示到达。 using System; using System.Threading; using System.Linq; using System.Text; class Runner : IDisposable { //用于让所有运动员到达起跑线准备起跑 private ManualResetEvent _mnlRstEvntStartLine = null; //用于运动员到达终点时发出信号 private static AutoResetEvent[] _mnlRstEvntRunner = null; private const int _numberOfRunner = 8; private Random _rnd = new Random(); /// <summary> /// 构造函数 /// </summary> public Runner() { _mnlRstEvntStartLine = new ManualResetEvent(false); _mnlRstEvntRunner = new AutoResetEvent[_numberOfRunner]; //请运动员就位 for (int i = 0; i < _numberOfRunner; ++i) { _mnlRstEvntRunner[i] = new AutoResetEvent(false); } } /// <summary> /// 运动员方法 /// </summary> /// <param name="id">运动员序号</param> public void Run(object id) { int index = (int)id; //等待信号准备起跑 Console.WriteLine("{0}号运动员就位。", index); _mnlRstEvntRunner[index].Set(); //等待发令 _mnlRstEvntStartLine.WaitOne(); //随机睡眠,表示不同运动员跑的快慢 Thread.Sleep(_rnd.Next(2000)); Console.WriteLine("{0}号运动员到达终点!", index); _mnlRstEvntRunner[index].Set(); } /// <summary> /// 比赛开始 /// </summary> public void Start() { Thread[] runners = new Thread[_numberOfRunner]; //请运动员就位 for (int i = 0; i < _numberOfRunner; ++i) { runners[i] = new Thread(Run); runners[i].Start(i); } //等待所有运动员就位 WaitHandle.WaitAll(_mnlRstEvntRunner); //发令起跑 Console.WriteLine("***********************起跑!!!*************************"); _mnlRstEvntStartLine.Set(); //看看谁先到达终点 int index = WaitHandle.WaitAny(_mnlRstEvntRunner); //等待所有运动员到达终点 //请运动员就位 for (int i = 0; i < _numberOfRunner; ++i) { runners[i].Join(); } Console.WriteLine("**********************************************************"); Console.WriteLine("{0}号运动员夺得冠军!", index); Console.WriteLine("***********************比赛结束***************************"); } static void Main() { Runner ss = new Runner(); try { ss.Start(); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { ss.Dispose(); ss = null; Console.WriteLine("输入回车结束"); Console.ReadLine(); } } #region IDisposable Members public void Dispose() { if (_mnlRstEvntStartLine != null) { _mnlRstEvntStartLine.Set(); _mnlRstEvntStartLine.Close(); } else { //do nothing } if (_mnlRstEvntRunner != null) |
请发表评论