-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathEventLoopThread.cpp
More file actions
99 lines (83 loc) · 2.32 KB
/
EventLoopThread.cpp
File metadata and controls
99 lines (83 loc) · 2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include "EventLoopThread.h"
#include "Connection.h"
#include "Server.h"
void NotifyDataArrived(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
//std::cout<<"NotifyDataArrived"<<std::endl;
EventLoopThread* eventLoop = (EventLoopThread*)watcher->data;
eventLoop->NotifyEventArrived();
}
EventLoopThread::EventLoopThread(int listenfd, int readfd, Server* server)
:m_listenfd(listenfd), m_readfd(readfd), m_server(server),m_eventLoop(NULL),m_fdCount(0)
{
}
EventLoopThread::~EventLoopThread()
{
if (m_thread)
{
m_thread->join();
}
close(m_readfd);
boost::unique_lock<boost::mutex> lock(m_mutex);
m_fdCount = 0;
std::cout<<"EventLoopThread::~EventLoopThread"<<std::endl;
}
void EventLoopThread::Initialize()
{
m_thread = ThreadPtr(new boost::thread(boost::bind(EventLoopThread::WorkThread, this)));
}
void EventLoopThread::WorkThread(void* arg)
{
EventLoopThread* pThis = (EventLoopThread*)arg;
struct ev_loop* eventLoop = ev_loop_new(EVBACKEND_EPOLL);
pThis->m_eventLoop = eventLoop;
struct ev_io notify_watcher;
notify_watcher.data = pThis;
ev_io_init (¬ify_watcher, NotifyDataArrived, pThis->m_readfd, EV_READ);
ev_io_start (eventLoop, ¬ify_watcher);
ev_loop(eventLoop,0);
}
void EventLoopThread::NotifyEventArrived()
{
char buf[32] = {0};
if (read(m_readfd, buf, 32) <= 0)
{
return;
}
if (strcmp(buf, "New Connection") == 0)
{
int connfd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen = sizeof(cliaddr);
if ((connfd = accept(m_listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen)) == -1)
{
//std::cout<<"Accept Error"<<std::endl;
return;
}
Server::MakeNonBlocking(connfd);
std::cout<<fm::Time::Now().FormatString()<<":accept a new client:"<<inet_ntoa(cliaddr.sin_addr)<<":"<<cliaddr.sin_port<<std::endl;
ConnectionPtr conn = ConnectionPtr(new Connection( connfd,cliaddr,m_server,this ));
conn->InitConnectin(m_eventLoop);
m_server->AddConnection(conn);
IncreaseFdCount();
}
else if (strcmp(buf, "Close") == 0)
{
ev_break(m_eventLoop, EVBREAK_ALL);
}
}
int EventLoopThread::GetListenFdCount()
{
boost::unique_lock<boost::mutex> lock(m_mutex);
return m_fdCount;
}
void EventLoopThread::IncreaseFdCount()
{
boost::unique_lock<boost::mutex> lock(m_mutex);
m_fdCount++;
}
void EventLoopThread::DecreaseFdCount()
{
boost::unique_lock<boost::mutex> lock(m_mutex);
m_fdCount--;
}