首頁>技術>

libevent使用多執行緒時,需要注意一個執行緒對應一個event_base,我這邊用的是一個執行緒用來監聽連結和接收資料, 另外一個執行緒專門用來發送資料,也就是用兩個 event_base,接收和傳送資料使用的是 bufferevent。

同時多個服務共用讀寫event_base(demo寫建立了兩個服務)

關於訊息的解析沒有新增還有就是業務執行緒沒有加,可以自己根據需求改,可以使用Protobuf,或者自定義都行。

libevent版本:libevent-2.0.22-stable 需要自己編譯出連結庫

系統環境:windows

#pragma comment(lib,"ws2_32.lib")#pragma comment(lib,"wsock32.lib")#include <WinSock2.h>#include <windows.h>#include <WS2tcpip.h>#include "event2/event.h"#include "event2/listener.h"#include "event2/thread.h"#include "event2/bufferevent.h"#include <thread>#include <iostream>#include <stdlib.h>#include <assert.h>#include <string.h>using namespace std;void event_time_out(evutil_socket_t, short, void * p){	if (!p)	{		return;	}	cout << "event_time_out" << endl;	event_base* base = (event_base*)p;	struct event* read_time_out = event_new(base, -1, EV_PERSIST | EV_TIMEOUT, event_time_out, base);	struct timeval tv;	tv.tv_sec = 10 * 365 * 86400;		//10年時間	tv.tv_usec = 0;	event_add(read_time_out, &tv);}void event_base_run(void* p){	if (!p)	{		return;	}	event_base* base = (event_base*)p;	event_base_dispatch(base);	std::cout << "event_base_run end" << endl;}#define  MAX_READ_LENGTH 10240void doRead(struct bufferevent* bev, void *ctx){	evutil_socket_t fd = bufferevent_getfd(bev);	static char recvBuff[MAX_READ_LENGTH];	size_t read_length = bufferevent_read(bev, recvBuff, MAX_READ_LENGTH);	evutil_socket_t serverFd = (evutil_socket_t)ctx;	cout << "服務:" << serverFd << " 連結:" << fd << "接收:" << recvBuff << endl;}void doWrite(struct bufferevent* bev, void *ctx){	evutil_socket_t fd = bufferevent_getfd(bev);}void doError(struct bufferevent* bev, short error, void *ctx){	evutil_socket_t serverFd = (evutil_socket_t)ctx;	evutil_socket_t fd = bufferevent_getfd(bev);	cout << "服務:" << serverFd << " 連結:" << fd << endl;	std::cout << fd << "連結斷開" << endl;}void doAccept(struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr *, int socklen, void * p){	if (!p)	{		return;	}	event_base * readEventBase = evconnlistener_get_base(listener);	if (!readEventBase)	{		return;	}	event_base * writeEventBase = (event_base*)p;	if (!writeEventBase)	{		return;	}	struct bufferevent* readBuffevent = bufferevent_socket_new(readEventBase, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);	if (!readBuffevent)	{		return;	}	struct bufferevent* writeBuffevent = bufferevent_socket_new(writeEventBase, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);	if (!writeBuffevent)	{		return;	}	evutil_socket_t serverFd = evconnlistener_get_fd(listener);	evutil_make_socket_nonblocking(fd);	bufferevent_setfd(readBuffevent, fd);	bufferevent_setcb(readBuffevent, doRead, NULL, doError, (void *)serverFd);	bufferevent_enable(readBuffevent, EV_READ | EV_PERSIST);	bufferevent_disable(readBuffevent, EV_WRITE);	bufferevent_setfd(writeBuffevent, fd);	bufferevent_setcb(writeBuffevent, NULL, doWrite, doError, (void *)serverFd);	bufferevent_enable(writeBuffevent, EV_WRITE | EV_PERSIST);	//writeBuffevent要另外儲存下,可以用來發訊息 bufferevent_write	std::cout << "服務:" << serverFd << "有連結連上:" << fd  << endl;	}bool init_new_service(std::string ip, int port, event_base* readEventBase, event_base* writeEventBase){	struct sockaddr_in sin;	memset(&sin, 0, sizeof(sin));	sin.sin_family = AF_INET;	sin.sin_port = htons(port);	SOCKADDR_IN addr_in;	if (inet_pton(AF_INET, ip.c_str(), &addr_in.sin_addr.S_un.S_addr) != 1)	{		return false;	}	if (!evconnlistener_new_bind(readEventBase, doAccept, writeEventBase, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE | LEV_OPT_THREADSAFE, -1, (struct sockaddr*)&sin, sizeof(sin)) )	{		cout << "bind error" << endl;		return false;	}	cout << "init_new_service" << ip.c_str() << ":" << port << " success!" << endl;	return true;}int main(){#ifdef WIN32	WSADATA wsa_data;	WSAStartup(0x0201, &wsa_data);#endif	evthread_use_windows_threads();	event_base* _readEventBase  = event_base_new();	event_base* _writeEventBase = event_base_new();	evthread_make_base_notifiable(_readEventBase);	evthread_make_base_notifiable(_writeEventBase);	//新增定時器,因為event_base沒有事件監聽會退出迴圈的	{		struct event* time_event = event_new(_readEventBase, -1, EV_PERSIST | EV_TIMEOUT, event_time_out, _readEventBase);		struct timeval tv;		tv.tv_sec = 10 * 365 * 86400;				tv.tv_usec = 0;		if (event_add(time_event, &tv) == -1)		{			return -1;		}	}	{		struct event* time_event = event_new(_writeEventBase, -1, EV_PERSIST | EV_TIMEOUT, event_time_out, _writeEventBase);		struct timeval tv;		tv.tv_sec = 10 * 365 * 86400;		tv.tv_usec = 0;		if (event_add(time_event, &tv) == -1)		{			return -1;		}	}		std::thread _readThread(event_base_run, _readEventBase);	std::thread _writeThread(event_base_run, _writeEventBase);	init_new_service("127.0.0.1", 7857, _readEventBase, _writeEventBase);	init_new_service("127.0.0.1", 7858, _readEventBase, _writeEventBase);	cout << "輸入任意字元結束" << endl;	getchar();	event_base_loopbreak(_readEventBase);	_readThread.join();	event_base_loopbreak(_writeEventBase);	_writeThread.join();	event_base_free(_readEventBase);	event_base_free(_writeEventBase);	_readEventBase = NULL;	_writeEventBase = NULL;	return 0;}

8
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • PHP操作MySQL資料庫