引言
本文主要从线程的基础用法,CLR线程池当中工作者线程与I/O线程的开发,并行操作PLINQ等多个方面介绍多线程的开发。 其中委托的BeginInvoke方法以及回调函数最为常用。 而 I/O线程可能容易遭到大家的忽略,其实在开发多线程系统,更应该多留意I/O线程的操作。特别是在ASP.NET开发当中,可能更多人只会留意在客户端使用Ajax或者在服务器端使用UpdatePanel。其实合理使用I/O线程在通讯项目或文件下载时,能尽量降低IIS的压力。 并行编程是Framework4.0中极力推广的异步操作方式,更值得更深入地学习。 希望本篇文章能对各位的学习研究有所帮助,当中有所错漏的地方敬请点评。
目录
一、线程的定义
二、线程的基础知识
三、以ThreadStart方式实现多线程
四、CLR线程池的工作者线程
五、CLR线程池的I/O线程
六、异步 SqlCommand
七、并行编程与PLINQ
八、计时器与锁
五、CLR线程池的I/O线程
在前一节所介绍的线程都属于CLR线程池的工作者线程,这一节开始为大家介绍一下CLR线程池的I/O线程
I/O 线程是.NET专为访问外部资源所设置的一种线程,因为访问外部资源常常要受到外界因素的影响,为了防止让主线程受影响而长期处于阻塞状态,.NET为多个I/O操作都建立起了异步方法,例如:FileStream、TCP/IP、WebRequest、WebService等等,而且每个异步方法的使用方式都非常类似,都是以BeginXXX为开始,以EndXXX结束,下面为大家一一解说。
5.1 异步读写 FileStream
需要在 FileStream 异步调用 I/O线程,必须使用以下构造函数建立 FileStream 对象,并把useAsync设置为 true。
FileStream stream = new FileStream ( string path, FileMode mode, FileAccess access, FileShare share, int bufferSize,bool useAsync ) ;
其中 path 是文件的相对路径或绝对路径; mode 确定如何打开或创建文件; access 确定访问文件的方式; share 确定文件如何进程共享; bufferSize 是代表缓冲区大小,一般默认最小值为8,在启动异步读取或写入时,文件大小一般大于缓冲大小; userAsync代表是否启动异步I/O线程。
注意:当使用 BeginRead 和 BeginWrite 方法在执行大量读或写时效果更好,但对于少量的读/写,这些方法速度可能比同步读取还要慢,因为进行线程间的切换需要大量时间。
5.1.1 异步写入
FileStream中包含BeginWrite、EndWrite 方法可以启动I/O线程进行异步写入。
public override IAsyncResult BeginWrite ( byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject ) public override void EndWrite (IAsyncResult asyncResult )
BeginWrite 返回值为IAsyncResult, 使用方式与委托的BeginInvoke方法相似,最好就是使用回调函数,避免线程阻塞。在最后两个参数中,参数AsyncCallback用于绑定回调函数; 参数Object用于传递外部数据。要注意一点:AsyncCallback所绑定的回调函数必须是带单个 IAsyncResult 参数的无返回值方法。 在例子中,把FileStream作为外部数据传递到回调函数当中,然后在回调函数中利用IAsyncResult.AsyncState获取FileStream对象,最后通过FileStream.EndWrite(IAsyncResult)结束写入。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //把线程池的最大值设置为1000 6 ThreadPool.SetMaxThreads(1000, 1000); 7 ThreadPoolMessage("Start"); 8 9 //新立文件File.sour 10 FileStream stream = new FileStream("File.sour", FileMode.OpenOrCreate, 11 FileAccess.ReadWrite,FileShare.ReadWrite,1024,true); 12 byte[] bytes = new byte[16384]; 13 string message = "An operating-system ThreadId has no fixed relationship........"; 14 bytes = Encoding.Unicode.GetBytes(message); 15 16 //启动异步写入 17 stream.BeginWrite(bytes, 0, (int)bytes.Length,new AsyncCallback(Callback),stream); 18 stream.Flush(); 19 20 Console.ReadKey(); 21 } 22 23 static void Callback(IAsyncResult result) 24 { 25 //显示线程池现状 26 Thread.Sleep(200); 27 ThreadPoolMessage("AsyncCallback"); 28 //结束异步写入 29 FileStream stream = (FileStream)result.AsyncState; 30 stream.EndWrite(result); 31 stream.Close(); 32 } 33 34 //显示线程池现状 35 static void ThreadPoolMessage(string data) 36 { 37 int a, b; 38 ThreadPool.GetAvailableThreads(out a, out b); 39 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+ 40 "WorkerThreads is:{2} CompletionPortThreads is :{3}", 41 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 42 Console.WriteLine(message); 43 } 44 }
由输出结果可以看到,在使用FileStream.BeginWrite方法后,系统将自动启动CLR线程池中I/O线程。
5.1.2 异步读取
FileStream 中包含 BeginRead 与 EndRead 可以异步调用I/O线程进行读取。
public override IAsyncResult BeginRead ( byte[] array,int offset,int numBytes, AsyncCallback userCallback,Object stateObject) public override int EndRead(IAsyncResult asyncResult)
其使用方式与BeginWrite和EndWrite相似,AsyncCallback用于绑定回调函数; Object用于传递外部数据。在回调函数只需要使用IAsyncResut.AsyncState就可获取外部数据。EndWrite 方法会返回从流读取到的字节数量。
首先定义 FileData 类,里面包含FileStream对象,byte[] 数组和长度。然后把FileData对象作为外部数据传到回调函数,在回调函数中,把IAsyncResult.AsyncState强制转换为FileData,然后通过FileStream.EndRead(IAsyncResult)结束读取。最后比较一下长度,若读取到的长度与输入的数据长度不一至,则抛出异常。
1 class Program 2 { 3 public class FileData 4 { 5 public FileStream Stream; 6 public int Length; 7 public byte[] ByteData; 8 } 9 10 static void Main(string[] args) 11 { 12 //把线程池的最大值设置为1000 13 ThreadPool.SetMaxThreads(1000, 1000); 14 ThreadPoolMessage("Start"); 15 ReadFile(); 16 17 Console.ReadKey(); 18 } 19 20 static void ReadFile() 21 { 22 byte[] byteData=new byte[80961024]; 23 FileStream stream = new FileStream("File1.sour", FileMode.OpenOrCreate, 24 FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true); 25 26 //把FileStream对象,byte[]对象,长度等有关数据绑定到FileData对象中,以附带属性方式送到回调函数 27 FileData fileData = new FileData(); 28 fileData.Stream = stream; 29 fileData.Length = (int)stream.Length; 30 fileData.ByteData = byteData; 31 32 //启动异步读取 33 stream.BeginRead(byteData, 0, fileData.Length, new AsyncCallback(Completed), fileData); 34 } 35 36 static void Completed(IAsyncResult result) 37 { 38 ThreadPoolMessage("Completed"); 39 40 //把AsyncResult.AsyncState转换为FileData对象,以FileStream.EndRead完成异步读取 41 FileData fileData = (FileData)result.AsyncState; 42 int length=fileData.Stream.EndRead(result); 43 fileData.Stream.Close(); 44 45 //如果读取到的长度与输入长度不一致,则抛出异常 46 if (length != fileData.Length) 47 throw new Exception("Stream is not complete!"); 48 49 string data=Encoding.ASCII.GetString(fileData.ByteData, 0, fileData.Length); 50 Console.WriteLine(data.Substring(2,22)); 51 } 52 53 //显示线程池现状 54 static void ThreadPoolMessage(string data) 55 { 56 int a, b; 57 ThreadPool.GetAvailableThreads(out a, out b); 58 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+ 59 "WorkerThreads is:{2} CompletionPortThreads is :{3}", 60 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 61 Console.WriteLine(message); 62 } 63 64 }
由输出结果可以看到,在使用FileStream.BeginRead方法后,系统将自动启动CLR线程池中I/O线程。
注意:如果你看到的测试结果正好相反:工作者线程为999,I/O线程为1000,这是因为FileStream的文件容量小于缓冲值1024所致的。此时文件将会一次性读取或写入,而系统将启动工作者线程而非I/O线程来处理回调函数。
5.2 异步操作TCP/IP套接字
在介绍 TCP/IP 套接字前先简单介绍一下 NetworkStream 类,它是用于网络访问的基础数据流。 NetworkStream 提供了好几个方法控制套接字数据的发送与接收, 其中BeginRead、EndRead、BeginWrite、EndWrite 能够实现异步操作,而且异步线程是来自于CLR线程池的I/O线程。
public override int ReadByte () public override int Read (byte[] buffer,int offset, int size)
public override void WriteByte (byte value) public override void Write (byte[] buffer,int offset, int size)
public override IAsyncResult BeginRead (byte [] buffer, int offset, int size, AsyncCallback callback, Object state ) public override int EndRead(IAsyncResult result)
public override IAsyncResult BeginWrite (byte [] buffer, int offset, int size, AsyncCallback callback, Object state ) public override void EndWrite(IAsyncResult result)
若要创建 NetworkStream,必须提供已连接的 Socket。而在.NET中使用TCP/IP套接字不需要直接与Socket打交道,因为.NET把Socket的大部分操作都放在System.Net.TcpListener和System.Net.Sockets.TcpClient里面,这两个类大大地简化了Socket的操作。一般套接字对象Socket包含一个Accept()方法,此方法能产生阻塞来等待客户端的请求,而在TcpListener类里也包含了一个相似的方法 public TcpClient AcceptTcpClient()用于等待客户端的请求。此方法将会返回一个TcpClient 对象,通过 TcpClient 的 public NetworkStream GetStream()方法就能获取NetworkStream对象,控制套接字数据的发送与接收。
下面以一个例子说明异步调用TCP/IP套接字收发数据的过程。
首先在服务器端建立默认地址127.0.0.1用于收发信息,使用此地址与端口500新建TcpListener对象,调用TcpListener.Start 侦听传入的连接请求,再使用一个死循环来监听信息。
在ChatClient类包括有接收信息与发送信息两个功能:当接收到客户端请求时,它会利用 NetworkStream.BeginRead 读取客户端信息,并在回调函数ReceiveAsyncCallback中输出信息内容,若接收到的信息的大小小于1时,它将会抛出一个异常。当信息成功接收后,再使用 NetworkStream.BeginWrite 方法回馈信息到客户端
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //设置CLR线程池最大线程数 6 ThreadPool.SetMaxThreads(1000, 1000); 7 8 //默认地址为127.0.0.1 9 IPAddress ipAddress = IPAddress.Parse("127.0.0.1"); 10 TcpListener tcpListener = new TcpListener(ipAddress, 500); 11 tcpListener.Start(); 12 13 //以一个死循环来实现监听 14 while (true) 15 { //调用一个ChatClient对象来实现监听 16 ChatClient chatClient = new ChatClient(tcpListener.AcceptTcpClient()); 17 } 18 } 19 } 20 21 public class ChatClient 22 { 23 static TcpClient tcpClient; 24 static byte[] byteMessage; 25 static string clientEndPoint; 26 27 public ChatClient(TcpClient tcpClient1) 28 { 29 tcpClient = tcpClient1; 30 byteMessage = new byte[tcpClient.ReceiveBufferSize]; 31 32 //显示客户端信息 33 clientEndPoint = tcpClient.Client.RemoteEndPoint.ToString(); 34 Console.WriteLine("Client's endpoint is " + clientEndPoint); 35 36 //使用NetworkStream.BeginRead异步读取信息 37 NetworkStream networkStream = tcpClient.GetStream(); 38 networkStream.BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize, 39 new AsyncCallback(ReceiveAsyncCallback), null); 40 } 41 42 public void ReceiveAsyncCallback(IAsyncResult iAsyncResult) 43 { 44 //显示CLR线程池状态 45 Thread.Sleep(100); 46 ThreadPoolMessage("\nMessage is receiving"); 47 48 //使用NetworkStream.EndRead结束异步读取 49 NetworkStream networkStreamRead = tcpClient.GetStream(); 50 int length=networkStreamRead.EndRead(iAsyncResult); 51 52 //如果接收到的数据长度少于1则抛出异常 53 if (length < 1) 54 { 55 tcpClient.GetStream().Close(); 56 throw new Exception("Disconnection!"); 57 } 58 59 //显示接收信息 60 string message = Encoding.UTF8.GetString(byteMessage, 0, length); 61 Console.WriteLine("Message:" + message); 62 63 //使用NetworkStream.BeginWrite异步发送信息 64 byte[] sendMessage = Encoding.UTF8.GetBytes("Message is received!"); 65 NetworkStream networkStreamWrite=tcpClient.GetStream(); 66 networkStreamWrite.BeginWrite(sendMessage, 0, sendMessage.Length, 67 new AsyncCallback(SendAsyncCallback), null); 68 } 69 70 //把信息转换成二进制数据,然后发送到客户端 71 public void SendAsyncCallback(IAsyncResult iAsyncResult) 72 { 73 //显示CLR线程池状态 74 Thread.Sleep(100); 75 ThreadPoolMessage("\nMessage is sending"); 76 77 //使用NetworkStream.EndWrite结束异步发送 78 tcpClient.GetStream().EndWrite(iAsyncResult); 79 80 //重新监听 81 tcpClient.GetStream().BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize, 82 new AsyncCallback(ReceiveAsyncCallback), null); 83 } 84 85 //显示线程池现状 86 static void ThreadPoolMessage(string data) 87 { 88 int a, b; 89 ThreadPool.GetAvailableThreads(out a, out b); 90 string message = string.Format("{0}\n CurrentThreadId is {1}\n " + 91 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", 92 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 93 94 Console.WriteLine(message); 95 } 96 }
而在客户端只是使用简单的开发方式,利用TcpClient连接到服务器端,然后调用NetworkStream.Write方法发送信息,最后调用NetworkStream.Read方法读取回馈信息
1 static void Main(string[] args) 2 { 3 //连接服务端 4 TcpClient tcpClient = new TcpClient("127.0.0.1", 500); 5 6 //发送信息 7 NetworkStream networkStream = tcpClient.GetStream(); 8 byte[] sendMessage = Encoding.UTF8.GetBytes("Client request connection!"); 9 networkStream.Write(sendMessage, 0, sendMessage.Length); 10 networkStream.Flush(); 11 12 //接收信息 13 byte[] receiveMessage=new byte[1024]; 14 int count=networkStream.Read(receiveMessage, 0,1024); 15 Console.WriteLine(Encoding.UTF8.GetString(receiveMessage)); 16 Console.ReadKey(); 17 }
注意观察运行结果,服务器端的异步操作线程都是来自于CLR线程池的I/O线程
5.3 异步WebRequest
FtpWebRequest 使用FTP文件传输协议实现文件请求/响应、HttpWebRequest 用于处理HTTP的页面请求/响应。由于使用方法相类似,下面就以常用的HttpWebRequest为例子介绍一下异步WebRequest的使用方法。
在使用ASP.NET开发网站的时候,往往会忽略了HttpWebRequest的使用,因为开发都假设客户端是使用浏览器等工具去阅读页面的。但如果你对REST开发方式有所了解,那对 HttpWebRequest 就应该非常熟悉。它可以在路径参数、头文件、页面主体、Cookie 等多处地方加入请求条件,然后对回复数据进行适当处理。HttpWebRequest 包含有以下几个常用方法用于处理请求/响应:
public override Stream GetRequestStream () public override WebResponse GetResponse ()
public override IAsyncResult BeginGetRequestStream ( AsyncCallback callback, Object state ) public override Stream EndGetRequestStream ( IAsyncResult asyncResult ) public override IAsyncResult BeginGetResponse ( AsyncCallback callback, Object state ) public override WebResponse EndGetResponse ( IAsyncResult asyncResult )
其中BeginGetRequestStream、EndGetRequestStream 用于异步向HttpWebRequest对象写入请求信息; BeginGetResponse、EndGetResponse 用于异步发送页面请求并获取返回信息。使用异步方式操作Internet的“请求/响应”,避免主线程长期处于等待状态,而操作期间异步线程是来自CLR线程池的I/O线程。
注意:请求与响应不能使用同步与异步混合开发模式,即当请求写入使用GetRequestStream同步模式,即使响应使用BeginGetResponse异步方法,操作也与GetRequestStream方法在于同一线程内。
下面以简单的例子介绍一下异步请求的用法。
首先为Person类加上可序列化特性,在服务器端建立Hanlder.ashx,通过Request.InputStream 获取到请求数据并把数据转化为String对象,此实例中数据是以 “Id:1” 的形式实现传送的。然后根据Id查找对应的Person对象,并把Person对象写入Response.OutStream 中返还到客户端。
在客户端先把 HttpWebRequird.Method 设置为 "post",使用异步方式通过BeginGetRequireStream获取请求数据流,然后写入请求数据 “Id:1”。再使用异步方法BeginGetResponse 获取回复数据,最后把数据反序列化为Person对象显示出来。
注意:HttpWebRequire.Method默认为get,在写入请求前必须把HttpWebRequire.Method设置为post,否则在使用BeginGetRequireStream 获取请求数据流的时候,系统就会发出 “无法发送具有此谓词类型的内容正文" 的异常。
Model
1 namespace Model 2 { 3 [Serializable] 4 public class Person 5 { 6 public int ID 7 { 8 get; 9 set; 10 } 11 public string Name 12 { 13 get; 14 set; 15 } 16 public int Age 17 { 18 get; 19 set; 20 } 21 } 22 }
服务器端
1 public class Handler : IHttpHandler { 2 3 public void ProcessRequest(HttpContext context) 4 { 5 //把信息转换为String,找出输入条件Id 6 byte[] bytes=new byte[1024]; 7 int length=context.Request.InputStream.Read(bytes,0,1024); 8 string condition = Encoding.Default.GetString(bytes); 9 int id = int.Parse(condition.Split(new string[] { ":" }, 10 StringSplitOptions.RemoveEmptyEntries)[1]); 11 12 //根据Id查找对应Person对象 13 var person = GetPersonList().Where(x => x.ID == id).First(); 14 15 //所Person格式化为二进制数据写入OutputStream 16 BinaryFormatter formatter = new BinaryFormatter(); 17 formatter.Serialize(context.Response.OutputStream, person); 18 } 19 20 //模拟源数据 21 private IList<Person> GetPersonList() 22 { 23 var personList = new List<Person>(); 24 25 var person1 = new Person(); 26 person1.ID = 1; 27 person1.Name = "Leslie"; 28 person1.Age = 30; 29 personList.Add(person1); 30 ........... 31 return personList; 32 } 33 34 public bool IsReusable 35 { 36 get { return true;} 37 } 38 }
客户端
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 ThreadPool.SetMaxThreads(1000, 1000); 6 Request(); 7 Console.ReadKey(); 8 } 9 10 static void Request() 11 { 12 ThreadPoolMessage("Start"); 13 //使用WebRequest.Create方法建立HttpWebRequest对象 14 HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create( 15 "http://localhost:5700/Handler.ashx"); 16 webRequest.Method = "post"; 17 18 //对写入数据的RequestStream对象进行异步请求 19 IAsyncResult result=webRequest.BeginGetRequestStream( 20 new AsyncCallback(EndGetRequestStream),webRequest); 21 } 22 23 static void EndGetRequestStream(IAsyncResult result) 24 { 25 ThreadPoolMessage("RequestStream Complete"); 26 //获取RequestStream 27 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState; 28 Stream stream=webRequest.EndGetRequestStream(result); 29 30 //写入请求条件 31 byte[] condition = Encoding.Default.GetBytes("Id:1"); 32 stream.Write(condition, 0, condition.Length); 33 34 //异步接收回传信息 35 IAsyncResult responseResult = webRequest.BeginGetResponse( 36 new AsyncCallback(EndGetResponse), webRequest); 37 } 38 39 static void EndGetResponse(IAsyncResult result) 40 { 41 //显出线程池现状 42 ThreadPoolMessage("GetResponse Complete"); 43 44 //结束异步请求,获取结果 45 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState; 46 WebResponse webResponse = webRequest.EndGetResponse(result); 47 48 //把输出结果转化为Person对象 49 Stream stream = webResponse.GetResponseStream(); 50 BinaryFormatter formatter = new BinaryFormatter(); 51 var person=(Person)formatter.Deserialize(stream); 52 Console.WriteLine(string.Format("Person Id:{0} Name:{1} Age:{2}", 53 person.ID, person.Name, person.Age)); 54 } 55 56 //
|
请发表评论