1
0
mirror of https://github.com/anope/anope.git synced 2026-06-26 10:16:40 +02:00

Added an epoll socket engine

This commit is contained in:
Adam
2010-07-08 22:19:13 -04:00
parent 8f8b1e46d6
commit 1cf4ebb231
17 changed files with 864 additions and 463 deletions
+337 -344
View File
@@ -1,11 +1,11 @@
#include "services.h"
SocketEngine socketEngine;
SocketEngineBase *SocketEngine;
int32 TotalRead = 0;
int32 TotalWritten = 0;
/** Trims all the \r and \ns from the begining and end of a string
* @return A string without trailing \r and \ns
* @param buffer The buffer to trim
*/
static void TrimBuf(std::string &buffer)
{
@@ -15,148 +15,54 @@ static void TrimBuf(std::string &buffer)
buffer.erase(buffer.length() - 1);
}
/** Default constructor
* @param nTargetHost Hostname to connect to
* @param nPort Port to connect to
* @param nBindHos Host to bind to when connecting
* @param nIPv6 true to use IPv6
/** Constructor
* @param nsock The socket
* @param nIPv6 IPv6?
*/
Socket::Socket(const std::string &nTargetHost, int nPort, const std::string &nBindHost, bool nIPv6) : TargetHost(nTargetHost), Port(nPort), BindHost(nBindHost), IPv6(nIPv6)
Socket::Socket(int nsock, bool nIPv6)
{
if (!IPv6 && (TargetHost.find(':') != std::string::npos || BindHost.find(':') != std::string::npos))
IPv6 = true;
Sock = socket(IPv6 ? AF_INET6 : AF_INET, SOCK_STREAM, 0);
addrinfo hints;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = 0;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_family = IPv6 ? AF_INET6 : AF_INET;
if (!BindHost.empty())
{
addrinfo *bindar;
sockaddr_in bindaddr;
sockaddr_in6 bindaddr6;
int Bound = -1;
if (!getaddrinfo(BindHost.c_str(), NULL, &hints, &bindar))
{
if (IPv6)
memcpy(&bindaddr6, bindar->ai_addr, bindar->ai_addrlen);
else
memcpy(&bindaddr, bindar->ai_addr, bindar->ai_addrlen);
freeaddrinfo(bindar);
Bound = bind(Sock, reinterpret_cast<sockaddr *>(&bindaddr), sizeof(bindaddr));
}
if (Bound < 0)
{
if (IPv6)
{
bindaddr6.sin6_family = AF_INET6;
if (inet_pton(AF_INET6, BindHost.c_str(), &bindaddr6.sin6_addr) < 1)
throw SocketException("Invalid bind host");
if (bind(Sock, reinterpret_cast<sockaddr *>(&bindaddr6), sizeof(bindaddr6)) == -1)
throw SocketException("Unable to bind to address");
}
else
{
bindaddr.sin_family = AF_INET;
if (inet_pton(bindaddr.sin_family, BindHost.c_str(), &bindaddr.sin_addr) < 1)
throw SocketException("Invalid bind host");
if (bind(Sock, reinterpret_cast<sockaddr *>(&bindaddr), sizeof(bindaddr)) == -1)
throw SocketException("Unable to bind to address");
}
}
}
addrinfo *conar;
sockaddr_in conaddr;
sockaddr_in6 conaddr6;
if (!getaddrinfo(TargetHost.c_str(), NULL, &hints, &conar))
{
if (IPv6)
memcpy(&conaddr6, conar->ai_addr, conar->ai_addrlen);
else
memcpy(&conaddr, conar->ai_addr, conar->ai_addrlen);
freeaddrinfo(conar);
}
Type = SOCKTYPE_CLIENT;
IPv6 = nIPv6;
if (nsock == 0)
sock = socket(IPv6 ? AF_INET6 : AF_INET, SOCK_STREAM, 0);
else
{
if (IPv6)
{
if (inet_pton(AF_INET6, TargetHost.c_str(), &conaddr6.sin6_addr) < 1)
throw SocketException("Invalid server address");
}
else
{
if (inet_pton(AF_INET, TargetHost.c_str(), &conaddr.sin_addr) < 1)
throw SocketException("Invalid server address");
}
}
if (IPv6)
{
conaddr6.sin6_family = AF_INET6;
conaddr6.sin6_port = htons(Port);
if (connect(Sock, reinterpret_cast<sockaddr *>(&conaddr6), sizeof(conaddr6)) < 0)
throw SocketException("Error connecting to server");
}
else
{
conaddr.sin_family = AF_INET;
conaddr.sin_port = htons(Port);
if (connect(Sock, reinterpret_cast<sockaddr *>(&conaddr), sizeof(conaddr)) < 0)
throw SocketException("Error connecting to server");
}
socketEngine.AddSocket(this);
sock = nsock;
SocketEngine->AddSocket(this);
}
/** Default destructor
*/
*/
Socket::~Socket()
{
CloseSocket(Sock);
socketEngine.DelSocket(this);
SocketEngine->DelSocket(this);
CloseSocket(sock);
}
/** Read from the socket
* @param buf Buffer to read to
/** Really recieve something from the buffer
* @param buf The buf to read to
* @param sz How much to read
* @return Number of bytes recieved
*/
int Socket::RecvInternal(char *buf, size_t sz) const
const int Socket::RecvInternal(char *buf, size_t sz) const
{
return recv(GetSock(), buf, sz, 0);
}
/** Write to the socket
/** Really write something to the socket
* @param buf What to write
* @return Number of bytes sent, -1 on error
* @return Number of bytes written
*/
int Socket::SendInternal(const std::string &buf) const
const int Socket::SendInternal(const std::string &buf) const
{
return send(GetSock(), buf.c_str(), buf.length(), 0);
}
/** Get the socket FD for this socket
* @return The fd
* @return the fd
*/
int Socket::GetSock() const
{
return Sock;
return sock;
}
/** Check if this socket is IPv6
@@ -167,98 +73,6 @@ bool Socket::IsIPv6() const
return IPv6;
}
/** Called when there is something to be read from thie socket
* @return true on success, false to kill this socket
*/
bool Socket::ProcessRead()
{
char buffer[NET_BUFSIZE];
memset(&buffer, 0, sizeof(buffer));
RecvLen = RecvInternal(buffer, sizeof(buffer) - 1);
if (RecvLen <= 0)
return false;
TotalRead += RecvLen;
std::string sbuffer = extrabuf;
sbuffer.append(buffer);
extrabuf.clear();
size_t lastnewline = sbuffer.find_last_of('\n');
if (lastnewline < sbuffer.size() - 1)
{
extrabuf = sbuffer.substr(lastnewline);
TrimBuf(extrabuf);
sbuffer = sbuffer.substr(0, lastnewline);
}
sepstream stream(sbuffer, '\n');
std::string buf;
while (stream.GetToken(buf))
{
TrimBuf(buf);
if (!buf.empty())
if (!Read(buf))
return false;
}
return true;
}
/** Called when this socket becomes writeable
* @return true on success, false to drop this socket
*/
bool Socket::ProcessWrite()
{
int Written = SendInternal(WriteBuffer);
if (Written == -1)
return false;
TotalWritten += Written;
WriteBuffer.clear();
return true;
}
/** Called when there is an error on this socket
*/
void Socket::ProcessError()
{
}
/** Called with a message recieved from the socket
* @param buf The message
* @return true on success, false to kill this socket
*/
bool Socket::Read(const std::string &buf)
{
return true;
}
/** Write to the socket
* @param message The message to write
*/
void Socket::Write(const char *message, ...)
{
char buf[BUFSIZE];
va_list vi;
va_start(vi, message);
vsnprintf(buf, sizeof(buf), message, vi);
va_end(vi);
std::string sbuf = buf;
Write(sbuf);
}
/** Write to the socket
* @param message The message to write
*/
void Socket::Write(std::string &message)
{
WriteBuffer.append(message + "\r\n");
socketEngine.MarkWriteable(this);
}
/** Get the length of the read buffer
* @return The length of the read buffer
*/
@@ -272,159 +86,338 @@ size_t Socket::ReadBufferLen() const
*/
size_t Socket::WriteBufferLen() const
{
return WriteBuffer.size();
return WriteBuffer.length();
}
/** Called when there is something to be recieved for this socket
* @return true on success, false to drop this socket
*/
bool Socket::ProcessRead()
{
char tbuffer[NET_BUFSIZE];
memset(&tbuffer, '\0', sizeof(tbuffer));
RecvLen = RecvInternal(tbuffer, sizeof(tbuffer) - 1);
if (RecvLen <= 0)
return false;
std::string sbuffer = extrabuf;
sbuffer.append(tbuffer);
extrabuf.clear();
size_t lastnewline = sbuffer.find_last_of('\n');
if (lastnewline < sbuffer.size() - 1)
{
extrabuf = sbuffer.substr(lastnewline);
TrimBuf(extrabuf);
sbuffer = sbuffer.substr(0, lastnewline);
}
sepstream stream(sbuffer, '\n');
std::string tbuf;
while (stream.GetToken(tbuf))
{
TrimBuf(tbuf);
if (!tbuf.empty())
if (!Read(tbuf))
return false;
}
return true;
}
/** Called when there is something to be written to this socket
* @return true on success, false to drop this socket
*/
bool Socket::ProcessWrite()
{
if (WriteBuffer.empty())
{
return true;
}
if (SendInternal(WriteBuffer) == -1)
{
return false;
}
WriteBuffer.clear();
SocketEngine->ClearWriteable(this);
return true;
}
/** Called when there is an error for this socket
* @return true on success, false to drop this socket
*/
void Socket::ProcessError()
{
}
/** Called with a line recieved from the socket
* @param buf The line
* @return true to continue reading, false to drop the socket
*/
bool Socket::Read(const std::string &buf)
{
return false;
}
/** Write to the socket
* @param message The message
*/
void Socket::Write(const char *message, ...)
{
va_list vi;
char tbuffer[BUFSIZE];
std::string sbuf;
if (!message)
return;
va_start(vi, message);
vsnprintf(tbuffer, sizeof(tbuffer), message, vi);
va_end(vi);
sbuf = tbuffer;
Write(sbuf);
}
/** Write to the socket
* @param message The message
*/
void Socket::Write(const std::string &message)
{
WriteBuffer.append(message + "\r\n");
SocketEngine->MarkWriteable(this);
}
/** Constructor
* @param nLS The listen socket this connection came from
* @param nu The user using this socket
* @param nsock The socket
* @param nIPv6 IPv6
*/
SocketEngine::SocketEngine()
ClientSocket::ClientSocket(const std::string &nTargetHost, int nPort, const std::string &nBindHost, bool nIPv6) : Socket(0, nIPv6), TargetHost(nTargetHost), Port(nPort), BindHost(nBindHost)
{
FD_ZERO(&ReadFDs);
FD_ZERO(&WriteFDs);
MaxFD = 0;
if (!IPv6 && (TargetHost.find(':') != std::string::npos || BindHost.find(':') != std::string::npos))
IPv6 = true;
addrinfo hints;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = 0;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_family = IPv6 ? AF_INET6 : AF_INET;
#ifdef _WIN32
WSADATA wsa;
if (WSAStartup(MAKEWORD(2, 0), &wsa))
Alog() << "Failed to initialize WinSock library";
#endif
if (!BindHost.empty())
{
addrinfo *bindar;
sockaddr_in bindaddr;
sockaddr_in6 bindaddr6;
if (getaddrinfo(BindHost.c_str(), NULL, &hints, &bindar) == 0)
{
if (IPv6)
memcpy(&bindaddr6, bindar->ai_addr, bindar->ai_addrlen);
else
memcpy(&bindaddr, bindar->ai_addr, bindar->ai_addrlen);
freeaddrinfo(bindar);
}
else
{
if (IPv6)
{
bindaddr6.sin6_family = AF_INET6;
if (inet_pton(AF_INET6, BindHost.c_str(), &bindaddr6.sin6_addr) < 1)
throw SocketException("Invalid bind host: " + std::string(strerror(errno)));
}
else
{
bindaddr.sin_family = AF_INET;
if (inet_pton(AF_INET, BindHost.c_str(), &bindaddr.sin_addr) < 1)
throw SocketException("Invalid bind host: " + std::string(strerror(errno)));
}
}
if (IPv6)
{
if (bind(sock, reinterpret_cast<sockaddr *>(&bindaddr6), sizeof(bindaddr6)) == -1)
throw SocketException("Unable to bind to address: " + std::string(strerror(errno)));
}
else
{
if (bind(sock, reinterpret_cast<sockaddr *>(&bindaddr), sizeof(bindaddr)) == -1)
throw SocketException("Unable to bind to address: " + std::string(strerror(errno)));
}
}
addrinfo *conar;
sockaddr_in6 addr6;
sockaddr_in addr;
if (getaddrinfo(TargetHost.c_str(), NULL, &hints, &conar) == 0)
{
if (IPv6)
memcpy(&addr6, conar->ai_addr, conar->ai_addrlen);
else
memcpy(&addr, conar->ai_addr, conar->ai_addrlen);
freeaddrinfo(conar);
}
else
{
if (IPv6)
{
if (inet_pton(AF_INET6, TargetHost.c_str(), &addr6.sin6_addr) < 1)
throw SocketException("Invalid server host: " + std::string(strerror(errno)));
}
else
{
if (inet_pton(AF_INET, TargetHost.c_str(), &addr.sin_addr) < 1)
throw SocketException("Invalid server host: " + std::string(strerror(errno)));
}
}
if (IPv6)
{
addr6.sin6_family = AF_INET6;
addr6.sin6_port = htons(nPort);
if (connect(sock, reinterpret_cast<sockaddr *>(&addr6), sizeof(addr6)) == -1)
{
throw SocketException("Error connecting to server: " + std::string(strerror(errno)));
}
}
else
{
addr.sin_family = AF_INET;
addr.sin_port = htons(nPort);
if (connect(sock, reinterpret_cast<sockaddr *>(&addr), sizeof(addr)) == -1)
{
throw SocketException("Error connecting to server: " + std::string(strerror(errno)));
}
}
}
/** Default destructor
*/
ClientSocket::~ClientSocket()
{
}
/** Called with a line recieved from the socket
* @param buf The line
* @return true to continue reading, false to drop the socket
*/
bool ClientSocket::Read(const std::string &buf)
{
return true;
}
/** Constructor
* @param bind The IP to bind to
* @param port The port to listen on
*/
ListenSocket::ListenSocket(const std::string &bindip, int port) : Socket(0, (bindip.find(':') != std::string::npos ? true : false))
{
Type = SOCKTYPE_LISTEN;
BindIP = bindip;
Port = port;
sockaddr_in sock_addr;
sockaddr_in6 sock_addr6;
if (IPv6)
{
sock_addr6.sin6_family = AF_INET6;
sock_addr6.sin6_port = htons(port);
if (inet_pton(AF_INET6, bindip.c_str(), &sock_addr6.sin6_addr) < 1)
{
throw SocketException("Invalid bind host: " + std::string(strerror(errno)));
}
}
else
{
sock_addr.sin_family = AF_INET;
sock_addr.sin_port = htons(port);
if (inet_pton(AF_INET, bindip.c_str(), &sock_addr.sin_addr) < 1)
{
throw SocketException("Invalid bind host: " + std::string(strerror(errno)));
}
}
if (IPv6)
{
if (bind(sock, reinterpret_cast<sockaddr *>(&sock_addr6), sizeof(sock_addr6)) == -1)
{
throw SocketException("Unable to bind to address: " + std::string(strerror(errno)));
}
}
else
{
if (bind(sock, reinterpret_cast<sockaddr *>(&sock_addr), sizeof(sock_addr)) == -1)
{
throw SocketException("Unable to bind to address: " + std::string(strerror(errno)));
}
}
if (listen(sock, 5) == -1)
{
throw SocketException("Unable to listen: " + std::string(strerror(errno)));
}
}
/** Destructor
*/
SocketEngine::~SocketEngine()
ListenSocket::~ListenSocket()
{
#ifdef _WIN32
WSACleanup();
}
/** Accept a connection in this sockets queue
*/
bool ListenSocket::ProcessRead()
{
int newsock = accept(sock, NULL, NULL);
#ifndef _WIN32
# define INVALID_SOCKET 0
#endif
}
/** Add a socket to the socket engine
* @param s The socket
*/
void SocketEngine::AddSocket(Socket *s)
{
if (s->GetSock() > MaxFD)
MaxFD = s->GetSock();
FD_SET(s->GetSock(), &ReadFDs);
Sockets.insert(s);
}
/** Delete a socket from the socket engine
* @param s The socket
*/
void SocketEngine::DelSocket(Socket *s)
{
if (s->GetSock() == MaxFD)
--MaxFD;
FD_CLR(s->GetSock(), &ReadFDs);
FD_CLR(s->GetSock(), &WriteFDs);
Sockets.erase(s);
}
/** Mark a socket as wanting to be written to
* @param s The socket
*/
void SocketEngine::MarkWriteable(Socket *s)
{
FD_SET(s->GetSock(), &WriteFDs);
}
/** Unmark a socket as writeable
* @param s The socket
*/
void SocketEngine::ClearWriteable(Socket *s)
{
FD_CLR(s->GetSock(), &WriteFDs);
}
/** Called to iterate through each socket and check for activity
*/
void SocketEngine::Process()
{
fd_set rfdset = ReadFDs, wfdset = WriteFDs, efdset = ReadFDs;
timeval tval;
tval.tv_sec = Config.ReadTimeout;
tval.tv_usec = 0;
int sresult = select(MaxFD + 1, &rfdset, &wfdset, &efdset, &tval);
if (sresult == -1)
Alog() << "SocketEngine::Process error, " << GetError();
else if (sresult)
if (newsock > 0 && newsock != INVALID_SOCKET)
{
for (std::set<Socket *>::iterator it = Sockets.begin(); it != Sockets.end(); ++it)
{
Socket *s = *it;
if (FD_ISSET(s->GetSock(), &efdset))
{
s->ProcessError();
OldSockets.insert(s);
continue;
}
if (FD_ISSET(s->GetSock(), &rfdset))
{
if (!s->ProcessRead())
OldSockets.insert(s);
}
if (FD_ISSET(s->GetSock(), &wfdset))
{
ClearWriteable(s);
if (!s->ProcessWrite())
OldSockets.insert(s);
}
}
return this->OnAccept(new Socket(newsock, IPv6));
}
while (!OldSockets.empty())
{
delete (*OldSockets.begin());
OldSockets.erase(OldSockets.begin());
}
return true;
}
/** Get the last socket error
* @return The error
/** Called when a connection is accepted
* @param s The socket for the new connection
* @return true if the listen socket should remain alive
*/
const std::string SocketEngine::GetError() const
bool ListenSocket::OnAccept(Socket *s)
{
#ifdef _WIN32
errno = WSAGetLastError();
#endif
switch (errno)
{
case EBADF:
return "Socket error, invalid file descriptor given to select()";
break;
case EINTR:
return "Socket engine caught signal";
break;
#ifdef WIN32
case WSANOTINITIALISED:
return "A successful WSAStartup call must occur before using this function.";
break;
case WSAEFAULT:
return "The Windows Sockets implementation was unable to allocate needed resources for its internal operations, or the readfds, writefds, exceptfds, or timeval parameters are not part of the user address space.";
break;
case WSAENETDOWN:
return "The network subsystem has failed.";
break;
case WSAEINVAL:
return "The time-out value is not valid, or all three descriptor parameters were null.";
break;
case WSAEINTR:
return "A blocking Windows Socket 1.1 call was canceled through WSACancelBlockingCall.";
break;
case WSAEINPROGRESS:
return "A blocking Windows Sockets 1.1 call is in progress, or the service provider is still processing a callback function.";
break;
case WSAENOTSOCK:
return "One of the descriptor sets contains an entry that is not a socket.";
break;
#endif
default:
return "Socket engine caught unknown error";
}
return true;
}
/** Get the bind IP for this socket
* @return the bind ip
*/
const std::string &ListenSocket::GetBindIP() const
{
return BindIP;
}
/** Get the port this socket is bound to
* @return The port
*/
const int ListenSocket::GetPort() const
{
return Port;
}