From 49863109453faa907ce2c8b1158e60a6777d28ab Mon Sep 17 00:00:00 2001 From: Yanqi Lv Date: Wed, 20 Nov 2024 04:53:19 +0800 Subject: [PATCH] Import-mode: Avoid expiration and eviction during data syncing (#1185) New config: `import-mode (yes|no)` New command: `CLIENT IMPORT-SOURCE (ON|OFF)` The config, when set to `yes`, disables eviction and deletion of expired keys, except for commands coming from a client which has marked itself as an import-source, the data source when importing data from another node, using the CLIENT IMPORT-SOURCE command. When we sync data from the source Valkey to the destination Valkey using some sync tools like [redis-shake](https://github.com/tair-opensource/RedisShake), the destination Valkey can perform expiration and eviction, which may cause data corruption. This problem has been discussed in https://github.com/redis/redis/discussions/9760#discussioncomment-1681041 and Redis already have a solution. But in Valkey we haven't fixed it by now. E.g. we call `set key 1 ex 1` on the source server and transfer this command to the destination server. Then we call `incr key` on the source server before the key expired, we will have a key on the source server with a value of 2. But when the command arrived at the destination server, the key may be expired and has deleted. So we will have a key on the destination server with a value of 1, which is inconsistent with the source server. In standalone mode, we can use writable replica to simplify the sync process. However, in cluster mode, we still need a sync tool to help us transfer the source data to the destination. The sync tool usually work as a normal client and the destination works as a primary which keep expiration and eviction. In this PR, we add a new mode named 'import-mode'. In this mode, server stop expiration and eviction just like a replica. Notice that this mode exists only in sync state to avoid data inconsistency caused by expiration and eviction. Import mode only takes effect on the primary. Sync tools can mark their clients as an import source by `CLIENT IMPORT-SOURCE`, which work like a client from primary and can visit expired keys in `lookupkey`. **Notice: during the migration, other clients, apart from the import source, should not access the data imported by import source.** --------- Signed-off-by: lvyanqi.lyq Signed-off-by: Yanqi Lv Co-authored-by: Madelyn Olson --- src/commands.def | 29 ++++++++++ src/commands/client-import-source.json | 40 ++++++++++++++ src/config.c | 1 + src/db.c | 21 +++++++- src/evict.c | 4 +- src/expire.c | 7 ++- src/networking.c | 20 +++++++ src/server.c | 9 ++-- src/server.h | 5 +- tests/unit/expire.tcl | 74 ++++++++++++++++++++++++++ tests/unit/maxmemory.tcl | 18 +++++++ valkey.conf | 7 +++ 12 files changed, 225 insertions(+), 10 deletions(-) create mode 100644 src/commands/client-import-source.json diff --git a/src/commands.def b/src/commands.def index 791b30d54..ecc77126a 100644 --- a/src/commands.def +++ b/src/commands.def @@ -1230,6 +1230,34 @@ struct COMMAND_ARG CLIENT_CAPA_Args[] = { #define CLIENT_ID_Keyspecs NULL #endif +/********** CLIENT IMPORT_SOURCE ********************/ + +#ifndef SKIP_CMD_HISTORY_TABLE +/* CLIENT IMPORT_SOURCE history */ +#define CLIENT_IMPORT_SOURCE_History NULL +#endif + +#ifndef SKIP_CMD_TIPS_TABLE +/* CLIENT IMPORT_SOURCE tips */ +#define CLIENT_IMPORT_SOURCE_Tips NULL +#endif + +#ifndef SKIP_CMD_KEY_SPECS_TABLE +/* CLIENT IMPORT_SOURCE key specs */ +#define CLIENT_IMPORT_SOURCE_Keyspecs NULL +#endif + +/* CLIENT IMPORT_SOURCE enabled argument table */ +struct COMMAND_ARG CLIENT_IMPORT_SOURCE_enabled_Subargs[] = { +{MAKE_ARG("on",ARG_TYPE_PURE_TOKEN,-1,"ON",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("off",ARG_TYPE_PURE_TOKEN,-1,"OFF",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +}; + +/* CLIENT IMPORT_SOURCE argument table */ +struct COMMAND_ARG CLIENT_IMPORT_SOURCE_Args[] = { +{MAKE_ARG("enabled",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLIENT_IMPORT_SOURCE_enabled_Subargs}, +}; + /********** CLIENT INFO ********************/ #ifndef SKIP_CMD_HISTORY_TABLE @@ -1630,6 +1658,7 @@ struct COMMAND_STRUCT CLIENT_Subcommands[] = { {MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)}, {MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)}, {MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)}, +{MAKE_CMD("import-source","Mark this client as an import source when server is in import mode.","O(1)","8.1.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, {MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)}, {MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args}, {MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args}, diff --git a/src/commands/client-import-source.json b/src/commands/client-import-source.json new file mode 100644 index 000000000..113c07d70 --- /dev/null +++ b/src/commands/client-import-source.json @@ -0,0 +1,40 @@ +{ + "IMPORT-SOURCE": { + "summary": "Mark this client as an import source when server is in import mode.", + "complexity": "O(1)", + "group": "connection", + "since": "8.1.0", + "arity": 3, + "container": "CLIENT", + "function": "clientCommand", + "command_flags": [ + "NOSCRIPT", + "LOADING", + "STALE" + ], + "acl_categories": [ + "CONNECTION" + ], + "reply_schema": { + "const": "OK" + }, + "arguments": [ + { + "name": "enabled", + "type": "oneof", + "arguments": [ + { + "name": "on", + "type": "pure-token", + "token": "ON" + }, + { + "name": "off", + "type": "pure-token", + "token": "OFF" + } + ] + } + ] + } +} \ No newline at end of file diff --git a/src/config.c b/src/config.c index 15fec1527..c4009adef 100644 --- a/src/config.c +++ b/src/config.c @@ -3139,6 +3139,7 @@ standardConfig static_configs[] = { createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL), createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), + createBoolConfig("import-mode", NULL, MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/db.c b/src/db.c index b59c7727b..10d4a0409 100644 --- a/src/db.c +++ b/src/db.c @@ -385,7 +385,7 @@ robj *dbRandomKey(serverDb *db) { key = dictGetKey(de); keyobj = createStringObject(key, sdslen(key)); if (dbFindExpiresWithDictIndex(db, key, randomDictIndex)) { - if (allvolatile && server.primary_host && --maxtries == 0) { + if (allvolatile && (server.primary_host || server.import_mode) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically * expired in the repilca, so the function cannot stop because @@ -1821,6 +1821,25 @@ keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int di if (server.primary_host != NULL) { if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID; if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; + } else if (server.import_mode) { + /* If we are running in the import mode on a primary, instead of + * evicting the expired key from the database, we return ASAP: + * the key expiration is controlled by the import source that will + * send us synthesized DEL operations for expired keys. The + * exception is when write operations are performed on this server + * because it's a primary. + * + * Notice: other clients, apart from the import source, should not access + * the data imported by import source. + * + * Still we try to return the right information to the caller, + * that is, KEY_VALID if we think the key should still be valid, + * KEY_EXPIRED if we think the key is expired but don't want to delete it at this time. + * + * When receiving commands from the import source, keys are never considered + * expired. */ + if (server.current_client && (server.current_client->flag.import_source)) return KEY_VALID; + if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; } /* In some cases we're explicitly instructed to return an indication of a diff --git a/src/evict.c b/src/evict.c index 5e4b6220e..5208328b3 100644 --- a/src/evict.c +++ b/src/evict.c @@ -546,8 +546,8 @@ int performEvictions(void) { goto update_metrics; } - if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) { - result = EVICT_FAIL; /* We need to free memory, but policy forbids. */ + if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || (iAmPrimary() && server.import_mode)) { + result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in import mode. */ goto update_metrics; } diff --git a/src/expire.c b/src/expire.c index 928bb58d8..c22df1ef8 100644 --- a/src/expire.c +++ b/src/expire.c @@ -520,8 +520,11 @@ int checkAlreadyExpired(long long when) { * of a replica instance. * * Instead we add the already expired key to the database with expire time - * (possibly in the past) and wait for an explicit DEL from the primary. */ - return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host); + * (possibly in the past) and wait for an explicit DEL from the primary. + * + * If the server is a primary and in the import mode, we also add the already + * expired key and wait for an explicit DEL from the import source. */ + return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.import_mode); } #define EXPIRE_NX (1 << 0) diff --git a/src/networking.c b/src/networking.c index 4791055b5..9558780f3 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3585,6 +3585,10 @@ void clientCommand(client *c) { " Protect current client connection from eviction.", "NO-TOUCH (ON|OFF)", " Will not touch LRU/LFU stats when this mode is on.", + "IMPORT-SOURCE (ON|OFF)", + " Mark this connection as an import source if server.import_mode is true.", + " Sync tools can set their connections into 'import-source' state to visit", + " expired keys.", NULL}; addReplyHelp(c, help); } else if (!strcasecmp(c->argv[1]->ptr, "id") && c->argc == 2) { @@ -4058,6 +4062,22 @@ void clientCommand(client *c) { } } addReply(c, shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr, "import-source")) { + /* CLIENT IMPORT-SOURCE ON|OFF */ + if (!server.import_mode) { + addReplyError(c, "Server is not in import mode"); + return; + } + if (!strcasecmp(c->argv[2]->ptr, "on")) { + c->flag.import_source = 1; + addReply(c, shared.ok); + } else if (!strcasecmp(c->argv[2]->ptr, "off")) { + c->flag.import_source = 0; + addReply(c, shared.ok); + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } } else { addReplySubcommandSyntaxError(c); } diff --git a/src/server.c b/src/server.c index 12691df8e..aebbb57a9 100644 --- a/src/server.c +++ b/src/server.c @@ -1131,10 +1131,10 @@ void databasesCron(void) { /* Expire keys by random sampling. Not required for replicas * as primary will synthesize DELs for us. */ if (server.active_expire_enabled) { - if (iAmPrimary()) { - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else { + if (!iAmPrimary()) { expireReplicaKeys(); + } else if (!server.import_mode) { + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } } @@ -1727,7 +1727,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + if (server.active_expire_enabled && !server.import_mode && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); if (moduleCount()) { moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP, NULL); @@ -2133,6 +2133,7 @@ void initServerConfig(void) { server.extended_redis_compat = 0; server.pause_cron = 0; server.dict_resizing = 1; + server.import_mode = 0; server.latency_tracking_info_percentiles_len = 3; server.latency_tracking_info_percentiles = zmalloc(sizeof(double) * (server.latency_tracking_info_percentiles_len)); diff --git a/src/server.h b/src/server.h index 5ef04a908..531ca8e7c 100644 --- a/src/server.h +++ b/src/server.h @@ -1233,7 +1233,8 @@ typedef struct ClientFlags { * knows that it does not need the cache and required a full sync. With this * flag, we won't cache the primary in freeClient. */ uint64_t fake : 1; /* This is a fake client without a real connection. */ - uint64_t reserved : 5; /* Reserved for future use */ + uint64_t import_source : 1; /* This client is importing data to server and can visit expired key. */ + uint64_t reserved : 4; /* Reserved for future use */ } ClientFlags; typedef struct client { @@ -2089,6 +2090,8 @@ struct valkeyServer { char primary_replid[CONFIG_RUN_ID_SIZE + 1]; /* Primary PSYNC runid. */ long long primary_initial_offset; /* Primary PSYNC offset. */ int repl_replica_lazy_flush; /* Lazy FLUSHALL before loading DB? */ + /* Import Mode */ + int import_mode; /* If true, server is in import mode and forbid expiration and eviction. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */ diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index d85ce7ee6..fba425f62 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -832,6 +832,80 @@ start_server {tags {"expire"}} { close_replication_stream $repl assert_equal [r debug set-active-expire 1] {OK} } {} {needs:debug} + + test {Import mode should forbid active expiration} { + r flushall + + r config set import-mode yes + assert_equal [r client import-source on] {OK} + + r set foo1 bar PX 1 + r set foo2 bar PX 1 + after 100 + + assert_equal [r dbsize] {2} + + assert_equal [r client import-source off] {OK} + r config set import-mode no + + # Verify all keys have expired + wait_for_condition 40 100 { + [r dbsize] eq 0 + } else { + fail "Keys did not actively expire." + } + } + + test {Import mode should forbid lazy expiration} { + r flushall + r debug set-active-expire 0 + + r config set import-mode yes + assert_equal [r client import-source on] {OK} + + r set foo1 1 PX 1 + after 10 + + r get foo1 + assert_equal [r dbsize] {1} + + assert_equal [r client import-source off] {OK} + r config set import-mode no + + r get foo1 + + assert_equal [r dbsize] {0} + + assert_equal [r debug set-active-expire 1] {OK} + } {} {needs:debug} + + test {RANDOMKEY can return expired key in import mode} { + r flushall + + r config set import-mode yes + assert_equal [r client import-source on] {OK} + + r set foo1 bar PX 1 + after 10 + + set client [valkey [srv "host"] [srv "port"] 0 $::tls] + if {!$::singledb} { + $client select 9 + } + assert_equal [$client ttl foo1] {-2} + + assert_equal [r randomkey] {foo1} + + assert_equal [r client import-source off] {OK} + r config set import-mode no + + # Verify all keys have expired + wait_for_condition 40 100 { + [r dbsize] eq 0 + } else { + fail "Keys did not actively expire." + } + } } start_cluster 1 0 {tags {"expire external:skip cluster"}} { diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index d4e62246f..89e9699a3 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -611,3 +611,21 @@ start_server {tags {"maxmemory" "external:skip"}} { assert {[r object freq foo] == 5} } } + +start_server {tags {"maxmemory" "external:skip"}} { + test {Import mode should forbid eviction} { + r set key val + r config set import-mode yes + assert_equal [r client import-source on] {OK} + r config set maxmemory-policy allkeys-lru + r config set maxmemory 1 + + assert_equal [r dbsize] {1} + assert_error {OOM command not allowed*} {r set key1 val1} + + assert_equal [r client import-source off] {OK} + r config set import-mode no + + assert_equal [r dbsize] {0} + } +} \ No newline at end of file diff --git a/valkey.conf b/valkey.conf index 7c7b9da43..bf82b0187 100644 --- a/valkey.conf +++ b/valkey.conf @@ -818,6 +818,13 @@ replica-priority 100 # # replica-ignore-disk-write-errors no +# Make the primary forbid expiration and eviction. +# This is useful for sync tools, because expiration and eviction may cause the data corruption. +# Sync tools can mark their connections as importing source by CLIENT IMPORT-SOURCE. +# NOTICE: Clients should avoid writing the same key on the source server and the destination server. +# +# import-mode no + # ----------------------------------------------------------------------------- # By default, Sentinel includes all replicas in its reports. A replica # can be excluded from Sentinel's announcements. An unannounced replica