diff --git a/data/anope.example.conf b/data/anope.example.conf index 258b0e9e4..141cce700 100644 --- a/data/anope.example.conf +++ b/data/anope.example.conf @@ -1248,26 +1248,6 @@ module import = no } -/* - * db_redis. - * - * This module allows using Redis (https://redis.io/) as a database backend. - * This module requires that redis is loaded and configured properly. - * - * Redis 2.8 supports keyspace notifications which allows Redis to push notifications - * to Anope about outside modifications to the database. This module supports this and - * will internally reflect any changes made to the database immediately once notified. - * See docs/REDIS for more information regarding this. - */ -#module -{ - name = "db_redis" - - /* - * Redis database to use. This must be configured with redis. - */ - engine = "redis/main" -} /* * [RECOMMENDED] Encryption modules. diff --git a/data/modules.example.conf b/data/modules.example.conf index 87511f052..37f3a77bc 100644 --- a/data/modules.example.conf +++ b/data/modules.example.conf @@ -374,30 +374,6 @@ module { name = "help" } } } -/* - * redis - * - * This module allows other modules to use Redis. - */ -#module -{ - name = "redis" - - /* A redis database */ - redis - { - /* The name of this service */ - name = "redis/main" - - /* - * The redis database to use. New connections default to 0. - */ - db = 0 - - ip = "127.0.0.1" - port = 6379 - } -} /* * [EXTRA] regex_pcre2 diff --git a/docs/REDIS b/docs/REDIS deleted file mode 100644 index 603fa034a..000000000 --- a/docs/REDIS +++ /dev/null @@ -1,160 +0,0 @@ -Anope has Redis database support (https://redis.io/). -This document explains the data structure used by Anope, and explains how -keyspace notification works. - -This is not a tutorial on how to use Redis, see https://redis.io/documentation -for that. - -Table of Contents ------------------ -1) Data structure -2) Keyspace notifications -3) Examples of modifying, deleting, and creating objects - -1) Data structure - - There are 4 key namespaces in Anope, they are: - - id - The keys in id are used to atomically create object ids for new - objects. For example, if I were to create a new BotInfo I would first: - - redis 127.0.0.1:6379> INCR id:BotInfo - - To get the object ID of the new object. - - ids - The keys in ids contain a set of all object ids of the given type. - For example: - - redis 127.0.0.1:6379> SMEMBERS ids:BotInfo - - Returns "1", "2", "3", "4", "5", "6", "7", "8" because I have 8 bots that - have IDs 1, 2, 3, 4, 5, 6, 7, and 8, respectively. - - hash - The keys in hash are the actual objects, stored as hashes. For - example, if I had just looked up all BotInfo ids and wanted to iterate - over all of them, I would start by: - - redis 127.0.0.1:6379> HGETALL hash:BotInfo:1 - - Which gets all keys and values from the hash of type BotInfo with id 1. - This may return: - - "nick" -> "BotServ" - "user" -> "services" - "host" -> "services.anope.org" - "created" -> "1368704765" - - value - The keys in value only exist to aid looking up object IDs. They - are sets of object IDs and are used to map key+value pairs to objects. - For example: - - redis 127.0.0.1:6379> SMEMBERS value:NickAlias:nick:Adam - - Returns a set of object ids of NickAlias objects that have the key - 'nick' set to the value 'Adam' in its hash. Clearly this can only - ever contain at most one object, since it is not possible to have - more than one registered nick with the same name, but other keys - will contain more than one, such as: - - redis 127.0.0.1:6379> SMEMBERS value:NickCore:email:adam@anope.org - - Which would return all accounts with the email "adam@anope.org". - - redis 127.0.0.1:6379> SMEMBERS value:ChanAccess:mask:Adam - - Which would return all access entries set on the account "Adam". - - Behavior similar to SQL's AND, can be achieved using the - SINTER command, which does set intersection on one or more sets. - -2) Keyspace notifications - - Redis 2.7 (unstable) and 2.8 (stable) and newer support keyspace notifications - (https://redis.io/topics/notifications). This allows Redis to notify Anope of - any external changes to objects in the database. Once notified, Anope will - immediately update the object. Otherwise, Anope keeps all objects in memory - and will not regularly read from the database once started. - - You can use this to modify objects in Redis and have them immediately reflected - back into Anope. Additionally you can use this feature to run multiple Anope - instances simultaneously from the same database (see also, Redis database - replication). - - To use keyspace notifications you MUST execute - - redis 127.0.0.1:6379> CONFIG SET notify-keyspace-events KA - OK - - or set notify-keyspace-events in redis.conf properly. Anope always executes - CONFIG SET when it first connects. - - If you do not enable keyspace events properly Anope will be UNABLE to see any - object modifications you do. - - The key space ids and value are managed entirely by Anope, you do - not (and should not) modify them. Once you modify the object (hash), Anope will - update them for you to correctly reflect any changes made to the object. - - Finally, always use atomic operations. If you are inserting a new object with - multiple commands, or inserting multiple objects at once, specifically if the - objects depend on each other, you MUST use a transaction. - -3) Examples of modifying, deleting, and creating objects - - These examples will ONLY work if you meet the criteria in section 2. - - If I want to change the email account 'Adam' to 'Adam@anope.org', I would execute the following: - - redis 127.0.0.1:6379> SMEMBERS value:NickCore:display:Adam - - Which returns a value of "1", which is the object id I want to modify. - Now to change the email: - - redis 127.0.0.1:6379> HSET hash:NickCore:1 email Adam@anope.org - - You can now see this in NickServ's INFO command: - -NickServ- Email address: Adam@anope.org - - If I want to drop the account "Adam", I would execute the following: - - redis 127.0.0.1:6379> SMEMBERS value:NickCore:display:Adam - - Which returns a value of "1". I would then check: - - redis 127.0.0.1:6379> SMEMBERS value:NickAlias:nc:Adam - - To see what nicknames depend on this account to exist, as I will - have to remove those too. This returns the values "2", and "3". - - Finally, I can drop the nick using a transaction via: - - redis 127.0.0.1:6379> MULTI - OK - redis 127.0.0.1:6379> DEL hash:NickAlias:2 - QUEUED - redis 127.0.0.1:6379> DEL hash:NickAlias:3 - QUEUED - redis 127.0.0.1:6379> DEL hash:NickCore:1 - QUEUED - redis 127.0.0.1:6379> EXEC - - Or alternatively simply: - - redis 127.0.0.1:6379> DEL hash:NickAlias:2 hash:NickAlias:3 hash:NickCore:1 - - If I wanted to create a BotServ bot, I would execute the following: - - redis 127.0.0.1:6379> INCR id:BotInfo - - Which returns a new object ID for me, in this example it will be "8". - Now I can create the object: - - HMSET hash:BotInfo:8 nick redis user redis host services.anope.org realname "Anope IRC Services" - - Note if you are using HSET instead of HMSET you will need to use a transaction, as shown in the above example. - If you are watching your services logs you will immediately see: - - USERS: redis!redis@services.anope.org (Anope IRC Services) connected to the network (services.anope.org) - - And the bot redis will be in BotServ's bot list. - Notice how ids:BotInfo and the value keys are updated automatically. diff --git a/include/modules/redis.h b/include/modules/redis.h deleted file mode 100644 index e1b25478f..000000000 --- a/include/modules/redis.h +++ /dev/null @@ -1,83 +0,0 @@ -// Anope IRC Services -// -// Copyright (C) 2003-2026 Anope Contributors -// -// Anope is free software. You can use, modify, and/or distribute it under the -// terms of version 2 of the GNU General Public License. See docs/LICENSE.txt -// for the complete terms of this license and docs/AUTHORS.txt for a list of -// contributors. -// -// Based on the original code of Epona by Lara -// Based on the original code of Services by Andy Church -// -// SPDX-License-Identifier: GPL-2.0-only - -#pragma once - -namespace Redis -{ - struct Reply final - { - enum Type - { - NOT_PARSED, - NOT_OK, - OK, - INT, - BULK, - MULTI_BULK - } - type; - - Reply() { Clear(); } - ~Reply() { Clear(); } - - void Clear() - { - type = NOT_PARSED; - i = 0; - bulk.clear(); - multi_bulk_size = 0; - for (const auto *reply : multi_bulk) - delete reply; - multi_bulk.clear(); - } - - int64_t i; - Anope::string bulk; - int multi_bulk_size; - std::deque multi_bulk; - }; - - class Interface - { - public: - Module *owner; - - Interface(Module *m) : owner(m) { } - virtual ~Interface() = default; - - virtual void OnResult(const Reply &r) = 0; - virtual void OnError(const Anope::string &error) { Log(owner) << error; } - }; - - class Provider - : public Service - { - public: - Provider(Module *c, const Anope::string &n) : Service(c, "Redis::Provider", n) { } - - virtual bool IsSocketDead() = 0; - - virtual void SendCommand(Interface *i, const std::vector &cmds) = 0; - virtual void SendCommand(Interface *i, const Anope::string &str) = 0; - - virtual bool BlockAndProcess() = 0; - - virtual void Subscribe(Interface *i, const Anope::string &pattern) = 0; - virtual void Unsubscribe(const Anope::string &pattern) = 0; - - virtual void StartTransaction() = 0; - virtual void CommitTransaction() = 0; - }; -} diff --git a/include/serialize.h b/include/serialize.h index c603d552a..ee79ab8e0 100644 --- a/include/serialize.h +++ b/include/serialize.h @@ -88,9 +88,6 @@ public: /* Unique ID (per type, not globally) for this object */ Id object_id = 0; - /* Only used by redis, to ignore updates */ - unsigned short redis_ignore = 0; - /** Marks the object as potentially being updated "soon". */ void QueueUpdate(); diff --git a/modules/database/db_redis.cpp b/modules/database/db_redis.cpp deleted file mode 100644 index 3ba6229b7..000000000 --- a/modules/database/db_redis.cpp +++ /dev/null @@ -1,648 +0,0 @@ -// Anope IRC Services -// -// Copyright (C) 2003-2026 Anope Contributors -// -// Anope is free software. You can use, modify, and/or distribute it under the -// terms of version 2 of the GNU General Public License. See docs/LICENSE.txt -// for the complete terms of this license and docs/AUTHORS.txt for a list of -// contributors. -// -// Based on the original code of Epona by Lara -// Based on the original code of Services by Andy Church -// -// SPDX-License-Identifier: GPL-2.0-only - -#include "module.h" -#include "modules/redis.h" - -using namespace Redis; - -class DatabaseRedis; -static DatabaseRedis *me; - -class Data final - : public Serialize::Data -{ -public: - Anope::unordered_map data; - - bool LoadInternal(const Anope::string &key, Anope::string &value) override - { - auto it = this->data.find(key); - if (it == this->data.end()) - return false; - - value = it->second; - return true; - } - - bool StoreInternal(const Anope::string &key, const Anope::string &value) override - { - this->data[key] = value; - return true; - } - - size_t Hash() const override - { - size_t hash = 0; - for (const auto &[_, value] : this->data) - if (!value.empty()) - hash ^= Anope::hash_cs()(value); - return hash; - } -}; - -class TypeLoader final - : public Interface -{ - Anope::string type; -public: - TypeLoader(Module *creator, const Anope::string &t) : Interface(creator), type(t) { } - - void OnResult(const Reply &r) override; -}; - -class ObjectLoader final - : public Interface -{ - Anope::string type; - int64_t id; - -public: - ObjectLoader(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } - - void OnResult(const Reply &r) override; -}; - -class IDInterface final - : public Interface -{ - Reference o; -public: - IDInterface(Module *creator, Serializable *obj) : Interface(creator), o(obj) { } - - void OnResult(const Reply &r) override; -}; - -class Deleter final - : public Interface -{ - Anope::string type; - int64_t id; -public: - Deleter(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } - - void OnResult(const Reply &r) override; -}; - -class Updater final - : public Interface -{ - Anope::string type; - int64_t id; -public: - Updater(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } - - void OnResult(const Reply &r) override; -}; - -class ModifiedObject final - : public Interface -{ - Anope::string type; - int64_t id; -public: - ModifiedObject(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } - - void OnResult(const Reply &r) override; -}; - -class SubscriptionListener final - : public Interface -{ -public: - SubscriptionListener(Module *creator) : Interface(creator) { } - - void OnResult(const Reply &r) override; -}; - -class DatabaseRedis final - : public Module - , public Pipe -{ - SubscriptionListener sl; - std::set updated_items; - -public: - ServiceReference redis; - - DatabaseRedis(const Anope::string &modname, const Anope::string &creator) - : Module(modname, creator, DATABASE | VENDOR) - , sl(this) - , redis("Redis::Provider") - { - me = this; - - } - - /* Insert or update an object */ - void InsertObject(Serializable *obj) - { - Serialize::Type *t = obj->GetSerializableType(); - - /* If there is no id yet for this object, get one */ - if (!obj->object_id) - redis->SendCommand(new IDInterface(this, obj), "INCR id:" + t->GetName()); - else - { - Data data; - t->Serialize(obj, data); - - if (obj->IsCached(data)) - return; - - obj->UpdateCache(data); - - std::vector args; - args.emplace_back("HGETALL"); - args.push_back("hash:" + t->GetName() + ":" + Anope::ToString(obj->object_id)); - - /* Get object attrs to clear before updating */ - redis->SendCommand(new Updater(this, t->GetName(), obj->object_id), args); - } - } - - void OnNotify() override - { - for (auto *obj : this->updated_items) - { - this->InsertObject(obj); - } - - this->updated_items.clear(); - } - - void OnReload(Configuration::Conf &conf) override - { - const auto &block = conf.GetModule(this); - this->redis.SetServiceName(block.Get("engine", "redis/main")); - } - - EventReturn OnLoadDatabase() override - { - if (!redis) - { - Log(this) << "Unable to load database - unable to find redis provider"; - return EVENT_CONTINUE; - } - - for (const auto &type_order : Serialize::Type::GetTypeOrder()) - { - Serialize::Type *sb = Serialize::Type::Find(type_order); - this->OnSerializeTypeCreate(sb); - } - - while (!redis->IsSocketDead() && redis->BlockAndProcess()); - - if (redis->IsSocketDead()) - { - Log(this) << "I/O error while loading redis database - is it online?"; - return EVENT_CONTINUE; - } - - redis->Subscribe(&this->sl, "__keyspace@*__:hash:*"); - - return EVENT_STOP; - } - - void OnSerializeTypeCreate(Serialize::Type *sb) override - { - if (!redis) - return; - - std::vector args; - args.emplace_back("SMEMBERS"); - args.push_back("ids:" + sb->GetName()); - - redis->SendCommand(new TypeLoader(this, sb->GetName()), args); - } - - void OnSerializableConstruct(Serializable *obj) override - { - this->updated_items.insert(obj); - this->Notify(); - } - - void OnSerializableDestruct(Serializable *obj) override - { - Serialize::Type *t = obj->GetSerializableType(); - - if (t == NULL) - { - /* This is probably the module providing the type unloading. - * - * The types get registered after the extensible container is - * registered so that unserialization on module load can insert - * into the extensible container. So, the type destructs prior to - * the extensible container, which then triggers this - */ - return; - } - - std::vector args; - args.emplace_back("HGETALL"); - args.push_back("hash:" + t->GetName() + ":" + Anope::ToString(obj->object_id)); - - /* Get all of the attributes for this object */ - redis->SendCommand(new Deleter(this, t->GetName(), obj->object_id), args); - - this->updated_items.erase(obj); - t->objects.erase(obj->object_id); - this->Notify(); - } - - void OnSerializableUpdate(Serializable *obj) override - { - this->updated_items.insert(obj); - this->Notify(); - } -}; - -void TypeLoader::OnResult(const Reply &r) -{ - if (r.type != Reply::MULTI_BULK || !me->redis) - { - delete this; - return; - } - - for (auto *reply : r.multi_bulk) - { - if (reply->type != Reply::BULK) - continue; - - auto i = Anope::TryConvert(reply->bulk); - if (!i) - continue; - - auto id = i.value(); - std::vector args; - args.emplace_back("HGETALL"); - args.push_back("hash:" + this->type + ":" + Anope::ToString(id)); - - me->redis->SendCommand(new ObjectLoader(me, this->type, id), args); - } - - delete this; -} - -void ObjectLoader::OnResult(const Reply &r) -{ - Serialize::Type *st = Serialize::Type::Find(this->type); - - if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis || !st) - { - delete this; - return; - } - - Data data; - - for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) - { - const Reply *key = r.multi_bulk[i], - *value = r.multi_bulk[i + 1]; - - data.StoreInternal(key->bulk, value->bulk); - } - - Serializable *&obj = st->objects[this->id]; - obj = st->Unserialize(obj, data); - if (obj) - { - obj->object_id = this->id; - obj->UpdateCache(data); - } - - delete this; -} - -void IDInterface::OnResult(const Reply &r) -{ - if (!o || r.type != Reply::INT || !r.i) - { - delete this; - return; - } - - Serializable *&obj = o->GetSerializableType()->objects[r.i]; - if (obj) - /* This shouldn't be possible */ - obj->object_id = 0; - - o->object_id = r.i; - obj = o; - - /* Now that we have the id, insert this object for real */ - anope_dynamic_static_cast(this->owner)->InsertObject(o); - - delete this; -} - -void Deleter::OnResult(const Reply &r) -{ - if (r.type != Reply::MULTI_BULK || !me->redis || r.multi_bulk.empty()) - { - delete this; - return; - } - - /* Transaction start */ - me->redis->StartTransaction(); - - std::vector args; - args.emplace_back("DEL"); - args.push_back("hash:" + this->type + ":" + Anope::ToString(this->id)); - - /* Delete hash object */ - me->redis->SendCommand(NULL, args); - - args.clear(); - args.emplace_back("SREM"); - args.push_back("ids:" + this->type); - args.push_back(Anope::ToString(this->id)); - - /* Delete id from ids set */ - me->redis->SendCommand(NULL, args); - - for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) - { - const Reply *key = r.multi_bulk[i], - *value = r.multi_bulk[i + 1]; - - args.clear(); - args.emplace_back("SREM"); - args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk); - args.push_back(Anope::ToString(this->id)); - - /* Delete value -> object id */ - me->redis->SendCommand(NULL, args); - } - - /* Transaction end */ - me->redis->CommitTransaction(); - - delete this; -} - -void Updater::OnResult(const Reply &r) -{ - Serialize::Type *st = Serialize::Type::Find(this->type); - - if (!st) - { - delete this; - return; - } - - Serializable *obj = st->objects[this->id]; - if (!obj) - { - delete this; - return; - } - - Data data; - st->Serialize(obj, data); - - /* Transaction start */ - me->redis->StartTransaction(); - - for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) - { - const Reply *key = r.multi_bulk[i], - *value = r.multi_bulk[i + 1]; - - std::vector args; - args.emplace_back("SREM"); - args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk); - args.push_back(Anope::ToString(this->id)); - - /* Delete value -> object id */ - me->redis->SendCommand(NULL, args); - } - - /* Add object id to id set for this type */ - std::vector args; - args.emplace_back("SADD"); - args.push_back("ids:" + this->type); - args.push_back(Anope::ToString(obj->object_id)); - me->redis->SendCommand(NULL, args); - - args.clear(); - args.emplace_back("HMSET"); - args.push_back("hash:" + this->type + ":" + Anope::ToString(obj->object_id)); - - for (const auto &[key, value] : data.data) - { - args.push_back(key); - args.emplace_back(value); - - std::vector args2; - - args2.emplace_back("SADD"); - args2.push_back("value:" + this->type + ":" + key + ":" + value); - args2.push_back(Anope::ToString(obj->object_id)); - - /* Add to value -> object id set */ - me->redis->SendCommand(NULL, args2); - } - - ++obj->redis_ignore; - - /* Add object */ - me->redis->SendCommand(NULL, args); - - /* Transaction end */ - me->redis->CommitTransaction(); - - delete this; -} - -void SubscriptionListener::OnResult(const Reply &r) -{ - /* - * [May 15 13:59:35.645839 2013] Debug: pmessage - * [May 15 13:59:35.645866 2013] Debug: __keyspace@*__:anope:hash:* - * [May 15 13:59:35.645880 2013] Debug: __keyspace@0__:anope:hash:type:id - * [May 15 13:59:35.645893 2013] Debug: hset - */ - if (r.multi_bulk.size() != 4) - return; - - size_t sz = r.multi_bulk[2]->bulk.find(':'); - if (sz == Anope::string::npos) - return; - - const Anope::string &key = r.multi_bulk[2]->bulk.substr(sz + 1), - &op = r.multi_bulk[3]->bulk; - - sz = key.rfind(':'); - if (sz == Anope::string::npos) - return; - - const Anope::string &id = key.substr(sz + 1); - - size_t sz2 = key.rfind(':', sz - 1); - if (sz2 == Anope::string::npos) - return; - const Anope::string &type = key.substr(sz2 + 1, sz - sz2 - 1); - - Serialize::Type *s_type = Serialize::Type::Find(type); - - if (s_type == NULL) - return; - - auto oid = Anope::TryConvert(id); - if (!oid.has_value()) - return; - - auto obj_id = oid.value(); - if (op == "hset" || op == "hdel") - { - Serializable *s = s_type->objects[obj_id]; - - if (s && s->redis_ignore) - { - --s->redis_ignore; - Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type << ", but I am ignoring it"; - } - else - { - Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type; - - std::vector args; - args.emplace_back("HGETALL"); - args.push_back("hash:" + type + ":" + id); - - me->redis->SendCommand(new ModifiedObject(me, type, obj_id), args); - } - } - else if (op == "del") - { - Serializable *&s = s_type->objects[obj_id]; - if (s == NULL) - return; - - Log(LOG_DEBUG) << "redis: notify: deleting object id " << obj_id << " of type " << type; - - Data data; - s_type->Serialize(s, data); - - /* Transaction start */ - me->redis->StartTransaction(); - - for (const auto &[k, value] : data.data) - { - std::vector args; - args.emplace_back("SREM"); - args.push_back("value:" + type + ":" + k + ":" + value); - args.push_back(id); - - /* Delete value -> object id */ - me->redis->SendCommand(NULL, args); - } - - std::vector args; - args.emplace_back("SREM"); - args.push_back("ids:" + type); - args.push_back(Anope::ToString(s->object_id)); - - /* Delete object from id set */ - me->redis->SendCommand(NULL, args); - - /* Transaction end */ - me->redis->CommitTransaction(); - - delete s; - s = NULL; - } -} - -void ModifiedObject::OnResult(const Reply &r) -{ - Serialize::Type *st = Serialize::Type::Find(this->type); - - if (!st) - { - delete this; - return; - } - - Serializable *&obj = st->objects[this->id]; - - /* Transaction start */ - me->redis->StartTransaction(); - - /* Erase old object values */ - if (obj) - { - Data data; - st->Serialize(obj, data); - - for (auto &[key, value] : data.data) - { - std::vector args; - args.emplace_back("SREM"); - args.push_back("value:" + st->GetName() + ":" + key + ":" + value); - args.push_back(Anope::ToString(this->id)); - - /* Delete value -> object id */ - me->redis->SendCommand(NULL, args); - } - } - - Data data; - - for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) - { - const Reply *key = r.multi_bulk[i], - *value = r.multi_bulk[i + 1]; - - data.StoreInternal(key->bulk, value->bulk); - } - - obj = st->Unserialize(obj, data); - if (obj) - { - obj->object_id = this->id; - obj->UpdateCache(data); - - /* Insert new object values */ - for (const auto &[key, value] : data.data) - { - std::vector args; - args.emplace_back("SADD"); - args.push_back("value:" + st->GetName() + ":" + key + ":" + value); - args.push_back(Anope::ToString(obj->object_id)); - - /* Add to value -> object id set */ - me->redis->SendCommand(NULL, args); - } - - std::vector args; - args.emplace_back("SADD"); - args.push_back("ids:" + st->GetName()); - args.push_back(Anope::ToString(obj->object_id)); - - /* Add to type -> id set */ - me->redis->SendCommand(NULL, args); - } - - /* Transaction end */ - me->redis->CommitTransaction(); - - delete this; -} - -MODULE_INIT(DatabaseRedis) diff --git a/modules/redis.cpp b/modules/redis.cpp deleted file mode 100644 index fe541dc47..000000000 --- a/modules/redis.cpp +++ /dev/null @@ -1,615 +0,0 @@ -// Anope IRC Services -// -// Copyright (C) 2003-2026 Anope Contributors -// -// Anope is free software. You can use, modify, and/or distribute it under the -// terms of version 2 of the GNU General Public License. See docs/LICENSE.txt -// for the complete terms of this license and docs/AUTHORS.txt for a list of -// contributors. -// -// Based on the original code of Epona by Lara -// Based on the original code of Services by Andy Church -// -// SPDX-License-Identifier: GPL-2.0-only - -#include "module.h" -#include "modules/redis.h" - -using namespace Redis; - -class MyRedisService; - -class RedisSocket final - : public BinarySocket - , public ConnectionSocket -{ - size_t ParseReply(Reply &r, const char *buf, size_t l); -public: - MyRedisService *provider; - std::deque interfaces; - std::map subinterfaces; - - RedisSocket(MyRedisService *pro, bool v6) : Socket(-1, v6 ? AF_INET6 : AF_INET), provider(pro) { } - - ~RedisSocket() override; - - void OnConnect() override; - void OnError(const Anope::string &error) override; - - bool Read(const char *buffer, size_t l) override; -}; - -class Transaction final - : public Interface -{ -public: - std::deque interfaces; - - Transaction(Module *creator) : Interface(creator) { } - - ~Transaction() override - { - for (auto *iface : interfaces) - { - if (!iface) - continue; - - iface->OnError("Interface going away"); - } - } - - void OnResult(const Reply &r) override - { - /* This is a multi bulk reply of the results of the queued commands - * in this transaction - */ - - Log(LOG_DEBUG_2) << "redis: transaction complete with " << r.multi_bulk.size() << " results"; - - for (auto *result : r.multi_bulk) - { - if (interfaces.empty()) - break; - - Interface *inter = interfaces.front(); - interfaces.pop_front(); - - if (inter) - inter->OnResult(*result); - } - } -}; - -class MyRedisService final - : public Provider -{ -public: - Anope::string host; - int port; - unsigned db; - - RedisSocket *sock = nullptr, *sub = nullptr; - - Transaction ti; - bool in_transaction = false; - - MyRedisService(Module *c, const Anope::string &n, const Anope::string &h, int p, unsigned d) : Provider(c, n), host(h), port(p), db(d), ti(c) - { - sock = new RedisSocket(this, host.find(':') != Anope::string::npos); - sock->Connect(host, port); - - sub = new RedisSocket(this, host.find(':') != Anope::string::npos); - sub->Connect(host, port); - } - - ~MyRedisService() override - { - if (sock) - { - sock->flags[SF_DEAD] = true; - sock->provider = NULL; - } - - if (sub) - { - sub->flags[SF_DEAD] = true; - sub->provider = NULL; - } - } - -private: - static inline void Pack(std::vector &buffer, const char *buf, size_t sz = 0) - { - if (!sz) - sz = strlen(buf); - - size_t old_size = buffer.size(); - buffer.resize(old_size + sz); - std::copy(buf, buf + sz, buffer.begin() + old_size); - } - - void Send(RedisSocket *s, Interface *i, const std::vector > &args) - { - std::vector buffer; - - Pack(buffer, "*"); - Pack(buffer, Anope::ToString(args.size()).c_str()); - Pack(buffer, "\r\n"); - - for (const auto &[key, value] : args) - { - Pack(buffer, "$"); - Pack(buffer, Anope::ToString(value).c_str()); - Pack(buffer, "\r\n"); - - Pack(buffer, key, value); - Pack(buffer, "\r\n"); - } - - if (buffer.empty()) - return; - - s->Write(&buffer[0], buffer.size()); - if (in_transaction) - { - ti.interfaces.push_back(i); - s->interfaces.push_back(NULL); // For the +Queued response - } - else - s->interfaces.push_back(i); - } - -public: - bool IsSocketDead() override - { - return this->sock && this->sock->flags[SF_DEAD]; - } - - void SendCommand(RedisSocket *s, Interface *i, const std::vector &cmds) - { - std::vector > args; - for (const auto &cmd : cmds) - args.emplace_back(cmd.c_str(), cmd.length()); - this->Send(s, i, args); - } - - void SendCommand(RedisSocket *s, Interface *i, const Anope::string &str) - { - std::vector args; - spacesepstream(str).GetTokens(args); - this->SendCommand(s, i, args); - } - - void Send(Interface *i, const std::vector > &args) - { - if (!sock) - { - sock = new RedisSocket(this, host.find(':') != Anope::string::npos); - sock->Connect(host, port); - } - - this->Send(sock, i, args); - } - - void SendCommand(Interface *i, const std::vector &cmds) override - { - std::vector > args; - for (const auto &cmd : cmds) - args.emplace_back(cmd.c_str(), cmd.length()); - this->Send(i, args); - } - - void SendCommand(Interface *i, const Anope::string &str) override - { - std::vector args; - spacesepstream(str).GetTokens(args); - this->SendCommand(i, args); - } - -public: - bool BlockAndProcess() override - { - if (!this->sock->ProcessWrite()) - this->sock->flags[SF_DEAD] = true; - this->sock->SetBlocking(true); - if (!this->sock->ProcessRead()) - this->sock->flags[SF_DEAD] = true; - this->sock->SetBlocking(false); - return !this->sock->interfaces.empty(); - } - - void Subscribe(Interface *i, const Anope::string &pattern) override - { - if (sub == NULL) - { - sub = new RedisSocket(this, host.find(':') != Anope::string::npos); - sub->Connect(host, port); - } - - std::vector args; - args.emplace_back("PSUBSCRIBE"); - args.push_back(pattern); - this->SendCommand(sub, NULL, args); - - sub->subinterfaces[pattern] = i; - } - - void Unsubscribe(const Anope::string &pattern) override - { - if (sub) - sub->subinterfaces.erase(pattern); - } - - void StartTransaction() override - { - if (in_transaction) - throw ModuleException("Tried to start a transaction while one was already in progress"); - - this->SendCommand(NULL, "MULTI"); - in_transaction = true; - } - - void CommitTransaction() override - { - /* The result of the transaction comes back to the reply of EXEC as a multi bulk. - * The reply to the individual commands that make up the transaction when executed - * is a simple +QUEUED - */ - in_transaction = false; - this->SendCommand(&this->ti, "EXEC"); - } -}; - -RedisSocket::~RedisSocket() -{ - if (provider) - { - if (provider->sock == this) - provider->sock = NULL; - else if (provider->sub == this) - provider->sub = NULL; - } - - for (auto *iface : interfaces) - { - if (!iface) - continue; - - iface->OnError("Interface going away"); - } -} - -void RedisSocket::OnConnect() -{ - Log() << "redis: Successfully connected to " << provider->name << (this == this->provider->sub ? " (sub)" : ""); - - this->provider->SendCommand(NULL, "CLIENT SETNAME Anope"); - this->provider->SendCommand(NULL, "SELECT " + Anope::ToString(provider->db)); - - if (this != this->provider->sub) - { - this->provider->SendCommand(this, NULL, "CONFIG SET notify-keyspace-events KA"); - } -} - -void RedisSocket::OnError(const Anope::string &error) -{ - Log() << "redis: Error on " << provider->name << (this == this->provider->sub ? " (sub)" : "") << ": " << error; -} - -size_t RedisSocket::ParseReply(Reply &r, const char *buffer, size_t l) -{ - size_t used = 0; - - if (!l) - return used; - - if (r.type == Reply::MULTI_BULK) - goto multi_bulk_cont; - - switch (*buffer) - { - case '+': - { - Anope::string reason(buffer, 1, l - 1); - size_t nl = reason.find("\r\n"); - Log(LOG_DEBUG_2) << "redis: status ok: " << reason.substr(0, nl); - if (nl != Anope::string::npos) - { - r.type = Reply::OK; - used = 1 + nl + 2; - } - break; - } - case '-': - { - Anope::string reason(buffer, 1, l - 1); - size_t nl = reason.find("\r\n"); - Log(LOG_DEBUG) << "redis: status error: " << reason.substr(0, nl); - if (nl != Anope::string::npos) - { - r.type = Reply::NOT_OK; - used = 1 + nl + 2; - } - break; - } - case ':': - { - Anope::string ibuf(buffer, 1, l - 1); - size_t nl = ibuf.find("\r\n"); - if (nl != Anope::string::npos) - { - if (auto i = Anope::TryConvert(ibuf.substr(0, nl))) - r.i = i.value(); - - r.type = Reply::INT; - used = 1 + nl + 2; - } - break; - } - case '$': - { - Anope::string reply(buffer + 1, l - 1); - /* This assumes one bulk can always fit in our recv buffer */ - size_t nl = reply.find("\r\n"); - if (nl != Anope::string::npos) - { - if (auto l = Anope::TryConvert(reply.substr(0, nl))) - { - int len = l.value(); - if (len >= 0) - { - if (1 + nl + 2 + len + 2 <= l) - { - used = 1 + nl + 2 + len + 2; - r.bulk = reply.substr(nl + 2, len); - r.type = Reply::BULK; - } - } - else - { - used = 1 + nl + 2 + 2; - r.type = Reply::BULK; - } - } - } - break; - } - multi_bulk_cont: - case '*': - { - if (r.type != Reply::MULTI_BULK) - { - Anope::string reply(buffer + 1, l - 1); - size_t nl = reply.find("\r\n"); - if (nl != Anope::string::npos) - { - r.type = Reply::MULTI_BULK; - if (auto size = Anope::TryConvert(reply.substr(0, nl))) - r.multi_bulk_size = size.value(); - used = 1 + nl + 2; - } - else - break; - } - else if (r.multi_bulk_size >= 0 && r.multi_bulk.size() == static_cast(r.multi_bulk_size)) - { - /* This multi bulk is already complete, so check the sub bulks */ - for (auto &bulk : r.multi_bulk) - if (bulk->type == Reply::MULTI_BULK) - ParseReply(*bulk, buffer + used, l - used); - break; - } - - for (int i = r.multi_bulk.size(); i < r.multi_bulk_size; ++i) - { - auto *reply = new Reply(); - size_t u = ParseReply(*reply, buffer + used, l - used); - if (!u) - { - Log(LOG_DEBUG) << "redis: ran out of data to parse"; - delete reply; - break; - } - r.multi_bulk.push_back(reply); - used += u; - } - break; - } - default: - Log(LOG_DEBUG) << "redis: unknown reply " << *buffer; - } - - return used; -} - -bool RedisSocket::Read(const char *buffer, size_t l) -{ - static std::vector save; - std::vector copy; - - if (!save.empty()) - { - std::copy(buffer, buffer + l, std::back_inserter(save)); - - copy = save; - - buffer = ©[0]; - l = copy.size(); - } - - while (l) - { - static Reply r; - - size_t used = this->ParseReply(r, buffer, l); - if (!used) - { - Log(LOG_DEBUG) << "redis: used == 0 ?"; - r.Clear(); - break; - } - else if (used > l) - { - Log(LOG_DEBUG) << "redis: used > l ?"; - r.Clear(); - break; - } - - /* Full result is not here yet */ - if (r.type == Reply::MULTI_BULK && static_cast(r.multi_bulk_size) != r.multi_bulk.size()) - { - buffer += used; - l -= used; - break; - } - - if (this == provider->sub) - { - if (r.multi_bulk.size() == 4) - { - /* pmessage - * pattern subscribed to - * __keyevent@0__:set - * key - */ - auto it = this->subinterfaces.find(r.multi_bulk[1]->bulk); - if (it != this->subinterfaces.end()) - it->second->OnResult(r); - } - } - else - { - if (this->interfaces.empty()) - { - Log(LOG_DEBUG) << "redis: no interfaces?"; - } - else - { - Interface *i = this->interfaces.front(); - this->interfaces.pop_front(); - - if (i) - { - if (r.type != Reply::NOT_OK) - i->OnResult(r); - else - i->OnError(r.bulk); - } - } - } - - buffer += used; - l -= used; - - r.Clear(); - } - - if (l) - { - save.resize(l); - std::copy(buffer, buffer + l, save.begin()); - } - else - std::vector().swap(save); - - return true; -} - - -class ModuleRedis final - : public Module -{ - std::map services; - -public: - ModuleRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR) - { - } - - ~ModuleRedis() override - { - for (auto &[_, p] : services) - { - delete p->sock; - p->sock = NULL; - - delete p->sub; - p->sub = NULL; - - delete p; - } - } - - void OnReload(Configuration::Conf &conf) override - { - const auto &block = conf.GetModule(this); - std::vector new_services; - - for (int i = 0; i < block.CountBlock("redis"); ++i) - { - const auto &redis = block.GetBlock("redis", i); - - const Anope::string &n = redis.Get("name"), - &ip = redis.Get("ip"); - int port = redis.Get("port"); - auto db = redis.Get("db"); - - delete services[n]; - services[n] = new MyRedisService(this, n, ip, port, db); - new_services.push_back(n); - } - - for (auto it = services.begin(); it != services.end();) - { - Provider *p = it->second; - ++it; - - if (std::find(new_services.begin(), new_services.end(), p->name) == new_services.end()) - delete it->second; - } - } - - void OnModuleUnload(User *, Module *m) override - { - for (auto &[_, p] : services) - { - if (p->sock) - for (unsigned i = p->sock->interfaces.size(); i > 0; --i) - { - Interface *inter = p->sock->interfaces[i - 1]; - - if (inter && inter->owner == m) - { - inter->OnError(m->name + " being unloaded"); - p->sock->interfaces.erase(p->sock->interfaces.begin() + i - 1); - } - } - - if (p->sub) - for (unsigned i = p->sub->interfaces.size(); i > 0; --i) - { - Interface *inter = p->sub->interfaces[i - 1]; - - if (inter && inter->owner == m) - { - inter->OnError(m->name + " being unloaded"); - p->sub->interfaces.erase(p->sub->interfaces.begin() + i - 1); - } - } - - for (unsigned i = p->ti.interfaces.size(); i > 0; --i) - { - Interface *inter = p->ti.interfaces[i - 1]; - - if (inter && inter->owner == m) - { - inter->OnError(m->name + " being unloaded"); - p->ti.interfaces.erase(p->ti.interfaces.begin() + i - 1); - } - } - } - } -}; - -MODULE_INIT(ModuleRedis)