1
0
mirror of https://github.com/anope/anope.git synced 2026-06-25 13:16:38 +02:00
Files
anope/modules/socketengines/m_socketengine_poll.cpp
T
Adam 4403849126 Added db_mysql_live which allows Anope to pull data
from the four main SQL tables in realtime, which
effectively gives us "live" SQL.
Changed eventfd pipe engine to not use buffered write.
Added TryLock to threading engines.
Made blocking SQL queries in our SQL API thread-safe.
2011-01-07 15:57:13 -05:00

197 lines
3.7 KiB
C++

#include "module.h"
#ifndef _WIN32
# include <ulimit.h>
# include <sys/poll.h>
# include <poll.h>
# ifndef POLLRDHUP
# define POLLRDHUP 0
# endif
#else
# define poll WSAPoll
# define POLLRDHUP POLLHUP
#endif
class SocketEnginePoll : public SocketEngineBase
{
private:
long max;
pollfd *events;
int SocketCount;
std::map<int, int> socket_positions;
public:
SocketEnginePoll()
{
SocketCount = 0;
#ifndef _WIN32
max = ulimit(4, 0);
#else
max = 1024;
#endif
if (max <= 0)
{
Log() << "Can't determine maximum number of open sockets";
throw ModuleException("Can't determine maximum number of open sockets");
}
events = new pollfd[max];
}
~SocketEnginePoll()
{
delete [] events;
}
void AddSocket(Socket *s)
{
if (SocketCount == max)
{
Log() << "Unable to add fd " << s->GetFD() << " to socketengine poll, engine is full";
return;
}
pollfd *ev = &this->events[SocketCount];
ev->fd = s->GetFD();
ev->events = POLLIN;
ev->revents = 0;
Sockets.insert(std::make_pair(ev->fd, s));
socket_positions.insert(std::make_pair(ev->fd, SocketCount));
++SocketCount;
}
void DelSocket(Socket *s)
{
std::map<int, int>::iterator pos = socket_positions.find(s->GetFD());
if (pos == socket_positions.end())
{
Log() << "Unable to delete unknown fd " << s->GetFD() << " from socketengine poll";
return;
}
if (pos->second != SocketCount - 1)
{
pollfd *ev = &this->events[pos->second],
*last_ev = &this->events[SocketCount - 1];
ev->fd = last_ev->fd;
ev->events = last_ev->events;
ev->revents = last_ev->revents;
socket_positions[ev->fd] = pos->second;
}
Sockets.erase(s->GetFD());
this->socket_positions.erase(pos);
--SocketCount;
}
void MarkWritable(Socket *s)
{
if (s->HasFlag(SF_WRITABLE))
return;
std::map<int, int>::iterator pos = socket_positions.find(s->GetFD());
if (pos == socket_positions.end())
{
Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable";
return;
}
pollfd *ev = &this->events[pos->second];
ev->events |= POLLOUT;
s->SetFlag(SF_WRITABLE);
}
void ClearWritable(Socket *s)
{
if (!s->HasFlag(SF_WRITABLE))
return;
std::map<int, int>::iterator pos = socket_positions.find(s->GetFD());
if (pos == socket_positions.end())
{
Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable";
return;
}
pollfd *ev = &this->events[pos->second];
ev->events &= ~POLLOUT;
s->UnsetFlag(SF_WRITABLE);
}
void Process()
{
int total = poll(this->events, this->SocketCount, Config->ReadTimeout * 1000);
Anope::CurTime = time(NULL);
if (total == -1)
{
Log() << "SockEngine::Process(): error: " << Anope::LastError();
return;
}
for (int i = 0, processed = 0; i < SocketCount && processed != total; ++i)
{
pollfd *ev = &this->events[i];
if (ev->revents != 0)
++processed;
Socket *s = Sockets[ev->fd];
if (s->HasFlag(SF_DEAD))
continue;
if (ev->revents & (POLLERR | POLLRDHUP))
{
s->ProcessError();
s->SetFlag(SF_DEAD);
continue;
}
if ((ev->revents & POLLIN) && !s->ProcessRead())
s->SetFlag(SF_DEAD);
if ((ev->revents & POLLOUT) && !s->ProcessWrite())
s->SetFlag(SF_DEAD);
}
for (int i = 0; i < SocketCount; ++i)
{
pollfd *ev = &this->events[i];
Socket *s = Sockets[ev->fd];
if (s->HasFlag(SF_DEAD))
delete s;
}
}
};
class ModuleSocketEnginePoll : public Module
{
SocketEnginePoll engine;
public:
ModuleSocketEnginePoll(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator)
{
this->SetAuthor("Anope");
this->SetPermanent(true);
this->SetType(SOCKETENGINE);
SocketEngine = &engine;
}
~ModuleSocketEnginePoll()
{
SocketEngine = NULL;
}
};
MODULE_INIT(ModuleSocketEnginePoll)