winsock编程IOCP模型实现代码

winsock编程IOCP模型实现代码

  话不多说,上代码。借鉴《windows核心编程》部分源码和CSDN小猪部分代码。

  stdafx.h依赖头文件:

 #include <iostream>
#include <WinSock2.h>
#include <MSWSock.h>
#include <vector>
#include "Singleton.h"
#include "IOCPWrapper.h"
#include "OverlappedIOInfo.h"
#include "TaskSvc.h" using namespace std;

  其中,TaskSvc.h、Singleton.h源码可以在我的blog里面找到。

  IOCPWrapper.h源码:

 /******************************************************************************
Module: IOCP.h
Notices: Copyright (c) 2007 Jeffrey Richter & Christophe Nasarre
Purpose: This class wraps an I/O Completion Port.
Revise: IOCP封装类,由《windows核心编程》第10章示例程序源码改编所得
******************************************************************************/
#pragma once class CIOCP
{
public:
CIOCP(int nMaxConcurrency = -)
{
m_hIOCP = NULL;
if (nMaxConcurrency != -)
Create(nMaxConcurrency);
}
~CIOCP()
{
if (m_hIOCP != NULL)
VERIFY(CloseHandle(m_hIOCP));
} //关闭IOCP
BOOL Close()
{
BOOL bResult = CloseHandle(m_hIOCP);
m_hIOCP = NULL;
return(bResult);
} //创建IOCP,nMaxConcurrency指定最大线程并发数量,0默认为cpu数量
BOOL Create(int nMaxConcurrency = )
{
m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, , nMaxConcurrency);
ASSERT(m_hIOCP != NULL);
return(m_hIOCP != NULL);
} //为设备(文件、socket、邮件槽、管道等)关联一个IOCP
BOOL AssociateDevice(HANDLE hDevice, ULONG_PTR CompKey)
{
BOOL fOk = (CreateIoCompletionPort(hDevice, m_hIOCP, CompKey, ) == m_hIOCP);
ASSERT(fOk);
return(fOk);
} //为socket关联一个IOCP
BOOL AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey)
{
return(AssociateDevice((HANDLE) hSocket, CompKey));
} //为iocp传递事件通知
BOOL PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = ,OVERLAPPED* po = NULL)
{
BOOL fOk = PostQueuedCompletionStatus(m_hIOCP, dwNumBytes, CompKey, po);
ASSERT(fOk);
return(fOk);
} //从IO完成队列中获取事件通知。IO完成队列无事件时,该函数将阻塞
BOOL GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes,OVERLAPPED** ppo, DWORD dwMilliseconds = INFINITE)
{
return(GetQueuedCompletionStatus(m_hIOCP, pdwNumBytes,pCompKey, ppo, dwMilliseconds));
} //获取IOCP对象
const HANDLE GetIOCP()
{
return m_hIOCP;
}
private:
//IOCP句柄
HANDLE m_hIOCP;
}; ///////////////////////////////// End of File /////////////////////////////////

