在前一篇讲到了服务侦听,具体构建了一个抽象类,该类是后续服务类中SOCKET通讯服务的基类,而SOCKET通讯中不论是服务端还是客户端都需要数据收发,客户端对应服务端侦听的部份则是连接,下面就讲讲连接和数据收发
1.连接
该部份比较简单,只有三个简单的函数:CreateConnectToRemoteSocket,ConnectToRemoteCore,AsyncConnectToRemoteCore,
CreateConnectToRemoteSocket:创建连接的Socket,其中CheckSockectIsConnected是检查是否已连接(该函数用了MS使用的例子来判断是否已连接),最后使用传进的参数(如果不为null)设置SOCKET(比如超时等等),TcpIPSocketSetting类是一个专用户设置SOCKET的类
ConnectToRemoteCore:同步连接到远程
AsyncConnectToRemoteCore:异步连接到远程,在该函数中用到类接口ITranseParamets,使用SocketAsyncEventArgs.UserToken存储该接口参数,对于该接口的定义我会在代码中贴出,并且属性及方法也会在该接口中说明,其实现会在后续章节中讲到,SocketAsyncEventArgs.RemoteEndPoint存储要连接到的远程终结点(这个一定要设),最后调用Socket.ConnectAsync函数异步连接,该函数使用的参数及返回值的含义等同上一章使用到的Socket.AcceptAsync,在Socket中类似这样的还有收发函数Socket.SendAsync,Socket.ReceiveAsync,这些函数都使用SocketAsyncEventArgs类,返回值都是Bool类型,含义及处理都类型上一章的Socket.AcceptAsync,只是在调用前设置SocketAsyncEventArgs类的方法不同(而所有方法中都必需设置SocketAsyncEventArgs.Completed事件),及返回后SocketAsyncEventArgs类的有效属性不同,本例中是在GetSocketArg方法中设置了SocketAsyncEventArgs类的属性,由于GetSocketArg是一个通用方法,其实在本例中它仅设置类SocketError属性及Completed事件,其它属性都是null,本例中Socket.ConnectAsync方法完成后的有效属性:它完成后仅SocketError属性有效指示是否成功
OnCompleted:Completed事件处理程序,该例程是连接及数据收发的处理例程,先讲连接的处理例程,仅仅调用OnConnectCompleted方法完成连接
OverAsyncTranse:该例程用于完成异步操作,它在连接及收发操作中都会调用,首先RecySocketAsyncEventArgs,回收SocketAsyncEventArgs,如果你的SocketAsyncEventArgs不是使用缓冲的则不必调用了,设置ITranseParamets.TranseError指示是否有错误,调用ITranseParamets.AsyncTranseOver通指异步操作完成
2.数据收发的约定
数据的收发方都遵循约定ITranseHeader,该接口是一个约定,及传输过程中的控制信息,实现该接口的类负责解读该约定的含义,而该接口在网络中传输的数据则存储在该接口的Data属性中,Data属性应该是一个接收双方都已知长度的字节数组,通常方送方事先填充Data,接收方根据双方已知的字节长度接收该长度字节数的内容,具体说命见代码中贴出的说明,
其实现会在后续章节中给出,其实这个就类似于数据传输协议的头
3.数据的收发
数据收发分为同步和异步两种,这两种都用同一规则操作,只不过途经不同而已,它们都是:先发送(接收)约定,然后根据约定发送(接收)数据体,同步接收和发送比较简单,分别是SendDataCore,ReceiveDataCore这两个例程,而它们又都各自调用SendHeaderCore和ReceiveHeaderCore来发送(接收)约定
PrepareForSend:发送前准备,将要发送的数据读到发送缓冲区,这个例程同步和异步都使用,看过程就应该知道起含义,这里就不说了
HandlerReceiver:接收数据后的处理,该例程将接收到的字节数rcvsize写进流中,并根据约定返回是否还有后续数据要接收,同样这个例程同步异步都使用
数据异步的收发:AsyncSendDataCore和AsyncRecevDataCore分别发送和接收约定,NextAsyncSendDataCore和NextAsyncReceiveDataCore分别根据约定发送和接收数据体,返回True说明还有后续数据需要发送或接收,否则说明数据发送或接收完成,HeaderIsTranseOver例程判断约定是否已发送或接收完成,在异步数据收发中最重要的是设置每次要发送或接收的数据缓冲区(该缓冲区在给出SocketAsyncEventArgs就已设为ITranseParamets.TranseBuffer)及偏移量和长度,而偏移量及长度分别在发送和接收前设置
OnCompleted:数据异步接收和发送事件处理
约定完成前调用OnHeaderCompleted,约定完成后调用OnTranseDataCompleted,这两个例程都是判断指定操作是否已完成,未完成则继续下一轮操作,OnHeaderCompleted例程中已完成则开始根据约定开始数据主体收发操作,OnTranseDataCompleted例程中完成则调用OverAsyncTranse结束异步操作
3.其它
ITranseInterface接口是为了以后扩展数据传输而定义的接口(例如不使用Socket的数据传输),在使用中仅使用该接口而不管其实现,这样方便以后扩展
本类中TranseInterfaceDirection枚举是为了区分该传输接口是客户端连接发起方还是服务端被动接受连接方,分别提供两个构造,带参数的构造一定是服务端被动接收端,不带参数的构造则是客户端发起连接方
WaitClose方法和Close方法:前一个方法是等待远程方调用Close后才关闭,其实现机理是首先开始异步接收1长度数据等待对方关闭时肯定在回调中引发错误,此时关闭.
以下为代码:
TcpIpTranseInterface
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Net.Sockets; 6 using System.IO; 7 using System.Net; 8 9 namespace YZKStdLibrary.Network.Transe 10 { 11 public class TcpIpTranseInterface : ITranseInterface 12 { 13 14 static TcpIpTranseInterface() 15 { 16 StateSendBytes = new byte[1]; 17 SocketArgPools = new RecyManager<SocketAsyncEventArgs>(50); 18 WaitCloseBytes = new byte[1]; 19 } 20 21 public TcpIpTranseInterface() 22 { 23 p_TranseDirection = TranseInterfaceDirection.Client; 24 } 25 26 public TcpIpTranseInterface( 27 Socket socket) 28 { 29 g_TcpSocket = socket; 30 p_TranseDirection = TranseInterfaceDirection.Server; 31 } 32 33 internal Socket BaseSocket 34 { 35 get { return g_TcpSocket; } 36 } 37 38 #region 私有变量 39 40 private Socket g_TcpSocket; 41 42 private TranseInterfaceDirection p_TranseDirection; 43 44 private static readonly byte[] StateSendBytes; 45 46 private static readonly byte[] WaitCloseBytes; 47 48 public static readonly RecyManager<SocketAsyncEventArgs> SocketArgPools; 49 50 #endregion 51 52 #region ITranseInterfaceInformation 成员 53 /// <summary> 54 /// 获取本地终结点信息 55 /// </summary> 56 public EndPoint LocalEndPont 57 { 58 get 59 { 60 if (g_TcpSocket != null) 61 return g_TcpSocket.LocalEndPoint; 62 else 63 return null; 64 } 65 } 66 /// <summary> 67 /// 获取远程终结点信息 68 /// </summary> 69 public EndPoint RemoteEndPoint 70 { 71 get 72 { 73 if (g_TcpSocket != null) 74 return g_TcpSocket.RemoteEndPoint; 75 else 76 return null; 77 } 78 } 79 /// <summary> 80 /// 获取当前传输接口建立连接时的方向 81 /// </summary> 82 public TranseInterfaceDirection TranseDirection 83 { 84 get { return p_TranseDirection; } 85 } 86 /// <summary> 87 /// 获取传输协义名称 88 /// </summary> 89 public string ProtocolTypeName 90 { 91 get { return TranseProtocolTypeNames.TcpIp; } 92 } 93 94 #endregion 95 96 #region 接收前和发送后流处理 97 98 /// <summary> 99 /// 返回小于等于0值指示无数据发送 100 /// 会设置paramets.BytesTransferred 101 /// </summary> 102 /// <param name="paramets"></param> 103 /// <returns></returns> 104 private int PrepareForSend(ITranseParamets paramets) 105 { 106 int rd = paramets.TranseStream.Read( 107 paramets.TranseBuffer, paramets.TranseBufferOffset, paramets.TranseBufferSize); 108 if (rd > 0) 109 { 110 paramets.BytesTransferred = 111 paramets.BytesTransferred + rd; 112 } 113 return rd; 114 } 115 116 /// <summary> 117 /// 返回True指示已接收完成 118 /// 会设置paramets.BytesTransferred 119 /// </summary> 120 /// <param name="paramets"></param> 121 /// <param name="rcvsize"></param> 122 /// <returns></returns> 123 private bool HandlerReceiver(ITranseParamets paramets,int rcvsize) 124 { 125 if (rcvsize > 0) 126 { 127 paramets.TranseStream.Write(paramets.TranseBuffer, 128 paramets.TranseBufferOffset, rcvsize); 129 paramets.BytesTransferred = 130 paramets.BytesTransferred + rcvsize; 131 } 132 return paramets.TranseHeader.TranseSize <= paramets.BytesTransferred; 133 } 134 #endregion 135 136 #region 数据同步接收和发送 137 138 private void SendHeaderCore( 139 ITranseHeader header) 140 { 141 int offset = 0; 142 int l = header.Data.Length; 143 while (l > 0) 144 { 145 int s = g_TcpSocket.Send(header.Data, offset, l, SocketFlags.None); 146 l -= s; 147 offset += s; 148 } 149 } 150 151 private void SendDataCore(ITranseParamets paramets) 152 { 153 SendHeaderCore(paramets.TranseHeader); 154 paramets.HeaderTransed(); 155 int x = PrepareForSend(paramets); 156 while (x > 0) 157 { 158 while (x > 0) 159 { 160 int offset = paramets.TranseBufferOffset; 161 int s = g_TcpSocket.Send(paramets.TranseBuffer, 162 offset, x, SocketFlags.None); 163 x -= s; 164 offset += s; 165 } 166 x = PrepareForSend(paramets); 167 } 168 } 169 170 private void ReceiveHeaderCore( 171 ITranseHeader header) 172 { 173 int l = header.Data.Length; 174 int offset = 0; 175 while (l > 0) 176 { 177 int s = g_TcpSocket.Receive(header.Data, offset, l, SocketFlags.None); 178 l -= s; 179 offset += s; 180 } 181 header.ResetParametsFromData(); 182 } 183 184 private void ReceiveDataCore(ITranseParamets paramets) 185 { 186 ReceiveHeaderCore(paramets.TranseHeader); 187 paramets.HeaderTransed(); 188 int rcvsize = 0; 189 while (!HandlerReceiver(paramets, rcvsize)) 190 { 191 long x = paramets.TranseHeader.TranseSize - paramets.BytesTransferred; 192 if (x > paramets.TranseBufferSize) 193 x = paramets.TranseBufferSize; 194 rcvsize = g_TcpSocket.Receive(paramets.TranseBuffer, 195 paramets.TranseBufferOffset, (int)x, SocketFlags.None); 196 } 197 } 198 #endregion 199 200 #region 异步处理 201 202 private void CopyHeaderDataToTranseBuffer( 203 ITranseParamets par,SocketAsyncEventArgs arg) 204 { 205 int hs = par.TranseHeader.Data.Length; 206 arg.SetBuffer(arg.Offset, hs); 207 Array.Copy(par.TranseHeader.Data, 0, arg.Buffer, 208 arg.Offset, arg.Count); 209 } 210 211 private void CopyTranseBufferToHeaderData( 212 ITranseParamets par, SocketAsyncEventArgs arg) 213 { 214 int hs = par.TranseHeader.Data.Length; 215 Array.Copy(arg.Buffer, arg.Offset, 216 par.TranseHeader.Data, 0, hs); 217 par.TranseHeader.ResetParametsFromData(); 218 } 219 220 private SocketAsyncEventArgs GetSocketArg( 221 ITranseParamets par) 222 { 223 SocketAsyncEventArgs arg = SocketArgPools.GetItem(); 224 arg.SetBuffer(par.TranseBuffer, par.TranseBufferOffset, par.TranseBufferSize); 225 arg.Completed += new EventHandler<SocketAsyncEventArgs>(OnCompleted); 226 arg.SocketError = SocketError.Success; 227 return arg; 228 } 229 230 private void RecySocketAsyncEventArgs(SocketAsyncEventArgs arg) 231 { 232 arg.Completed -= new EventHandler<SocketAsyncEventArgs>(OnCompleted); 233 arg.SetBuffer(null, 0, 0); 234 arg.UserToken = null; 235 arg.AcceptSocket = null; 236 arg.RemoteEndPoint = null; 237 SocketArgPools.RecyItem(arg); 238 } 239 240 private void OverAsyncTranse( 241 SocketAsyncEventArgs arg, 242 ITranseParamets paramets, 243 Exception error) 244 { 245 RecySocketAsyncEventArgs(arg); 246 paramets.TranseError = error; 247 paramets.AsyncTranseOver(); 248 } 249 250 private void OnCompleted(object sender, SocketAsyncEventArgs e) 251 { 252 ITranseParamets par = e.UserToken as ITranseParamets; 253 if (par.DataTranseDirection != DataTranseDirection.Connecting) 254 { 255 if (e.BytesTransferred == 0) 256 { 257 if (!CheckSockectIsConnected()) 258 e.SocketError = SocketError.ConnectionAborted; 259 } 260 } 261 if (e.SocketError != SocketError.Success) 262 { 263 OverAsyncTranse(e, par, new SocketException((int)e.SocketError)); 264 return; 265 } 266 switch (par.DataTranseDirection) 267 { 268 case DataTranseDirection.Connecting: 269 { 270 OnConnectCompleted(par,e); 271 break; 272 } 273 case DataTranseDirection.ReceivingData: 274 case DataTranseDirection.SendingData: 275 { 276 if (par.HeaderTransedOver) 277 OnTranseDataCompleted(par,e); 278 else 279 OnHeaderCompleted(par,e); 280 break; 281 } 282 } 283 } 284 285 private void OnHeaderCompleted(ITranseParamets paramets,SocketAsyncEventArgs e) 286 { 287 bool ovcall = false; 288 switch (e.LastOperation) 289 { 290 case SocketAsyncOperation.Send: 291 { 292 try 293 { 294 if (!HeaderIsTranseOver(paramets, e)) 295 { 296 if (!g_TcpSocket.SendAsync(e)) 297 OnCompleted(g_TcpSocket, e); 298 return; 299 } 300 paramets.HeaderTransed(); 301 if (!NextAsyncSendDataCore(paramets, e)) 302 { 303 paramets.AsyncNotyTransing(); 304 ovcall = true; 305 OverAsyncTranse(e, paramets, null); 306 } 307 } 308 catch (Exception err) 309 { 310 if (!ovcall) 311 OverAsyncTranse(e, paramets, err); 312 } 313 break; 314 } 315 case SocketAsyncOperation.Receive: 316 { 317 try 318 { 319 if (!HeaderIsTranseOver(paramets, e)) 320 { 321 if (!g_TcpSocket.ReceiveAsync(e)) 322 OnCompleted(g_TcpSocket, e); 323 return; 324 } 325 CopyTranseBufferToHeaderData(paramets, e); 326 paramets.HeaderTransed(); 327 if (!NextAsyncReceiveDataCore(paramets, e)) 328 { 329 ovcall = true; 330 OverAsyncTranse(e, paramets, null); 331 } 332 } 333 catch (Exception err) 334 { 335 if (!ovcall) 336 OverAsyncTranse(e, paramets, err); 337 } 338 break; 339 } 340 } 341 } 342 343 private void OnConnectCompleted(ITranseParamets paramets,SocketAsyncEventArgs e) 344 { 345 OverAsyncTranse(e, paramets, null); 346 } 347 348 private void OnTranseDataCompleted(ITranseParamets paramets,SocketAsyncEventArgs e) 349 { 350 bool ovcall = false; 351 switch (e.LastOperation) 352 { 353 case SocketAsyncOperation.Send: 354 { 355 try 356 { 357 paramets.AsyncNotyTransing(); 358 if (!NextAsyncSendDataCore(paramets, e)) 359 { 360 ovcall = true; 361 OverAsyncTranse(e, paramets, null); 362 } 363 } 364 catch (Exception err) 365 { 366 if (!ovcall) 367 OverAsyncTranse(e, paramets, err); 368 } 369 break; 370 } 371 case SocketAsyncOperation.Receive: 372 { 373 try 374 { 375 bool hd = HandlerReceiver(paramets, e.BytesTransferred); 376 paramets.AsyncNotyTransing(); 377 if (hd) 378 { 379 ovcall = true; 380 OverAsyncTranse(e, paramets, null); 381 } 382 else 383 { 384 NextAsyncReceiveDataCore(paramets, e); 385 } 386 } 387 catch (Exception err) 388 { 389 if (!ovcall) 390 OverAsyncTranse(e, paramets, err); 391 } 392 break; 393 } 394 } 395 } 396 397 #endregion 398 399 } 400 401 }
TcpIpTranseInterface
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Net.Sockets; 6 using System.IO; 7 using System.Net; 8 9 namespace YZKStdLibrary.Network.Transe 10 { 11 partial class TcpIpTranseInterface : ITranseInterface 12 { 13 #region 数据异步接收和发送 14 15 private bool NextAsyncSendDataCore(ITranseParamets paramets, SocketAsyncEventArgs e) 16 { 17 int s = PrepareForSend(paramets); 18 if (s > 0) 19 { 20 if (s > paramets.TranseBufferSize) 21 s = paramets.TranseBufferSize; 22 e.SetBuffer(e.Offset, s); 23 if (!g_TcpSocket.SendAsync(e)) 24 OnCompleted(g_TcpSocket, e); 25 return true; 26 } 27 return false; 28 } 29 30 private void AsyncSendDataCore(ITranseParamets par) 31 { 32 SocketAsyncEventArgs arg = GetSocketArg(par); 33 arg.UserToken = par; 34 CopyHeaderDataToTranseBuffer(par, arg); 35 if (!g_TcpSocket.SendAsync(arg)) 36 OnCompleted(g_TcpSocket, arg); 37 } 38 39 private void AsyncRecevDataCore(ITranseParamets par) 40 { 41 SocketAsyncEventArgs arg = GetSocketArg(par); 42 arg.UserToken = par; 43 arg.SetBuffer(arg.Offset, par.TranseHeader.Data.Length); 44 if (!g_TcpSocket.ReceiveAsync(arg)) 45 OnCompleted(g_TcpSocket, arg); 46 } 47 48 private bool NextAsyncReceiveDataCore(ITranseParamets paramets, SocketAsyncEventArgs e) 49 { 50 long dx = paramets.TranseHeader.TranseSize - paramets.BytesTransferred; 51 if (dx > 0) 52 { 53 if (dx > paramets.TranseBufferSize) 54 dx = paramets.TranseBufferSize; 55 e.SetBuffer(e.Offset, (int)dx); 56 if (!g_TcpSocket.ReceiveAsync(e)) 57 OnCompleted(g_TcpSocket, e); 58 return true; 59 } 60 return false; 61 } 62 63 private bool HeaderIsTranseOver(ITranseParamets paramets, SocketAsyncEventArgs e) 64 { 65 int dx = e.Count - e.BytesTransferred; 66 if (dx > 0) 67 { 68 int offset = e.Offset + e.BytesTransferred; 69 e.SetBuffer(offset, dx); 70 return false; 71 } 72 else 73 e.SetBuffer(paramets.TranseBufferOffset, paramets.TranseBufferSize); 74 return true; 75 } 76 #endregion 77 78 #region 异步等待对方关闭 79 80 private void WaitCloseCore(object state, WaitCloseDelegate callback) 81 { 82 lock (g_CloseAsyncLock) 83 { 84 if (g_TcpSocket != null 85 && !p_ClosedOver) 86 g_TcpSocket.BeginReceive(WaitCloseBytes, 87 0
|
请发表评论