嘿有StackOverflow的人!
我正在制作一个IOCP服务器,到目前为止我已经解决了大部分问题,但仍然存在,我不知道从哪里开始看。 当我在我的机器上运行客户端/服务器时,一切都很好。 它匹配的Windows SDK样本的速度可能会快一点,肯定使用较less的CPU周期。 但是,当我从一台单独的计算机运行客户端时,传输速度上限为37 KB / s,往返延迟为200毫秒(而不是0)。 现在,如果我将客户端连接到SDK示例服务器,我没有这个问题,所以我的代码有问题。 据我所知,sockets初始化完全相同的方式与相同的选项。 我也运行我的服务器在探查器检查瓶颈,但我找不到任何。 另外,我试过的计算机连接到同一个千兆位交换机(使用千兆位适配器)。 我知道这有点含糊,但是那是因为我现在还不能确定这个问题,如果你们中的任何一个人都能指出我的正确方向,我将永远感激不尽。
干杯,
-Roxy
编辑2:按照迈克的build议,我做了一些代码研究,发现当远程客户端连接到服务器的大部分时间的代码正在GetQueuedCompletionStatus等待。 这表明IO请求只是需要很长时间才能完成,但我仍然不明白为什么。 这只有当客户端在远程计算机上才会发生。 我想这与如何设置套接字或如何发布请求有关,但我没有看到与示例代码的任何区别。
有任何想法吗?
编辑(新增示例代码):
好的,在这里! 虽然不是很漂亮!
如果您安装了Windows SDK,则可以使用iocpclient示例(Program Files \ Microsoft SDKs \ Windows \ v7.1 \ Samples \ netds \ winsock \ iocp \ client)连接到它,并将其在第73行的默认端口更改为5000 。
奇怪的是,我自己尝试的时候注意到iocpclient在37KB / s的时候并没有导致相同的上限……但是看起来样例代码的限制设置为800KB / s左右。 如果有任何帮助,我会发布一个客户端。
#pragma comment(lib, "Ws2_32.lib") #include <WinSock2.h> #include <stdio.h> unsigned int connection = 0; unsigned int upload = 0; unsigned int download = 0; #define IO_CONTEXT_COUNT 5 class NetClientHost { friend class gNetProtocolHost; public: enum Operation { kOperationUnknown, kOperationRead, kOperationWrite, }; struct ClientData { SOCKET socket; }; struct IOContext { WSAOVERLAPPED overlapped; WSABUF wsaReceiveBuf; WSABUF wsaSendBuf; char *buf; char *TESTbuf; unsigned long bytesReceived; unsigned long bytesSent; unsigned long flags; unsigned int bytesToSendTotal; unsigned int remainingBytesToSend; unsigned int chunk; Operation operation; }; NetClientHost() { memset((void *) &m_clientData, 0, sizeof(m_clientData)); } NetClientHost::IOContext *NetClientHost::AcquireContext() { while (true) { for (int i = 0; i < IO_CONTEXT_COUNT; ++i) { if (!(m_ioContexts + i)->inUse) { InterlockedIncrement(&(m_ioContexts + i)->inUse); //ResetEvent(*(m_hContextEvents + i)); if ((m_ioContexts + i)->ioContext.TESTbuf == 0) Sleep(1); return &(m_ioContexts + i)->ioContext; } } //++g_blockOnPool; //WaitForMultipleObjects(IO_CONTEXT_COUNT, m_hContextEvents, FALSE, INFINITE); } } const ClientData *NetClientHost::GetClientData() const { return &m_clientData; }; void NetClientHost::Init(unsigned int bufferSize) { _InitializeIOContexts(bufferSize ? bufferSize : 1024); } void NetClientHost::ReleaseContext(IOContext *ioContext) { int i = sizeof(_IOContextData), j = sizeof(IOContext); _IOContextData *contextData = (_IOContextData *) (((char *) ioContext) - (i - j)); InterlockedDecrement(&contextData->inUse); //SetEvent(*(m_hContextEvents + contextData->index)); } struct _IOContextData { unsigned int index; volatile long inUse; IOContext ioContext; }; ClientData m_clientData; _IOContextData *m_ioContexts; HANDLE *m_hContextEvents; void _InitializeIOContexts(unsigned int bufferSize) { m_ioContexts = new _IOContextData[IO_CONTEXT_COUNT]; m_hContextEvents = new HANDLE[IO_CONTEXT_COUNT]; memset((void *) m_ioContexts, 0, sizeof(_IOContextData) * IO_CONTEXT_COUNT); for (int i = 0; i < IO_CONTEXT_COUNT; ++i) { (m_ioContexts + i)->index = i; (m_ioContexts + i)->ioContext.buf = new char[bufferSize]; (m_ioContexts + i)->ioContext.wsaReceiveBuf.len = bufferSize; (m_ioContexts + i)->ioContext.wsaReceiveBuf.buf = (m_ioContexts + i)->ioContext.buf; (m_ioContexts + i)->ioContext.TESTbuf = new char[10000]; (m_ioContexts + i)->ioContext.wsaSendBuf.buf = (m_ioContexts + i)->ioContext.TESTbuf; *(m_hContextEvents + i) = CreateEvent(0, TRUE, FALSE, 0); } } void _SetSocket(SOCKET socket) { m_clientData.socket = socket; } }; bool WriteChunk(const NetClientHost *clientHost, NetClientHost::IOContext *ioContext) { int status; status = WSASend(clientHost->GetClientData()->socket, &ioContext->wsaSendBuf, 1, &ioContext->bytesSent, ioContext->flags, &ioContext->overlapped, 0); if (status == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) { // ... return false; } return true; } bool Write(NetClientHost *clientHost, void *buffer, unsigned int size, unsigned int chunk) { //__ASSERT(m_clientHost); //__ASSERT(m_clientHost->GetClientData()->remainingBytesToSend == 0); NetClientHost::IOContext *ioContext = clientHost->AcquireContext(); if (!chunk) chunk = size; ioContext->wsaSendBuf.buf = ioContext->TESTbuf; ioContext->operation = NetClientHost::kOperationWrite; ioContext->flags = 0; ioContext->wsaSendBuf.buf = new char[size]; memcpy((void *) ioContext->wsaSendBuf.buf, buffer, chunk); ioContext->wsaSendBuf.len = chunk; ioContext->chunk = chunk; ioContext->bytesToSendTotal = size; ioContext->remainingBytesToSend = size; return WriteChunk(clientHost, ioContext); } void Read(NetClientHost *clientHost) { NetClientHost::IOContext *ioContext = clientHost->AcquireContext(); int status; memset((void *) ioContext, 0, sizeof(NetClientHost::IOContext)); ioContext->buf = new char[1024]; ioContext->wsaReceiveBuf.len = 1024; ioContext->wsaReceiveBuf.buf = ioContext->buf; ioContext->flags = 0; ioContext->operation = NetClientHost::kOperationRead; status = WSARecv(clientHost->GetClientData()->socket, &ioContext->wsaReceiveBuf, 1, &ioContext->bytesReceived, &ioContext->flags, &ioContext->overlapped, 0); int i = WSAGetLastError(); if (status == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) { // ... } } bool AddSocket(HANDLE hIOCP, SOCKET socket) { ++connection; int bufSize = 0; LINGER lingerStruct; lingerStruct.l_onoff = 1; lingerStruct.l_linger = 0; setsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char *) &bufSize, sizeof(int)); setsockopt(socket, SOL_SOCKET, SO_RCVBUF, (char *) &bufSize, sizeof(int)); setsockopt(socket, SOL_SOCKET, SO_LINGER, (char *) &lingerStruct, sizeof(lingerStruct) ); NetClientHost *clientHost = new NetClientHost; clientHost->_InitializeIOContexts(1024); clientHost->Init(0); clientHost->_SetSocket(socket); // Add this socket to the IO Completion Port CreateIoCompletionPort((HANDLE) socket, hIOCP, (DWORD_PTR) clientHost, 0); Read(clientHost); return true; } int read = 0, write = 0; DWORD WINAPI WorkerThread(LPVOID param) { LPOVERLAPPED overlapped; NetClientHost *clientHost; HANDLE hIOCP = (HANDLE) param; DWORD ioSize; BOOL status; while (true) { status = GetQueuedCompletionStatus(hIOCP, &ioSize, (PULONG_PTR) &clientHost, (LPOVERLAPPED *) &overlapped, INFINITE); if (!(status || ioSize)) { --connection; //_CloseConnection(clientHost); continue; } NetClientHost::IOContext *ioContext = (NetClientHost::IOContext *) overlapped; switch (ioContext->operation) { case NetClientHost::kOperationRead: download += ioSize; Write(clientHost, ioContext->wsaReceiveBuf.buf, ioSize, 0); write++; clientHost->ReleaseContext(ioContext); break; case NetClientHost::kOperationWrite: upload += ioSize; if (ioContext->remainingBytesToSend) { ioContext->remainingBytesToSend -= ioSize; ioContext->wsaSendBuf.len = ioContext->chunk <= ioContext->remainingBytesToSend ? ioContext->chunk : ioContext->remainingBytesToSend; // equivalent to min(clientData->chunk, clientData->remainingBytesToSend); ioContext->wsaSendBuf.buf += ioContext->wsaSendBuf.len; } if (ioContext->remainingBytesToSend) { WriteChunk(clientHost, ioContext); } else { clientHost->ReleaseContext(ioContext); Read(clientHost); read++; } break; } } return 0; } DWORD WINAPI ListenThread(LPVOID param) { SOCKET sdListen = (SOCKET) param; HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); CreateThread(0, 0, WorkerThread, hIOCP, 0, 0); CreateThread(0, 0, WorkerThread, hIOCP, 0, 0); CreateThread(0, 0, WorkerThread, hIOCP, 0, 0); CreateThread(0, 0, WorkerThread, hIOCP, 0, 0); while (true) { SOCKET as = WSAAccept(sdListen, 0, 0, 0, 0); if (as != INVALID_SOCKET) AddSocket(hIOCP, as); } } int main() { SOCKET sdListen; SOCKADDR_IN si_addrlocal; int nRet; int nZero = 0; LINGER lingerStruct; WSADATA wsaData; WSAStartup(0x202, &wsaData); sdListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED); si_addrlocal.sin_family = AF_INET; si_addrlocal.sin_port = htons(5000); si_addrlocal.sin_addr.s_addr = htonl(INADDR_ANY); nRet = bind(sdListen, (struct sockaddr *)&si_addrlocal, sizeof(si_addrlocal)); nRet = listen(sdListen, 5); nZero = 0; nRet = setsockopt(sdListen, SOL_SOCKET, SO_SNDBUF, (char *) &nZero, sizeof(nZero)); nZero = 0; nRet = setsockopt(sdListen, SOL_SOCKET, SO_RCVBUF, (char *)&nZero, sizeof(nZero)); lingerStruct.l_onoff = 1; lingerStruct.l_linger = 0; nRet = setsockopt(sdListen, SOL_SOCKET, SO_LINGER, (char *)&lingerStruct, sizeof(lingerStruct) ); CreateThread(0, 0, ListenThread, (LPVOID) sdListen, 0, 0); HANDLE console = GetStdHandle(STD_OUTPUT_HANDLE); while (true) { COORD c = {0}; SetConsoleCursorPosition(console, c); printf("Connections: %i \nUpload: %iKB/s \nDownload: %iKB/s ", connection, upload * 2 / 1024, download * 2 / 1024); upload = 0; download = 0; Sleep(500); } return 0; }
这种异步系统应该能够以全数据链路速度运行。 我发现错误的问题如下:
有一个叫wireshark的东西可以给你一些消息流量的可见性。 我曾经以困难的方式做到这一点,带有时间戳的消息日志。
顺便说一下:在进行异步分析之前,我会首先在单个进程上使用这种方法来清除任何瓶颈。 如果你还没有这样做,你可以打赌他们在那里。 任何旧的分析器都不可靠。 有很好的,包括Zoom 。