用DELPHI开发网络代码已经有一段时间了! 我发现在网上用VC来实现完成端口(IOCP)的代码很多,但是使用DELPHI来实现的就比较少了。对IOCP讲的清楚的就更少了。在这里我把自己编写DELPHI下的IOCP写出来,希望对刚学完成端口的朋友有个帮助。 首先我们来了解一些在使用IOCP的时候需要使用的一些结构! (1):单IO数据结构 LPVOID = Pointer; LPPER_IO_OPERATION_DATA = ^ PER_IO_OPERATION_DATA ; PER_IO_OPERATION_DATA = packed record Overlapped: OVERLAPPED; DataBuf: TWSABUF; Buffer: array [0..1024] of CHAR; BytesSEND: DWORD; BytesRECV: DWORD; end; 上面的结构中Overlapped: OVERLAPPED;和DataBuf: TWSABUF;是固定的结构类型。Buffer: array [0..1024] of CHAR;是用来保存接受数据的缓存。BytesSEND: DWORD;用来标志发送数据的长度。BytesRECV: DWORD;用来标志接受数据的长度。因为完成端口的工作者线程可以接受到来自客户端的数据,同时还可以接受到自己发送给客户端的数据,所以我们使用BytesSEND,BytesRECV变量来说是用来区分这次的数据是来自客户端的数据还是自己发送出去的数据。详细的使用方法,我会在下面详细说明。 (2):“单句柄数据结构” LPPER_HANDLE_DATA = ^ PER_HANDLE_DATA; PER_HANDLE_DATA = packed record Socket: TSocket; end; 下来我从编写一个完成端口的为例说明。 if WSAStartUp($202, wsData) <> 0 then begin WSACleanup(); end; 加载SOCKET。我使用的是2.2版为了后面方便加入心跳。 CompletionPort:=CreateIOCompletionPort(INVALID_HANDLE_VALUE,0,0,0); 创建一个完成端口。 GetSystemInfo(LocalSI); for I:=0 to LocalSI.dwNumberOfProcessors * 2 -1 do begin hThread := CreateThread(nil, 0, @ServerWorkerThread, Pointer(CompletionPort),0, ThreadID); if (hThread = 0) then begin Exit; end; CloseHandle(hThread); end; 根据CPU的数量创建CPU*2数量的工作者线程。 Listensc:=WSASocket(AF_INET,SOCK_STREAM,0,Nil,0,WSA_FLAG_OVERLAPPED); if Listensc=SOCKET_ERROR then begin closesocket(Listensc); WSACleanup(); end; sto.sin_family:=AF_INET; sto.sin_port:=htons(5500); sto.sin_addr.s_addr:=htonl(INADDR_ANY); if bind(Listensc,sto,sizeof(sto))=SOCKET_ERROR then begin closesocket(Listensc); end; listen(Listensc,20); 创建一个套接字,将此套接字和一个端口绑定并监听此端口。 while (TRUE) do begin Acceptsc:= WSAAccept(Listensc, nil, nil, nil, 0); 当客户端有连接请求的时候,WSAAccept函数会新创建一个套接字Acceptsc。这个套接字就是和客户端通信的时候使用的套接字。 if (Acceptsc= SOCKET_ERROR) then begin closesocket(Listensc); exit; end; 判断Acceptsc套接字创建是否成功,如果不成功则退出。 PerHandleData := LPPER_HANDLE_DATA (GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA))); if (PerHandleData = nil) then begin exit; end; PerHandleData.Socket := Acceptsc; 创建一个“单句柄数据结构”将Acceptsc套接字绑定。 if (CreateIoCompletionPort(Acceptsc, CompletionPort, DWORD(PerHandleData), 0) = 0) then begin exit; end; 将套接字、完成端口和“单句柄数据结构”三者绑定在一起。 PerIoData := LPPER_IO_OPERATION_DATA(GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA))); if (PerIoData = nil) then begin exit; end; ZeroMemory(@PerIoData.Overlapped, sizeof(OVERLAPPED)); PerIoData.BytesSEND := 0; PerIoData.BytesRECV := 0; PerIoData.DataBuf.len := 1024; PerIoData.DataBuf.buf := @PerIoData.Buffer; Flags := 0; 创建一个“单IO数据结构”其中将PerIoData.BytesSEND 和PerIoData.BytesRECV 均设置成0。说明此“单IO数据结构”是用来接受的。 if (WSARecv(Acceptsc, @(PerIoData.DataBuf), 1, @RecvBytes, @Flags,@(PerIoData.Overlapped), nil) = SOCKET_ERROR) then begin if (WSAGetLastError() <> ERROR_IO_PENDING) then begin //最近在检查代码的时候发现以前这里只是使用Exit来退出是不正确的。这里需要删除申请的单IO数据结构,否子会出现内存泄露。 (2008年3月24日) //Exit; closesocket(AcceptSc); if PerIoData <> nil then begin GlobalFree(DWORD(PerIoData)); end; Continue; end end; 用此“单IO数据结构”来接受Acceptsc套接字的数据。 end; 创建IOCP的工作已经完成,下一次我将写IOCP的工作者线程的处理方法,谢谢!
今天我写一下关于DELPHI编写完成端口(IOCP)的工作者线程中的东西。希望各位能提出批评意见。 上次我写了关于常见IOCP的代码,对于IOCP来说,接受到客户端发送过来和自己发送出去的数据都是从工作者线程中得到。代码和解释如下: function ServerWorkerThread(CompletionPortID:Pointer):Integer;stdcall; begin CompletionPort:=THANDLE(CompletionPortID); //得到创建线程是传递过来的IOCP while(TRUE) do begin //工作者线程会停止到GetQueuedCompletionStatus函数处,直到接受到数据为止 if (GetQueuedCompletionStatus(CompletionPort, BytesTransferred,DWORD(PerHandleData), POverlapped(PerIoData), INFINITE) = False) then begin //当客户端连接断开或者客户端调用closesocket函数的时候,函数GetQueuedCompletionStatus会返回错误。如果我们加入心跳后,在这里就可以来判断套接字是否依然在连接。 if PerHandleData<>nil then begin closesocket(PerHandleData.Socket); GlobalFree(DWORD(PerHandleData)); end; if PerIoData<>nil then begin GlobalFree(DWORD(PerIoData)); end; continue; end; if (BytesTransferred = 0) then begin //当客户端调用shutdown函数来从容断开的时候,我们可以在这里进行处理。 if PerHandleData<>nil then begin TempSc:=PerHandleData.Socket; shutdown(PerHandleData.Socket,1); closesocket(PerHandleData.Socket); GlobalFree(DWORD(PerHandleData)); end; if PerIoData<>nil then begin GlobalFree(DWORD(PerIoData)); end; continue; end; //在上一篇中我们说到IOCP可以接受来自客户端的数据和自己发送出去的数据,两种数据的区别在于我们定义的结构成员BytesRECV和BytesSEND的值。所以下面我们来判断数据的来自方向。因为我们发送出去数据的时候我们设置了结构成员BytesSEND。所以如果BytesRECV=0同时BytesSEND=0那么此数据就是我们接受到的客户端数据。(这种区分方法不是唯一的,个人可以有自己的定义方法。只要可以区分开数据来源就可以。) if (PerIoData.BytesRECV = 0) and (PerIoData.BytesSEND = 0) then begin PerIoData.BytesRECV := BytesTransferred; PerIoData.BytesSEND := 0; end else begin PerIoData.BytesSEND := BytesTransferred; PerIoData.BytesRECV := 0; end; //当是接受来自客户端的数据是,我们进行数据的处理。 if (PerIoData.BytesRECV > PerIoData.BytesSEND) then begin PerIoData.DataBuf.buf := PerIoData.Buffer + PerIoData.BytesSEND; PerIoData.DataBuf.len := PerIoData.BytesRECV - PerIoData.BytesSEND; //这时变量PerIoData.Buffer就是接受到的客户端数据。数据的长度是PerIoData.DataBuf.len 你可以对数据进行相关的处理了。 //....... //当我们将数据处理完毕以后,应该将此套接字设置为结束状态,同时初始化和它绑定在一起的数据结构。 ZeroMemory(@(PerIoData.Overlapped), sizeof(OVERLAPPED)); PerIoData.BytesRECV := 0; Flags := 0; ZeroMemory(@(PerIoData.Overlapped), sizeof(OVERLAPPED)); PerIoData.DataBuf.len := DATA_BUFSIZE; ZeroMemory(@PerIoData.Buffer,sizeof(@PerIoData.Buffer)); PerIoData.DataBuf.buf := @PerIoData.Buffer; if (WSARecv(PerHandleData.Socket, @(PerIoData.DataBuf), 1, @RecvBytes, @Flags,@(PerIoData.Overlapped), nil) = SOCKET_ERROR) then begin if (WSAGetLastError() <> ERROR_IO_PENDING) then begin if PerHandleData<>nil then begin TempSc:=PerHandleData.Socket; closesocket(PerHandleData.Socket); GlobalFree(DWORD(PerHandleData)); end; if PerIoData<>nil then begin GlobalFree(DWORD(PerIoData)); end; continue; end; end; end //当我们判断出来接受的数据是我们发送出去的数据的时候,在这里我们清空我们申请的内存空间 else begin GlobalFree(DWORD(PerIoData)); end; end; end; 到此,工作者线程已经处理完成。接受数据已经没有问题了。下一篇中我将会写出,如何时候IOCP来发送数据代码。今天的代码中应该对PerIoData.BytesRECV > PerIoData.BytesSEND单另解说一下。其实如果按照上面的例子去编写工作者线程,会觉得编写出来的代码会很不稳定,接受到的数据总是有错位的现象。原因是TCP协议中有数据分片的概念,这个以后我会将我处理这个的代码分享给大家。
近太忙,所以没有机会来写IOCP的后续文章。今天好不容易有了时间来写IOCP的粘包处理问题。 TCP数据粘包的产生原因在于TCP是一种流协议。在以太网中一个TCP的数据包长度是1500位。其中20位的IP包头,20位的TCP包头,其余的1460都是我们可以发送的数据。在数据发送的时候,我们发送的数据长度有可能比1460短,这样在TCP来说它还是以一个数据包来发送。从而降低了网络的利用率。所以TCP在发送数据包的时候,会将下一个数据包和这个数据包合在一起发送以增加网络利用率(虽然SOCKET 中可以强制关闭这种合并发送,但是我不建议使用)。这样以来,在我们接受到一个数据包以后,就会发现在这个数据包中含有其它的数据包,从而很难处理。 处理粘包现象有多种方法。我的方法是在每发送一个数据的前面加入这次发送的数据长度(4位)。以char的方式加入。这样以来我们的数据包结构就变成了: 数据包长度(4位)+实际数据。 在接收到数据包以后,我们首先得到数据包的长度,然后根据这个数据包长度来得到实际的数据。 以下是我的粘包处理函数实现(这个函数是对于多个套接字来处理的所以在这里我使用了TList链表):
//用于处理粘包的数据结构 tagPacket = record Socket:TSocket; //处理粘包的套接字 hThread:THANDLE; //线程句柄 ThreadID:DWORD; //线程ID DataBuf:array[0..DATA_BUFSIZE-1] of char; //处理粘包的包 DataLen:Integer; //处理粘包的包长度 end; TDealPacket = tagPacket; PDealPacket = ^tagPacket;
{粘包处理函数} function TClientNet.ComminutePacket(SorucePacket:array of char;SPLen:Integer;var Destpacket:array of char; var DPLen:Integer;var SparePacket:array of char; var SpareLen:Integer;var IsEnd:Boolean;socket:Tsocket):Boolean; const MaxPacket = 1024; PacketLength = 4; var Temp:pchar; TempLen,PacketHeader:Integer; I,J:Integer; TempArray:array[0..MaxPacket-1] of char; TempCurr:Integer; CurrListI:Integer; SocketData:PDealPacket; t_Ord:Integer; begin Result:=true; try //首先根据套接字来得到上次遗留的数据 Fillchar(TempArray,sizeof(TempArray),#0); for I:=0 to DealDataList.Count-1 do begin SocketData:=DealDataList.Items[I]; if SocketData.Socket = socket then begin strmove(TempArray,SocketData.DataBuf,sizeof(SocketData.DataBuf)); TempCurr:=SocketData.DataLen; CurrListI:=I; break; end; end;
//我们将每次处理粘包以后剩余的数据保存在一个TDealPacket的链表中DealDataList。每次根据套接字先得到上次是否有剩余的数据。如果有则将这个数据拷贝到一个临时处理的缓存中。
FillChar(Destpacket,sizeof(Destpacket),#0); FillChar(SparePacket,sizeof(SparePacket),#0); IsEnd:=false;
{以下就是对数据包的整合,其算法很简单,读者可以参考我的注释来理解}
//对临时缓存进行检测 if TempCurr<>0 then //缓存中存在数据 begin if TempCurr<PacketLength then //缓存中包含的数据包长度不足一个4位的数据包长度。 begin TempLen:=PacketLength-TempCurr; if TempLen>SPLen then //数据包中含有的数量不足包头数量 begin strmove(TempArray+TempCurr,SorucePacket,SPLen); TempCurr:=TempCurr+SPLen; //分解完毕, IsEnd:=true; end else begin strmove(TempArray+TempCurr,SorucePacket,TempLen); TempCurr:=TempCurr+TempLen; GetMem(Temp,PacketLength+1); Fillchar(Temp^,PacketLength+1,#0); strmove(Temp,TempArray,PacketLength); //最近在检查代码的时候发现这里转换包头长度的时候,只是使用异常来判断是不合适的。所以这里进行了修改 (2008年3月24日) {try PacketHeader:=StrToInt(StrPas(Temp)); except Result:=false; exit; end; } for J := 1 to 4 do begin t_Ord:=Ord(StrPas(Temp)[J]); if (t_Ord<48) or (t_Ord>57) then begin Result := false; IsEnd := true; Exit; end; end; if PacketHeader>SPLen-TempLen then //此包是不全包 begin strmove(TempArray+TempCurr,SorucePacket+TempLen,SPLen-TempLen); TempCurr:=TempCurr+SPLen-TempLen; //已经将数据拷贝完成 IsEnd:=true; end else //此包是过包 begin strmove(TempArray+TempCurr,SorucePacket+TempLen,PacketHeader); strmove(Destpacket,TempArray,PacketHeader+PacketLength); DPLen:=PacketHeader+PacketLength; Strmove(SparePacket,SorucePacket+TempLen+PacketHeader,SPLen-(TempLen+PacketHeader)); SpareLen:=SPLen-(TempLen+PacketHeader); FillChar(TempArray,sizeof(TempArray),#0); TempCurr:=0; IsEnd:=false; end; FreeMem(Temp); end; end else //缓存中已经含有数据头 begin GetMem(Temp,PacketLength+1); Fillchar(Temp^,PacketLength+1,#0); strmove(Temp,TempArray,PacketLength); //最近在检查代码的时候发现这里转换包头长度的时候,只是使用异常来判断是不合适的。所以这里进行了修改 (2008年3月24日) {try PacketHeader:=StrToInt(StrPas(Temp)); except Result:=false; exit; end; } for J := 1 to 4 do begin t_Ord:=Ord(StrPas(Temp)[J]); if (t_Ord<48) or (t_Ord>57) then begin Result := false; IsEnd := true; Exit; end; end; if PacketHeader>TempCurr-PacketLength then //数据包包头 begin TempLen:=(PacketHeader+PacketLength)-TempCurr; if TempLen>SPLen then begin strmove(TempArray+TempCurr,SorucePacket,SPLen); TempCurr:=TempCurr+SPLen; IsEnd:=true; end else begin strmove(TempArray+TempCurr,SorucePacket,TempLen); strmove(Destpacket,TempArray,PacketHeader+PacketLength); DPLen:=PacketHeader+PacketLength; Strmove(SparePacket,SorucePacket+TempLen,SPLen-TempLen); SpareLen:=SPLen-TempLen; TempCurr:=0; FillChar(TempArray,sizeof(TempArray),#0); IsEnd:=false; end; end else begin strmove(TempArray+TempCurr,SorucePacket,TempLen+PacketLength); strmove(Destpacket,TempArray,TempCurr+TempLen+PacketLength); DPLen:=TempCurr+TempLen+PacketLength; Strmove(SparePacket,SorucePacket+TempLen+PacketLength,SPLen-TempLen); SpareLen:=SPLen-TempLen-PacketLength; TempCurr:=0; FillChar(TempArray,sizeof(TempArray),#0); IsEnd:=false; end; FreeMem(Temp); end; end else //缓存中不存在数据 begin Fillchar(TempArray,sizeof(TempArray),#0); if SPLen>=PacketLength then begin strmove(TempArray,SorucePacket,PacketLength); GetMem(Temp,PacketLength+1); Fillchar(Temp^,PacketLength+1,#0); strmove(Temp,TempArray,PacketLength); //最近在检查代码的时候发现这里转换包头长度的时候,只是使用异常来判断是不合适的。所以这里进行了修改 (2008年3月24日) {try PacketHeader:=StrToInt(StrPas(Temp)); except Result:=false; exit; end;} for J := 1 to 4 do begin t_Ord:=Ord(StrPas(Temp)[J]); if (t_Ord<48) or (t_Ord>57) then begin Result := false; IsEnd := true; Exit; end; end;
if PacketHeader>SPLen-PacketLength then begin strmove(TempArray+PacketLength,SorucePacket+PacketLength,SPLen-PacketLength); TempCurr:=SPLen; IsEnd:=true; end else begin strmove(TempArray+PacketLength,SorucePacket+PacketLength,PacketHeader); strmove(Destpacket,TempArray,PacketHeader+PacketLength); DPLen:=PacketHeader+PacketLength; Strmove(SparePacket,SorucePacket+PacketHeader+PacketLength,SPLen-(PacketHeader+PacketLength)); SpareLen:=SPLen-(PacketHeader+PacketLength); TempCurr:=0; FillChar(TempArray,sizeof(TempArray),#0); IsEnd:=false; end; FreeMem(Temp); end else begin strmove(TempArray,SorucePacket,SPLen); TempCurr:=SPLen; IsEnd:=true; end; end; //恢复数据 SocketData.DataLen:=TempCurr; Fillchar(SocketData.DataBuf,sizeof(SocketData.DataBuf),#0); strmove(SocketData.DataBuf,TempArray,TempCurr); except Result:=false; end; end;
上面的函数就是对TCP协议中粘包的处理DLEPHI代码,对于UDP数据来说是不存在粘包现象的。 我写的IOCP的代码已经在我编写的网络游戏中使用,运行稳定。 下次我会讲使用IOCP发送数据的方法。 同时祝大家新年快乐!
在我以前写的文章中,一直说的是如何接收数据。但是对于如何发送数据却一点也没有提到。因为从代码量上来说接收的代码要比发送多很多。今天我就来写一下如何使用IOCP发送数据。 function TNetControl.SendSpecifyData(const Socket: TSocket; Data: array of char; DataLen: Integer): Boolean; const DATA_BUFSIZE = 1024; //这里定义一个发送数据的缓存长度,只要和接收的一直就可以 var PerIoData: LPPER_IO_OPERATION_DATA ; SendBytes, RecvBytes: DWORD; Flags: DWORD ; LenStr:String; SendBuf:array [0..DATA_BUFSIZE] of char; begin try //由于粘包的关系,所以在需要发送的数据前面加入4位这次发送数据的长度。(详见我的前一篇文章) SetArrayLength(DataLen,LenStr) ; Fillchar(SendBuf,sizeof(SendBuf),#0); strmove(SendBuf,Pointer(LenStr),4); strmove(SendBuf+4,Data,DataLen); //在这里申请一个发送数据的"单IO数据结构" PerIoData := LPPER_IO_OPERATION_DATA(GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA))); if (PerIoData = nil) then begin Result:=false; exit; end; ZeroMemory(@PerIoData.Overlapped, sizeof(OVERLAPPED)); //设置发送标记 PerIoData.BytesRECV := 0; PerIoData.DataBuf.len := DataLen+4; PerIoData.DataBuf.buf:=@SendBuf; PerIoData.BytesSEND := DataLen+4; Flags := 0; //使用WSASend函数将数据发送 if (WSASend(Socket, @(PerIoData.DataBuf), 1, @SendBytes, 0,@(PerIoData.Overlapped), nil) = SOCKET_ERROR) then begin if (WSAGetLastError() <> ERROR_IO_PENDING) then begin //最近在检查代码的时候发现以前这里只是使用Exit来退出是不正确的。这里需要删除申请的单IO数据结构,否子会出现内存泄露。 (2008年3月24日) //Exit; //表示发送失败,以后也不会有处理在工作者线程处出现。 if PerIoData <> nil then begin GlobalFree(DWORD(PerIoData)); end; Result:=false; Exit; end; end; Result:=true; except Result:=false; end; end; 使用IOCP发送数据的代码就这些,但是这里需要说明一些问题。 1:读者一定发送我们在申请了“单IO数据结构”以后并没有对它进行释放。这是因为我们使用的是异步函数WSASend来进行发送数据,只有当我们确定将数据发送出去以后才可以将我们申请的这个结构释放。这就引出了第二个问题。 2:如何判断我们发送的数据已经发送。向我以前的文章中所说的“IOCP可以接受来自客户端的数据和自己发送出去的数据”,而区分这个数据是来自客户端还是自己发送出去的区分就是使用PerIoData.BytesRECV 和PerIoData.BytesSEND 如果PerIoData.BytesSEND >0则表示这个数据是自己发送出去的。现在咱们来回顾一下以前的代码,找出释放“单IO数据结构”的地方。 在第二篇文章我写了这样的代码。 //当我们判断出来接受的数据是我们发送出去的数据的时候,在这里我们清空我们申请的内存空间 else begin GlobalFree(DWORD(PerIoData)); end; 这里就是我们释放“单IO数据结构”的地方。 到此我已经将整个的IOCP从创建、初始化、接收和发送简单的描述了一下。如果读者根据我写的思路或者代码就可以编写出以后稳定的基于IOCP的网络程序。
|
请发表评论