mirror of
https://github.com/anope/anope.git
synced 2026-06-28 10:46:38 +02:00
Split db_mysql_live into two modules so other modules can make use of the asynchronous command interface
This commit is contained in:
+18
-5
@@ -278,6 +278,8 @@ options
|
||||
* - db_mysql
|
||||
* - db_mysql_live
|
||||
*
|
||||
* You may have more than one loaded at once!
|
||||
*
|
||||
* The db_mysql_live module is an extension to db_mysql, and should only be used if
|
||||
* db_mysql is being used. This module pulls data in real time from SQL as it is
|
||||
* requested by the core as a result of someone executing commands.
|
||||
@@ -285,12 +287,11 @@ options
|
||||
* This effectively allows you to edit your database and have it be immediately
|
||||
* reflected back in Anope.
|
||||
*
|
||||
* It is highly recommended you only use this module if your databases are located
|
||||
* locally as this module will generate many queries per command.
|
||||
* db_mysql_live only uses threads for commands and non-blocking queries, so it is safe to
|
||||
* use on large networks without worrying about response times.
|
||||
* For information on how to make db_mysql_live use asynchronous queries see
|
||||
* m_async_commands.
|
||||
*
|
||||
* NOTE: You can and probably should use db_plain together with db_mysql/db_mysql_live
|
||||
* At this time db_mysql_live only supports pulling data in real time from the four
|
||||
* main tables: anope_bs_core, anope_cs_info, anope_ns_alias, and anope_ns_core.
|
||||
*
|
||||
*/
|
||||
database = "db_plain"
|
||||
@@ -2030,6 +2031,18 @@ alias
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* m_async_commands
|
||||
*
|
||||
* Creates a thread for each command executed by a user. You should
|
||||
* only load this if you are using a module designed to work with this.
|
||||
*
|
||||
* If this is loaded with db_mysql_live then Anope will support
|
||||
* processing multiple commands at once which will help very busy networks
|
||||
* with lag issues caused from the overhead of SQL queries caused by db_mysq_live.
|
||||
*/
|
||||
#module { name = "m_async_commands" }
|
||||
|
||||
/* Provides the !kb fantasy command */
|
||||
alias
|
||||
{
|
||||
|
||||
@@ -45,7 +45,7 @@ class CoreExport Thread : public Extensible
|
||||
{
|
||||
private:
|
||||
/* Set to true to tell the thread to finish and we are waiting for it */
|
||||
bool Exit;
|
||||
bool exit;
|
||||
|
||||
public:
|
||||
/* Handle for this thread */
|
||||
@@ -67,6 +67,10 @@ class CoreExport Thread : public Extensible
|
||||
*/
|
||||
void SetExitState();
|
||||
|
||||
/** Exit the thread. Note that the thread still must be joined to free resources!
|
||||
*/
|
||||
void Exit();
|
||||
|
||||
/** Returns the exit state of the thread
|
||||
* @return true if we want to exit
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
|
||||
class CommandMutex : public Thread
|
||||
{
|
||||
public:
|
||||
// Mutex used by this command to allow the core to drop and pick up processing of it at will
|
||||
Mutex mutex;
|
||||
// Set to true when this thread is processing data that is not thread safe (eg, the command)
|
||||
bool processing;
|
||||
Command *command;
|
||||
CommandSource source;
|
||||
std::vector<Anope::string> params;
|
||||
|
||||
CommandMutex() : Thread(), processing(true) { }
|
||||
|
||||
~CommandMutex() { }
|
||||
|
||||
virtual void Run() = 0;
|
||||
|
||||
virtual void Lock() = 0;
|
||||
|
||||
virtual void Unlock() = 0;
|
||||
};
|
||||
|
||||
class AsynchCommandsService : public Service
|
||||
{
|
||||
public:
|
||||
AsynchCommandsService(Module *o, const Anope::string &n) : Service(o, n) { }
|
||||
virtual CommandMutex *CurrentCommand() = 0;
|
||||
};
|
||||
|
||||
+70
-163
@@ -1,67 +1,11 @@
|
||||
#include "module.h"
|
||||
#include "async_commands.h"
|
||||
#include "sql.h"
|
||||
|
||||
class CommandMutex;
|
||||
static std::list<CommandMutex *> commands;
|
||||
|
||||
/* Current command being processed by the core */
|
||||
static CommandMutex *current_command = NULL;
|
||||
/* Mutex held by the core when it is processing. Used by threads to halt the core */
|
||||
static Mutex main_mutex;
|
||||
|
||||
class CommandMutex : public Thread
|
||||
{
|
||||
public:
|
||||
// Mutex used by this command to allow the core to drop and pick up processing of it at will
|
||||
Mutex mutex;
|
||||
// Set to true when this thread is processing data that is not thread safe (eg, the command)
|
||||
bool processing;
|
||||
Command *command;
|
||||
CommandSource source;
|
||||
std::vector<Anope::string> params;
|
||||
|
||||
CommandMutex() : Thread(), processing(true)
|
||||
{
|
||||
commands.push_back(this);
|
||||
current_command = this;
|
||||
}
|
||||
|
||||
~CommandMutex()
|
||||
{
|
||||
std::list<CommandMutex *>::iterator it = std::find(commands.begin(), commands.end(), this);
|
||||
if (it != commands.end())
|
||||
commands.erase(it);
|
||||
if (this == current_command)
|
||||
current_command = NULL;
|
||||
}
|
||||
|
||||
void Run()
|
||||
{
|
||||
User *u = this->source.u;
|
||||
BotInfo *bi = this->source.owner;
|
||||
|
||||
if (!command->permission.empty() && !u->Account()->HasCommand(command->permission))
|
||||
{
|
||||
u->SendMessage(bi, LanguageString::ACCESS_DENIED);
|
||||
Log(LOG_COMMAND, "denied", bi) << "Access denied for user " << u->GetMask() << " with command " << command;
|
||||
}
|
||||
else
|
||||
{
|
||||
CommandReturn ret = command->Execute(source, params);
|
||||
|
||||
if (ret == MOD_CONT)
|
||||
{
|
||||
FOREACH_MOD(I_OnPostCommand, OnPostCommand(source, command, params));
|
||||
}
|
||||
}
|
||||
|
||||
main_mutex.Unlock();
|
||||
}
|
||||
};
|
||||
|
||||
class MySQLLiveModule : public Module, public Pipe
|
||||
class MySQLLiveModule : public Module
|
||||
{
|
||||
service_reference<SQLProvider> SQL;
|
||||
service_reference<AsynchCommandsService> ACS;
|
||||
|
||||
SQLResult RunQuery(const Anope::string &query)
|
||||
{
|
||||
@@ -76,86 +20,41 @@ class MySQLLiveModule : public Module, public Pipe
|
||||
return SQL ? SQL->Escape(query) : query;
|
||||
}
|
||||
|
||||
CommandMutex *CurrentCommand()
|
||||
{
|
||||
if (this->ACS)
|
||||
return this->ACS->CurrentCommand();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
public:
|
||||
MySQLLiveModule(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), SQL("mysql/main")
|
||||
MySQLLiveModule(const Anope::string &modname, const Anope::string &creator) :
|
||||
Module(modname, creator), SQL("mysql/main"), ACS("asynch_commands")
|
||||
{
|
||||
Implementation i[] = { I_OnPreCommand, I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore };
|
||||
ModuleManager::Attach(i, this, 5);
|
||||
Implementation i[] = { I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore };
|
||||
ModuleManager::Attach(i, this, 4);
|
||||
}
|
||||
|
||||
EventReturn OnPreCommand(CommandSource &source, Command *command, const std::vector<Anope::string> ¶ms)
|
||||
{
|
||||
if (this->SQL)
|
||||
{
|
||||
CommandMutex *cm = new CommandMutex();
|
||||
try
|
||||
{
|
||||
cm->mutex.Lock();
|
||||
cm->command = command;
|
||||
cm->source = source;
|
||||
cm->params = params;
|
||||
|
||||
commands.push_back(cm);
|
||||
|
||||
// Give processing to the command thread
|
||||
Log(LOG_DEBUG_2) << "db_mysql_live: Waiting for command thread " << cm->command->name << " from " << source.u->nick;
|
||||
threadEngine.Start(cm);
|
||||
main_mutex.Lock();
|
||||
|
||||
return EVENT_STOP;
|
||||
}
|
||||
catch (const CoreException &ex)
|
||||
{
|
||||
delete cm;
|
||||
Log() << "db_mysql_live: Unable to thread for command: " << ex.GetReason();
|
||||
}
|
||||
}
|
||||
|
||||
return EVENT_CONTINUE;
|
||||
}
|
||||
|
||||
void OnNotify()
|
||||
{
|
||||
for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it)
|
||||
{
|
||||
CommandMutex *cm = *it;
|
||||
|
||||
// Thread engine will pick this up later
|
||||
if (cm->GetExitState() || !cm->processing)
|
||||
continue;
|
||||
|
||||
Log(LOG_DEBUG_2) << "db_mysql_live: Waiting for command thread " << cm->command->name << " from " << cm->source.u->nick;
|
||||
current_command = cm;
|
||||
|
||||
// Unlock to give processing back to the command thread
|
||||
cm->mutex.Unlock();
|
||||
// Relock to regain processing once the command thread hangs for any reason
|
||||
main_mutex.Lock();
|
||||
|
||||
current_command = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void OnFindBot(const Anope::string &nick)
|
||||
{
|
||||
if (!current_command)
|
||||
static bool lookup = true;
|
||||
if (lookup == false)
|
||||
{
|
||||
lookup = true;
|
||||
return;
|
||||
|
||||
CommandMutex *cm = current_command;
|
||||
|
||||
// Give it back to the core
|
||||
cm->processing = false;
|
||||
main_mutex.Unlock();
|
||||
SQLResult res = this->RunQuery("SELECT * FROM `anope_bs_core` WHERE `nick` = '" + this->Escape(nick) + "'");
|
||||
// And take it back...
|
||||
cm->processing = true;
|
||||
this->Notify();
|
||||
cm->mutex.Lock();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
current_command = NULL;
|
||||
CommandMutex *current_command = this->CurrentCommand();
|
||||
|
||||
if (current_command)
|
||||
current_command->Unlock();
|
||||
SQLResult res = this->RunQuery("SELECT * FROM `anope_bs_core` WHERE `nick` = '" + this->Escape(nick) + "'");
|
||||
if (current_command)
|
||||
current_command->Lock();
|
||||
|
||||
lookup = false;
|
||||
BotInfo *bi = findbot(res.Get(0, "nick"));
|
||||
if (!bi)
|
||||
bi = new BotInfo(res.Get(0, "nick"), res.Get(0, "user"), res.Get(0, "host"), res.Get(0, "rname"));
|
||||
@@ -177,21 +76,24 @@ class MySQLLiveModule : public Module, public Pipe
|
||||
|
||||
void OnFindChan(const Anope::string &chname)
|
||||
{
|
||||
if (!current_command)
|
||||
static bool lookup = true;
|
||||
if (lookup == false)
|
||||
{
|
||||
lookup = true;
|
||||
return;
|
||||
|
||||
CommandMutex *cm = current_command;
|
||||
|
||||
cm->processing = false;
|
||||
main_mutex.Unlock();
|
||||
SQLResult res = this->RunQuery("SELECT * FROM `anope_cs_info` WHERE `name` = '" + this->Escape(chname) + "'");
|
||||
cm->processing = true;
|
||||
this->Notify();
|
||||
cm->mutex.Lock();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
current_command = NULL;
|
||||
CommandMutex *current_command = this->CurrentCommand();
|
||||
|
||||
if (current_command)
|
||||
current_command->Unlock();
|
||||
SQLResult res = this->RunQuery("SELECT * FROM `anope_cs_info` WHERE `name` = '" + this->Escape(chname) + "'");
|
||||
if (current_command)
|
||||
current_command->Lock();
|
||||
|
||||
lookup = false;
|
||||
ChannelInfo *ci = cs_findchan(res.Get(0, "name"));
|
||||
if (!ci)
|
||||
ci = new ChannelInfo(res.Get(0, "name"));
|
||||
@@ -263,25 +165,27 @@ class MySQLLiveModule : public Module, public Pipe
|
||||
|
||||
void OnFindNick(const Anope::string &nick)
|
||||
{
|
||||
if (!current_command)
|
||||
static bool lookup = true;
|
||||
if (lookup == false)
|
||||
{
|
||||
lookup = true;
|
||||
return;
|
||||
|
||||
CommandMutex *cm = current_command;
|
||||
|
||||
cm->processing = false;
|
||||
main_mutex.Unlock();
|
||||
SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_alias` WHERE `nick` = '" + this->Escape(nick) + "'");
|
||||
cm->processing = true;
|
||||
this->Notify();
|
||||
cm->mutex.Lock();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Make OnFindCore trigger and look up the core too
|
||||
CommandMutex *current_command = this->CurrentCommand();
|
||||
|
||||
if (current_command)
|
||||
current_command->Unlock();
|
||||
SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_alias` WHERE `nick` = '" + this->Escape(nick) + "'");
|
||||
if (current_command)
|
||||
current_command->Lock();
|
||||
|
||||
NickCore *nc = findcore(res.Get(0, "display"));
|
||||
if (!nc)
|
||||
return;
|
||||
current_command = NULL;
|
||||
lookup = false;
|
||||
NickAlias *na = findnick(res.Get(0, "nick"));
|
||||
if (!na)
|
||||
na = new NickAlias(res.Get(0, "nick"), nc);
|
||||
@@ -310,21 +214,24 @@ class MySQLLiveModule : public Module, public Pipe
|
||||
|
||||
void OnFindCore(const Anope::string &nick)
|
||||
{
|
||||
if (!current_command)
|
||||
static bool lookup = true;
|
||||
if (lookup == false)
|
||||
{
|
||||
lookup = true;
|
||||
return;
|
||||
|
||||
CommandMutex *cm = current_command;
|
||||
|
||||
cm->processing = false;
|
||||
main_mutex.Unlock();
|
||||
SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_core` WHERE `name` = '" + this->Escape(nick) + "'");
|
||||
cm->processing = true;
|
||||
this->Notify();
|
||||
cm->mutex.Lock();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
current_command = NULL;
|
||||
CommandMutex *current_command = this->CurrentCommand();
|
||||
|
||||
if (current_command)
|
||||
current_command->Unlock();
|
||||
SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_core` WHERE `name` = '" + this->Escape(nick) + "'");
|
||||
if (current_command)
|
||||
current_command->Lock();
|
||||
|
||||
lookup = false;
|
||||
NickCore *nc = findcore(res.Get(0, "display"));
|
||||
if (!nc)
|
||||
nc = new NickCore(res.Get(0, "display"));
|
||||
|
||||
@@ -0,0 +1,137 @@
|
||||
#include "module.h"
|
||||
#include "async_commands.h"
|
||||
|
||||
static Pipe *me;
|
||||
static CommandMutex *current_command = NULL;
|
||||
static std::list<CommandMutex *> commands;
|
||||
/* Mutex held by the core when it is processing. Used by threads to halt the core */
|
||||
static Mutex main_mutex;
|
||||
|
||||
class AsynchCommandMutex : public CommandMutex
|
||||
{
|
||||
public:
|
||||
AsynchCommandMutex() : CommandMutex()
|
||||
{
|
||||
commands.push_back(this);
|
||||
current_command = this;
|
||||
}
|
||||
|
||||
~AsynchCommandMutex()
|
||||
{
|
||||
std::list<CommandMutex *>::iterator it = std::find(commands.begin(), commands.end(), this);
|
||||
if (it != commands.end())
|
||||
commands.erase(it);
|
||||
if (this == current_command)
|
||||
current_command = NULL;
|
||||
}
|
||||
|
||||
void Run()
|
||||
{
|
||||
User *u = this->source.u;
|
||||
BotInfo *bi = this->source.owner;
|
||||
|
||||
if (!command->permission.empty() && !u->Account()->HasCommand(command->permission))
|
||||
{
|
||||
u->SendMessage(bi, LanguageString::ACCESS_DENIED);
|
||||
Log(LOG_COMMAND, "denied", bi) << "Access denied for user " << u->GetMask() << " with command " << command;
|
||||
}
|
||||
else
|
||||
{
|
||||
CommandReturn ret = command->Execute(source, params);
|
||||
|
||||
if (ret == MOD_CONT)
|
||||
{
|
||||
FOREACH_MOD(I_OnPostCommand, OnPostCommand(source, command, params));
|
||||
}
|
||||
}
|
||||
|
||||
main_mutex.Unlock();
|
||||
}
|
||||
|
||||
void Lock()
|
||||
{
|
||||
this->processing = true;
|
||||
me->Notify();
|
||||
this->mutex.Lock();
|
||||
}
|
||||
|
||||
void Unlock()
|
||||
{
|
||||
this->processing = false;
|
||||
main_mutex.Unlock();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsService
|
||||
{
|
||||
public:
|
||||
ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands")
|
||||
{
|
||||
me = this;
|
||||
|
||||
this->SetPermanent(true);
|
||||
|
||||
main_mutex.Lock();
|
||||
|
||||
Implementation i[] = { I_OnPreCommand };
|
||||
ModuleManager::Attach(i, this, 1);
|
||||
|
||||
ModuleManager::RegisterService(this);
|
||||
}
|
||||
|
||||
EventReturn OnPreCommand(CommandSource &source, Command *command, const std::vector<Anope::string> ¶ms)
|
||||
{
|
||||
AsynchCommandMutex *cm = new AsynchCommandMutex();
|
||||
try
|
||||
{
|
||||
cm->mutex.Lock();
|
||||
cm->command = command;
|
||||
cm->source = source;
|
||||
cm->params = params;
|
||||
|
||||
// Give processing to the command thread
|
||||
Log(LOG_DEBUG_2) << "Waiting for command thread " << cm->command->name << " from " << source.u->nick;
|
||||
threadEngine.Start(cm);
|
||||
main_mutex.Lock();
|
||||
|
||||
return EVENT_STOP;
|
||||
}
|
||||
catch (const CoreException &ex)
|
||||
{
|
||||
delete cm;
|
||||
Log() << "Unable to thread for command: " << ex.GetReason();
|
||||
}
|
||||
|
||||
return EVENT_CONTINUE;
|
||||
}
|
||||
|
||||
void OnNotify()
|
||||
{
|
||||
for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it)
|
||||
{
|
||||
CommandMutex *cm = *it;
|
||||
|
||||
// Thread engine will pick this up later
|
||||
if (cm->GetExitState() || !cm->processing)
|
||||
continue;
|
||||
|
||||
Log(LOG_DEBUG_2) << "Waiting for command thread " << cm->command->name << " from " << cm->source.u->nick;
|
||||
current_command = cm;
|
||||
|
||||
// Unlock to give processing back to the command thread
|
||||
cm->mutex.Unlock();
|
||||
// Relock to regain processing once the command thread hangs for any reason
|
||||
main_mutex.Lock();
|
||||
|
||||
current_command = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
CommandMutex *CurrentCommand()
|
||||
{
|
||||
return current_command;
|
||||
}
|
||||
};
|
||||
|
||||
MODULE_INIT(ModuleAsynchCommands)
|
||||
@@ -20,7 +20,7 @@ void ThreadEngine::Process()
|
||||
|
||||
/** Threads constructor
|
||||
*/
|
||||
Thread::Thread() : Exit(false)
|
||||
Thread::Thread() : exit(false)
|
||||
{
|
||||
threadEngine.threads.push_back(this);
|
||||
}
|
||||
@@ -41,7 +41,7 @@ Thread::~Thread()
|
||||
*/
|
||||
void Thread::SetExitState()
|
||||
{
|
||||
Exit = true;
|
||||
exit = true;
|
||||
}
|
||||
|
||||
/** Returns the exit state of the thread
|
||||
@@ -49,7 +49,7 @@ void Thread::SetExitState()
|
||||
*/
|
||||
bool Thread::GetExitState() const
|
||||
{
|
||||
return Exit;
|
||||
return exit;
|
||||
}
|
||||
|
||||
/** Called to run the thread, should be overloaded
|
||||
|
||||
@@ -39,6 +39,13 @@ void Thread::Join()
|
||||
pthread_join(Handle, NULL);
|
||||
}
|
||||
|
||||
/** Exit the thread. Note that the thread still must be joined to free resources!
|
||||
*/
|
||||
void Thread::Exit()
|
||||
{
|
||||
pthread_exit(0);
|
||||
}
|
||||
|
||||
/** Start a new thread
|
||||
* @param thread A pointer to a newley allocated thread
|
||||
*/
|
||||
|
||||
@@ -31,6 +31,13 @@ void Thread::Join()
|
||||
WaitForSingleObject(Handle, INFINITE);
|
||||
}
|
||||
|
||||
/** Exit the thread. Note that the thread still must be joined to free resources!
|
||||
*/
|
||||
void Thread::Exit()
|
||||
{
|
||||
ExitThread(0);
|
||||
}
|
||||
|
||||
/** Start a new thread
|
||||
* @param thread A pointer to a newley allocated thread
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user