libevent使用多執行緒時,需要注意一個執行緒對應一個event_base,我這邊用的是一個執行緒用來監聽連結和接收資料, 另外一個執行緒專門用來發送資料,也就是用兩個 event_base,接收和傳送資料使用的是 bufferevent。
同時多個服務共用讀寫event_base(demo寫建立了兩個服務)
關於訊息的解析沒有新增還有就是業務執行緒沒有加,可以自己根據需求改,可以使用Protobuf,或者自定義都行。
libevent版本:libevent-2.0.22-stable 需要自己編譯出連結庫
系統環境:windows
#pragma comment(lib,"ws2_32.lib")#pragma comment(lib,"wsock32.lib")#include <WinSock2.h>#include <windows.h>#include <WS2tcpip.h>#include "event2/event.h"#include "event2/listener.h"#include "event2/thread.h"#include "event2/bufferevent.h"#include <thread>#include <iostream>#include <stdlib.h>#include <assert.h>#include <string.h>using namespace std;void event_time_out(evutil_socket_t, short, void * p){ if (!p) { return; } cout << "event_time_out" << endl; event_base* base = (event_base*)p; struct event* read_time_out = event_new(base, -1, EV_PERSIST | EV_TIMEOUT, event_time_out, base); struct timeval tv; tv.tv_sec = 10 * 365 * 86400; //10年時間 tv.tv_usec = 0; event_add(read_time_out, &tv);}void event_base_run(void* p){ if (!p) { return; } event_base* base = (event_base*)p; event_base_dispatch(base); std::cout << "event_base_run end" << endl;}#define MAX_READ_LENGTH 10240void doRead(struct bufferevent* bev, void *ctx){ evutil_socket_t fd = bufferevent_getfd(bev); static char recvBuff[MAX_READ_LENGTH]; size_t read_length = bufferevent_read(bev, recvBuff, MAX_READ_LENGTH); evutil_socket_t serverFd = (evutil_socket_t)ctx; cout << "服務:" << serverFd << " 連結:" << fd << "接收:" << recvBuff << endl;}void doWrite(struct bufferevent* bev, void *ctx){ evutil_socket_t fd = bufferevent_getfd(bev);}void doError(struct bufferevent* bev, short error, void *ctx){ evutil_socket_t serverFd = (evutil_socket_t)ctx; evutil_socket_t fd = bufferevent_getfd(bev); cout << "服務:" << serverFd << " 連結:" << fd << endl; std::cout << fd << "連結斷開" << endl;}void doAccept(struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr *, int socklen, void * p){ if (!p) { return; } event_base * readEventBase = evconnlistener_get_base(listener); if (!readEventBase) { return; } event_base * writeEventBase = (event_base*)p; if (!writeEventBase) { return; } struct bufferevent* readBuffevent = bufferevent_socket_new(readEventBase, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); if (!readBuffevent) { return; } struct bufferevent* writeBuffevent = bufferevent_socket_new(writeEventBase, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); if (!writeBuffevent) { return; } evutil_socket_t serverFd = evconnlistener_get_fd(listener); evutil_make_socket_nonblocking(fd); bufferevent_setfd(readBuffevent, fd); bufferevent_setcb(readBuffevent, doRead, NULL, doError, (void *)serverFd); bufferevent_enable(readBuffevent, EV_READ | EV_PERSIST); bufferevent_disable(readBuffevent, EV_WRITE); bufferevent_setfd(writeBuffevent, fd); bufferevent_setcb(writeBuffevent, NULL, doWrite, doError, (void *)serverFd); bufferevent_enable(writeBuffevent, EV_WRITE | EV_PERSIST); //writeBuffevent要另外儲存下,可以用來發訊息 bufferevent_write std::cout << "服務:" << serverFd << "有連結連上:" << fd << endl; }bool init_new_service(std::string ip, int port, event_base* readEventBase, event_base* writeEventBase){ struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(port); SOCKADDR_IN addr_in; if (inet_pton(AF_INET, ip.c_str(), &addr_in.sin_addr.S_un.S_addr) != 1) { return false; } if (!evconnlistener_new_bind(readEventBase, doAccept, writeEventBase, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE | LEV_OPT_THREADSAFE, -1, (struct sockaddr*)&sin, sizeof(sin)) ) { cout << "bind error" << endl; return false; } cout << "init_new_service" << ip.c_str() << ":" << port << " success!" << endl; return true;}int main(){#ifdef WIN32 WSADATA wsa_data; WSAStartup(0x0201, &wsa_data);#endif evthread_use_windows_threads(); event_base* _readEventBase = event_base_new(); event_base* _writeEventBase = event_base_new(); evthread_make_base_notifiable(_readEventBase); evthread_make_base_notifiable(_writeEventBase); //新增定時器,因為event_base沒有事件監聽會退出迴圈的 { struct event* time_event = event_new(_readEventBase, -1, EV_PERSIST | EV_TIMEOUT, event_time_out, _readEventBase); struct timeval tv; tv.tv_sec = 10 * 365 * 86400; tv.tv_usec = 0; if (event_add(time_event, &tv) == -1) { return -1; } } { struct event* time_event = event_new(_writeEventBase, -1, EV_PERSIST | EV_TIMEOUT, event_time_out, _writeEventBase); struct timeval tv; tv.tv_sec = 10 * 365 * 86400; tv.tv_usec = 0; if (event_add(time_event, &tv) == -1) { return -1; } } std::thread _readThread(event_base_run, _readEventBase); std::thread _writeThread(event_base_run, _writeEventBase); init_new_service("127.0.0.1", 7857, _readEventBase, _writeEventBase); init_new_service("127.0.0.1", 7858, _readEventBase, _writeEventBase); cout << "輸入任意字元結束" << endl; getchar(); event_base_loopbreak(_readEventBase); _readThread.join(); event_base_loopbreak(_writeEventBase); _writeThread.join(); event_base_free(_readEventBase); event_base_free(_writeEventBase); _readEventBase = NULL; _writeEventBase = NULL; return 0;}
最新評論