OverlappedIOInfo.h源码

 /******************************************************************************
Module: OverlappedIOInfo.h
Notices: Copyright (c) 20161201 whg
Purpose:
IOCP网络编程模型中,需要用到GetQueuedCompletionStatus函数获取已完成事件。
但该函数的返回参数无socket或buffer的描述信息。 一个简单的解决办法,创建一个新的结构,该结构第一个参数是OVERLAPPED。
由于AcceptEx、WSASend等重叠IO操作传入的是Overlapped结构体的地址,调用AcceptEx等重叠IO操作,
在Overlapped结构体后面开辟新的空间,写入socket或buffer的信息,即可将socket或buffer的信息由
GetQueuedCompletionStatus带回。 参考《windows核心编程》和CSDN PiggyXP
******************************************************************************/ #pragma once #define MAXBUF 8*1024 enum IOOperType{
TYPE_ACP, //accept事件到达,有新连接请求
TYPE_RECV, //数据接收事件
TYPE_SEND, //数据发送事件
TYPE_CLOSE, //关闭事件
TYPE_NO_OPER
}; class COverlappedIOInfo : public OVERLAPPED
{
public:
COverlappedIOInfo(void)
{
m_sSock = INVALID_SOCKET;
ResetOverlapped();
ResetRecvBuffer();
ResetSendBuffer();
}
~COverlappedIOInfo(void)
{
if (m_sSock != INVALID_SOCKET)
{
closesocket(m_sSock);
m_sSock = INVALID_SOCKET;
}
}
void ResetOverlapped()
{
Internal = InternalHigh = ;
Offset = OffsetHigh = ;
hEvent = NULL;
}
void ResetRecvBuffer()
{
ZeroMemory(m_cRecvBuf,MAXBUF);
m_recvBuf.buf = m_cRecvBuf;
m_recvBuf.len = MAXBUF;
}
void ResetSendBuffer()
{
ZeroMemory(m_cSendBuf,MAXBUF);
m_sendBuf.buf = m_cSendBuf;
m_sendBuf.len = MAXBUF;
}
public:
//套接字
SOCKET m_sSock;
//接收缓冲区,用于AcceptEx、WSARecv操作
WSABUF m_recvBuf;
char m_cRecvBuf[MAXBUF];
//发送缓冲区,用于WSASend操作
WSABUF m_sendBuf;
char m_cSendBuf[MAXBUF];
//对端地址
sockaddr_in m_addr;
};

server.h

 #pragma once

 class CServer:public CTaskSvc
{
#define ACCEPT_SOCKET_NUM 10 public:
CServer(void);
~CServer(void);
bool StartListen(unsigned short port,std::string ip); protected:
virtual void svc(); private:
//启动CPU*2个线程,返回已启动线程个数
UINT StartThreadPull();
//获取AcceptEx和GetAcceptExSockaddrs函数指针
bool GetLPFNAcceptEXAndGetAcceptSockAddrs();
//利用AcceptEx监听accept请求
bool PostAccept(COverlappedIOInfo* ol);
//处理accept请求,NumberOfBytes=0表示没有收到第一帧数据,>0表示收到第一帧数据
bool DoAccept(COverlappedIOInfo* ol,DWORD NumberOfBytes=);
//投递recv请求
bool PostRecv(COverlappedIOInfo* ol);
//处理recv请求
bool DoRecv(COverlappedIOInfo* ol);
//从已连接socket列表中移除socket及释放空间
bool DeleteLink(SOCKET s);
//释放3个部分步骤:
//1:清空IOCP线程队列,退出线程
//2: 清空等待accept的套接字m_vecAcps
//3: 清空已连接的套接字m_vecContInfo并清空缓存
void CloseServer();
private:
//winsock版本类型
WSAData m_wsaData;
//端口监听套接字
SOCKET m_sListen;
//等待accept的套接字,这些套接字是没有使用过的,数量为ACCEPT_SOCKET_NUM。同时会有10个套接字等待accept
std::vector<SOCKET> m_vecAcps;
//已建立连接的信息,每个结构含有一个套接字、发送缓冲和接收缓冲,以及对端地址
std::vector<COverlappedIOInfo*> m_vecContInfo;
//操作vector的互斥访问锁
CThreadLockCs m_lsc;
//IOCP封装类
CIOCP m_iocp;
//AcceptEx函数指针
LPFN_ACCEPTEX m_lpfnAcceptEx;
//GetAcceptSockAddrs函数指针
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptSockAddrs;
}; typedef CSingleton<CServer> SERVER;

server.cpp

 #include "StdAfx.h"
