在开发大量Socket并发服务器,完成端口加重叠I/O是迄今为止最好的一种解决方案,下面是简单的介绍: “完成端口”模型是迄今为止最为复杂的一种I/O模型,特别适合需要同时管理为数众多的套接字,采用这种模型,往往可以达到最佳的系统性能。但是只适合Windows NT和Windows 2000及以上操作系统。因其设计的复杂性,只有在你的应用程序需要同时管理数百乃至上千套接字的时候,而且希望随着系统内安装的CPU数量增多,应用程序的性能也可以线性提升,才考虑采用“完成端口”模型。 重叠I/O(Overlapped I/O)模型使应用程序达到更佳的系统性能。重叠模型的基本设计原理便是让应用程序使用一个重叠的数据结构,一次投递一个或多个Winsock I/O请求。针对哪些提交的请求,在它们完成之后,应用程序可为它们提供服务。该模型适用于除Windows CE之外的各种Windows平台。 开发完成端口最具有挑战是线程个数和管理内存,创建一个完成端口后,就需要创建一个或多个“工作者线程”,以便在I/O请求投递给完成端口对象后,为完成端口提供服务。但是到底应创建多少个线程,这实际正是完成端口最为复杂的一个方面,一般采用的是为每一个CPU分配一个线程(有的是CPU个数加1,有的是CPU*2的线程个数)。内存分配效率低是因为应用程序在分配内存的时候,系统内核需要不停的Lock/UnLock,而且在多CPU的情况下,会成为整个程序性能的瓶颈,不能随CPU的个数增加而性能提高,一种比较好的做法一个一次分配多块内存。 下面是我写一个的完成端口的演示程序,在我的电脑上测试可以达到链接5100个客服端,服务器性能还很好,由于我写的客服端占用资源比较的,最后直接重启了,具体见代码。演示程序主要的瓶颈在于发消息的这一块,在实际应用中应去掉。
(配置:操作系统 Microsoft Windows XP Professional 操作系统 Service Pack 版本 Service Pack 2;CPU:Intel(R) Pentium(R)4 2.40GHz 2.40GHz;内存:2G;主板:华硕P4P800)。
主要源代码:(Delphi 7编写),下载地址:http://download.csdn.net/source/818039
{*******************************************************} { } { 高性能服务器,这个是一个演示DEMO } { } { 联系邮箱:[email protected] } { } {*******************************************************}
unit IOCPSvr;
interface
uses Windows, Messages, WinSock2, Classes, SysUtils, SyncObjs;
const {* 每一次发送和接收的数据缓冲池大小 *} MAX_BUFSIZE = 4096; {* 关闭客户端通知消息 *} WM_CLIENTSOCKET = WM_USER + $2000;
type {* Windows Socket 消息 *} TCMSocketMessage = packed record Msg: Cardinal; Socket: TSocket; SelectEvent: Word; SelectError: Word; Result: Longint; end;
{* IOCP服务器运行轨迹 *} TSocketEvent = (seInitIOPort, seUninitIOPort, seInitThread, seUninitThread, seInitSocket, seUninitSocket, seConnect, seDisconnect, seListen, seAccept, seWrite, seRead); const CSSocketEvent: array[TSocketEvent] of string = ('InitIOPort', 'UninitIOPort', 'InitThread', 'UninitThread', 'InitSocket', 'UninitSocket', 'Connect', 'Disconnect', 'Listen', 'Accept', 'Write', 'Read'); type {* 产生错误类型 *} TErrorEvent = (eeGeneral, eeSend, eeReceive, eeConnect, eeDisconnect, eeAccept);
{* 完成端口传递的结构体 *} TIOCPStruct = packed record Overlapped: OVERLAPPED; wsaBuffer: TWSABUF; Event: TSocketEvent; //读或写 Buffer: array [0..MAX_BUFSIZE - 1] of Char; Assigned: Boolean; //表示已经分配给某个客户端 Active: Boolean; //客服端内部使用,表示是否正在使用 end; PIOCPStruct = ^TIOCPStruct;
EMemoryBuffer = class(Exception); ESocketError = class(Exception);
TMemoryManager = class; TServerSocket = class; TSymmetricalSocket = class;
TMemoryManager = class private {* 管理内存使用 *} FList: TList; {* 分配和释放时候使用的锁 *} FLock: TCriticalSection; {* 服务器 *} FServerSocket: TServerSocket; function GetCount: Integer; function GetIOCPStruct(AIndex: Integer): PIOCPStruct; public constructor Create(AServerSocket: TServerSocket; ACount: Integer); overload; constructor Create(AServerSocket: TServerSocket); overload; destructor Destroy; override;
{* 分配内存使用权 *} function Allocate: PIOCPStruct; {* 释放内存使用权 *} procedure Release(AValue: PIOCPStruct); property Server: TServerSocket read FServerSocket; property Count: Integer read GetCount; property Item[AIndex: Integer]: PIOCPStruct read GetIOCPStruct; end;
{* 客服端链接服务器触发此事件,如果要拒绝链接,把AConnect := False *} TOnBeforeConnect = procedure(ASymmIP: string; AConnect: Boolean) of object; {* 链接完成之后触发此事件 *} TOnAfterConnect = procedure(ASymmetricalSocket: TSymmetricalSocket) of object; {* 断开连接触发事件 *} TOnAfterDisconnect = procedure(ASymmetricalSocket: TSymmetricalSocket) of object; {* 收到数据会触发此事件 *} TOnDataEvent = procedure(ASymmetricalSocket: TSymmetricalSocket; AData: Pointer; ACount: Integer) of object; {* 错误触发事件 *} TOnErrorEvent = procedure(AError: Integer; AErrorString: string; AInfo: string; var AHandleError: Boolean) of object; {* 服务器运行LOG *} TOnLog = procedure (ASocketEvent: TSocketEvent; AInfo: string) of object;
{* 服务器,负责建立完成端口,管理内存和管理客服端,及Socket消息循环 *} TServerSocket = class private {* 内存管理 *} FMemory: TMemoryManager; {* 端口 *} FPort: Integer; {* 套接字 *} FSocket: TSocket; {* 完成端口句柄 *} FIOCPHandle: THandle; {* 消息循环句柄 *} FHandle: THandle; {* 对等的客服端 *} FClients: TList; {* 服务器运行线程 *} FThreads: TList; {* 监听线程 *} FAcceptThread: TThread; {* 表示是否激活 *} FActive: Boolean; {* 锁 *} FLock: TCriticalSection; {* 错误触发事件 *} FOnError: TOnErrorEvent; {* 书写LOG *} FOnLog: TOnLog; {* 接收连接事件 *} FOnBeforeConnect: TOnBeforeConnect; {* 连接成功之后的事件 *} FOnAfterConnect: TOnAfterConnect; {* 断开连接事件 *} FOnAfterDisconnect: TOnAfterDisconnect; {* 接收数据 *} FOnRead: TOnDataEvent;
procedure WndProc(var AMsg: TMessage); {* 激活 *} procedure Open; {* 关闭 *} procedure Close; {* 设置激活/关闭 *} procedure SetActive(AValue: Boolean); {* 触发错误 *} function CheckError(AErrorCode: Integer = -1; AInfo: string = ''): Boolean; {* 触发LOG *} procedure DoLog(ASocketEvent: TSocketEvent; AInfo: string = ''); {* 设置端口 *} procedure SetPort(AValue: Integer); {* 注册一个客服端,由于在另外一个线程中调用,需要加锁 *} procedure RegisterClient(ASocket: TSymmetricalSocket); {* 反注册一个客服端,由于在另外一个线程中调用,需要加锁 *} procedure UnRegisterClient(ASocket: TSymmetricalSocket); {* 通过Socket句柄查找对等的TSymmetricalSocket *} function FindSymmClient(ASocket: TSocket): TSymmetricalSocket; {* 客服端关闭消息 *} procedure WMClientClose(var AMsg: TCMSocketMessage); message WM_CLIENTSOCKET; {* 连接时触发的事件 *} function DoConnect(ASocket: TSocket): Boolean; {* 连接完成之后触发事件 *} procedure DoAfterConnect(ASymSocket: TSymmetricalSocket); {* 连接断开触发事件 *} procedure DoDisConnect(ASymSocket: TSymmetricalSocket); {* 接收数据触发的事件 *} procedure DoRead(ASymmetricalSocket: TSymmetricalSocket; AData: Pointer; ACount: Integer); {* 获得客服端个数 *} function GetClientCount: Integer; function GetClient(const AIndex: Integer): TSymmetricalSocket; public constructor Create; destructor Destroy; override; {* 接收一个客服端,被接收线程调用 *} procedure AcceptClient;
property Port: Integer read FPort write SetPort; property Socket: TSocket read FSocket; property Handle: THandle read FHandle; property Active: Boolean read FActive write SetActive; property MemoryManager: TMemoryManager read FMemory; {* 事件 *} property OnError: TOnErrorEvent read FOnError write FOnError; property OnLog: TOnLog read FOnLog write FOnLog; property OnRead: TOnDataEvent read FOnRead write FOnRead; property OnBeforeConnect: TOnBeforeConnect read FOnBeforeConnect write FOnBeforeConnect; property OnAfterConnect: TOnAfterConnect read FOnAfterConnect write FOnAfterConnect; property OnAfterDisConnect: TOnAfterDisconnect read FOnAfterDisconnect write FOnAfterDisconnect; property ClientCount: Integer read GetClientCount; property Client[const AIndex: Integer]: TSymmetricalSocket read GetClient; end;
{* 接收数据、发送数据及管理分配的内存 *} TSymmetricalSocket = class private FSocket: TSocket; FServer: TServerSocket; FAssignMemory: TList; FRemoteAddress, FRemoteHost: string; FRemotePort: Integer;
{* 准备接收数据 *} function PrepareRecv(AIOCPStruct: PIOCPStruct = nil): Boolean; {* 获得完成端口内存块使用权 *} function Allocate: PIOCPStruct; {* 处理接收的数据 *} function WorkBlock(AIOCPStruct: PIOCPStruct; ACount: DWORD): Integer; {* 获得地方IP *} function GetRemoteIP: string; {* 获得远程机器名 *} function GetRemoteHost: string; {* 获得远程端口 *} function GetRemotePort: Integer; public constructor Create(ASvrSocket: TServerSocket; ASocket: TSocket); destructor Destroy; override; {* 发送数据 *} function Write(var ABuf; ACount: Integer): Integer; function WriteString(const AValue: string): Integer;
property Socket: TSocket read FSocket; property RemoteAddress: string read GetRemoteIP; property RemoteHost: string read GetRemoteHost; property RemotePort: Integer read GetRemotePort; end;
TSocketThread = class(TThread) private FServer: TServerSocket; public constructor Create(AServer: TServerSocket); end;
TAcceptThread = class(TSocketThread) protected procedure Execute; override; end;
TWorkThread = class(TSocketThread) protected procedure Execute; override; end;
implementation
uses RTLConsts;
const SHUTDOWN_FLAG = $FFFFFFFF;
{ TMemoryManager }
constructor TMemoryManager.Create(AServerSocket: TServerSocket; ACount: Integer); var i: Integer; pIOCPData: PIOCPStruct; begin inherited Create; FList := TList.Create; FLock := TCriticalSection.Create; for i := 1 to ACount do begin New(pIOCPData); FillChar(pIOCPData^, SizeOf(PIOCPStruct), 0); {* 下面两句其实由FillChar已经完成,在这写,只是为了强调 *} pIOCPData.Assigned := False; pIOCPData.Active := False; FList.Add(pIOCPData); end; end;
function TMemoryManager.Allocate: PIOCPStruct; var i: Integer; begin FLock.Enter; try Result := nil; for i := 0 to FList.Count - 1 do begin Result := FList[i]; if not Result.Assigned then Break; end; if (not Assigned(Result)) or (Result.Assigned) then begin New(Result); FList.Add(Result); end; FillChar(Result^, SizeOf(TIOCPStruct), 0); Result.Assigned := True; Result.Active := False; finally FLock.Leave; end; end;
constructor TMemoryManager.Create(AServerSocket: TServerSocket); begin Create(AServerSocket, 200); end;
destructor TMemoryManager.Destroy; var i: Integer; begin for i := 0 to FList.Count - 1 do FreeMem(FList[i]); FList.Clear; FList.Free; FLock.Free; inherited; end;
function TMemoryManager.GetCount: Integer; begin Result := FList.Count; end;
function TMemoryManager.GetIOCPStruct(AIndex: Integer): PIOCPStruct; begin Result := nil; if (AIndex >= FList.Count) or (AIndex < 0) then EMemoryBuffer.CreateFmt(SListIndexError, [AIndex]) else Result := FList[AIndex]; end;
procedure TMemoryManager.Release(AValue: PIOCPStruct); begin FLock.Enter; try AValue.Assigned := False; AValue.Active := False; finally FLock.Leave; end; end;
{ TServerSocket }
constructor TServerSocket.Create; begin FMemory := TMemoryManager.Create(Self); FClients := TList.Create; FThreads := TList.Create; FSocket := INVALID_SOCKET; FLock := TCriticalSection.Create;
FPort := 6666; FAcceptThread := nil; FIOCPHandle := 0; FHandle := AllocateHWnd(WndProc); end;
destructor TServerSocket.Destroy; begin //关闭完成端口 SetActive(False); FThreads.Free; FClients.Free; DeallocateHWnd(FHandle); FMemory.Free; FLock.Free; inherited; end;
procedure TServerSocket.Open; var SystemInfo: TSystemInfo; i: Integer; Thread: TThread; Addr: TSockAddr; WSData: TWSAData; begin try if WSAStartup($0202, WSData) <> 0 then begin raise ESocketError.Create('WSAStartup'); end; DoLog(seInitIOPort); //初始化完成端口 FIOCPHandle := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); if FIOCPHandle = 0 then CheckError;
DoLog(seInitThread); //初始化工作线程 GetSystemInfo(SystemInfo); for i := 0 to SystemInfo.dwNumberOfProcessors * 2 -1 do begin Thread := TWorkThread.Create(Self); FThreads.Add(Thread); end;
DoLog(seInitSocket); //建立套接字 FSocket := WSASocket(PF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED); if FSocket = INVALID_SOCKET then CheckError;
FillChar(Addr, SizeOf(TSockAddr), 0); Addr.sin_family := AF_INET; Addr.sin_port := htons(FPort); Addr.sin_addr.S_addr := htonl(INADDR_ANY); CheckError(bind(FSocket, @Addr, SizeOf(TSockAddr)), 'bind');
DoLog(seListen); //开始监听 CheckError(listen(FSocket, 5), 'listen'); FAcceptThread := TAcceptThread.Create(Self); except on E: Exception do begin Close; CheckError(GetLastError, E.Message); end; end; end;
procedure TServerSocket.Close; var i: Integer; Thread: TThread; begin try WSACleanup; DoLog(seUninitSocket); FAcceptThread.Terminate; if FSocket <> INVALID_SOCKET then begin closesocket(FSocket); FSocket := INVALID_SOCKET; end;
DoLog(seUninitThread); for i := FThreads.Count - 1 downto 0 do begin Thread := FThreads[i]; Thread.Terminate; PostQueuedCompletionStatus(FIOCPHandle, 0, 0, Pointer(SHUTDOWN_FLAG)) end; FThreads.Clear;
for i := FClients.Count - 1 downto 0 do begin TSymmetricalSocket(FClients[i]).Free; end; FClients.Clear;
DoLog(seUninitIOPort); CloseHandle(FIOCPHandle); FIOCPHandle := 0; except on E: Exception do begin Close; CheckError(-1, E.Message); end; end; end;
procedure TServerSocket.SetActive(AValue: Boolean); begin if FActive = AValue then Exit; FActive := AValue; if FActive then Open else Close; end;
procedure TServerSocket.WndProc(var AMsg: TMessage); begin try Dispatch(AMsg); except if Assigned(ApplicationHandleException) then ApplicationHandleException(Self); end; end;
function TServerSocket.CheckError(AErrorCode: Integer; AInfo: string): Boolean; var HandleError: Boolean; begin Result := True; if AErrorCode = 0 then Exit; if AErrorCode = -1 then AErrorCode := WSAGetLastError; if AErrorCode = -1 then AErrorCode := GetLastError; if (AErrorCode <> WSAEWOULDBLOCK) and (AErrorCode <> ERROR_IO_PENDING) and (AErrorCode <> 0) then begin if Assigned(FOnError) then begin HandleError := False; FOnError(AErrorCode, SysErrorMessage(AErrorCode), AInfo, HandleError); if HandleError then Exit; end; raise ESocketError.CreateFmt(SWindowsSocketError, [SysErrorMessage(AErrorCode), AErrorCode, AInfo]); end; end;
procedure TServerSocket.DoLog(ASocketEvent: TSocketEvent; AInfo: string); begin if Assigned(FOnLog) then FOnLog(ASocketEvent, AInfo); end;
procedure TServerSocket.DoRead(ASymmetricalSocket: TSymmetricalSocket; AData: Pointer; ACount: Integer); begin if Assigned(FOnRead) then FOnRead(ASymmetricalSocket, AData, ACount); end;
procedure TServerSocket.SetPort(AValue: Integer); begin if FActive then raise ESocketError.Create('IOCP is acitve, cann''t change port'); FPort := AValue; end;
procedure TServerSocket.RegisterClient(ASocket: TSymmetricalSocket); begin FLock.Enter; try if FClients.IndexOf(ASocket) = -1 then begin FClients.Add(ASocket); DoAfterConnect(ASocket); {* 注册关闭通知消息 *} WSAAsyncSelect(ASocket.Socket, FHandle, WM_CLIENTSOCKET, FD_CLOSE); end; finally FLock.Leave; end; end;
procedure TServerSocket.UnRegisterClient(ASocket: TSymmetricalSocket); var iIndex: Integer; begin FLock.Enter; try iIndex := FClients.IndexOf(ASocket); if iIndex <> -1 then begin FClients.Delete(iIndex); DoDisConnect(ASocket); end; finally FLock.Leave; end; end;
procedure TServerSocket.AcceptClient; var Addr: TSockAddrIn; iAddrLen: Integer; ClientWinSocket: TSocket; SymmSocket: TSymmetricalSocket; begin iAddrLen := SizeOf(TSockAddrIn); ClientWinSocket := WinSock2.WSAAccept(Socket, nil, nil, nil, 0); if ClientWinSocket <> INVALID_SOCKET then begin if (not Active) or (not DoConnect(ClientWinSocket)) then begin closesocket(ClientWinSocket); Exit; end; try DoLog(seAccept); SymmSocket := TSymmetricalSocket.Create(Self, ClientWinSocket); DoLog(seConnect); except closesocket(ClientWinSocket); CheckError; Exit; end; if CreateIoCompletionPort(ClientWinSocket, FIOCPHandle, DWORD(SymmSocket), 0) = 0 then begin CheckError(GetLastError, 'CreateIoCompletionPort'); SymmSocket.Free; end else SymmSocket.PrepareRecv; end; end;
procedure TServerSocket.DoAfterConnect(ASymSocket: TSymmetricalSocket); begin if Assigned(FOnAfterConnect) then FOnAfterConnect(ASymSocket); end;
function TServerSocket.DoConnect(ASocket: TSocket): Boolean; var SockAddrIn: TSockAddrIn; Size: Integer; begin Result := True; if Assigned(FOnBeforeConnect) then begin Size := SizeOf(TSockAddrIn); CheckError(getpeername(ASocket, SockAddrIn, Size), 'getpeername'); FOnBeforeConnect(inet_ntoa(SockAddrIn.sin_addr), Result); end; end;
procedure TServerSocket.DoDisConnect(ASymSocket: TSymmetricalSocket); begin if Assigned(FOnAfterDisconnect) then FOnAfterDisconnect(ASymSocket); end;
function TServerSocket.FindSymmClient( ASocket: TSocket): TSymmetricalSocket; var i: Integer; begin Result := nil; FLock.Enter; try for i := 0 to FClients.Count - 1 do begin Result := FClients[i]; if ASocket = Result.Socket then Break else Result := nil; end; finally FLock.Leave; end; end;
function TServerSocket.GetClient(const AIndex: Integer): TSymmetricalSocket; begin Result := FClients[AIndex]; end;
function TServerSocket.GetClientCount: Integer; begin Result := FClients.Count; end;
procedure TServerSocket.WMClientClose(var AMsg: TCMSocketMessage); var ASymmSocket: TSymmetricalSocket; begin if AMsg.SelectEvent = FD_CLOSE then begin ASymmSocket := FindSymmClient(AMsg.Socket); if Assigned(ASymmSocket) then ASymmSocket.Free; end; end;
{ TSocketThread }
constructor TSocketThread.Create(AServer: TServerSocket); begin FServer := AServer; inherited Create(False); FreeOnTerminate := True; end;
{ TAcceptThread }
procedure TAcceptThread.Execute; begin inherited; while not Terminated and FServer.Active do begin FServer.AcceptClient; end; end;
{ TWorkThread }
procedure TWorkThread.Execute; var ASymSocket: TSymmetricalSocket; AIOCPStruct: PIOCPStruct; iWorkCount: Cardinal; begin inherited; while (not Terminated) and (FServer.Active) do begin AIOCPStruct := nil; iWorkCount := 0; ASymSocket := nil; if not GetQueuedCompletionStatus(FServer.FIOCPHandle, iWorkCount, DWORD(ASymSocket), POVerlapped(AIOCPStruct), INFINITE) then begin if Assigned(ASymSocket) then FreeAndNil(ASymSocket); Continue; end;
if Cardinal(AIOCPStruct) = SHUTDOWN_FLAG then Break; //退出标志 if not FServer.Active then Break; //退出
{* 客户可能超时 或是断开连接,I/O失败 应放在通知结束的后面 *} if iWorkCount = 0 then begin //FreeAndNil(ASymSocket); //不在这儿释放,而是接收释放消息来释放 Continue; end; FServer.DoLog(AIOCPStruct.Event); if ASymSocket.WorkBlock(AIOCPStruct, iWorkCount) = -1 then begin FreeAndNil(ASymSocket); end; end; end;
{ TSymmetricalSocket }
constructor TSymmetricalSocket.Create(ASvrSocket: TServerSocket; ASocket: TSocket); begin FServer := ASvrSocket; FSocket := ASocket; FAssignMemory := TList.Create; FServer.RegisterClient(Self); //PrepareRecv; end;
destructor TSymmetricalSocket.Destroy; var i: Integer; Linger: TLinger; begin FServer.UnRegisterClient(Self); FillChar(Linger, SizeOf(TLinger), 0); //优雅关闭 setsockopt(FSocket, SOL_SOCKET, SO_LINGER, @Linger, Sizeof(Linger)); closesocket(FSocket); for i := FAssignMemory.Count - 1 downto 0 do FServer.MemoryManager.Release(FAssignMemory[i]); FAssignMemory.Free; inherited; end;
function TSymmetricalSocket.Allocate: PIOCPStruct; var i: Integer; begin for i := 0 to FAssignMemory.Count - 1 do begin Result := FAssignMemory[i]; if not Result.Active then begin Result.Active := True; Exit; end; end; Result := FServer.MemoryManager.Allocate; FAssignMemory.Add(Result); Result.Active := True; end;
function TSymmetricalSocket.PrepareRecv(AIOCPStruct: PIOCPStruct = nil): Boolean; var iFlags, iTransfer: Cardinal; ErrCode: Integer; begin if not Assigned(AIOCPStruct) then AIOCPStruct := Allocate; iFlags := 0; AIOCPStruct.Event := seRead; FillChar(AIOCPStruct.Buffer, SizeOf(AIOCPStruct.Buffer), 0); FillChar(AIOCPStruct.Overlapped, SizeOf(AIOCPStruct.Overlapped), 0); AIOCPStruct.wsaBuffer.buf := @AIOCPStruct.Buffer; AIOCPStruct.wsaBuffer.len := MAX_BUFSIZE; Result := WSARecv(FSocket, @AIOCPStruct.wsaBuffer, 1, @iTransfer, @iFlags, @AIOCPStruct.Overlapped, nil) <> SOCKET_ERROR; if not Result then begin ErrCode := WSAGetLastError; Result := ErrCode = ERROR_IO_PENDING; if not Result then FServer.CheckError(ErrCode, 'WSARecv'); end; end;
function TSymmetricalSocket.WorkBlock(AIOCPStruct: PIOCPStruct; ACount: DWORD): Integer; var ErrCode: Integer; iSend, iFlag: Cardinal; begin Result := 0; try case AIOCPStruct.Event of seRead: //接收数据 begin FServer.DoRead(Self, @AIOCPStruct.Buffer[0], ACount); if PrepareRecv(AIOCPStruct) then Result := ACount; end; seWrite: //发送数据 begin Dec(AIOCPStruct.wsaBuffer.len, ACount); if AIOCPStruct.wsaBuffer.len <= 0 then begin AIOCPStruct.Active := False; end else begin FillChar(AIOCPStruct.Overlapped, SizeOf(AIOCPStruct.Overlapped), 0); iFlag := 0; if SOCKET_ERROR = WSASend(FSocket, @AIOCPStruct.wsaBuffer, 1, @iSend, iFlag, @AIOCPStruct.Overlapped, nil) then begin ErrCode := WSAGetLastError; if ErrCode <> ERROR_IO_PENDING then FServer.CheckError(ErrCode, 'WSASend'); end else Result := iSend; end; end; end; except Result := 0; end; end;
function TSymmetricalSocket.Write(var ABuf; ACount: Integer): Integer; var AIOCPStruct: PIOCPStruct; ErrCode: Integer; iFlag, iSend: Cardinal; begin Result := ACount; if Result = 0 then Exit; AIOCPStruct := Allocate; iFlag := 0; AIOCPStruct.Event := seWrite; FillChar(AIOCPStruct.Buffer[0], SizeOf(AIOCPStruct.Buffer), 0); CopyMemory(@AIOCPStruct.Buffer[0], @ABuf, ACount); AIOCPStruct.wsaBuffer.buf := @AIOCPStruct.Buffer[0]; AIOCPStruct.wsaBuffer.len := Result;
if SOCKET_ERROR = WSASend(FSocket, @AIOCPStruct.wsaBuffer, 1, @iSend, iFlag, @AIOCPStruct.Overlapped, nil) then begin ErrCode := WSAGetLastError; if ErrCode <> ERROR_IO_PENDING then begin Result := SOCKET_ERROR; FServer.CheckError(ErrCode, 'WSASend'); end; end; end;
function TSymmetricalSocket.WriteString(const AValue: string): Integer; begin Result := Write(Pointer(AValue)^, Length(AValue)); end;
function TSymmetricalSocket.GetRemoteIP: string; var SockAddrIn: TSockAddrIn; iSize: Integer; HostEnt: PHostEnt; begin if FRemoteAddress = '' then begin iSize := SizeOf(SockAddrIn); FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername'); FRemoteAddress := inet_ntoa(SockAddrIn.sin_addr); end; Result := FRemoteAddress; end;
function TSymmetricalSocket.GetRemotePort: Integer; var SockAddrIn: TSockAddrIn; iSize: Integer; HostEnt: PHostEnt; begin if FRemoteAddress = '' then begin iSize := SizeOf(SockAddrIn); FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername'); FRemotePort := ntohs(SockAddrIn.sin_port); end; Result := FRemotePort; end;
function TSymmetricalSocket.GetRemoteHost: string; var SockAddrIn: TSockAddrIn; iSize: Integer; HostEnt: PHostEnt; begin if FRemoteAddress = '' then begin iSize := SizeOf(SockAddrIn); FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername'); HostEnt := gethostbyaddr(@SockAddrIn.sin_addr.S_addr, 4, PF_INET); if HostEnt <> nil then FRemoteHost := HostEnt.h_name; end; Result := FRemoteHost end;
end.
http://blog.sina.com.cn/s/blog_562349090100zufs.html
|
请发表评论