mirror of
http://github.com/valkey-io/valkey
synced 2024-11-21 16:46:15 +00:00
Remove EVAL script verbatim replication, propagation, and deterministic execution logic (#9812)
# Background The main goal of this PR is to remove relevant logics on Lua script verbatim replication, only keeping effects replication logic, which has been set as default since Redis 5.0. As a result, Lua in Redis 7.0 would be acting the same as Redis 6.0 with default configuration from users' point of view. There are lots of reasons to remove verbatim replication. Antirez has listed some of the benefits in Issue #5292: >1. No longer need to explain to users side effects into scripts. They can do whatever they want. >2. No need for a cache about scripts that we sent or not to the slaves. >3. No need to sort the output of certain commands inside scripts (SMEMBERS and others): this both simplifies and gains speed. >4. No need to store scripts inside the RDB file in order to startup correctly. >5. No problems about evicting keys during the script execution. When looking back at Redis 5.0, antirez and core team decided to set the config `lua-replicate-commands yes` by default instead of removing verbatim replication directly, in case some bad situations happened. 3 years later now before Redis 7.0, it's time to remove it formally. # Changes - configuration for lua-replicate-commands removed - created config file stub for backward compatibility - Replication script cache removed - this is useless under script effects replication - relevant statistics also removed - script persistence in RDB files is also removed - Propagation of SCRIPT LOAD and SCRIPT FLUSH to replica / AOF removed - Deterministic execution logic in scripts removed (i.e. don't run write commands after random ones, and sorting output of commands with random order) - the flags indicating which commands have non-deterministic results are kept as hints to clients. - `redis.replicate_commands()` & `redis.set_repl()` changed - now `redis.replicate_commands()` does nothing and return an 1 - ...and then `redis.set_repl()` can be issued before `redis.replicate_commands()` now - Relevant TCL cases adjusted - DEBUG lua-always-replicate-commands removed # Other changes - Fix a recent bug comparing CLIENT_ID_AOF to original_client->flags instead of id. (introduced in #9780) Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
febc3f63b2
commit
1b0968df46
@ -1777,7 +1777,6 @@ int rewriteAppendOnlyFileBackground(void) {
|
||||
* accumulated by the parent into server.aof_rewrite_buf will start
|
||||
* with a SELECT statement and it will be safe to merge. */
|
||||
server.aof_selected_db = -1;
|
||||
replicationScriptCacheFlush();
|
||||
return C_OK;
|
||||
}
|
||||
return C_OK; /* unreached */
|
||||
|
@ -3247,10 +3247,10 @@ struct redisCommandArg SCRIPT_LOAD_Args[] = {
|
||||
struct redisCommand SCRIPT_Subcommands[] = {
|
||||
{"debug","Set the debug mode for executed scripts.","O(1)","3.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_DEBUG_History,SCRIPT_DEBUG_Hints,scriptCommand,3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_DEBUG_Args},
|
||||
{"exists","Check existence of scripts in the script cache.","O(N) with N being the number of scripts to check (so checking a single script is an O(1) operation).","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_EXISTS_History,SCRIPT_EXISTS_Hints,scriptCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_EXISTS_Args},
|
||||
{"flush","Remove all the scripts from the script cache.","O(N) with N being the number of scripts in cache","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_FLUSH_History,SCRIPT_FLUSH_Hints,scriptCommand,-2,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_FLUSH_Args},
|
||||
{"flush","Remove all the scripts from the script cache.","O(N) with N being the number of scripts in cache","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_FLUSH_History,SCRIPT_FLUSH_Hints,scriptCommand,-2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_FLUSH_Args},
|
||||
{"help","Show helpful text about the different subcommands","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_HELP_History,SCRIPT_HELP_Hints,scriptCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING},
|
||||
{"kill","Kill the script currently in execution.","O(1)","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_KILL_History,SCRIPT_KILL_Hints,scriptCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{"load","Load the specified Lua script into the script cache.","O(N) with N being the length in bytes of the script body.","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_LOAD_History,SCRIPT_LOAD_Hints,scriptCommand,3,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_LOAD_Args},
|
||||
{"load","Load the specified Lua script into the script cache.","O(N) with N being the length in bytes of the script body.","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,SCRIPT_LOAD_History,SCRIPT_LOAD_Hints,scriptCommand,3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=SCRIPT_LOAD_Args},
|
||||
{0}
|
||||
};
|
||||
|
||||
|
@ -14,8 +14,7 @@
|
||||
]
|
||||
],
|
||||
"command_flags": [
|
||||
"NOSCRIPT",
|
||||
"MAY_REPLICATE"
|
||||
"NOSCRIPT"
|
||||
],
|
||||
"acl_categories": [
|
||||
"SCRIPTING"
|
||||
|
@ -8,8 +8,7 @@
|
||||
"container": "SCRIPT",
|
||||
"function": "scriptCommand",
|
||||
"command_flags": [
|
||||
"NOSCRIPT",
|
||||
"MAY_REPLICATE"
|
||||
"NOSCRIPT"
|
||||
],
|
||||
"acl_categories": [
|
||||
"SCRIPTING"
|
||||
|
30
src/config.c
30
src/config.c
@ -45,6 +45,12 @@ typedef struct configEnum {
|
||||
const int val;
|
||||
} configEnum;
|
||||
|
||||
typedef struct deprecatedConfig {
|
||||
const char *name;
|
||||
const int argc_min;
|
||||
const int argc_max;
|
||||
} deprecatedConfig;
|
||||
|
||||
configEnum maxmemory_policy_enum[] = {
|
||||
{"volatile-lru", MAXMEMORY_VOLATILE_LRU},
|
||||
{"volatile-lfu", MAXMEMORY_VOLATILE_LFU},
|
||||
@ -405,6 +411,12 @@ void initConfigValues() {
|
||||
static int reading_config_file;
|
||||
|
||||
void loadServerConfigFromString(char *config) {
|
||||
deprecatedConfig deprecated_configs[] = {
|
||||
{"list-max-ziplist-entries", 2, 2},
|
||||
{"list-max-ziplist-value", 2, 2},
|
||||
{"lua-replicate-commands", 2, 2},
|
||||
{NULL, 0},
|
||||
};
|
||||
char buf[1024];
|
||||
const char *err = NULL;
|
||||
int linenum = 0, totlines, i;
|
||||
@ -459,6 +471,19 @@ void loadServerConfigFromString(char *config) {
|
||||
}
|
||||
}
|
||||
|
||||
/* If there's no matching above, we try matching them with deprecated configs */
|
||||
if (!match) {
|
||||
for (deprecatedConfig *config = deprecated_configs; config->name != NULL; config++) {
|
||||
if (!strcasecmp(argv[0], config->name) &&
|
||||
config->argc_min <= argc &&
|
||||
argc <= config->argc_max)
|
||||
{
|
||||
match = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (match) {
|
||||
sdsfreesplitres(argv,argc);
|
||||
continue;
|
||||
@ -467,10 +492,6 @@ void loadServerConfigFromString(char *config) {
|
||||
/* Execute config directives */
|
||||
if (!strcasecmp(argv[0],"include") && argc == 2) {
|
||||
loadServerConfig(argv[1], 0, NULL);
|
||||
} else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){
|
||||
/* DEAD OPTION */
|
||||
} else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2) {
|
||||
/* DEAD OPTION */
|
||||
} else if (!strcasecmp(argv[0],"rename-command") && argc == 3) {
|
||||
struct redisCommand *cmd = lookupCommandBySds(argv[1]);
|
||||
int retval;
|
||||
@ -2572,7 +2593,6 @@ standardConfig configs[] = {
|
||||
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
|
||||
createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL),
|
||||
createBoolConfig("io-threads-do-reads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_threads_do_reads, 0,NULL, NULL), /* Read + parse from threads? */
|
||||
createBoolConfig("lua-replicate-commands", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lua_always_replicate_commands, 1, NULL, NULL),
|
||||
createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL),
|
||||
createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL),
|
||||
createBoolConfig("rdbcompression", NULL, MODIFIABLE_CONFIG, server.rdb_compression, 1, NULL, NULL),
|
||||
|
@ -417,9 +417,6 @@ void debugCommand(client *c) {
|
||||
" Like HTSTATS but for the hash table stored at <key>'s value.",
|
||||
"LOADAOF",
|
||||
" Flush the AOF buffers on disk and reload the AOF in memory.",
|
||||
"LUA-ALWAYS-REPLICATE-COMMANDS <0|1>",
|
||||
" Setting it to 1 makes Lua replication defaulting to replicating single",
|
||||
" commands, without the script having to enable effects replication.",
|
||||
#ifdef USE_JEMALLOC
|
||||
"MALLCTL <key> [<val>]",
|
||||
" Get or set a malloc tuning integer.",
|
||||
@ -832,11 +829,6 @@ NULL
|
||||
{
|
||||
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"lua-always-replicate-commands") &&
|
||||
c->argc == 3)
|
||||
{
|
||||
server.lua_always_replicate_commands = atoi(c->argv[2]->ptr);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) {
|
||||
sds errstr = sdsnewlen("-",1);
|
||||
|
||||
|
@ -940,7 +940,6 @@ long defragOtherGlobals() {
|
||||
* but we assume most of these are short lived, we only need to defrag allocations
|
||||
* that remain static for a long time */
|
||||
defragged += activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_IS_STROB);
|
||||
defragged += activeDefragSdsListAndDict(server.repl_scriptcache_fifo, server.repl_scriptcache_dict, DEFRAG_SDS_DICT_NO_VAL);
|
||||
defragged += moduleDefragGlobals();
|
||||
return defragged;
|
||||
}
|
||||
|
68
src/eval.c
68
src/eval.c
@ -54,7 +54,6 @@ struct luaCtx {
|
||||
char *lua_cur_script; /* SHA1 of the script currently running, or NULL */
|
||||
dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
|
||||
unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */
|
||||
int lua_replicate_commands; /* True if we are doing single commands repl. */
|
||||
} lctx;
|
||||
|
||||
/* Debugger shared state is stored inside this global structure. */
|
||||
@ -140,23 +139,13 @@ int luaRedisDebugCommand(lua_State *lua) {
|
||||
|
||||
/* redis.replicate_commands()
|
||||
*
|
||||
* DEPRECATED: Now do nothing and always return true.
|
||||
* Turn on single commands replication if the script never called
|
||||
* a write command so far, and returns true. Otherwise if the script
|
||||
* already started to write, returns false and stick to whole scripts
|
||||
* replication, which is our default. */
|
||||
int luaRedisReplicateCommandsCommand(lua_State *lua) {
|
||||
scriptRunCtx* rctx = luaGetFromRegistry(lua, REGISTRY_RUN_CTX_NAME);
|
||||
if (rctx->flags & SCRIPT_WRITE_DIRTY) {
|
||||
lua_pushboolean(lua,0);
|
||||
} else {
|
||||
lctx.lua_replicate_commands = 1;
|
||||
rctx->flags &= ~SCRIPT_EVAL_REPLICATION;
|
||||
/* When we switch to single commands replication, we can provide
|
||||
* different math.random() sequences at every call, which is what
|
||||
* the user normally expects. */
|
||||
redisSrand48(rand());
|
||||
lua_pushboolean(lua,1);
|
||||
}
|
||||
lua_pushboolean(lua,1);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -373,13 +362,6 @@ void evalGenericCommand(client *c, int evalsha) {
|
||||
lua_State *lua = lctx.lua;
|
||||
char funcname[43];
|
||||
long long numkeys;
|
||||
long long initial_server_dirty = server.dirty;
|
||||
|
||||
/* When we replicate whole scripts, we want the same PRNG sequence at
|
||||
* every call so that our PRNG is not affected by external state. */
|
||||
redisSrand48(0);
|
||||
|
||||
lctx.lua_replicate_commands = server.lua_always_replicate_commands;
|
||||
|
||||
/* Get the number of arguments that are keys */
|
||||
if (getLongLongFromObjectOrReply(c,c->argv[2],&numkeys,NULL) != C_OK)
|
||||
@ -445,7 +427,7 @@ void evalGenericCommand(client *c, int evalsha) {
|
||||
scriptPrepareForRun(&rctx, lctx.lua_client, c, lctx.lua_cur_script);
|
||||
rctx.flags |= SCRIPT_EVAL_MODE; /* mark the current run as legacy so we
|
||||
will get legacy error messages and logs */
|
||||
if (!lctx.lua_replicate_commands) rctx.flags |= SCRIPT_EVAL_REPLICATION;
|
||||
|
||||
/* This check is for EVAL_RO, EVALSHA_RO. We want to allow only read only commands */
|
||||
if ((server.script_caller->cmd->proc == evalRoCommand ||
|
||||
server.script_caller->cmd->proc == evalShaRoCommand)) {
|
||||
@ -457,43 +439,6 @@ void evalGenericCommand(client *c, int evalsha) {
|
||||
scriptResetRun(&rctx);
|
||||
|
||||
lctx.lua_cur_script = NULL;
|
||||
|
||||
/* EVALSHA should be propagated to Slave and AOF file as full EVAL, unless
|
||||
* we are sure that the script was already in the context of all the
|
||||
* attached slaves *and* the current AOF file if enabled.
|
||||
*
|
||||
* To do so we use a cache of SHA1s of scripts that we already propagated
|
||||
* as full EVAL, that's called the Replication Script Cache.
|
||||
*
|
||||
* For replication, every time a new slave attaches to the master, we need to
|
||||
* flush our cache of scripts that can be replicated as EVALSHA, while
|
||||
* for AOF we need to do so every time we rewrite the AOF file. */
|
||||
if (evalsha && !lctx.lua_replicate_commands) {
|
||||
if (!replicationScriptCacheExists(c->argv[1]->ptr)) {
|
||||
/* This script is not in our script cache, replicate it as
|
||||
* EVAL, then add it into the script cache, as from now on
|
||||
* slaves and AOF know about it. */
|
||||
robj *script = dictFetchValue(lctx.lua_scripts,c->argv[1]->ptr);
|
||||
|
||||
replicationScriptCacheAdd(c->argv[1]->ptr);
|
||||
serverAssertWithInfo(c,NULL,script != NULL);
|
||||
|
||||
/* If the script did not produce any changes in the dataset we want
|
||||
* just to replicate it as SCRIPT LOAD, otherwise we risk running
|
||||
* an aborted script on slaves (that may then produce results there)
|
||||
* or just running a CPU costly read-only script on the slaves. */
|
||||
if (server.dirty == initial_server_dirty) {
|
||||
rewriteClientCommandVector(c,3,
|
||||
shared.script,
|
||||
shared.load,
|
||||
script);
|
||||
} else {
|
||||
rewriteClientCommandArgument(c,0,shared.eval);
|
||||
rewriteClientCommandArgument(c,1,script);
|
||||
}
|
||||
forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void evalCommand(client *c) {
|
||||
@ -568,8 +513,6 @@ NULL
|
||||
}
|
||||
scriptingReset(async);
|
||||
addReply(c,shared.ok);
|
||||
replicationScriptCacheFlush();
|
||||
server.dirty++; /* Propagating this command is a good idea. */
|
||||
} else if (c->argc >= 2 && !strcasecmp(c->argv[1]->ptr,"exists")) {
|
||||
int j;
|
||||
|
||||
@ -584,7 +527,6 @@ NULL
|
||||
sds sha = luaCreateFunction(c,c->argv[2]);
|
||||
if (sha == NULL) return; /* The error was sent by luaCreateFunction(). */
|
||||
addReplyBulkCBuffer(c,sha,40);
|
||||
forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
|
||||
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"kill")) {
|
||||
scriptKill(c, 1);
|
||||
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr,"debug")) {
|
||||
@ -1389,7 +1331,7 @@ void ldbEval(lua_State *lua, sds *argv, int argc) {
|
||||
* implementation, with ldb.step enabled, so as a side effect the Redis command
|
||||
* and its reply are logged. */
|
||||
void ldbRedis(lua_State *lua, sds *argv, int argc) {
|
||||
int j, saved_rc = lctx.lua_replicate_commands;
|
||||
int j;
|
||||
|
||||
if (!lua_checkstack(lua, argc + 1)) {
|
||||
/* Increase the Lua stack if needed to make sure there is enough room
|
||||
@ -1408,10 +1350,8 @@ void ldbRedis(lua_State *lua, sds *argv, int argc) {
|
||||
for (j = 1; j < argc; j++)
|
||||
lua_pushlstring(lua,argv[j],sdslen(argv[j]));
|
||||
ldb.step = 1; /* Force redis.call() to log. */
|
||||
lctx.lua_replicate_commands = 1;
|
||||
lua_pcall(lua,argc-1,1,0); /* Stack: redis, result */
|
||||
ldb.step = 0; /* Disable logging. */
|
||||
lctx.lua_replicate_commands = saved_rc;
|
||||
lua_pop(lua,2); /* Discard the result and clean the stack. */
|
||||
}
|
||||
|
||||
|
@ -1208,12 +1208,6 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
|
||||
mem_total+=mem;
|
||||
|
||||
mem = evalScriptsMemory();
|
||||
mem += dictSize(server.repl_scriptcache_dict) * sizeof(dictEntry) +
|
||||
dictSlots(server.repl_scriptcache_dict) * sizeof(dictEntry*);
|
||||
if (listLength(server.repl_scriptcache_fifo) > 0) {
|
||||
mem += listLength(server.repl_scriptcache_fifo) * (sizeof(listNode) +
|
||||
sdsZmallocSize(listNodeValue(listFirst(server.repl_scriptcache_fifo))));
|
||||
}
|
||||
mh->lua_caches = mem;
|
||||
mem_total+=mem;
|
||||
mh->functions_caches = functionsMemoryOverhead();
|
||||
|
22
src/rdb.c
22
src/rdb.c
@ -1319,21 +1319,6 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
|
||||
di = NULL; /* So that we don't release it again on error. */
|
||||
}
|
||||
|
||||
/* If we are storing the replication information on disk, persist
|
||||
* the script cache as well: on successful PSYNC after a restart, we need
|
||||
* to be able to process any EVALSHA inside the replication backlog the
|
||||
* master will send us. */
|
||||
if (rsi && dictSize(evalScriptsDict())) {
|
||||
di = dictGetIterator(evalScriptsDict());
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
robj *body = dictGetVal(de);
|
||||
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
|
||||
goto werr;
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
di = NULL; /* So that we don't release it again on error. */
|
||||
}
|
||||
|
||||
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
|
||||
|
||||
/* EOF opcode */
|
||||
@ -2900,12 +2885,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||
} else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
|
||||
if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
|
||||
} else if (!strcasecmp(auxkey->ptr,"lua")) {
|
||||
/* Load the script back in memory. */
|
||||
if (luaCreateFunction(NULL, auxval) == NULL) {
|
||||
rdbReportCorruptRDB(
|
||||
"Can't load Lua script from RDB file! "
|
||||
"BODY: %s", (char*)auxval->ptr);
|
||||
}
|
||||
/* Won't load the script back in memory anymore. */
|
||||
} else if (!strcasecmp(auxkey->ptr,"redis-ver")) {
|
||||
serverLog(LL_NOTICE,"Loading RDB produced by version %s",
|
||||
(char*)auxval->ptr);
|
||||
|
@ -887,9 +887,6 @@ int startBgsaveForReplication(int mincapa) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Flush the script cache, since we need that slave differences are
|
||||
* accumulated without requiring slaves to match our cached scripts. */
|
||||
if (retval == C_OK) replicationScriptCacheFlush();
|
||||
return retval;
|
||||
}
|
||||
|
||||
@ -3277,90 +3274,6 @@ void refreshGoodSlavesCount(void) {
|
||||
server.repl_good_slaves_count = good;
|
||||
}
|
||||
|
||||
/* ----------------------- REPLICATION SCRIPT CACHE --------------------------
|
||||
* The goal of this code is to keep track of scripts already sent to every
|
||||
* connected slave, in order to be able to replicate EVALSHA as it is without
|
||||
* translating it to EVAL every time it is possible.
|
||||
*
|
||||
* We use a capped collection implemented by a hash table for fast lookup
|
||||
* of scripts we can send as EVALSHA, plus a linked list that is used for
|
||||
* eviction of the oldest entry when the max number of items is reached.
|
||||
*
|
||||
* We don't care about taking a different cache for every different slave
|
||||
* since to fill the cache again is not very costly, the goal of this code
|
||||
* is to avoid that the same big script is transmitted a big number of times
|
||||
* per second wasting bandwidth and processor speed, but it is not a problem
|
||||
* if we need to rebuild the cache from scratch from time to time, every used
|
||||
* script will need to be transmitted a single time to reappear in the cache.
|
||||
*
|
||||
* This is how the system works:
|
||||
*
|
||||
* 1) Every time a new slave connects, we flush the whole script cache.
|
||||
* 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
|
||||
* trying to convert EVAL into EVALSHA specifically for slaves.
|
||||
* 3) Every time we transmit a script as EVAL to the slaves, we also add the
|
||||
* corresponding SHA1 of the script into the cache as we are sure every
|
||||
* slave knows about the script starting from now.
|
||||
* 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
|
||||
* and at the same time flush the script cache.
|
||||
* 5) When the last slave disconnects, flush the cache.
|
||||
* 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
|
||||
* in the master sometimes.
|
||||
*/
|
||||
|
||||
/* Initialize the script cache, only called at startup. */
|
||||
void replicationScriptCacheInit(void) {
|
||||
server.repl_scriptcache_size = 10000;
|
||||
server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType);
|
||||
server.repl_scriptcache_fifo = listCreate();
|
||||
}
|
||||
|
||||
/* Empty the script cache. Should be called every time we are no longer sure
|
||||
* that every slave knows about all the scripts in our set, or when the
|
||||
* current AOF "context" is no longer aware of the script. In general we
|
||||
* should flush the cache:
|
||||
*
|
||||
* 1) Every time a new slave reconnects to this master and performs a
|
||||
* full SYNC (PSYNC does not require flushing).
|
||||
* 2) Every time an AOF rewrite is performed.
|
||||
* 3) Every time we are left without slaves at all, and AOF is off, in order
|
||||
* to reclaim otherwise unused memory.
|
||||
*/
|
||||
void replicationScriptCacheFlush(void) {
|
||||
dictEmpty(server.repl_scriptcache_dict,NULL);
|
||||
listRelease(server.repl_scriptcache_fifo);
|
||||
server.repl_scriptcache_fifo = listCreate();
|
||||
}
|
||||
|
||||
/* Add an entry into the script cache, if we reach max number of entries the
|
||||
* oldest is removed from the list. */
|
||||
void replicationScriptCacheAdd(sds sha1) {
|
||||
int retval;
|
||||
sds key = sdsdup(sha1);
|
||||
|
||||
/* Evict oldest. */
|
||||
if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
|
||||
{
|
||||
listNode *ln = listLast(server.repl_scriptcache_fifo);
|
||||
sds oldest = listNodeValue(ln);
|
||||
|
||||
retval = dictDelete(server.repl_scriptcache_dict,oldest);
|
||||
serverAssert(retval == DICT_OK);
|
||||
listDelNode(server.repl_scriptcache_fifo,ln);
|
||||
}
|
||||
|
||||
/* Add current. */
|
||||
retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
|
||||
listAddNodeHead(server.repl_scriptcache_fifo,key);
|
||||
serverAssert(retval == DICT_OK);
|
||||
}
|
||||
|
||||
/* Returns non-zero if the specified entry exists inside the cache, that is,
|
||||
* if all the slaves are aware of this script SHA1. */
|
||||
int replicationScriptCacheExists(sds sha1) {
|
||||
return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
|
||||
}
|
||||
|
||||
/* ----------------------- SYNCHRONOUS REPLICATION --------------------------
|
||||
* Redis synchronous replication design can be summarized in points:
|
||||
*
|
||||
@ -3694,16 +3607,6 @@ void replicationCron(void) {
|
||||
}
|
||||
}
|
||||
|
||||
/* If AOF is disabled and we no longer have attached slaves, we can
|
||||
* free our Replication Script Cache as there is no need to propagate
|
||||
* EVALSHA at all. */
|
||||
if (listLength(server.slaves) == 0 &&
|
||||
server.aof_state == AOF_OFF &&
|
||||
listLength(server.repl_scriptcache_fifo) != 0)
|
||||
{
|
||||
replicationScriptCacheFlush();
|
||||
}
|
||||
|
||||
replicationStartPendingFork();
|
||||
|
||||
/* Remove the RDB file used for replication if Redis is not running
|
||||
|
36
src/script.c
36
src/script.c
@ -148,11 +148,10 @@ void scriptResetRun(scriptRunCtx *run_ctx) {
|
||||
unprotectClient(run_ctx->original_client);
|
||||
}
|
||||
|
||||
if (!(run_ctx->flags & SCRIPT_EVAL_REPLICATION)) {
|
||||
preventCommandPropagation(run_ctx->original_client);
|
||||
if (run_ctx->flags & SCRIPT_MULTI_EMMITED) {
|
||||
execCommandPropagateExec(run_ctx->original_client->db->id);
|
||||
}
|
||||
/* emit EXEC if MULTI has been propagated. */
|
||||
preventCommandPropagation(run_ctx->original_client);
|
||||
if (run_ctx->flags & SCRIPT_MULTI_EMMITED) {
|
||||
execCommandPropagateExec(run_ctx->original_client->db->id);
|
||||
}
|
||||
|
||||
/* unset curr_run_ctx so we will know there is no running script */
|
||||
@ -258,17 +257,12 @@ static int scriptVerifyWriteCommandAllow(scriptRunCtx *run_ctx, char **err) {
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
if ((run_ctx->flags & SCRIPT_RANDOM_DIRTY) && (run_ctx->flags & SCRIPT_EVAL_REPLICATION)) {
|
||||
*err = sdsnew("Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode.");
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* Write commands are forbidden against read-only slaves, or if a
|
||||
* command marked as non-deterministic was already called in the context
|
||||
* of this script. */
|
||||
int deny_write_type = writeCommandsDeniedByDiskError();
|
||||
|
||||
if (server.masterhost && server.repl_slave_ro && run_ctx->original_client->flags != CLIENT_ID_AOF
|
||||
if (server.masterhost && server.repl_slave_ro && run_ctx->original_client->id != CLIENT_ID_AOF
|
||||
&& !(run_ctx->original_client->flags & CLIENT_MASTER))
|
||||
{
|
||||
*err = sdsdup(shared.roslaveerr->ptr);
|
||||
@ -343,8 +337,7 @@ static void scriptEmitMultiIfNeeded(scriptRunCtx *run_ctx) {
|
||||
* we propagate into a MULTI/EXEC block, so that it will be atomic like
|
||||
* a Lua script in the context of AOF and slaves. */
|
||||
client *c = run_ctx->c;
|
||||
if (!(run_ctx->flags & SCRIPT_EVAL_REPLICATION)
|
||||
&& !(run_ctx->flags & SCRIPT_MULTI_EMMITED)
|
||||
if (!(run_ctx->flags & SCRIPT_MULTI_EMMITED)
|
||||
&& !(run_ctx->original_client->flags & CLIENT_MULTI)
|
||||
&& (run_ctx->flags & SCRIPT_WRITE_DIRTY)
|
||||
&& ((run_ctx->repl_flags & PROPAGATE_AOF)
|
||||
@ -426,11 +419,6 @@ void scriptCall(scriptRunCtx *run_ctx, robj* *argv, int argc, sds *err) {
|
||||
run_ctx->flags |= SCRIPT_WRITE_DIRTY;
|
||||
}
|
||||
|
||||
if (cmd->flags & CMD_RANDOM) {
|
||||
/* signify that we already perform a random command in this execution */
|
||||
run_ctx->flags |= SCRIPT_RANDOM_DIRTY;
|
||||
}
|
||||
|
||||
if (scriptVerifyClusterState(c, run_ctx->original_client, err) != C_OK) {
|
||||
return;
|
||||
}
|
||||
@ -438,13 +426,11 @@ void scriptCall(scriptRunCtx *run_ctx, robj* *argv, int argc, sds *err) {
|
||||
scriptEmitMultiIfNeeded(run_ctx);
|
||||
|
||||
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
|
||||
if (!(run_ctx->flags & SCRIPT_EVAL_REPLICATION)) {
|
||||
if (run_ctx->repl_flags & PROPAGATE_AOF) {
|
||||
call_flags |= CMD_CALL_PROPAGATE_AOF;
|
||||
}
|
||||
if (run_ctx->repl_flags & PROPAGATE_REPL) {
|
||||
call_flags |= CMD_CALL_PROPAGATE_REPL;
|
||||
}
|
||||
if (run_ctx->repl_flags & PROPAGATE_AOF) {
|
||||
call_flags |= CMD_CALL_PROPAGATE_AOF;
|
||||
}
|
||||
if (run_ctx->repl_flags & PROPAGATE_REPL) {
|
||||
call_flags |= CMD_CALL_PROPAGATE_REPL;
|
||||
}
|
||||
call(c, call_flags);
|
||||
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
|
||||
|
@ -59,16 +59,12 @@
|
||||
|
||||
/* runCtx flags */
|
||||
#define SCRIPT_WRITE_DIRTY (1ULL<<0) /* indicate that the current script already performed a write command */
|
||||
#define SCRIPT_RANDOM_DIRTY (1ULL<<1) /* indicate that the current script already performed a random reply command.
|
||||
Thanks to this flag we'll raise an error every time a write command
|
||||
is called after a random command and prevent none deterministic
|
||||
replication or AOF. */
|
||||
|
||||
#define SCRIPT_MULTI_EMMITED (1ULL<<2) /* indicate that we already wrote a multi command to replication/aof */
|
||||
#define SCRIPT_TIMEDOUT (1ULL<<3) /* indicate that the current script timedout */
|
||||
#define SCRIPT_KILLED (1ULL<<4) /* indicate that the current script was marked to be killed */
|
||||
#define SCRIPT_READ_ONLY (1ULL<<5) /* indicate that the current script should only perform read commands */
|
||||
#define SCRIPT_EVAL_REPLICATION (1ULL<<6) /* mode for eval, indicate that we replicate the
|
||||
script invocation and not the effects */
|
||||
|
||||
#define SCRIPT_EVAL_MODE (1ULL<<7) /* Indicate that the current script called from legacy Lua */
|
||||
typedef struct scriptRunCtx scriptRunCtx;
|
||||
|
||||
|
@ -461,36 +461,6 @@ static int luaRaiseError(lua_State *lua) {
|
||||
return lua_error(lua);
|
||||
}
|
||||
|
||||
/* Sort the array currently in the stack. We do this to make the output
|
||||
* of commands like KEYS or SMEMBERS something deterministic when called
|
||||
* from Lua (to play well with AOf/replication).
|
||||
*
|
||||
* The array is sorted using table.sort itself, and assuming all the
|
||||
* list elements are strings. */
|
||||
static void luaSortArray(lua_State *lua) {
|
||||
/* Initial Stack: array */
|
||||
lua_getglobal(lua,"table");
|
||||
lua_pushstring(lua,"sort");
|
||||
lua_gettable(lua,-2); /* Stack: array, table, table.sort */
|
||||
lua_pushvalue(lua,-3); /* Stack: array, table, table.sort, array */
|
||||
if (lua_pcall(lua,1,0,0)) {
|
||||
/* Stack: array, table, error */
|
||||
|
||||
/* We are not interested in the error, we assume that the problem is
|
||||
* that there are 'false' elements inside the array, so we try
|
||||
* again with a slower function but able to handle this case, that
|
||||
* is: table.sort(table, __redis__compare_helper) */
|
||||
lua_pop(lua,1); /* Stack: array, table */
|
||||
lua_pushstring(lua,"sort"); /* Stack: array, table, sort */
|
||||
lua_gettable(lua,-2); /* Stack: array, table, table.sort */
|
||||
lua_pushvalue(lua,-3); /* Stack: array, table, table.sort, array */
|
||||
lua_getglobal(lua,"__redis__compare_helper");
|
||||
/* Stack: array, table, table.sort, array, __redis__compare_helper */
|
||||
lua_call(lua,2,0);
|
||||
}
|
||||
/* Stack: array (sorted), table */
|
||||
lua_pop(lua,1); /* Stack: array (sorted) */
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------------------
|
||||
* Lua reply to Redis reply conversion functions.
|
||||
@ -826,13 +796,6 @@ static int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
if (ldbIsEnabled())
|
||||
ldbLogRedisReply(reply);
|
||||
|
||||
/* Sort the output array if needed, assuming it is a non-null multi bulk
|
||||
* reply as expected. */
|
||||
if ((c->cmd->flags & CMD_SORT_FOR_SCRIPT) &&
|
||||
(rctx->flags & SCRIPT_EVAL_REPLICATION) &&
|
||||
(reply[0] == '*' && reply[1] != '-')) {
|
||||
luaSortArray(lua);
|
||||
}
|
||||
if (reply != c->buf) sdsfree(reply);
|
||||
c->reply_bytes = 0;
|
||||
|
||||
@ -945,17 +908,13 @@ static int luaRedisStatusReplyCommand(lua_State *lua) {
|
||||
* Set the propagation of write commands executed in the context of the
|
||||
* script to on/off for AOF and slaves. */
|
||||
static int luaRedisSetReplCommand(lua_State *lua) {
|
||||
int argc = lua_gettop(lua);
|
||||
int flags;
|
||||
int flags, argc = lua_gettop(lua);
|
||||
|
||||
scriptRunCtx* rctx = luaGetFromRegistry(lua, REGISTRY_RUN_CTX_NAME);
|
||||
|
||||
if (rctx->flags & SCRIPT_EVAL_REPLICATION) {
|
||||
lua_pushstring(lua, "You can set the replication behavior only after turning on single commands replication with redis.replicate_commands().");
|
||||
return lua_error(lua);
|
||||
} else if (argc != 1) {
|
||||
lua_pushstring(lua, "redis.set_repl() requires two arguments.");
|
||||
return lua_error(lua);
|
||||
if (argc != 1) {
|
||||
lua_pushstring(lua, "redis.set_repl() requires two arguments.");
|
||||
return lua_error(lua);
|
||||
}
|
||||
|
||||
flags = lua_tonumber(lua,-1);
|
||||
|
@ -2408,12 +2408,11 @@ void initServer(void) {
|
||||
}
|
||||
|
||||
if (server.cluster_enabled) clusterInit();
|
||||
replicationScriptCacheInit();
|
||||
scriptingInit(1);
|
||||
functionsInit();
|
||||
slowlogInit();
|
||||
latencyMonitorInit();
|
||||
|
||||
|
||||
/* Initialize ACL default password if it exists */
|
||||
ACLUpdateDefaultUserPassword(server.requirepass);
|
||||
|
||||
|
@ -1749,7 +1749,6 @@ struct redisServer {
|
||||
/* Scripting */
|
||||
client *script_caller; /* The client running script right now, or NULL */
|
||||
mstime_t script_time_limit; /* Script timeout in milliseconds */
|
||||
int lua_always_replicate_commands; /* Default replication type. */
|
||||
int script_oom; /* OOM detected when script start */
|
||||
int script_disable_deny_script; /* Allow running commands marked "no-script" inside a script. */
|
||||
/* Lazy free */
|
||||
@ -2498,10 +2497,6 @@ void resizeReplicationBacklog();
|
||||
void replicationSetMaster(char *ip, int port);
|
||||
void replicationUnsetMaster(void);
|
||||
void refreshGoodSlavesCount(void);
|
||||
void replicationScriptCacheInit(void);
|
||||
void replicationScriptCacheFlush(void);
|
||||
void replicationScriptCacheAdd(sds sha1);
|
||||
int replicationScriptCacheExists(sds sha1);
|
||||
void processClientsWaitingReplicas(void);
|
||||
void unblockClientWaitingReplicas(client *c);
|
||||
int replicationCountAcksByOffset(long long offset);
|
||||
|
BIN
tests/assets/scriptbackup.rdb
Normal file
BIN
tests/assets/scriptbackup.rdb
Normal file
Binary file not shown.
@ -485,4 +485,41 @@ tags {"aof external:skip"} {
|
||||
catch {exec src/redis-check-aof --truncate-to-timestamp 1628217469 $aof_path} e
|
||||
assert_match {*aborting*} $e
|
||||
}
|
||||
|
||||
test {EVAL timeout with slow verbatim Lua script from AOF} {
|
||||
create_aof {
|
||||
append_to_aof [formatCommand select 9]
|
||||
append_to_aof [formatCommand eval {redis.call('set',KEYS[1],'y'); for i=1,1500000 do redis.call('ping') end return 'ok'} 1 x]
|
||||
}
|
||||
|
||||
start_server [list overrides [list dir $server_path appendonly no lua-time-limit 1 aof-use-rdb-preamble no]] {
|
||||
# generate a long running script that is propagated to the AOF as script
|
||||
# make sure that the script times out during loading
|
||||
set rd [redis_deferring_client]
|
||||
r config set appendonly yes
|
||||
set start [clock clicks -milliseconds]
|
||||
$rd debug loadaof
|
||||
$rd flush
|
||||
after 100
|
||||
catch {r ping} err
|
||||
assert_match {LOADING*} $err
|
||||
$rd read
|
||||
set elapsed [expr [clock clicks -milliseconds]-$start]
|
||||
if {$::verbose} { puts "loading took $elapsed milliseconds" }
|
||||
$rd close
|
||||
assert_equal [r get x] y
|
||||
}
|
||||
}
|
||||
|
||||
test {EVAL can process writes from AOF in read-only replicas} {
|
||||
create_aof {
|
||||
append_to_aof [formatCommand select 9]
|
||||
append_to_aof [formatCommand eval {redis.call("set",KEYS[1],"100")} 1 foo]
|
||||
append_to_aof [formatCommand eval {redis.call("incr",KEYS[1])} 1 foo]
|
||||
append_to_aof [formatCommand eval {redis.call("incr",KEYS[1])} 1 foo]
|
||||
}
|
||||
start_server [list overrides [list dir $server_path appendonly yes replica-read-only yes replicaof "127.0.0.1 0"]] {
|
||||
assert_equal [r get foo] 102
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -371,73 +371,6 @@ start_server {} {
|
||||
assert {$sync_count == $new_sync_count}
|
||||
}
|
||||
|
||||
test "PSYNC2: Replica RDB restart with EVALSHA in backlog issue #4483" {
|
||||
# Pick a random slave
|
||||
set slave_id [expr {($master_id+1)%5}]
|
||||
set sync_count [status $R($master_id) sync_full]
|
||||
|
||||
# Make sure to replicate the first EVAL while the salve is online
|
||||
# so that it's part of the scripts the master believes it's safe
|
||||
# to propagate as EVALSHA.
|
||||
$R($master_id) EVAL {return redis.call("incr","__mycounter")} 0
|
||||
$R($master_id) EVALSHA e6e0b547500efcec21eddb619ac3724081afee89 0
|
||||
|
||||
# Wait for the two to sync
|
||||
wait_for_condition 50 1000 {
|
||||
[$R($master_id) debug digest] == [$R($slave_id) debug digest]
|
||||
} else {
|
||||
show_cluster_status
|
||||
fail "Replica not reconnecting"
|
||||
}
|
||||
|
||||
# Prevent the slave from receiving master updates, and at
|
||||
# the same time send a new script several times to the
|
||||
# master, so that we'll end with EVALSHA into the backlog.
|
||||
$R($slave_id) slaveof 127.0.0.1 0
|
||||
|
||||
$R($master_id) EVALSHA e6e0b547500efcec21eddb619ac3724081afee89 0
|
||||
$R($master_id) EVALSHA e6e0b547500efcec21eddb619ac3724081afee89 0
|
||||
$R($master_id) EVALSHA e6e0b547500efcec21eddb619ac3724081afee89 0
|
||||
|
||||
catch {
|
||||
$R($slave_id) config rewrite
|
||||
restart_server [expr {0-$slave_id}] true false
|
||||
set R($slave_id) [srv [expr {0-$slave_id}] client]
|
||||
}
|
||||
|
||||
# Reconfigure the slave correctly again, when it's back online.
|
||||
set retry 50
|
||||
while {$retry} {
|
||||
if {[catch {
|
||||
$R($slave_id) slaveof $master_host $master_port
|
||||
}]} {
|
||||
after 1000
|
||||
} else {
|
||||
break
|
||||
}
|
||||
incr retry -1
|
||||
}
|
||||
|
||||
# The master should be back at 4 slaves eventually
|
||||
wait_for_condition 50 1000 {
|
||||
[status $R($master_id) connected_slaves] == 4
|
||||
} else {
|
||||
show_cluster_status
|
||||
fail "Replica not reconnecting"
|
||||
}
|
||||
set new_sync_count [status $R($master_id) sync_full]
|
||||
assert {$sync_count == $new_sync_count}
|
||||
|
||||
# However if the slave started with the full state of the
|
||||
# scripting engine, we should now have the same digest.
|
||||
wait_for_condition 50 1000 {
|
||||
[$R($master_id) debug digest] == [$R($slave_id) debug digest]
|
||||
} else {
|
||||
show_cluster_status
|
||||
fail "Debug digest mismatch between master and replica in post-restart handshake"
|
||||
}
|
||||
}
|
||||
|
||||
if {$no_exit} {
|
||||
while 1 { puts -nonewline .; flush stdout; after 1000}
|
||||
}
|
||||
|
@ -355,4 +355,13 @@ start_server {overrides {save ""}} {
|
||||
}
|
||||
} ;# system_name
|
||||
|
||||
exec cp -f tests/assets/scriptbackup.rdb $server_path
|
||||
start_server [list overrides [list "dir" $server_path "dbfilename" "scriptbackup.rdb" "appendonly" "no"]] {
|
||||
# the script is: "return redis.call('set', 'foo', 'bar')""
|
||||
# its sha1 is: a0c38691e9fffe4563723c32ba77a34398e090e6
|
||||
test {script won't load anymore if it's in rdb} {
|
||||
assert_equal [r script exists a0c38691e9fffe4563723c32ba77a34398e090e6] 0
|
||||
}
|
||||
}
|
||||
|
||||
} ;# tags
|
||||
|
@ -128,91 +128,3 @@ start_server {tags {"repl external:skip"}} {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"repl external:skip"}} {
|
||||
start_server {} {
|
||||
test {First server should have role slave after SLAVEOF} {
|
||||
r -1 slaveof [srv 0 host] [srv 0 port]
|
||||
wait_for_condition 50 100 {
|
||||
[s -1 master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replication not started."
|
||||
}
|
||||
}
|
||||
|
||||
set numops 20000 ;# Enough to trigger the Script Cache LRU eviction.
|
||||
|
||||
# While we are at it, enable AOF to test it will be consistent as well
|
||||
# after the test.
|
||||
r config set appendonly yes
|
||||
|
||||
test {MASTER and SLAVE consistency with EVALSHA replication} {
|
||||
array set oldsha {}
|
||||
for {set j 0} {$j < $numops} {incr j} {
|
||||
set key "key:$j"
|
||||
# Make sure to create scripts that have different SHA1s
|
||||
set script "return redis.call('incr','$key')"
|
||||
set sha1 [r eval "return redis.sha1hex(\"$script\")" 0]
|
||||
set oldsha($j) $sha1
|
||||
r eval $script 0
|
||||
set res [r evalsha $sha1 0]
|
||||
assert {$res == 2}
|
||||
# Additionally call one of the old scripts as well, at random.
|
||||
set res [r evalsha $oldsha([randomInt $j]) 0]
|
||||
assert {$res > 2}
|
||||
|
||||
# Trigger an AOF rewrite while we are half-way, this also
|
||||
# forces the flush of the script cache, and we will cover
|
||||
# more code as a result.
|
||||
if {$j == $numops / 2} {
|
||||
catch {r bgrewriteaof}
|
||||
}
|
||||
}
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[r dbsize] == $numops &&
|
||||
[r -1 dbsize] == $numops &&
|
||||
[r debug digest] eq [r -1 debug digest]
|
||||
} else {
|
||||
set csv1 [csvdump r]
|
||||
set csv2 [csvdump {r -1}]
|
||||
set fd [open /tmp/repldump1.txt w]
|
||||
puts -nonewline $fd $csv1
|
||||
close $fd
|
||||
set fd [open /tmp/repldump2.txt w]
|
||||
puts -nonewline $fd $csv2
|
||||
close $fd
|
||||
puts "Master - Replica inconsistency"
|
||||
puts "Run diff -u against /tmp/repldump*.txt for more info"
|
||||
}
|
||||
|
||||
set old_digest [r debug digest]
|
||||
r config set appendonly no
|
||||
r debug loadaof
|
||||
set new_digest [r debug digest]
|
||||
assert {$old_digest eq $new_digest}
|
||||
}
|
||||
|
||||
test {SLAVE can reload "lua" AUX RDB fields of duplicated scripts} {
|
||||
# Force a Slave full resynchronization
|
||||
r debug change-repl-id
|
||||
r -1 client kill type master
|
||||
|
||||
# Check that after a full resync the slave can still load
|
||||
# correctly the RDB file: such file will contain "lua" AUX
|
||||
# sections with scripts already in the memory of the master.
|
||||
|
||||
wait_for_condition 1000 100 {
|
||||
[s -1 master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replication not started."
|
||||
}
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[r debug digest] eq [r -1 debug digest]
|
||||
} else {
|
||||
fail "DEBUG DIGEST mismatch after full SYNC with many scripts"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -602,26 +602,26 @@ start_server {tags {"multi"}} {
|
||||
test {MULTI propagation of SCRIPT LOAD} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# make sure that SCRIPT LOAD inside MULTI is propagated in a transaction
|
||||
# make sure that SCRIPT LOAD inside MULTI isn't propagated
|
||||
r multi
|
||||
r script load {redis.call('set', KEYS[1], 'foo')}
|
||||
r set foo bar
|
||||
set res [r exec]
|
||||
set sha [lindex $res 0]
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{multi}
|
||||
{script load *}
|
||||
{set foo bar}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
} {} {needs:repl}
|
||||
|
||||
test {MULTI propagation of SCRIPT LOAD} {
|
||||
test {MULTI propagation of EVAL} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# make sure that EVAL inside MULTI is propagated in a transaction
|
||||
r config set lua-replicate-commands no
|
||||
# make sure that EVAL inside MULTI is propagated in a transaction in effects
|
||||
r multi
|
||||
r eval {redis.call('set', KEYS[1], 'bar')} 1 bar
|
||||
r exec
|
||||
@ -629,12 +629,30 @@ start_server {tags {"multi"}} {
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{multi}
|
||||
{eval *}
|
||||
{set bar bar}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
} {} {needs:repl}
|
||||
|
||||
test {MULTI propagation of SCRIPT FLUSH} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# make sure that SCRIPT FLUSH isn't propagated
|
||||
r multi
|
||||
r script flush
|
||||
r set foo bar
|
||||
r exec
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{multi}
|
||||
{set foo bar}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
} {} {need:repl}
|
||||
|
||||
tags {"stream"} {
|
||||
test {MULTI propagation of XREADGROUP} {
|
||||
# stream is a special case because it calls propagate() directly for XREADGROUP
|
||||
|
@ -37,18 +37,11 @@ start_server {tags {"pause network"}} {
|
||||
$rd2 publish foo bar
|
||||
wait_for_blocked_clients_count 2 50 100
|
||||
|
||||
# Test that SCRIPT LOAD, which is replicated.
|
||||
set rd3 [redis_deferring_client]
|
||||
$rd3 script load "return 1"
|
||||
wait_for_blocked_clients_count 3 50 100
|
||||
|
||||
r client unpause
|
||||
assert_match "1" [$rd read]
|
||||
assert_match "0" [$rd2 read]
|
||||
assert_match "*" [$rd3 read]
|
||||
$rd close
|
||||
$rd2 close
|
||||
$rd3 close
|
||||
}
|
||||
|
||||
test "Test read/admin mutli-execs are not blocked by pause RO" {
|
||||
|
@ -227,18 +227,13 @@ start_server {tags {"scripting"}} {
|
||||
assert_error "*xreadgroup command is not allowed with BLOCK option from scripts" {run_script {return redis.pcall('xreadgroup','group','g','c','BLOCK',0,'STREAMS','s','>')} 1 s}
|
||||
}
|
||||
|
||||
if {$is_eval eq 1} {
|
||||
# only is_eval Lua can not execute randomkey
|
||||
test {EVAL - Scripts can't run certain commands} {
|
||||
test {EVAL - Scripts can run non-deterministic commands} {
|
||||
set e {}
|
||||
r debug lua-always-replicate-commands 0
|
||||
catch {
|
||||
run_script "redis.pcall('randomkey'); return redis.pcall('set','x','ciao')" 0
|
||||
} e
|
||||
r debug lua-always-replicate-commands 1
|
||||
set e
|
||||
} {*not allowed after*} {needs:debug}
|
||||
} ;# is_eval
|
||||
} {*OK*}
|
||||
|
||||
test {EVAL - No arguments to redis.call/pcall is considered an error} {
|
||||
set e {}
|
||||
@ -406,16 +401,6 @@ start_server {tags {"scripting"}} {
|
||||
[r evalsha b534286061d4b9e4026607613b95c06c06015ae8 0]
|
||||
} {b534286061d4b9e4026607613b95c06c06015ae8 loaded}
|
||||
|
||||
# reply oredering is only relevant for is_eval Lua
|
||||
test "In the context of Lua the output of random commands gets ordered" {
|
||||
r debug lua-always-replicate-commands 0
|
||||
r del myset
|
||||
r sadd myset a b c d e f g h i l m n o p q r s t u v z aa aaa azz
|
||||
set res [r eval {return redis.call('smembers',KEYS[1])} 1 myset]
|
||||
r debug lua-always-replicate-commands 1
|
||||
set res
|
||||
} {a aa aaa azz b c d e f g h i l m n o p q r s t u v z} {needs:debug}
|
||||
|
||||
test "SORT is normally not alpha re-ordered for the scripting engine" {
|
||||
r del myset
|
||||
r sadd myset 1 2 3 4 10
|
||||
@ -474,10 +459,13 @@ start_server {tags {"scripting"}} {
|
||||
|
||||
if {$is_eval eq 1} {
|
||||
# random handling is only relevant for is_eval Lua
|
||||
test {Scripting engine resets PRNG at every script execution} {
|
||||
test {random numbers are random now} {
|
||||
set rand1 [r eval {return tostring(math.random())} 0]
|
||||
set rand2 [r eval {return tostring(math.random())} 0]
|
||||
assert_equal $rand1 $rand2
|
||||
wait_for_condition 100 1 {
|
||||
$rand1 ne [r eval {return tostring(math.random())} 0]
|
||||
} else {
|
||||
fail "random numbers should be random, now it's fixed value"
|
||||
}
|
||||
}
|
||||
|
||||
test {Scripting engine PRNG can be seeded correctly} {
|
||||
@ -510,78 +498,71 @@ start_server {tags {"scripting"}} {
|
||||
r get x
|
||||
} {10000}
|
||||
|
||||
test {EVAL processes writes from AOF in read-only slaves} {
|
||||
r flushall
|
||||
r config set appendonly yes
|
||||
r config set aof-use-rdb-preamble no
|
||||
run_script {redis.call("set",KEYS[1],"100")} 1 foo
|
||||
run_script {redis.call("incr",KEYS[1])} 1 foo
|
||||
run_script {redis.call("incr",KEYS[1])} 1 foo
|
||||
wait_for_condition 50 100 {
|
||||
[s aof_rewrite_in_progress] == 0
|
||||
} else {
|
||||
fail "AOF rewrite can't complete after CONFIG SET appendonly yes."
|
||||
}
|
||||
r config set slave-read-only yes
|
||||
r slaveof 127.0.0.1 0
|
||||
r debug loadaof
|
||||
set res [r get foo]
|
||||
r slaveof no one
|
||||
r config set aof-use-rdb-preamble yes
|
||||
set res
|
||||
} {102} {external:skip}
|
||||
|
||||
if {$is_eval eq 1} {
|
||||
# script propagation is irrelevant on functions
|
||||
test {EVAL timeout from AOF} {
|
||||
# generate a long running script that is propagated to the AOF as script
|
||||
# make sure that the script times out during loading
|
||||
r config set appendonly no
|
||||
r config set aof-use-rdb-preamble no
|
||||
r config set lua-replicate-commands no
|
||||
r flushall
|
||||
r config set appendonly yes
|
||||
wait_for_condition 50 100 {
|
||||
[s aof_rewrite_in_progress] == 0
|
||||
} else {
|
||||
fail "AOF rewrite can't complete after CONFIG SET appendonly yes."
|
||||
}
|
||||
r config set lua-time-limit 1
|
||||
set rd [redis_deferring_client]
|
||||
set start [clock clicks -milliseconds]
|
||||
$rd eval {redis.call('set',KEYS[1],'y'); for i=1,1500000 do redis.call('ping') end return 'ok'} 1 x
|
||||
$rd flush
|
||||
after 100
|
||||
catch {r ping} err
|
||||
assert_match {BUSY*} $err
|
||||
$rd read
|
||||
set elapsed [expr [clock clicks -milliseconds]-$start]
|
||||
if {$::verbose} { puts "script took $elapsed milliseconds" }
|
||||
set start [clock clicks -milliseconds]
|
||||
$rd debug loadaof
|
||||
$rd flush
|
||||
after 100
|
||||
catch {r ping} err
|
||||
assert_match {LOADING*} $err
|
||||
$rd read
|
||||
set elapsed [expr [clock clicks -milliseconds]-$start]
|
||||
if {$::verbose} { puts "loading took $elapsed milliseconds" }
|
||||
$rd close
|
||||
r get x
|
||||
} {y} {external:skip}
|
||||
|
||||
test {We can call scripts rewriting client->argv from Lua} {
|
||||
test {SPOP: We can call scripts rewriting client->argv from Lua} {
|
||||
set repl [attach_to_replication_stream]
|
||||
#this sadd operation is for external-cluster test. If myset doesn't exist, 'del myset' won't get propagated.
|
||||
r sadd myset ppp
|
||||
r del myset
|
||||
r sadd myset a b c
|
||||
r mset a{t} 1 b{t} 2 c{t} 3 d{t} 4
|
||||
assert {[r spop myset] ne {}}
|
||||
assert {[r spop myset 1] ne {}}
|
||||
assert {[r spop myset] ne {}}
|
||||
assert {[r mget a{t} b{t} c{t} d{t}] eq {1 2 3 4}}
|
||||
assert {[r spop myset] eq {}}
|
||||
}
|
||||
} ;# is_eval
|
||||
assert {[r eval {return redis.call('spop', 'myset')} 0] ne {}}
|
||||
assert {[r eval {return redis.call('spop', 'myset', 1)} 0] ne {}}
|
||||
assert {[r eval {return redis.call('spop', KEYS[1])} 1 myset] ne {}}
|
||||
#this one below should be replicated by an empty MULTI/EXEC
|
||||
assert {[r eval {return redis.call('spop', KEYS[1])} 1 myset] eq {}}
|
||||
r set trailingkey 1
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{sadd *}
|
||||
{del *}
|
||||
{sadd *}
|
||||
{multi}
|
||||
{srem myset *}
|
||||
{exec}
|
||||
{multi}
|
||||
{srem myset *}
|
||||
{exec}
|
||||
{multi}
|
||||
{srem myset *}
|
||||
{exec}
|
||||
{multi}
|
||||
{exec}
|
||||
{set *}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
} {} {need:repl}
|
||||
|
||||
test {MGET: mget shouldn't be propagated in Lua} {
|
||||
set repl [attach_to_replication_stream]
|
||||
r mset a{t} 1 b{t} 2 c{t} 3 d{t} 4
|
||||
#read-only, won't be replicated
|
||||
assert {[r eval {return redis.call('mget', 'a{t}', 'b{t}', 'c{t}', 'd{t}')} 0] eq {1 2 3 4}}
|
||||
r set trailingkey 2
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{mset *}
|
||||
{set *}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
} {} {need:repl}
|
||||
|
||||
test {EXPIRE: We can call scripts rewriting client->argv from Lua} {
|
||||
set repl [attach_to_replication_stream]
|
||||
r set expirekey 1
|
||||
#should be replicated as EXPIREAT
|
||||
assert {[r eval {return redis.call('expire', KEYS[1], ARGV[1])} 1 expirekey 3] eq 1}
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{set *}
|
||||
{multi}
|
||||
{pexpireat expirekey *}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
} {} {need:repl}
|
||||
|
||||
} ;# is_eval
|
||||
|
||||
test {Call Redis command with many args from Lua (issue #1764)} {
|
||||
run_script {
|
||||
@ -812,17 +793,9 @@ start_server {tags {"scripting"}} {
|
||||
} {} {external:skip}
|
||||
}
|
||||
|
||||
foreach cmdrepl {0 1} {
|
||||
start_server {tags {"scripting repl needs:debug external:skip"}} {
|
||||
start_server {} {
|
||||
if {$cmdrepl == 1} {
|
||||
set rt "(commands replication)"
|
||||
} else {
|
||||
set rt "(scripts replication)"
|
||||
r debug lua-always-replicate-commands 1
|
||||
}
|
||||
|
||||
test "Before the replica connects we issue two EVAL commands $rt" {
|
||||
test "Before the replica connects we issue two EVAL commands" {
|
||||
# One with an error, but still executing a command.
|
||||
# SHA is: 67164fc43fa971f76fd1aaeeaf60c1c178d25876
|
||||
catch {
|
||||
@ -833,7 +806,7 @@ foreach cmdrepl {0 1} {
|
||||
run_script {return redis.call('incr',KEYS[1])} 1 x
|
||||
} {2}
|
||||
|
||||
test "Connect a replica to the master instance $rt" {
|
||||
test "Connect a replica to the master instance" {
|
||||
r -1 slaveof [srv 0 host] [srv 0 port]
|
||||
wait_for_condition 50 100 {
|
||||
[s -1 role] eq {slave} &&
|
||||
@ -844,7 +817,7 @@ foreach cmdrepl {0 1} {
|
||||
}
|
||||
|
||||
if {$is_eval eq 1} {
|
||||
test "Now use EVALSHA against the master, with both SHAs $rt" {
|
||||
test "Now use EVALSHA against the master, with both SHAs" {
|
||||
# The server should replicate successful and unsuccessful
|
||||
# commands as EVAL instead of EVALSHA.
|
||||
catch {
|
||||
@ -853,7 +826,7 @@ foreach cmdrepl {0 1} {
|
||||
r evalsha 6f5ade10a69975e903c6d07b10ea44c6382381a5 1 x
|
||||
} {4}
|
||||
|
||||
test "If EVALSHA was replicated as EVAL, 'x' should be '4' $rt" {
|
||||
test "'x' should be '4' for EVALSHA being replicated by effects" {
|
||||
wait_for_condition 50 100 {
|
||||
[r -1 get x] eq {4}
|
||||
} else {
|
||||
@ -862,7 +835,7 @@ foreach cmdrepl {0 1} {
|
||||
}
|
||||
} ;# is_eval
|
||||
|
||||
test "Replication of script multiple pushes to list with BLPOP $rt" {
|
||||
test "Replication of script multiple pushes to list with BLPOP" {
|
||||
set rd [redis_deferring_client]
|
||||
$rd brpop a 0
|
||||
run_script {
|
||||
@ -880,7 +853,7 @@ foreach cmdrepl {0 1} {
|
||||
} {a 1}
|
||||
|
||||
if {$is_eval eq 1} {
|
||||
test "EVALSHA replication when first call is readonly $rt" {
|
||||
test "EVALSHA replication when first call is readonly" {
|
||||
r del x
|
||||
r eval {if tonumber(ARGV[1]) > 0 then redis.call('incr', KEYS[1]) end} 1 x 0
|
||||
r evalsha 6e0e2745aa546d0b50b801a20983b70710aef3ce 1 x 0
|
||||
@ -893,7 +866,7 @@ foreach cmdrepl {0 1} {
|
||||
}
|
||||
} ;# is_eval
|
||||
|
||||
test "Lua scripts using SELECT are replicated correctly $rt" {
|
||||
test "Lua scripts using SELECT are replicated correctly" {
|
||||
run_script {
|
||||
redis.call("set","foo1","bar1")
|
||||
redis.call("select","10")
|
||||
@ -916,7 +889,6 @@ foreach cmdrepl {0 1} {
|
||||
} {} {singledb:skip}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"scripting repl external:skip"}} {
|
||||
start_server {overrides {appendonly yes aof-use-rdb-preamble no}} {
|
||||
@ -930,32 +902,22 @@ start_server {tags {"scripting repl external:skip"}} {
|
||||
}
|
||||
}
|
||||
|
||||
if {$is_eval eq 1} {
|
||||
# replicate_commands is the default on Redis Function
|
||||
test "Redis.replicate_commands() must be issued before any write" {
|
||||
test "Redis.replicate_commands() can be issued anywhere now" {
|
||||
r eval {
|
||||
redis.call('set','foo','bar');
|
||||
return redis.replicate_commands();
|
||||
} 0
|
||||
} {}
|
||||
|
||||
test "Redis.replicate_commands() must be issued before any write (2)" {
|
||||
r eval {
|
||||
return redis.replicate_commands();
|
||||
} 0
|
||||
} {1}
|
||||
|
||||
test "Redis.set_repl() must be issued after replicate_commands()" {
|
||||
r debug lua-always-replicate-commands 0
|
||||
test "Redis.set_repl() can be issued before replicate_commands() now" {
|
||||
catch {
|
||||
r eval {
|
||||
redis.set_repl(redis.REPL_ALL);
|
||||
} 0
|
||||
} e
|
||||
r debug lua-always-replicate-commands 1
|
||||
set e
|
||||
} {*only after turning on*}
|
||||
} ;# is_eval
|
||||
} {}
|
||||
|
||||
test "Redis.set_repl() don't accept invalid values" {
|
||||
catch {
|
||||
@ -981,7 +943,7 @@ start_server {tags {"scripting repl external:skip"}} {
|
||||
wait_for_condition 50 100 {
|
||||
[r -1 mget a b c d] eq {1 {} {} 4}
|
||||
} else {
|
||||
fail "Only a and c should be replicated to replica"
|
||||
fail "Only a and d should be replicated to replica"
|
||||
}
|
||||
|
||||
# Master should have everything right now
|
||||
|
Loading…
Reference in New Issue
Block a user