#include "Server.h" CServer::CServer(void)
{
m_lpfnAcceptEx = NULL;
m_lpfnGetAcceptSockAddrs = NULL;
WSAStartup(MAKEWORD(,),&m_wsaData);
} CServer::~CServer(void)
{
CloseServer();
WSACleanup();
} bool CServer::StartListen(unsigned short port,std::string ip)
{
//listen socket需要将accept操作投递到完成端口,因此,listen socket属性必须有重叠IO
m_sListen = WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,NULL,,WSA_FLAG_OVERLAPPED);
if(m_sListen == INVALID_SOCKET)
{
cout<<"WSASocket create socket error"<<endl;
return false;
}
//创建并设置IOCP并发线程数量
if (m_iocp.Create() == FALSE)
{
cout<<"IOCP create error,error code "<<WSAGetLastError()<<endl;
return false;
}
//将listen socket绑定至iocp
if (!m_iocp.AssociateSocket(m_sListen,TYPE_ACP))
{
cout<<"iocp Associate listen Socket error"<<endl;
return false;
}
sockaddr_in service;
service.sin_family = AF_INET;
service.sin_port = htons(port);
if (ip.empty())
{
service.sin_addr.s_addr = INADDR_ANY;
}
else
{
service.sin_addr.s_addr = inet_addr(ip.c_str());
} if (bind(m_sListen,(sockaddr*)&service,sizeof(service)) == SOCKET_ERROR)
{
cout<<"bind() error,error code "<<WSAGetLastError()<<endl;
return false;
}
cout<<"bind ok!"<<endl; if (listen(m_sListen,SOMAXCONN) == SOCKET_ERROR)
{
cout<<"listen() error,error code "<<WSAGetLastError()<<endl;
return false;
}
cout<<"listen ok!"<<endl;
//启动工作者线程
int threadnum = StartThreadPull();
cout<<"启动工作者线程,num="<<threadnum<<endl;
//获取AcceptEx和GetAcceptSockAddrs函数指针
if (!GetLPFNAcceptEXAndGetAcceptSockAddrs())
{
return false;
}
//创建10个acceptex
for (int i=;i<ACCEPT_SOCKET_NUM;i++)
{
//用accept
COverlappedIOInfo* ol = new COverlappedIOInfo;
if (!PostAccept(ol))
{
delete ol;
return false;
}
}
} bool CServer::GetLPFNAcceptEXAndGetAcceptSockAddrs()
{
DWORD BytesReturned = ;
//获取AcceptEx函数指针
GUID GuidAcceptEx = WSAID_ACCEPTEX;
if (SOCKET_ERROR == WSAIoctl(
m_sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&BytesReturned,
NULL,NULL))
{
cout<<"WSAIoctl get AcceptEx function error,error code "<<WSAGetLastError()<<endl;
return false;
} //获取GetAcceptexSockAddrs函数指针
GUID GuidGetAcceptexSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
if (SOCKET_ERROR == WSAIoctl(
m_sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptexSockAddrs,
sizeof(GuidGetAcceptexSockAddrs),
&m_lpfnGetAcceptSockAddrs,
sizeof(m_lpfnGetAcceptSockAddrs),
&BytesReturned,
NULL,NULL))
{
cout<<"WSAIoctl get GetAcceptexSockAddrs function error,error code "<<WSAGetLastError()<<endl;
return false;
}
return true;
} bool CServer::PostAccept(COverlappedIOInfo* ol)
{
if (m_lpfnAcceptEx == NULL)
{
cout << "m_lpfnAcceptEx is NULL"<<endl;
return false;
}
SOCKET s = ol->m_sSock;
ol->ResetRecvBuffer();
ol->ResetOverlapped();
ol->ResetSendBuffer();
ol->m_sSock = WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,NULL,,WSA_FLAG_OVERLAPPED);
if (ol->m_sSock == INVALID_SOCKET)
{
cout<<"WSASocket error ,error code "<<WSAGetLastError()<<endl;
return false;
}
//这里建立的socket用来和对端建立连接,终会加入m_vecContInfo列表
//调用acceptex将accept socket绑定至完成端口,并开始进行事件监听
//这里需要传递Overlapped,new一个COverlappedIOInfo
//AcceptEx是m_listen的监听事件,m_listen已经绑定了完成端口;虽然ol->m_sSock已经创建,
//但未使用,现在不必为ol->m_sSock绑定完成端口。在AcceptEx事件发生后,再为ol->m_sSock绑定IOCP
DWORD byteReceived = ;
if (FALSE == m_lpfnAcceptEx(
m_sListen,
ol->m_sSock,
ol->m_recvBuf.buf,
ol->m_recvBuf.len - (sizeof(SOCKADDR_IN)+)*,
sizeof(SOCKADDR_IN)+,
sizeof(SOCKADDR_IN)+,
&byteReceived,
ol))
{
DWORD res = WSAGetLastError();
if (ERROR_IO_PENDING != res)
{
cout<<"AcceptEx error , error code "<<res<<endl;
return false;
}
}
std::vector<SOCKET>::iterator iter = m_vecAcps.begin();
for (;iter != m_vecAcps.end(); iter++)
{
if (*iter == s)
{
*iter = ol->m_sSock;
}
}
if (iter == m_vecAcps.end())
{
m_vecAcps.push_back(ol->m_sSock);
}
return true;
} bool CServer::DoAccept(COverlappedIOInfo* ol,DWORD NumberOfBytes)
{
//分支用于获取远端地址。
//如果接收TYPE_ACP同时收到第一帧数据,则第一帧数据内包含远端地址。
//如果没有收到第一帧数据,则通过getpeername获取远端地址
SOCKADDR_IN* ClientAddr = NULL;
int remoteLen = sizeof(SOCKADDR_IN);
if (NumberOfBytes > )
{
//接受的数据分成3部分,第1部分是客户端发来的数据,第2部分是本地地址,第3部分是远端地址。
if (m_lpfnGetAcceptSockAddrs)
{
SOCKADDR_IN* LocalAddr = NULL;
int localLen = sizeof(SOCKADDR_IN);
m_lpfnGetAcceptSockAddrs(
ol->m_recvBuf.buf,
ol->m_recvBuf.len - (sizeof(SOCKADDR_IN)+)*,
sizeof(SOCKADDR_IN)+,
sizeof(SOCKADDR_IN)+,
(LPSOCKADDR*)&LocalAddr,
&localLen,
(LPSOCKADDR*)&ClientAddr,
&remoteLen);
cout<<"收到新的连接请求,ip="<<inet_ntoa(ClientAddr->sin_addr)<<",port="<<ClientAddr->sin_port<<
"数据为:"<<ol->m_recvBuf.buf<<endl;
}
}
else if (NumberOfBytes==)
{
//未收到第一帧数据
if (SOCKET_ERROR ==getpeername(ol->m_sSock,(sockaddr*)ClientAddr,&remoteLen))
{
cout<<"getpeername error,error code "<<WSAGetLastError()<<endl;
}
else
{
cout<<"收到新的连接请求,ip="<<inet_ntoa(ClientAddr->sin_addr)<<",port="<<ClientAddr->sin_port<<endl;
}
} COverlappedIOInfo* pol = new COverlappedIOInfo;
pol->m_sSock = ol->m_sSock;
pol->m_addr = *ClientAddr;
//服务端只收取recv,同时监听recv和send可用设计位偏移,用或运算实现
if (m_iocp.AssociateSocket(pol->m_sSock,TYPE_RECV))
{
PostRecv(pol); m_vecContInfo.push_back(pol);
}
else
{
delete pol;
return false;
} return true;
} bool CServer::DoRecv(COverlappedIOInfo* ol)
{
cout<<"收到客户端数据:ip="<<inet_ntoa(ol->m_addr.sin_addr)<<",port="<<ol->m_addr.sin_port<<
";内容="<<ol->m_recvBuf.buf<<endl;
return true;
} bool CServer::PostRecv(COverlappedIOInfo* ol)
{
DWORD BytesRecvd = ;
DWORD dwFlags = ;
ol->ResetOverlapped();
ol->ResetRecvBuffer();
int recvnum = WSARecv(ol->m_sSock,&ol->m_recvBuf,,&BytesRecvd,&dwFlags,(OVERLAPPED*)ol,NULL);
if (recvnum != )
{
int res = WSAGetLastError();
if (WSA_IO_PENDING != res)
{
cout<<"WSARecv error,error code "<<res<<endl;
}
}
return true;
} UINT CServer::StartThreadPull()
{
//获取系统cpu个数启动线程
SYSTEM_INFO si;
GetSystemInfo(&si);
//启动cpu数量*2个线程
return Activate(si.dwNumberOfProcessors * );
} bool CServer::DeleteLink(SOCKET s)
{
m_lsc.lock();
std::vector<COverlappedIOInfo*>::iterator iter = m_vecContInfo.begin();
for (;iter!=m_vecContInfo.end();iter++)
{
if (s == (*iter)->m_sSock)
{
COverlappedIOInfo* ol = *iter;
closesocket(s);
m_vecContInfo.erase(iter);
delete ol;
break;
}
}
m_lsc.unlock();
return true;
} void CServer::svc()
{
while (true)
{
DWORD NumberOfBytes = ;
unsigned long CompletionKey = ;
OVERLAPPED* ol = NULL;
if (FALSE != GetQueuedCompletionStatus(m_iocp.GetIOCP(),&NumberOfBytes,&CompletionKey,&ol,WSA_INFINITE))
{
if (CompletionKey == TYPE_CLOSE)
{
break;
}
if (NumberOfBytes == && (CompletionKey==TYPE_RECV || CompletionKey==TYPE_SEND))
{
//客户端断开连接
cout<<"客户端断开连接,ip="<<inet_ntoa(olinfo->m_addr.sin_addr)<<",port="<<olinfo->m_addr.sin_port<<endl;
DeleteLink(olinfo->m_sSock);
continue;
}
COverlappedIOInfo* olinfo = (COverlappedIOInfo*)ol;
switch (CompletionKey)
{
case TYPE_ACP:
{
DoAccept(olinfo,NumberOfBytes);
PostAccept(olinfo);
}
break;
case TYPE_RECV:
{
DoRecv(olinfo);
PostRecv(olinfo);
}
break;
case TYPE_SEND:
{
}
break;
default:
break;
}
}
else
{
int res = WSAGetLastError();
switch(res)
{
case ERROR_NETNAME_DELETED:
{
COverlappedIOInfo* olinfo = (COverlappedIOInfo*)ol;
if (olinfo)
{
cout<<"客户端异常退出,ip="<<inet_ntoa(olinfo->m_addr.sin_addr)<<",port="<<olinfo->m_addr.sin_port<<endl;
DeleteLink(olinfo->m_sSock);
}
}
break;
default:
cout<<"workthread GetQueuedCompletionStatus error,error code "<<WSAGetLastError()<<endl;
break;
}
continue;
}
}
cout<<"workthread stop"<<endl;
} void CServer::CloseServer()
{
//1:清空IOCP线程队列,退出线程,有多少个线程发送多少个PostQueuedCompletionStatus信息
int threadnum = GetThreadsNum();
for (int i=;i<threadnum;i++)
{
if (FALSE == m_iocp.PostStatus(TYPE_CLOSE))
{
cout<<"PostQueuedCompletionStatus error,error code "<<WSAGetLastError()<<endl;
}
}
//2:清空等待accept的套接字m_vecAcps
std::vector<SOCKET>::iterator iter = m_vecAcps.begin();
for (;iter != m_vecAcps.end();iter++)
{
SOCKET s = *iter;
closesocket(s);
}
m_vecAcps.clear();
//3:清空已连接的套接字m_vecContInfo并清空缓存
std::vector<COverlappedIOInfo*>::iterator iter2 = m_vecContInfo.begin();
for (;iter2 != m_vecContInfo.end();iter2++)
{
COverlappedIOInfo* ol = *iter2;
closesocket(ol->m_sSock);
iter2 = m_vecContInfo.erase(iter2);
delete ol;
}
m_vecContInfo.clear();
}

调用方法,控制台程序main函数内加入

 SERVER::Instance()->StartListen(,"127.0.0.1");
int outId;
cin>>outId;
if (outId == )
{
SERVER::Close();
}

输入0结束服务程序。

测试结果1,接收数据

winsock编程IOCP模型实现代码

测试结果2,客户端断开连接

winsock编程IOCP模型实现代码  

上一篇:MySQL中间层 Atlas


下一篇:CSS: 首字母字体变大时下划线不对齐的解决方法