mirror of
https://github.com/anope/anope.git
synced 2026-06-12 17:24:49 +02:00
Banish Redis support to the shadow realm.
Nobody actually uses this and it hasn't been tested in years so it a massive pain to maintain. It may be replaced with an alternate NoSQL database such as MongoDB in the future.
This commit is contained in:
@@ -1248,26 +1248,6 @@ module
|
||||
import = no
|
||||
}
|
||||
|
||||
/*
|
||||
* db_redis.
|
||||
*
|
||||
* This module allows using Redis (https://redis.io/) as a database backend.
|
||||
* This module requires that redis is loaded and configured properly.
|
||||
*
|
||||
* Redis 2.8 supports keyspace notifications which allows Redis to push notifications
|
||||
* to Anope about outside modifications to the database. This module supports this and
|
||||
* will internally reflect any changes made to the database immediately once notified.
|
||||
* See docs/REDIS for more information regarding this.
|
||||
*/
|
||||
#module
|
||||
{
|
||||
name = "db_redis"
|
||||
|
||||
/*
|
||||
* Redis database to use. This must be configured with redis.
|
||||
*/
|
||||
engine = "redis/main"
|
||||
}
|
||||
|
||||
/*
|
||||
* [RECOMMENDED] Encryption modules.
|
||||
|
||||
@@ -374,30 +374,6 @@ module { name = "help" }
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* redis
|
||||
*
|
||||
* This module allows other modules to use Redis.
|
||||
*/
|
||||
#module
|
||||
{
|
||||
name = "redis"
|
||||
|
||||
/* A redis database */
|
||||
redis
|
||||
{
|
||||
/* The name of this service */
|
||||
name = "redis/main"
|
||||
|
||||
/*
|
||||
* The redis database to use. New connections default to 0.
|
||||
*/
|
||||
db = 0
|
||||
|
||||
ip = "127.0.0.1"
|
||||
port = 6379
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* [EXTRA] regex_pcre2
|
||||
|
||||
-160
@@ -1,160 +0,0 @@
|
||||
Anope has Redis database support (https://redis.io/).
|
||||
This document explains the data structure used by Anope, and explains how
|
||||
keyspace notification works.
|
||||
|
||||
This is not a tutorial on how to use Redis, see https://redis.io/documentation
|
||||
for that.
|
||||
|
||||
Table of Contents
|
||||
-----------------
|
||||
1) Data structure
|
||||
2) Keyspace notifications
|
||||
3) Examples of modifying, deleting, and creating objects
|
||||
|
||||
1) Data structure
|
||||
|
||||
There are 4 key namespaces in Anope, they are:
|
||||
|
||||
id - The keys in id are used to atomically create object ids for new
|
||||
objects. For example, if I were to create a new BotInfo I would first:
|
||||
|
||||
redis 127.0.0.1:6379> INCR id:BotInfo
|
||||
|
||||
To get the object ID of the new object.
|
||||
|
||||
ids - The keys in ids contain a set of all object ids of the given type.
|
||||
For example:
|
||||
|
||||
redis 127.0.0.1:6379> SMEMBERS ids:BotInfo
|
||||
|
||||
Returns "1", "2", "3", "4", "5", "6", "7", "8" because I have 8 bots that
|
||||
have IDs 1, 2, 3, 4, 5, 6, 7, and 8, respectively.
|
||||
|
||||
hash - The keys in hash are the actual objects, stored as hashes. For
|
||||
example, if I had just looked up all BotInfo ids and wanted to iterate
|
||||
over all of them, I would start by:
|
||||
|
||||
redis 127.0.0.1:6379> HGETALL hash:BotInfo:1
|
||||
|
||||
Which gets all keys and values from the hash of type BotInfo with id 1.
|
||||
This may return:
|
||||
|
||||
"nick" -> "BotServ"
|
||||
"user" -> "services"
|
||||
"host" -> "services.anope.org"
|
||||
"created" -> "1368704765"
|
||||
|
||||
value - The keys in value only exist to aid looking up object IDs. They
|
||||
are sets of object IDs and are used to map key+value pairs to objects.
|
||||
For example:
|
||||
|
||||
redis 127.0.0.1:6379> SMEMBERS value:NickAlias:nick:Adam
|
||||
|
||||
Returns a set of object ids of NickAlias objects that have the key
|
||||
'nick' set to the value 'Adam' in its hash. Clearly this can only
|
||||
ever contain at most one object, since it is not possible to have
|
||||
more than one registered nick with the same name, but other keys
|
||||
will contain more than one, such as:
|
||||
|
||||
redis 127.0.0.1:6379> SMEMBERS value:NickCore:email:adam@anope.org
|
||||
|
||||
Which would return all accounts with the email "adam@anope.org".
|
||||
|
||||
redis 127.0.0.1:6379> SMEMBERS value:ChanAccess:mask:Adam
|
||||
|
||||
Which would return all access entries set on the account "Adam".
|
||||
|
||||
Behavior similar to SQL's AND, can be achieved using the
|
||||
SINTER command, which does set intersection on one or more sets.
|
||||
|
||||
2) Keyspace notifications
|
||||
|
||||
Redis 2.7 (unstable) and 2.8 (stable) and newer support keyspace notifications
|
||||
(https://redis.io/topics/notifications). This allows Redis to notify Anope of
|
||||
any external changes to objects in the database. Once notified, Anope will
|
||||
immediately update the object. Otherwise, Anope keeps all objects in memory
|
||||
and will not regularly read from the database once started.
|
||||
|
||||
You can use this to modify objects in Redis and have them immediately reflected
|
||||
back into Anope. Additionally you can use this feature to run multiple Anope
|
||||
instances simultaneously from the same database (see also, Redis database
|
||||
replication).
|
||||
|
||||
To use keyspace notifications you MUST execute
|
||||
|
||||
redis 127.0.0.1:6379> CONFIG SET notify-keyspace-events KA
|
||||
OK
|
||||
|
||||
or set notify-keyspace-events in redis.conf properly. Anope always executes
|
||||
CONFIG SET when it first connects.
|
||||
|
||||
If you do not enable keyspace events properly Anope will be UNABLE to see any
|
||||
object modifications you do.
|
||||
|
||||
The key space ids and value are managed entirely by Anope, you do
|
||||
not (and should not) modify them. Once you modify the object (hash), Anope will
|
||||
update them for you to correctly reflect any changes made to the object.
|
||||
|
||||
Finally, always use atomic operations. If you are inserting a new object with
|
||||
multiple commands, or inserting multiple objects at once, specifically if the
|
||||
objects depend on each other, you MUST use a transaction.
|
||||
|
||||
3) Examples of modifying, deleting, and creating objects
|
||||
|
||||
These examples will ONLY work if you meet the criteria in section 2.
|
||||
|
||||
If I want to change the email account 'Adam' to 'Adam@anope.org', I would execute the following:
|
||||
|
||||
redis 127.0.0.1:6379> SMEMBERS value:NickCore:display:Adam
|
||||
|
||||
Which returns a value of "1", which is the object id I want to modify.
|
||||
Now to change the email:
|
||||
|
||||
redis 127.0.0.1:6379> HSET hash:NickCore:1 email Adam@anope.org
|
||||
|
||||
You can now see this in NickServ's INFO command:
|
||||
-NickServ- Email address: Adam@anope.org
|
||||
|
||||
If I want to drop the account "Adam", I would execute the following:
|
||||
|
||||
redis 127.0.0.1:6379> SMEMBERS value:NickCore:display:Adam
|
||||
|
||||
Which returns a value of "1". I would then check:
|
||||
|
||||
redis 127.0.0.1:6379> SMEMBERS value:NickAlias:nc:Adam
|
||||
|
||||
To see what nicknames depend on this account to exist, as I will
|
||||
have to remove those too. This returns the values "2", and "3".
|
||||
|
||||
Finally, I can drop the nick using a transaction via:
|
||||
|
||||
redis 127.0.0.1:6379> MULTI
|
||||
OK
|
||||
redis 127.0.0.1:6379> DEL hash:NickAlias:2
|
||||
QUEUED
|
||||
redis 127.0.0.1:6379> DEL hash:NickAlias:3
|
||||
QUEUED
|
||||
redis 127.0.0.1:6379> DEL hash:NickCore:1
|
||||
QUEUED
|
||||
redis 127.0.0.1:6379> EXEC
|
||||
|
||||
Or alternatively simply:
|
||||
|
||||
redis 127.0.0.1:6379> DEL hash:NickAlias:2 hash:NickAlias:3 hash:NickCore:1
|
||||
|
||||
If I wanted to create a BotServ bot, I would execute the following:
|
||||
|
||||
redis 127.0.0.1:6379> INCR id:BotInfo
|
||||
|
||||
Which returns a new object ID for me, in this example it will be "8".
|
||||
Now I can create the object:
|
||||
|
||||
HMSET hash:BotInfo:8 nick redis user redis host services.anope.org realname "Anope IRC Services"
|
||||
|
||||
Note if you are using HSET instead of HMSET you will need to use a transaction, as shown in the above example.
|
||||
If you are watching your services logs you will immediately see:
|
||||
|
||||
USERS: redis!redis@services.anope.org (Anope IRC Services) connected to the network (services.anope.org)
|
||||
|
||||
And the bot redis will be in BotServ's bot list.
|
||||
Notice how ids:BotInfo and the value keys are updated automatically.
|
||||
@@ -1,83 +0,0 @@
|
||||
// Anope IRC Services <https://www.anope.org/>
|
||||
//
|
||||
// Copyright (C) 2003-2026 Anope Contributors
|
||||
//
|
||||
// Anope is free software. You can use, modify, and/or distribute it under the
|
||||
// terms of version 2 of the GNU General Public License. See docs/LICENSE.txt
|
||||
// for the complete terms of this license and docs/AUTHORS.txt for a list of
|
||||
// contributors.
|
||||
//
|
||||
// Based on the original code of Epona by Lara
|
||||
// Based on the original code of Services by Andy Church
|
||||
//
|
||||
// SPDX-License-Identifier: GPL-2.0-only
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace Redis
|
||||
{
|
||||
struct Reply final
|
||||
{
|
||||
enum Type
|
||||
{
|
||||
NOT_PARSED,
|
||||
NOT_OK,
|
||||
OK,
|
||||
INT,
|
||||
BULK,
|
||||
MULTI_BULK
|
||||
}
|
||||
type;
|
||||
|
||||
Reply() { Clear(); }
|
||||
~Reply() { Clear(); }
|
||||
|
||||
void Clear()
|
||||
{
|
||||
type = NOT_PARSED;
|
||||
i = 0;
|
||||
bulk.clear();
|
||||
multi_bulk_size = 0;
|
||||
for (const auto *reply : multi_bulk)
|
||||
delete reply;
|
||||
multi_bulk.clear();
|
||||
}
|
||||
|
||||
int64_t i;
|
||||
Anope::string bulk;
|
||||
int multi_bulk_size;
|
||||
std::deque<Reply *> multi_bulk;
|
||||
};
|
||||
|
||||
class Interface
|
||||
{
|
||||
public:
|
||||
Module *owner;
|
||||
|
||||
Interface(Module *m) : owner(m) { }
|
||||
virtual ~Interface() = default;
|
||||
|
||||
virtual void OnResult(const Reply &r) = 0;
|
||||
virtual void OnError(const Anope::string &error) { Log(owner) << error; }
|
||||
};
|
||||
|
||||
class Provider
|
||||
: public Service
|
||||
{
|
||||
public:
|
||||
Provider(Module *c, const Anope::string &n) : Service(c, "Redis::Provider", n) { }
|
||||
|
||||
virtual bool IsSocketDead() = 0;
|
||||
|
||||
virtual void SendCommand(Interface *i, const std::vector<Anope::string> &cmds) = 0;
|
||||
virtual void SendCommand(Interface *i, const Anope::string &str) = 0;
|
||||
|
||||
virtual bool BlockAndProcess() = 0;
|
||||
|
||||
virtual void Subscribe(Interface *i, const Anope::string &pattern) = 0;
|
||||
virtual void Unsubscribe(const Anope::string &pattern) = 0;
|
||||
|
||||
virtual void StartTransaction() = 0;
|
||||
virtual void CommitTransaction() = 0;
|
||||
};
|
||||
}
|
||||
@@ -88,9 +88,6 @@ public:
|
||||
/* Unique ID (per type, not globally) for this object */
|
||||
Id object_id = 0;
|
||||
|
||||
/* Only used by redis, to ignore updates */
|
||||
unsigned short redis_ignore = 0;
|
||||
|
||||
/** Marks the object as potentially being updated "soon".
|
||||
*/
|
||||
void QueueUpdate();
|
||||
|
||||
@@ -1,648 +0,0 @@
|
||||
// Anope IRC Services <https://www.anope.org/>
|
||||
//
|
||||
// Copyright (C) 2003-2026 Anope Contributors
|
||||
//
|
||||
// Anope is free software. You can use, modify, and/or distribute it under the
|
||||
// terms of version 2 of the GNU General Public License. See docs/LICENSE.txt
|
||||
// for the complete terms of this license and docs/AUTHORS.txt for a list of
|
||||
// contributors.
|
||||
//
|
||||
// Based on the original code of Epona by Lara
|
||||
// Based on the original code of Services by Andy Church
|
||||
//
|
||||
// SPDX-License-Identifier: GPL-2.0-only
|
||||
|
||||
#include "module.h"
|
||||
#include "modules/redis.h"
|
||||
|
||||
using namespace Redis;
|
||||
|
||||
class DatabaseRedis;
|
||||
static DatabaseRedis *me;
|
||||
|
||||
class Data final
|
||||
: public Serialize::Data
|
||||
{
|
||||
public:
|
||||
Anope::unordered_map<Anope::string> data;
|
||||
|
||||
bool LoadInternal(const Anope::string &key, Anope::string &value) override
|
||||
{
|
||||
auto it = this->data.find(key);
|
||||
if (it == this->data.end())
|
||||
return false;
|
||||
|
||||
value = it->second;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool StoreInternal(const Anope::string &key, const Anope::string &value) override
|
||||
{
|
||||
this->data[key] = value;
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t Hash() const override
|
||||
{
|
||||
size_t hash = 0;
|
||||
for (const auto &[_, value] : this->data)
|
||||
if (!value.empty())
|
||||
hash ^= Anope::hash_cs()(value);
|
||||
return hash;
|
||||
}
|
||||
};
|
||||
|
||||
class TypeLoader final
|
||||
: public Interface
|
||||
{
|
||||
Anope::string type;
|
||||
public:
|
||||
TypeLoader(Module *creator, const Anope::string &t) : Interface(creator), type(t) { }
|
||||
|
||||
void OnResult(const Reply &r) override;
|
||||
};
|
||||
|
||||
class ObjectLoader final
|
||||
: public Interface
|
||||
{
|
||||
Anope::string type;
|
||||
int64_t id;
|
||||
|
||||
public:
|
||||
ObjectLoader(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
|
||||
|
||||
void OnResult(const Reply &r) override;
|
||||
};
|
||||
|
||||
class IDInterface final
|
||||
: public Interface
|
||||
{
|
||||
Reference<Serializable> o;
|
||||
public:
|
||||
IDInterface(Module *creator, Serializable *obj) : Interface(creator), o(obj) { }
|
||||
|
||||
void OnResult(const Reply &r) override;
|
||||
};
|
||||
|
||||
class Deleter final
|
||||
: public Interface
|
||||
{
|
||||
Anope::string type;
|
||||
int64_t id;
|
||||
public:
|
||||
Deleter(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
|
||||
|
||||
void OnResult(const Reply &r) override;
|
||||
};
|
||||
|
||||
class Updater final
|
||||
: public Interface
|
||||
{
|
||||
Anope::string type;
|
||||
int64_t id;
|
||||
public:
|
||||
Updater(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
|
||||
|
||||
void OnResult(const Reply &r) override;
|
||||
};
|
||||
|
||||
class ModifiedObject final
|
||||
: public Interface
|
||||
{
|
||||
Anope::string type;
|
||||
int64_t id;
|
||||
public:
|
||||
ModifiedObject(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
|
||||
|
||||
void OnResult(const Reply &r) override;
|
||||
};
|
||||
|
||||
class SubscriptionListener final
|
||||
: public Interface
|
||||
{
|
||||
public:
|
||||
SubscriptionListener(Module *creator) : Interface(creator) { }
|
||||
|
||||
void OnResult(const Reply &r) override;
|
||||
};
|
||||
|
||||
class DatabaseRedis final
|
||||
: public Module
|
||||
, public Pipe
|
||||
{
|
||||
SubscriptionListener sl;
|
||||
std::set<Serializable *> updated_items;
|
||||
|
||||
public:
|
||||
ServiceReference<Provider> redis;
|
||||
|
||||
DatabaseRedis(const Anope::string &modname, const Anope::string &creator)
|
||||
: Module(modname, creator, DATABASE | VENDOR)
|
||||
, sl(this)
|
||||
, redis("Redis::Provider")
|
||||
{
|
||||
me = this;
|
||||
|
||||
}
|
||||
|
||||
/* Insert or update an object */
|
||||
void InsertObject(Serializable *obj)
|
||||
{
|
||||
Serialize::Type *t = obj->GetSerializableType();
|
||||
|
||||
/* If there is no id yet for this object, get one */
|
||||
if (!obj->object_id)
|
||||
redis->SendCommand(new IDInterface(this, obj), "INCR id:" + t->GetName());
|
||||
else
|
||||
{
|
||||
Data data;
|
||||
t->Serialize(obj, data);
|
||||
|
||||
if (obj->IsCached(data))
|
||||
return;
|
||||
|
||||
obj->UpdateCache(data);
|
||||
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("HGETALL");
|
||||
args.push_back("hash:" + t->GetName() + ":" + Anope::ToString(obj->object_id));
|
||||
|
||||
/* Get object attrs to clear before updating */
|
||||
redis->SendCommand(new Updater(this, t->GetName(), obj->object_id), args);
|
||||
}
|
||||
}
|
||||
|
||||
void OnNotify() override
|
||||
{
|
||||
for (auto *obj : this->updated_items)
|
||||
{
|
||||
this->InsertObject(obj);
|
||||
}
|
||||
|
||||
this->updated_items.clear();
|
||||
}
|
||||
|
||||
void OnReload(Configuration::Conf &conf) override
|
||||
{
|
||||
const auto &block = conf.GetModule(this);
|
||||
this->redis.SetServiceName(block.Get<const Anope::string>("engine", "redis/main"));
|
||||
}
|
||||
|
||||
EventReturn OnLoadDatabase() override
|
||||
{
|
||||
if (!redis)
|
||||
{
|
||||
Log(this) << "Unable to load database - unable to find redis provider";
|
||||
return EVENT_CONTINUE;
|
||||
}
|
||||
|
||||
for (const auto &type_order : Serialize::Type::GetTypeOrder())
|
||||
{
|
||||
Serialize::Type *sb = Serialize::Type::Find(type_order);
|
||||
this->OnSerializeTypeCreate(sb);
|
||||
}
|
||||
|
||||
while (!redis->IsSocketDead() && redis->BlockAndProcess());
|
||||
|
||||
if (redis->IsSocketDead())
|
||||
{
|
||||
Log(this) << "I/O error while loading redis database - is it online?";
|
||||
return EVENT_CONTINUE;
|
||||
}
|
||||
|
||||
redis->Subscribe(&this->sl, "__keyspace@*__:hash:*");
|
||||
|
||||
return EVENT_STOP;
|
||||
}
|
||||
|
||||
void OnSerializeTypeCreate(Serialize::Type *sb) override
|
||||
{
|
||||
if (!redis)
|
||||
return;
|
||||
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("SMEMBERS");
|
||||
args.push_back("ids:" + sb->GetName());
|
||||
|
||||
redis->SendCommand(new TypeLoader(this, sb->GetName()), args);
|
||||
}
|
||||
|
||||
void OnSerializableConstruct(Serializable *obj) override
|
||||
{
|
||||
this->updated_items.insert(obj);
|
||||
this->Notify();
|
||||
}
|
||||
|
||||
void OnSerializableDestruct(Serializable *obj) override
|
||||
{
|
||||
Serialize::Type *t = obj->GetSerializableType();
|
||||
|
||||
if (t == NULL)
|
||||
{
|
||||
/* This is probably the module providing the type unloading.
|
||||
*
|
||||
* The types get registered after the extensible container is
|
||||
* registered so that unserialization on module load can insert
|
||||
* into the extensible container. So, the type destructs prior to
|
||||
* the extensible container, which then triggers this
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("HGETALL");
|
||||
args.push_back("hash:" + t->GetName() + ":" + Anope::ToString(obj->object_id));
|
||||
|
||||
/* Get all of the attributes for this object */
|
||||
redis->SendCommand(new Deleter(this, t->GetName(), obj->object_id), args);
|
||||
|
||||
this->updated_items.erase(obj);
|
||||
t->objects.erase(obj->object_id);
|
||||
this->Notify();
|
||||
}
|
||||
|
||||
void OnSerializableUpdate(Serializable *obj) override
|
||||
{
|
||||
this->updated_items.insert(obj);
|
||||
this->Notify();
|
||||
}
|
||||
};
|
||||
|
||||
void TypeLoader::OnResult(const Reply &r)
|
||||
{
|
||||
if (r.type != Reply::MULTI_BULK || !me->redis)
|
||||
{
|
||||
delete this;
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto *reply : r.multi_bulk)
|
||||
{
|
||||
if (reply->type != Reply::BULK)
|
||||
continue;
|
||||
|
||||
auto i = Anope::TryConvert<int64_t>(reply->bulk);
|
||||
if (!i)
|
||||
continue;
|
||||
|
||||
auto id = i.value();
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("HGETALL");
|
||||
args.push_back("hash:" + this->type + ":" + Anope::ToString(id));
|
||||
|
||||
me->redis->SendCommand(new ObjectLoader(me, this->type, id), args);
|
||||
}
|
||||
|
||||
delete this;
|
||||
}
|
||||
|
||||
void ObjectLoader::OnResult(const Reply &r)
|
||||
{
|
||||
Serialize::Type *st = Serialize::Type::Find(this->type);
|
||||
|
||||
if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis || !st)
|
||||
{
|
||||
delete this;
|
||||
return;
|
||||
}
|
||||
|
||||
Data data;
|
||||
|
||||
for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
|
||||
{
|
||||
const Reply *key = r.multi_bulk[i],
|
||||
*value = r.multi_bulk[i + 1];
|
||||
|
||||
data.StoreInternal(key->bulk, value->bulk);
|
||||
}
|
||||
|
||||
Serializable *&obj = st->objects[this->id];
|
||||
obj = st->Unserialize(obj, data);
|
||||
if (obj)
|
||||
{
|
||||
obj->object_id = this->id;
|
||||
obj->UpdateCache(data);
|
||||
}
|
||||
|
||||
delete this;
|
||||
}
|
||||
|
||||
void IDInterface::OnResult(const Reply &r)
|
||||
{
|
||||
if (!o || r.type != Reply::INT || !r.i)
|
||||
{
|
||||
delete this;
|
||||
return;
|
||||
}
|
||||
|
||||
Serializable *&obj = o->GetSerializableType()->objects[r.i];
|
||||
if (obj)
|
||||
/* This shouldn't be possible */
|
||||
obj->object_id = 0;
|
||||
|
||||
o->object_id = r.i;
|
||||
obj = o;
|
||||
|
||||
/* Now that we have the id, insert this object for real */
|
||||
anope_dynamic_static_cast<DatabaseRedis *>(this->owner)->InsertObject(o);
|
||||
|
||||
delete this;
|
||||
}
|
||||
|
||||
void Deleter::OnResult(const Reply &r)
|
||||
{
|
||||
if (r.type != Reply::MULTI_BULK || !me->redis || r.multi_bulk.empty())
|
||||
{
|
||||
delete this;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Transaction start */
|
||||
me->redis->StartTransaction();
|
||||
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("DEL");
|
||||
args.push_back("hash:" + this->type + ":" + Anope::ToString(this->id));
|
||||
|
||||
/* Delete hash object */
|
||||
me->redis->SendCommand(NULL, args);
|
||||
|
||||
args.clear();
|
||||
args.emplace_back("SREM");
|
||||
args.push_back("ids:" + this->type);
|
||||
args.push_back(Anope::ToString(this->id));
|
||||
|
||||
/* Delete id from ids set */
|
||||
me->redis->SendCommand(NULL, args);
|
||||
|
||||
for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
|
||||
{
|
||||
const Reply *key = r.multi_bulk[i],
|
||||
*value = r.multi_bulk[i + 1];
|
||||
|
||||
args.clear();
|
||||
args.emplace_back("SREM");
|
||||
args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk);
|
||||
args.push_back(Anope::ToString(this->id));
|
||||
|
||||
/* Delete value -> object id */
|
||||
me->redis->SendCommand(NULL, args);
|
||||
}
|
||||
|
||||
/* Transaction end */
|
||||
me->redis->CommitTransaction();
|
||||
|
||||
delete this;
|
||||
}
|
||||
|
||||
void Updater::OnResult(const Reply &r)
|
||||
{
|
||||
Serialize::Type *st = Serialize::Type::Find(this->type);
|
||||
|
||||
if (!st)
|
||||
{
|
||||
delete this;
|
||||
return;
|
||||
}
|
||||
|
||||
Serializable *obj = st->objects[this->id];
|
||||
if (!obj)
|
||||
{
|
||||
delete this;
|
||||
return;
|
||||
}
|
||||
|
||||
Data data;
|
||||
st->Serialize(obj, data);
|
||||
|
||||
/* Transaction start */
|
||||
me->redis->StartTransaction();
|
||||
|
||||
for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
|
||||
{
|
||||
const Reply *key = r.multi_bulk[i],
|
||||
*value = r.multi_bulk[i + 1];
|
||||
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("SREM");
|
||||
args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk);
|
||||
args.push_back(Anope::ToString(this->id));
|
||||
|
||||
/* Delete value -> object id */
|
||||
me->redis->SendCommand(NULL, args);
|
||||
}
|
||||
|
||||
/* Add object id to id set for this type */
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("SADD");
|
||||
args.push_back("ids:" + this->type);
|
||||
args.push_back(Anope::ToString(obj->object_id));
|
||||
me->redis->SendCommand(NULL, args);
|
||||
|
||||
args.clear();
|
||||
args.emplace_back("HMSET");
|
||||
args.push_back("hash:" + this->type + ":" + Anope::ToString(obj->object_id));
|
||||
|
||||
for (const auto &[key, value] : data.data)
|
||||
{
|
||||
args.push_back(key);
|
||||
args.emplace_back(value);
|
||||
|
||||
std::vector<Anope::string> args2;
|
||||
|
||||
args2.emplace_back("SADD");
|
||||
args2.push_back("value:" + this->type + ":" + key + ":" + value);
|
||||
args2.push_back(Anope::ToString(obj->object_id));
|
||||
|
||||
/* Add to value -> object id set */
|
||||
me->redis->SendCommand(NULL, args2);
|
||||
}
|
||||
|
||||
++obj->redis_ignore;
|
||||
|
||||
/* Add object */
|
||||
me->redis->SendCommand(NULL, args);
|
||||
|
||||
/* Transaction end */
|
||||
me->redis->CommitTransaction();
|
||||
|
||||
delete this;
|
||||
}
|
||||
|
||||
void SubscriptionListener::OnResult(const Reply &r)
|
||||
{
|
||||
/*
|
||||
* [May 15 13:59:35.645839 2013] Debug: pmessage
|
||||
* [May 15 13:59:35.645866 2013] Debug: __keyspace@*__:anope:hash:*
|
||||
* [May 15 13:59:35.645880 2013] Debug: __keyspace@0__:anope:hash:type:id
|
||||
* [May 15 13:59:35.645893 2013] Debug: hset
|
||||
*/
|
||||
if (r.multi_bulk.size() != 4)
|
||||
return;
|
||||
|
||||
size_t sz = r.multi_bulk[2]->bulk.find(':');
|
||||
if (sz == Anope::string::npos)
|
||||
return;
|
||||
|
||||
const Anope::string &key = r.multi_bulk[2]->bulk.substr(sz + 1),
|
||||
&op = r.multi_bulk[3]->bulk;
|
||||
|
||||
sz = key.rfind(':');
|
||||
if (sz == Anope::string::npos)
|
||||
return;
|
||||
|
||||
const Anope::string &id = key.substr(sz + 1);
|
||||
|
||||
size_t sz2 = key.rfind(':', sz - 1);
|
||||
if (sz2 == Anope::string::npos)
|
||||
return;
|
||||
const Anope::string &type = key.substr(sz2 + 1, sz - sz2 - 1);
|
||||
|
||||
Serialize::Type *s_type = Serialize::Type::Find(type);
|
||||
|
||||
if (s_type == NULL)
|
||||
return;
|
||||
|
||||
auto oid = Anope::TryConvert<Serializable::Id>(id);
|
||||
if (!oid.has_value())
|
||||
return;
|
||||
|
||||
auto obj_id = oid.value();
|
||||
if (op == "hset" || op == "hdel")
|
||||
{
|
||||
Serializable *s = s_type->objects[obj_id];
|
||||
|
||||
if (s && s->redis_ignore)
|
||||
{
|
||||
--s->redis_ignore;
|
||||
Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type << ", but I am ignoring it";
|
||||
}
|
||||
else
|
||||
{
|
||||
Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type;
|
||||
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("HGETALL");
|
||||
args.push_back("hash:" + type + ":" + id);
|
||||
|
||||
me->redis->SendCommand(new ModifiedObject(me, type, obj_id), args);
|
||||
}
|
||||
}
|
||||
else if (op == "del")
|
||||
{
|
||||
Serializable *&s = s_type->objects[obj_id];
|
||||
if (s == NULL)
|
||||
return;
|
||||
|
||||
Log(LOG_DEBUG) << "redis: notify: deleting object id " << obj_id << " of type " << type;
|
||||
|
||||
Data data;
|
||||
s_type->Serialize(s, data);
|
||||
|
||||
/* Transaction start */
|
||||
me->redis->StartTransaction();
|
||||
|
||||
for (const auto &[k, value] : data.data)
|
||||
{
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("SREM");
|
||||
args.push_back("value:" + type + ":" + k + ":" + value);
|
||||
args.push_back(id);
|
||||
|
||||
/* Delete value -> object id */
|
||||
me->redis->SendCommand(NULL, args);
|
||||
}
|
||||
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("SREM");
|
||||
args.push_back("ids:" + type);
|
||||
args.push_back(Anope::ToString(s->object_id));
|
||||
|
||||
/* Delete object from id set */
|
||||
me->redis->SendCommand(NULL, args);
|
||||
|
||||
/* Transaction end */
|
||||
me->redis->CommitTransaction();
|
||||
|
||||
delete s;
|
||||
s = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void ModifiedObject::OnResult(const Reply &r)
|
||||
{
|
||||
Serialize::Type *st = Serialize::Type::Find(this->type);
|
||||
|
||||
if (!st)
|
||||
{
|
||||
delete this;
|
||||
return;
|
||||
}
|
||||
|
||||
Serializable *&obj = st->objects[this->id];
|
||||
|
||||
/* Transaction start */
|
||||
me->redis->StartTransaction();
|
||||
|
||||
/* Erase old object values */
|
||||
if (obj)
|
||||
{
|
||||
Data data;
|
||||
st->Serialize(obj, data);
|
||||
|
||||
for (auto &[key, value] : data.data)
|
||||
{
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("SREM");
|
||||
args.push_back("value:" + st->GetName() + ":" + key + ":" + value);
|
||||
args.push_back(Anope::ToString(this->id));
|
||||
|
||||
/* Delete value -> object id */
|
||||
me->redis->SendCommand(NULL, args);
|
||||
}
|
||||
}
|
||||
|
||||
Data data;
|
||||
|
||||
for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
|
||||
{
|
||||
const Reply *key = r.multi_bulk[i],
|
||||
*value = r.multi_bulk[i + 1];
|
||||
|
||||
data.StoreInternal(key->bulk, value->bulk);
|
||||
}
|
||||
|
||||
obj = st->Unserialize(obj, data);
|
||||
if (obj)
|
||||
{
|
||||
obj->object_id = this->id;
|
||||
obj->UpdateCache(data);
|
||||
|
||||
/* Insert new object values */
|
||||
for (const auto &[key, value] : data.data)
|
||||
{
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("SADD");
|
||||
args.push_back("value:" + st->GetName() + ":" + key + ":" + value);
|
||||
args.push_back(Anope::ToString(obj->object_id));
|
||||
|
||||
/* Add to value -> object id set */
|
||||
me->redis->SendCommand(NULL, args);
|
||||
}
|
||||
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("SADD");
|
||||
args.push_back("ids:" + st->GetName());
|
||||
args.push_back(Anope::ToString(obj->object_id));
|
||||
|
||||
/* Add to type -> id set */
|
||||
me->redis->SendCommand(NULL, args);
|
||||
}
|
||||
|
||||
/* Transaction end */
|
||||
me->redis->CommitTransaction();
|
||||
|
||||
delete this;
|
||||
}
|
||||
|
||||
MODULE_INIT(DatabaseRedis)
|
||||
@@ -1,615 +0,0 @@
|
||||
// Anope IRC Services <https://www.anope.org/>
|
||||
//
|
||||
// Copyright (C) 2003-2026 Anope Contributors
|
||||
//
|
||||
// Anope is free software. You can use, modify, and/or distribute it under the
|
||||
// terms of version 2 of the GNU General Public License. See docs/LICENSE.txt
|
||||
// for the complete terms of this license and docs/AUTHORS.txt for a list of
|
||||
// contributors.
|
||||
//
|
||||
// Based on the original code of Epona by Lara
|
||||
// Based on the original code of Services by Andy Church
|
||||
//
|
||||
// SPDX-License-Identifier: GPL-2.0-only
|
||||
|
||||
#include "module.h"
|
||||
#include "modules/redis.h"
|
||||
|
||||
using namespace Redis;
|
||||
|
||||
class MyRedisService;
|
||||
|
||||
class RedisSocket final
|
||||
: public BinarySocket
|
||||
, public ConnectionSocket
|
||||
{
|
||||
size_t ParseReply(Reply &r, const char *buf, size_t l);
|
||||
public:
|
||||
MyRedisService *provider;
|
||||
std::deque<Interface *> interfaces;
|
||||
std::map<Anope::string, Interface *> subinterfaces;
|
||||
|
||||
RedisSocket(MyRedisService *pro, bool v6) : Socket(-1, v6 ? AF_INET6 : AF_INET), provider(pro) { }
|
||||
|
||||
~RedisSocket() override;
|
||||
|
||||
void OnConnect() override;
|
||||
void OnError(const Anope::string &error) override;
|
||||
|
||||
bool Read(const char *buffer, size_t l) override;
|
||||
};
|
||||
|
||||
class Transaction final
|
||||
: public Interface
|
||||
{
|
||||
public:
|
||||
std::deque<Interface *> interfaces;
|
||||
|
||||
Transaction(Module *creator) : Interface(creator) { }
|
||||
|
||||
~Transaction() override
|
||||
{
|
||||
for (auto *iface : interfaces)
|
||||
{
|
||||
if (!iface)
|
||||
continue;
|
||||
|
||||
iface->OnError("Interface going away");
|
||||
}
|
||||
}
|
||||
|
||||
void OnResult(const Reply &r) override
|
||||
{
|
||||
/* This is a multi bulk reply of the results of the queued commands
|
||||
* in this transaction
|
||||
*/
|
||||
|
||||
Log(LOG_DEBUG_2) << "redis: transaction complete with " << r.multi_bulk.size() << " results";
|
||||
|
||||
for (auto *result : r.multi_bulk)
|
||||
{
|
||||
if (interfaces.empty())
|
||||
break;
|
||||
|
||||
Interface *inter = interfaces.front();
|
||||
interfaces.pop_front();
|
||||
|
||||
if (inter)
|
||||
inter->OnResult(*result);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class MyRedisService final
|
||||
: public Provider
|
||||
{
|
||||
public:
|
||||
Anope::string host;
|
||||
int port;
|
||||
unsigned db;
|
||||
|
||||
RedisSocket *sock = nullptr, *sub = nullptr;
|
||||
|
||||
Transaction ti;
|
||||
bool in_transaction = false;
|
||||
|
||||
MyRedisService(Module *c, const Anope::string &n, const Anope::string &h, int p, unsigned d) : Provider(c, n), host(h), port(p), db(d), ti(c)
|
||||
{
|
||||
sock = new RedisSocket(this, host.find(':') != Anope::string::npos);
|
||||
sock->Connect(host, port);
|
||||
|
||||
sub = new RedisSocket(this, host.find(':') != Anope::string::npos);
|
||||
sub->Connect(host, port);
|
||||
}
|
||||
|
||||
~MyRedisService() override
|
||||
{
|
||||
if (sock)
|
||||
{
|
||||
sock->flags[SF_DEAD] = true;
|
||||
sock->provider = NULL;
|
||||
}
|
||||
|
||||
if (sub)
|
||||
{
|
||||
sub->flags[SF_DEAD] = true;
|
||||
sub->provider = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
static inline void Pack(std::vector<char> &buffer, const char *buf, size_t sz = 0)
|
||||
{
|
||||
if (!sz)
|
||||
sz = strlen(buf);
|
||||
|
||||
size_t old_size = buffer.size();
|
||||
buffer.resize(old_size + sz);
|
||||
std::copy(buf, buf + sz, buffer.begin() + old_size);
|
||||
}
|
||||
|
||||
void Send(RedisSocket *s, Interface *i, const std::vector<std::pair<const char *, size_t> > &args)
|
||||
{
|
||||
std::vector<char> buffer;
|
||||
|
||||
Pack(buffer, "*");
|
||||
Pack(buffer, Anope::ToString(args.size()).c_str());
|
||||
Pack(buffer, "\r\n");
|
||||
|
||||
for (const auto &[key, value] : args)
|
||||
{
|
||||
Pack(buffer, "$");
|
||||
Pack(buffer, Anope::ToString(value).c_str());
|
||||
Pack(buffer, "\r\n");
|
||||
|
||||
Pack(buffer, key, value);
|
||||
Pack(buffer, "\r\n");
|
||||
}
|
||||
|
||||
if (buffer.empty())
|
||||
return;
|
||||
|
||||
s->Write(&buffer[0], buffer.size());
|
||||
if (in_transaction)
|
||||
{
|
||||
ti.interfaces.push_back(i);
|
||||
s->interfaces.push_back(NULL); // For the +Queued response
|
||||
}
|
||||
else
|
||||
s->interfaces.push_back(i);
|
||||
}
|
||||
|
||||
public:
|
||||
bool IsSocketDead() override
|
||||
{
|
||||
return this->sock && this->sock->flags[SF_DEAD];
|
||||
}
|
||||
|
||||
void SendCommand(RedisSocket *s, Interface *i, const std::vector<Anope::string> &cmds)
|
||||
{
|
||||
std::vector<std::pair<const char *, size_t> > args;
|
||||
for (const auto &cmd : cmds)
|
||||
args.emplace_back(cmd.c_str(), cmd.length());
|
||||
this->Send(s, i, args);
|
||||
}
|
||||
|
||||
void SendCommand(RedisSocket *s, Interface *i, const Anope::string &str)
|
||||
{
|
||||
std::vector<Anope::string> args;
|
||||
spacesepstream(str).GetTokens(args);
|
||||
this->SendCommand(s, i, args);
|
||||
}
|
||||
|
||||
void Send(Interface *i, const std::vector<std::pair<const char *, size_t> > &args)
|
||||
{
|
||||
if (!sock)
|
||||
{
|
||||
sock = new RedisSocket(this, host.find(':') != Anope::string::npos);
|
||||
sock->Connect(host, port);
|
||||
}
|
||||
|
||||
this->Send(sock, i, args);
|
||||
}
|
||||
|
||||
void SendCommand(Interface *i, const std::vector<Anope::string> &cmds) override
|
||||
{
|
||||
std::vector<std::pair<const char *, size_t> > args;
|
||||
for (const auto &cmd : cmds)
|
||||
args.emplace_back(cmd.c_str(), cmd.length());
|
||||
this->Send(i, args);
|
||||
}
|
||||
|
||||
void SendCommand(Interface *i, const Anope::string &str) override
|
||||
{
|
||||
std::vector<Anope::string> args;
|
||||
spacesepstream(str).GetTokens(args);
|
||||
this->SendCommand(i, args);
|
||||
}
|
||||
|
||||
public:
|
||||
bool BlockAndProcess() override
|
||||
{
|
||||
if (!this->sock->ProcessWrite())
|
||||
this->sock->flags[SF_DEAD] = true;
|
||||
this->sock->SetBlocking(true);
|
||||
if (!this->sock->ProcessRead())
|
||||
this->sock->flags[SF_DEAD] = true;
|
||||
this->sock->SetBlocking(false);
|
||||
return !this->sock->interfaces.empty();
|
||||
}
|
||||
|
||||
void Subscribe(Interface *i, const Anope::string &pattern) override
|
||||
{
|
||||
if (sub == NULL)
|
||||
{
|
||||
sub = new RedisSocket(this, host.find(':') != Anope::string::npos);
|
||||
sub->Connect(host, port);
|
||||
}
|
||||
|
||||
std::vector<Anope::string> args;
|
||||
args.emplace_back("PSUBSCRIBE");
|
||||
args.push_back(pattern);
|
||||
this->SendCommand(sub, NULL, args);
|
||||
|
||||
sub->subinterfaces[pattern] = i;
|
||||
}
|
||||
|
||||
void Unsubscribe(const Anope::string &pattern) override
|
||||
{
|
||||
if (sub)
|
||||
sub->subinterfaces.erase(pattern);
|
||||
}
|
||||
|
||||
void StartTransaction() override
|
||||
{
|
||||
if (in_transaction)
|
||||
throw ModuleException("Tried to start a transaction while one was already in progress");
|
||||
|
||||
this->SendCommand(NULL, "MULTI");
|
||||
in_transaction = true;
|
||||
}
|
||||
|
||||
void CommitTransaction() override
|
||||
{
|
||||
/* The result of the transaction comes back to the reply of EXEC as a multi bulk.
|
||||
* The reply to the individual commands that make up the transaction when executed
|
||||
* is a simple +QUEUED
|
||||
*/
|
||||
in_transaction = false;
|
||||
this->SendCommand(&this->ti, "EXEC");
|
||||
}
|
||||
};
|
||||
|
||||
RedisSocket::~RedisSocket()
|
||||
{
|
||||
if (provider)
|
||||
{
|
||||
if (provider->sock == this)
|
||||
provider->sock = NULL;
|
||||
else if (provider->sub == this)
|
||||
provider->sub = NULL;
|
||||
}
|
||||
|
||||
for (auto *iface : interfaces)
|
||||
{
|
||||
if (!iface)
|
||||
continue;
|
||||
|
||||
iface->OnError("Interface going away");
|
||||
}
|
||||
}
|
||||
|
||||
void RedisSocket::OnConnect()
|
||||
{
|
||||
Log() << "redis: Successfully connected to " << provider->name << (this == this->provider->sub ? " (sub)" : "");
|
||||
|
||||
this->provider->SendCommand(NULL, "CLIENT SETNAME Anope");
|
||||
this->provider->SendCommand(NULL, "SELECT " + Anope::ToString(provider->db));
|
||||
|
||||
if (this != this->provider->sub)
|
||||
{
|
||||
this->provider->SendCommand(this, NULL, "CONFIG SET notify-keyspace-events KA");
|
||||
}
|
||||
}
|
||||
|
||||
void RedisSocket::OnError(const Anope::string &error)
|
||||
{
|
||||
Log() << "redis: Error on " << provider->name << (this == this->provider->sub ? " (sub)" : "") << ": " << error;
|
||||
}
|
||||
|
||||
size_t RedisSocket::ParseReply(Reply &r, const char *buffer, size_t l)
|
||||
{
|
||||
size_t used = 0;
|
||||
|
||||
if (!l)
|
||||
return used;
|
||||
|
||||
if (r.type == Reply::MULTI_BULK)
|
||||
goto multi_bulk_cont;
|
||||
|
||||
switch (*buffer)
|
||||
{
|
||||
case '+':
|
||||
{
|
||||
Anope::string reason(buffer, 1, l - 1);
|
||||
size_t nl = reason.find("\r\n");
|
||||
Log(LOG_DEBUG_2) << "redis: status ok: " << reason.substr(0, nl);
|
||||
if (nl != Anope::string::npos)
|
||||
{
|
||||
r.type = Reply::OK;
|
||||
used = 1 + nl + 2;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case '-':
|
||||
{
|
||||
Anope::string reason(buffer, 1, l - 1);
|
||||
size_t nl = reason.find("\r\n");
|
||||
Log(LOG_DEBUG) << "redis: status error: " << reason.substr(0, nl);
|
||||
if (nl != Anope::string::npos)
|
||||
{
|
||||
r.type = Reply::NOT_OK;
|
||||
used = 1 + nl + 2;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ':':
|
||||
{
|
||||
Anope::string ibuf(buffer, 1, l - 1);
|
||||
size_t nl = ibuf.find("\r\n");
|
||||
if (nl != Anope::string::npos)
|
||||
{
|
||||
if (auto i = Anope::TryConvert<int64_t>(ibuf.substr(0, nl)))
|
||||
r.i = i.value();
|
||||
|
||||
r.type = Reply::INT;
|
||||
used = 1 + nl + 2;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case '$':
|
||||
{
|
||||
Anope::string reply(buffer + 1, l - 1);
|
||||
/* This assumes one bulk can always fit in our recv buffer */
|
||||
size_t nl = reply.find("\r\n");
|
||||
if (nl != Anope::string::npos)
|
||||
{
|
||||
if (auto l = Anope::TryConvert<int>(reply.substr(0, nl)))
|
||||
{
|
||||
int len = l.value();
|
||||
if (len >= 0)
|
||||
{
|
||||
if (1 + nl + 2 + len + 2 <= l)
|
||||
{
|
||||
used = 1 + nl + 2 + len + 2;
|
||||
r.bulk = reply.substr(nl + 2, len);
|
||||
r.type = Reply::BULK;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
used = 1 + nl + 2 + 2;
|
||||
r.type = Reply::BULK;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
multi_bulk_cont:
|
||||
case '*':
|
||||
{
|
||||
if (r.type != Reply::MULTI_BULK)
|
||||
{
|
||||
Anope::string reply(buffer + 1, l - 1);
|
||||
size_t nl = reply.find("\r\n");
|
||||
if (nl != Anope::string::npos)
|
||||
{
|
||||
r.type = Reply::MULTI_BULK;
|
||||
if (auto size = Anope::TryConvert<int>(reply.substr(0, nl)))
|
||||
r.multi_bulk_size = size.value();
|
||||
used = 1 + nl + 2;
|
||||
}
|
||||
else
|
||||
break;
|
||||
}
|
||||
else if (r.multi_bulk_size >= 0 && r.multi_bulk.size() == static_cast<unsigned>(r.multi_bulk_size))
|
||||
{
|
||||
/* This multi bulk is already complete, so check the sub bulks */
|
||||
for (auto &bulk : r.multi_bulk)
|
||||
if (bulk->type == Reply::MULTI_BULK)
|
||||
ParseReply(*bulk, buffer + used, l - used);
|
||||
break;
|
||||
}
|
||||
|
||||
for (int i = r.multi_bulk.size(); i < r.multi_bulk_size; ++i)
|
||||
{
|
||||
auto *reply = new Reply();
|
||||
size_t u = ParseReply(*reply, buffer + used, l - used);
|
||||
if (!u)
|
||||
{
|
||||
Log(LOG_DEBUG) << "redis: ran out of data to parse";
|
||||
delete reply;
|
||||
break;
|
||||
}
|
||||
r.multi_bulk.push_back(reply);
|
||||
used += u;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
Log(LOG_DEBUG) << "redis: unknown reply " << *buffer;
|
||||
}
|
||||
|
||||
return used;
|
||||
}
|
||||
|
||||
bool RedisSocket::Read(const char *buffer, size_t l)
|
||||
{
|
||||
static std::vector<char> save;
|
||||
std::vector<char> copy;
|
||||
|
||||
if (!save.empty())
|
||||
{
|
||||
std::copy(buffer, buffer + l, std::back_inserter(save));
|
||||
|
||||
copy = save;
|
||||
|
||||
buffer = ©[0];
|
||||
l = copy.size();
|
||||
}
|
||||
|
||||
while (l)
|
||||
{
|
||||
static Reply r;
|
||||
|
||||
size_t used = this->ParseReply(r, buffer, l);
|
||||
if (!used)
|
||||
{
|
||||
Log(LOG_DEBUG) << "redis: used == 0 ?";
|
||||
r.Clear();
|
||||
break;
|
||||
}
|
||||
else if (used > l)
|
||||
{
|
||||
Log(LOG_DEBUG) << "redis: used > l ?";
|
||||
r.Clear();
|
||||
break;
|
||||
}
|
||||
|
||||
/* Full result is not here yet */
|
||||
if (r.type == Reply::MULTI_BULK && static_cast<unsigned>(r.multi_bulk_size) != r.multi_bulk.size())
|
||||
{
|
||||
buffer += used;
|
||||
l -= used;
|
||||
break;
|
||||
}
|
||||
|
||||
if (this == provider->sub)
|
||||
{
|
||||
if (r.multi_bulk.size() == 4)
|
||||
{
|
||||
/* pmessage
|
||||
* pattern subscribed to
|
||||
* __keyevent@0__:set
|
||||
* key
|
||||
*/
|
||||
auto it = this->subinterfaces.find(r.multi_bulk[1]->bulk);
|
||||
if (it != this->subinterfaces.end())
|
||||
it->second->OnResult(r);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (this->interfaces.empty())
|
||||
{
|
||||
Log(LOG_DEBUG) << "redis: no interfaces?";
|
||||
}
|
||||
else
|
||||
{
|
||||
Interface *i = this->interfaces.front();
|
||||
this->interfaces.pop_front();
|
||||
|
||||
if (i)
|
||||
{
|
||||
if (r.type != Reply::NOT_OK)
|
||||
i->OnResult(r);
|
||||
else
|
||||
i->OnError(r.bulk);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
buffer += used;
|
||||
l -= used;
|
||||
|
||||
r.Clear();
|
||||
}
|
||||
|
||||
if (l)
|
||||
{
|
||||
save.resize(l);
|
||||
std::copy(buffer, buffer + l, save.begin());
|
||||
}
|
||||
else
|
||||
std::vector<char>().swap(save);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
class ModuleRedis final
|
||||
: public Module
|
||||
{
|
||||
std::map<Anope::string, MyRedisService *> services;
|
||||
|
||||
public:
|
||||
ModuleRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR)
|
||||
{
|
||||
}
|
||||
|
||||
~ModuleRedis() override
|
||||
{
|
||||
for (auto &[_, p] : services)
|
||||
{
|
||||
delete p->sock;
|
||||
p->sock = NULL;
|
||||
|
||||
delete p->sub;
|
||||
p->sub = NULL;
|
||||
|
||||
delete p;
|
||||
}
|
||||
}
|
||||
|
||||
void OnReload(Configuration::Conf &conf) override
|
||||
{
|
||||
const auto &block = conf.GetModule(this);
|
||||
std::vector<Anope::string> new_services;
|
||||
|
||||
for (int i = 0; i < block.CountBlock("redis"); ++i)
|
||||
{
|
||||
const auto &redis = block.GetBlock("redis", i);
|
||||
|
||||
const Anope::string &n = redis.Get<const Anope::string>("name"),
|
||||
&ip = redis.Get<const Anope::string>("ip");
|
||||
int port = redis.Get<int>("port");
|
||||
auto db = redis.Get<unsigned>("db");
|
||||
|
||||
delete services[n];
|
||||
services[n] = new MyRedisService(this, n, ip, port, db);
|
||||
new_services.push_back(n);
|
||||
}
|
||||
|
||||
for (auto it = services.begin(); it != services.end();)
|
||||
{
|
||||
Provider *p = it->second;
|
||||
++it;
|
||||
|
||||
if (std::find(new_services.begin(), new_services.end(), p->name) == new_services.end())
|
||||
delete it->second;
|
||||
}
|
||||
}
|
||||
|
||||
void OnModuleUnload(User *, Module *m) override
|
||||
{
|
||||
for (auto &[_, p] : services)
|
||||
{
|
||||
if (p->sock)
|
||||
for (unsigned i = p->sock->interfaces.size(); i > 0; --i)
|
||||
{
|
||||
Interface *inter = p->sock->interfaces[i - 1];
|
||||
|
||||
if (inter && inter->owner == m)
|
||||
{
|
||||
inter->OnError(m->name + " being unloaded");
|
||||
p->sock->interfaces.erase(p->sock->interfaces.begin() + i - 1);
|
||||
}
|
||||
}
|
||||
|
||||
if (p->sub)
|
||||
for (unsigned i = p->sub->interfaces.size(); i > 0; --i)
|
||||
{
|
||||
Interface *inter = p->sub->interfaces[i - 1];
|
||||
|
||||
if (inter && inter->owner == m)
|
||||
{
|
||||
inter->OnError(m->name + " being unloaded");
|
||||
p->sub->interfaces.erase(p->sub->interfaces.begin() + i - 1);
|
||||
}
|
||||
}
|
||||
|
||||
for (unsigned i = p->ti.interfaces.size(); i > 0; --i)
|
||||
{
|
||||
Interface *inter = p->ti.interfaces[i - 1];
|
||||
|
||||
if (inter && inter->owner == m)
|
||||
{
|
||||
inter->OnError(m->name + " being unloaded");
|
||||
p->ti.interfaces.erase(p->ti.interfaces.begin() + i - 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
MODULE_INIT(ModuleRedis)
|
||||
Reference in New Issue
Block a user