在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
模块相应代码命名空间 (namespace ZPNetwork) 模块相应代码存储目录 (\ZoomPipeline_FuncSvr\network) 2.1 模块结构
重载了ZP_TcpServer::incomingConnection,不在监听线程中进行Accept操作,而是直接发出evt_NewClientArrived信号,把套接字描写叙述符(socketdescriptor)泵出。由zp_net_Engine类进行负荷均衡,选取当前负荷最小的传输线程(zp_netTransThread)接受该接入申请。
2.2 系统原理为了提供基于线程池的TCP服务。zp_net_engine类有几个重要成员。以下,依照一次client发起连接的过程,逆向的逐一来介绍这些类的合作原理. 2.2.1 监听器与监听线程1、监听器ZP_TcpServer 2、监听器线程对象zp_netListenThread m_tcpServer是一个指向ZP_TcpServe类实例的指针(參见zp_netlistenthread.h )。 该实例在zp_netListenThread::startListen()中创建。StartListen是一个关键的函数。创建了ZP_TcpServer对象。核心代码例如以下: m_tcpServer = new ZP_TcpServer(this); connect (m_tcpServer,&ZP_TcpServer::evt_NewClientArrived,this,&zp_netListenThread::evt_NewClientArrived,Qt::QueuedConnection);上面两行代码中,第一行创建一个监听服务。第二行。把监听服务的evt_NewClientArrived事件直接和zp_netListenThread 的 同名事件连接起来。
一个进程能够拥有若干监听port,这些监听port相应了不同的zp_netListenThread对象。这些监听线程对象由zp_net_Engine类管理,存储在这个类的成员变量中。以下两个成员变量 //This map stores listenThreadObjects QMap<QString,zp_netListenThread *> m_map_netListenThreads; //Internal Threads to hold each listenThreadObjects' message Queue QMap<QString,QThread *> m_map_netInternalListenThreads;第一个存储了各个port的线程对象,第二个存储了各个port的线程。
这里须要注意的是,因为Qt的信号与槽系统是一种广播系统。意味着一个zp_net_Engine类管理多个zp_netListenThread对象时。zp_net_Engine发出的信号会被全部zp_netListenThread对象接收。因此,信号与槽中含有一个唯一标示,用于指示本次信号触发是为了操作详细哪个对象。这样的技术在类似的场合被多次使用。 void zp_net_Engine::AddListeningAddress(QString id,const QHostAddress & address , quint16 nPort,bool bSSLConn /*= true*/) { if (m_map_netListenThreads.find(id)==m_map_netListenThreads.end()) { //Start Thread QThread * pThread = new QThread(this); zp_netListenThread * pListenObj = new zp_netListenThread(id,address,nPort,bSSLConn); pThread->start(); //m_mutex_listen.lock(); m_map_netInternalListenThreads[id] = pThread; m_map_netListenThreads[id] = pListenObj; //m_mutex_listen.unlock(); //Bind Object to New thread connect(this,&zp_net_Engine::startListen,pListenObj,&zp_netListenThread::startListen,Qt::QueuedConnection); connect(this,&zp_net_Engine::stopListen,pListenObj,&zp_netListenThread::stopListen,Qt::QueuedConnection); connect(pListenObj,&zp_netListenThread::evt_Message,this,&zp_net_Engine::evt_Message,Qt::QueuedConnection); connect(pListenObj,&zp_netListenThread::evt_ListenClosed,this,&zp_net_Engine::on_ListenClosed,Qt::QueuedConnection); connect(pListenObj,&zp_netListenThread::evt_NewClientArrived,this,&zp_net_Engine::on_New_Arrived_Client,Qt::QueuedConnection); pListenObj->moveToThread(pThread); //Start Listen Immediately emit startListen(id); } else emit evt_Message(this,"Warning>"+QString(tr("This ID has been used."))); } 2.2.2 接受连接过程client发起接入请求后,首先触发了ZP_TcpServer的incomingConnection方法。 在以下这种方法中,套接字的描写叙述符作为事件的參数被泵出。 void ZP_TcpServer::incomingConnection(qintptr socketDescriptor) { emit evt_NewClientArrived(socketDescriptor); }上面的信号相应的槽为zp_net_Engine::on_New_Arrived_Client槽函数。在这个函数中,网络模块首先从当前可用的传输线程中确定最空暇的那个线程,而后把套接字描写叙述符转交给传输线程。这个部分的核心代码: void zp_net_Engine::on_New_Arrived_Client(qintptr socketDescriptor) { zp_netListenThread * pSource = qobject_cast<zp_netListenThread *>(sender()); if (!pSource) { emit evt_Message(this,"Warning>"+QString(tr("Non-zp_netListenThread type detected."))); return; } emit evt_Message(this,"Info>" + QString(tr("Incomming client arriverd."))); int nsz = m_vec_NetTransThreads.size(); int nMinPay = 0x7fffffff; int nMinIdx = -1; for (int i=0;i<nsz && nMinPay!=0;i++) { if (m_vec_NetTransThreads[i]->isActive()==false || m_vec_NetTransThreads[i]->SSLConnection()!=pSource->bSSLConn() ) continue; int nPat = m_vec_NetTransThreads[i]->CurrentClients(); if (nPat<nMinPay) { nMinPay = nPat; nMinIdx = i; } //qDebug()<<i<<" "<<nPat<<" "<<nMinIdx; } //... if (nMinIdx>=0 && nMinIdx<nsz) emit evt_EstablishConnection(m_vec_NetTransThreads[nMinIdx],socketDescriptor); else { emit evt_Message(this,"Warning>"+QString(tr("Need Trans Thread Object for clients."))); } }上面的代码中, evt_EstablishConnection 事件携带了由均衡策略确定的承接线程、socketDescriptor 描写叙述符。 这个事件广播给全部的传输线程对象。在各个对象的incomingConnection槽中,详细生成用于传输的套接字对象.注意, 这个槽函数是执行在各个传输线程的事件循环中的,因此,创建的套接字直接属于特定线程. /** * @brief This slot dealing with multi-thread client socket accept. * accepy works start from zp_netListenThread::m_tcpserver, end with this method. * the socketDescriptor is delivered from zp_netListenThread(a Listening thread) * to zp_net_Engine(Normally in main-gui thread), and then zp_netTransThread. * * @param threadid if threadid is not equal to this object, this message is just omitted. * @param socketDescriptor socketDescriptor for incomming client. */ void zp_netTransThread::incomingConnection(QObject * threadid,qintptr socketDescriptor) { if (threadid!=this) return; QTcpSocket * sock_client = 0; if (m_bSSLConnection) sock_client = new QSslSocket(this); else sock_client = new QTcpSocket(this); if (sock_client) { //Initial content if (true ==sock_client->setSocketDescriptor(socketDescriptor)) { connect(sock_client, &QTcpSocket::readyRead,this, &zp_netTransThread::new_data_recieved,Qt::QueuedConnection); connect(sock_client, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed,Qt::QueuedConnection); connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)),Qt::QueuedConnection); connect(sock_client, &QTcpSocket::bytesWritten, this, &zp_netTransThread::some_data_sended,Qt::QueuedConnection); m_mutex_protect.lock(); m_clientList[sock_client] = 0; m_mutex_protect.unlock(); if (m_bSSLConnection) { QSslSocket * psslsock = qobject_cast<QSslSocket *>(sock_client); assert(psslsock!=NULL); QString strCerPath = QCoreApplication::applicationDirPath() + "/svr_cert.pem"; QString strPkPath = QCoreApplication::applicationDirPath() + "/svr_privkey.pem"; psslsock->setLocalCertificate(strCerPath); psslsock->setPrivateKey(strPkPath); connect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted,Qt::QueuedConnection); psslsock->startServerEncryption(); } emit evt_NewClientConnected(sock_client); emit evt_Message(sock_client,"Info>" + QString(tr("Client Accepted."))); } else sock_client->deleteLater(); } } 2.2.3 数据接收在成功创建了套接字后, 数据的收发都在传输线程中执行了.当套接字收到数据后,简单的触发事件 evt_Data_recieved void zp_netTransThread::new_data_recieved() { QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender()); if (pSock) { QByteArray array = pSock->readAll(); int sz = array.size(); g_mutex_sta.lock(); g_bytesRecieved +=sz; g_secRecieved += sz; g_mutex_sta.unlock(); emit evt_Data_recieved(pSock,array); } } 2.2.4数据发送虽然Qt的套接字本身具备缓存,塞入多大的数据都会成功, 可是本实现仍旧使用额外的队列, 每次缓存一个固定长度的片段并顺序发送. 这种优点,是能够给代码使用者一个机会,来增加代码检查缓冲区的大小,并作一些持久化的工作. 比方,队列超过100MB后,就把兴许的数据缓存在磁盘上, 而不是继续放在内存中, 实现这个策略的变量是两个缓存. //sending buffer, hold byteArraies. QMap<QObject *,QList<QByteArray> > m_buffer_sending; QMap<QObject *,QList<qint64> > m_buffer_sending_offset; 第一个缓存存储各个套接字的队列.还有一个存储各个数据块的发送偏移. 这样做是有性能缺陷的, 更好的办法是从 QTcpSocket 派生自己的类,并把各个套接字的缓存直接存储在派生类实例中去. 在本实现中, 直接使用了 QTcpSocket和QSSLSocket类, 因而有一定的性能损失. 一个槽方法 SendDataToClient 负责接受发送数据的请求. void zp_netTransThread::SendDataToClient(QObject * objClient,QByteArray dtarray) { m_mutex_protect.lock(); if (m_clientList.find(objClient)==m_clientList.end()) { m_mutex_protect.unlock(); return; } m_mutex_protect.unlock(); QTcpSocket * pSock = qobject_cast<QTcpSocket*>(objClient); if (pSock&&dtarray.size()) { QList<QByteArray> & list_sock_data = m_buffer_sending[pSock]; QList<qint64> & list_offset = m_buffer_sending_offset[pSock]; if (list_sock_data.empty()==true) { qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad)); if (bytesWritten < dtarray.size()) { list_sock_data.push_back(dtarray); list_offset.push_back(bytesWritten); } } else { list_sock_data.push_back(dtarray); list_offset.push_back(0); } } } 在上面的函数中,将检查队列是否为空.为空的话,将触发 QTcpSocket::write方法发出m_nPayload大小的数据块.当这些数据块发送完成,将触发QTcpSocket::bytesWritten事件,由以下的槽响应. /** * @brief this slot will be called when internal socket successfully * sent some data. in this method, zp_netTransThread object will check * the sending-queue, and send more data to buffer. * * @param wsended */ void zp_netTransThread::some_data_sended(qint64 wsended) { g_mutex_sta.lock(); g_bytesSent +=wsended; g_secSent += wsended; g_mutex_sta.unlock(); QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender()); if (pSock) { emit evt_Data_transferred(pSock,wsended); QList<QByteArray> & list_sock_data = m_buffer_sending[pSock]; QList<qint64> & list_offset = m_buffer_sending_offset[pSock]; while (list_sock_data.empty()==false) { QByteArray & arraySending = *list_sock_data.begin(); qint64 & currentOffset = *list_offset.begin(); qint64 nTotalBytes = arraySending.size(); assert(nTotalBytes>=currentOffset); qint64 nBytesWritten = pSock->write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad)); currentOffset += nBytesWritten; if (currentOffset>=nTotalBytes) { list_offset.pop_front(); list_sock_data.pop_front(); } else break; } } } 2.2.5 其它工作在传输终止后, 会进行一定的清理. 对于多线程的传输,最重要的是确保各个对象的生存期. 有兴趣的读者能够使用 sharedptr来管理动态分配的对象, 这样操作起来会非常方便. 在本范例中, 全部代码均进行了 7*24 调试. 下一章,将介绍流水线线程池的原理和实现. |
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论