1
0
mirror of https://github.com/anope/anope.git synced 2026-06-30 22:46:40 +02:00

Cleaned up some of the socket code, cleaned up the pipe engines, added support for binary sockets, and cleaned up the asynch connect/accept code

This commit is contained in:
Adam
2011-08-21 13:38:42 -04:00
parent 4fcb371bc8
commit 2eb708e5ad
16 changed files with 738 additions and 725 deletions
+69 -307
View File
@@ -260,15 +260,19 @@ int SocketIO::Recv(Socket *s, char *buf, size_t sz)
/** Write something to the socket
* @param s The socket
* @param buf What to write
* @return Number of bytes written
* @param buf The data to write
* @param size The length of the data
*/
int SocketIO::Send(Socket *s, const Anope::string &buf)
int SocketIO::Send(Socket *s, const char *buf, size_t sz)
{
size_t i = send(s->GetFD(), buf.c_str(), buf.length(), 0);
size_t i = send(s->GetFD(), buf, sz, 0);
TotalWritten += i;
return i;
}
int SocketIO::Send(Socket *s, const Anope::string &buf)
{
return this->Send(s, buf.c_str(), buf.length());
}
/** Accept a connection from a socket
* @param s The socket
@@ -282,22 +286,27 @@ ClientSocket *SocketIO::Accept(ListenSocket *s)
int newsock = accept(s->GetFD(), &conaddr.sa, &size);
#ifndef INVALID_SOCKET
# define INVALID_SOCKET -1
static const int INVALID_SOCKET = -1;
#endif
if (newsock >= 0 && newsock != INVALID_SOCKET)
return s->OnAccept(newsock, conaddr);
{
ClientSocket *ns = s->OnAccept(newsock, conaddr);
ns->SetFlag(SF_ACCEPTED);
ns->OnAccept();
return ns;
}
else
throw SocketException("Unable to accept connection: " + Anope::LastError());
}
/** Check if a connection has been accepted
* @param s The client socket
* @return -1 on error, 0 to wait, 1 on success
/** Finished accepting a connection from a socket
* @param s The socket
* @return SF_ACCEPTED if accepted, SF_ACCEPTING if still in process, SF_DEAD on error
*/
int SocketIO::Accepted(ClientSocket *cs)
SocketFlag SocketIO::FinishAccept(ClientSocket *cs)
{
return 1;
return SF_ACCEPTED;
}
/** Bind a socket
@@ -319,6 +328,8 @@ void SocketIO::Bind(Socket *s, const Anope::string &ip, int port)
*/
void SocketIO::Connect(ConnectionSocket *s, const Anope::string &target, int port)
{
s->UnsetFlag(SF_CONNECTING);
s->UnsetFlag(SF_CONNECTED);
s->conaddr.pton(s->IsIPv6() ? AF_INET6 : AF_INET, target, port);
int c = connect(s->GetFD(), &s->conaddr.sa, s->conaddr.size());
if (c == -1)
@@ -326,30 +337,51 @@ void SocketIO::Connect(ConnectionSocket *s, const Anope::string &target, int por
if (Anope::LastErrorCode() != EINPROGRESS)
s->OnError(Anope::LastError());
else
{
SocketEngine::MarkWritable(s);
s->SetFlag(SF_CONNECTING);
}
}
else
{
s->connected = true;
s->SetFlag(SF_CONNECTED);
s->OnConnect();
}
}
/** Check if this socket is connected
/** Called to potentially finish a pending connection
* @param s The socket
* @return -1 for error, 0 for wait, 1 for connected
* @return SF_CONNECTED on success, SF_CONNECTING if still pending, and SF_DEAD on error.
*/
int SocketIO::Connected(ConnectionSocket *s)
SocketFlag SocketIO::FinishConnect(ConnectionSocket *s)
{
return s->connected == true ? 1 : -1;
if (s->HasFlag(SF_CONNECTED))
return SF_CONNECTED;
else if (!s->HasFlag(SF_CONNECTING))
throw SocketException("SocketIO::FinishConnect called for a socket not connected nor connecting?");
int optval = 0;
socklen_t optlen = sizeof(optval);
if (!getsockopt(s->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen) && !optval)
{
s->SetFlag(SF_CONNECTED);
s->UnsetFlag(SF_CONNECTING);
s->OnConnect();
return SF_CONNECTED;
}
else
{
errno = optval;
s->ProcessError();
return SF_DEAD;
}
}
/** Empty constructor, used for things such as the pipe socket
/** Empty constructor, should not be called.
*/
Socket::Socket() : Flags<SocketFlag, 2>(SocketFlagStrings)
Socket::Socket() : Flags<SocketFlag>(SocketFlagStrings)
{
this->Type = SOCKTYPE_BASE;
this->IO = &normalSocketIO;
throw CoreException("Socket::Socket() ?");
}
/** Constructor
@@ -357,12 +389,11 @@ Socket::Socket() : Flags<SocketFlag, 2>(SocketFlagStrings)
* @param ipv6 IPv6?
* @param type The socket type, defaults to SOCK_STREAM
*/
Socket::Socket(int sock, bool ipv6, int type) : Flags<SocketFlag, 2>(SocketFlagStrings)
Socket::Socket(int sock, bool ipv6, int type) : Flags<SocketFlag>(SocketFlagStrings)
{
this->Type = SOCKTYPE_BASE;
this->IO = &normalSocketIO;
this->IPv6 = ipv6;
if (sock == 0)
if (sock == -1)
this->Sock = socket(this->IPv6 ? AF_INET6 : AF_INET, type, 0);
else
this->Sock = sock;
@@ -432,6 +463,14 @@ void Socket::Bind(const Anope::string &ip, int port)
this->IO->Bind(this, ip, port);
}
/** Called when there either is a read or write event.
* @return true to continue to call ProcessRead/ProcessWrite, false to not continue
*/
bool Socket::Process()
{
return true;
}
/** Called when there is something to be received for this socket
* @return true on success, false to drop this socket
*/
@@ -455,157 +494,25 @@ void Socket::ProcessError()
{
}
/** Constructor for pipe socket
*/
BufferedSocket::BufferedSocket() : Socket()
{
this->Type = SOCKTYPE_BUFFERED;
}
/** Constructor
* @param fd FD to use
* @param ipv6 true for ipv6
* @param type socket type, defaults to SOCK_STREAM
*/
BufferedSocket::BufferedSocket(int fd, bool ipv6, int type) : Socket(fd, ipv6, type)
{
this->Type = SOCKTYPE_BUFFERED;
}
/** Default destructor
*/
BufferedSocket::~BufferedSocket()
{
}
/** Called when there is something to be received for this socket
* @return true on success, false to drop this socket
*/
bool BufferedSocket::ProcessRead()
{
char tbuffer[NET_BUFSIZE];
this->RecvLen = 0;
int len = this->IO->Recv(this, tbuffer, sizeof(tbuffer) - 1);
if (len <= 0)
return false;
tbuffer[len] = 0;
this->RecvLen = len;
Anope::string sbuffer = this->extrabuf;
sbuffer += tbuffer;
this->extrabuf.clear();
size_t lastnewline = sbuffer.rfind('\n');
if (lastnewline == Anope::string::npos)
{
this->extrabuf = sbuffer;
return true;
}
if (lastnewline < sbuffer.length() - 1)
{
this->extrabuf = sbuffer.substr(lastnewline);
this->extrabuf.trim();
sbuffer = sbuffer.substr(0, lastnewline);
}
sepstream stream(sbuffer, '\n');
Anope::string tbuf;
while (stream.GetToken(tbuf))
{
tbuf.trim();
if (!tbuf.empty() && !Read(tbuf))
return false;
}
return true;
}
/** Called when the socket is ready to be written to
* @return true on success, false to drop this socket
*/
bool BufferedSocket::ProcessWrite()
{
int count = this->IO->Send(this, this->WriteBuffer);
if (count <= -1)
return false;
this->WriteBuffer = this->WriteBuffer.substr(count);
if (this->WriteBuffer.empty())
SocketEngine::ClearWritable(this);
return true;
}
/** Called with a line received from the socket
* @param buf The line
* @return true to continue reading, false to drop the socket
*/
bool BufferedSocket::Read(const Anope::string &buf)
{
return false;
}
/** Write to the socket
* @param message The message
*/
void BufferedSocket::Write(const char *message, ...)
{
va_list vi;
char tbuffer[BUFSIZE];
if (!message)
return;
va_start(vi, message);
vsnprintf(tbuffer, sizeof(tbuffer), message, vi);
va_end(vi);
Anope::string sbuf = tbuffer;
Write(sbuf);
}
/** Write to the socket
* @param message The message
*/
void BufferedSocket::Write(const Anope::string &message)
{
this->WriteBuffer += message + "\r\n";
SocketEngine::MarkWritable(this);
}
/** Get the length of the read buffer
* @return The length of the read buffer
*/
int BufferedSocket::ReadBufferLen() const
{
return RecvLen;
}
/** Get the length of the write buffer
* @return The length of the write buffer
*/
int BufferedSocket::WriteBufferLen() const
{
return this->WriteBuffer.length();
}
/** Constructor
* @param bindip The IP to bind to
* @param port The port to listen on
* @param ipv6 true for ipv6
*/
ListenSocket::ListenSocket(const Anope::string &bindip, int port, bool ipv6) : Socket(0, ipv6)
ListenSocket::ListenSocket(const Anope::string &bindip, int port, bool ipv6) : Socket(-1, ipv6)
{
this->Type = SOCKTYPE_LISTEN;
this->SetNonBlocking();
#ifndef _WIN32
int op = 1;
setsockopt(this->GetFD(), SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op));
#endif
this->bindaddr.pton(IPv6 ? AF_INET6 : AF_INET, bindip, port);
this->IO->Bind(this, bindip, port);
if (listen(Sock, SOMAXCONN) == -1)
throw SocketException(Anope::string("Unable to listen: ") + Anope::LastError());
throw SocketException("Unable to listen: " + Anope::LastError());
}
/** Destructor
@@ -629,148 +536,3 @@ bool ListenSocket::ProcessRead()
return true;
}
/** Called when a connection is accepted
* @param fd The FD for the new connection
* @param addr The sockaddr for where the connection came from
* @return The new socket
*/
ClientSocket *ListenSocket::OnAccept(int fd, const sockaddrs &addr)
{
return new ClientSocket(this, fd, addr);
}
/** Constructor
* @param ipv6 true to use IPv6
* @param type The socket type, defaults to SOCK_STREAM
*/
ConnectionSocket::ConnectionSocket(bool ipv6, int type) : BufferedSocket(0, ipv6, type), connected(false)
{
this->Type = SOCKTYPE_CONNECTION;
}
/** Connect the socket
* @param TargetHost The target host to connect to
* @param Port The target port to connect to
*/
void ConnectionSocket::Connect(const Anope::string &TargetHost, int Port)
{
this->IO->Connect(this, TargetHost, Port);
}
/** Called when there is something to be received for this socket
* @return true on success, false to drop this socket
*/
bool ConnectionSocket::ProcessRead()
{
if (!this->connected)
{
int optval = 0;
socklen_t optlen = sizeof(optval);
if (!getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen) && !optval)
{
this->connected = true;
this->OnConnect();
}
else
errno = optval;
}
int i = this->IO->Connected(this);
if (i == 1)
return BufferedSocket::ProcessRead();
else if (i == 0)
return true;
this->OnError(Anope::LastError());
return false;
}
/** Called when the socket is ready to be written to
* @return true on success, false to drop this socket
*/
bool ConnectionSocket::ProcessWrite()
{
if (!this->connected)
{
int optval = 0;
socklen_t optlen = sizeof(optval);
if (!getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen) && !optval)
{
this->connected = true;
this->OnConnect();
}
else
errno = optval;
}
int i = this->IO->Connected(this);
if (i == 1)
return BufferedSocket::ProcessWrite();
else if (i == 0)
return true;
this->OnError(Anope::LastError());
return false;
}
/** Called when there is an error for this socket
* @return true on success, false to drop this socket
*/
void ConnectionSocket::ProcessError()
{
int optval = 0;
socklen_t optlen = sizeof(optval);
getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen);
errno = optval;
this->OnError(optval ? Anope::LastError() : "");
}
/** Called on a successful connect
*/
void ConnectionSocket::OnConnect()
{
}
/** Called when a connection is not successful
* @param error The error
*/
void ConnectionSocket::OnError(const Anope::string &error)
{
}
/** Constructor
* @param ls Listen socket this connection is from
* @param fd New FD for this socket
* @param addr Address the connection came from
*/
ClientSocket::ClientSocket(ListenSocket *ls, int fd, const sockaddrs &addr) : BufferedSocket(fd, ls->IsIPv6()), LS(ls), clientaddr(addr)
{
this->Type = SOCKTYPE_CLIENT;
}
/** Called when there is something to be received for this socket
* @return true on success, false to drop this socket
*/
bool ClientSocket::ProcessRead()
{
int i = this->IO->Accepted(this);
if (i == 1)
return BufferedSocket::ProcessRead();
else if (i == 0)
return true;
return false;
}
/** Called when the socket is ready to be written to
* @return true on success, false to drop this socket
*/
bool ClientSocket::ProcessWrite()
{
int i = this->IO->Accepted(this);
if (i == 1)
return BufferedSocket::ProcessWrite();
else if (i == 0)
return true;
return false;
}