mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 00:52:38 +00:00
Cleanup around script_caller, fix tracking of scripts and ACL logging for RM_Call (#11770)
* Make it clear that current_client is the root client that was called by external connection * add executing_client which is the client that runs the current command (can be a module or a script) * Remove script_caller that was used for commands that have CLIENT_SCRIPT to get the client that called the script. in most cases, that's the current_client, and in others (when being called from a module), it could be an intermediate client when we actually want the original one used by the external connection. bugfixes: * RM_Call with C flag should log ACL errors with the requested user rather than the one used by the original client, this also solves a crash when RM_Call is used with C flag from a detached thread safe context. * addACLLogEntry would have logged info about the script_caller, but in case the script was issued by a module command we actually want the current_client. the exception is when RM_Call is called from a timer event, in which case we don't have a current_client. behavior changes: * client side tracking for scripts now tracks the keys that are read by the script instead of the keys that are declared by the caller for EVAL other changes: * Log both current_client and executing_client in the crash log. * remove prepareLuaClient and resetLuaClient, being dead code that was forgotten. * remove scriptTimeSnapshot and snapshot_time and instead add cmd_time_snapshot that serves all commands and is reset only when execution nesting starts. * remove code to propagate CLIENT_FORCE_REPL from the executed command to the script caller since scripts aren't propagated anyway these days and anyway this flag wouldn't have had an effect since CLIENT_PREVENT_PROP is added by scriptResetRun. * fix a module GIL violation issue in afterSleep that was introduced in #10300 (unreleased)
This commit is contained in:
parent
a35e08370a
commit
233abbbe03
@ -2518,8 +2518,8 @@ void addACLLogEntry(client *c, int reason, int context, int argpos, sds username
|
||||
}
|
||||
}
|
||||
|
||||
client *realclient = c;
|
||||
if (realclient->flags & CLIENT_SCRIPT) realclient = server.script_caller;
|
||||
/* if we have a real client from the network, use it (could be missing on module timers) */
|
||||
client *realclient = server.current_client? server.current_client : c;
|
||||
|
||||
le->cinfo = catClientInfoString(sdsempty(),realclient);
|
||||
le->context = context;
|
||||
|
@ -1413,8 +1413,10 @@ int loadSingleAppendOnlyFile(char *filename) {
|
||||
* to the same file we're about to read. */
|
||||
server.aof_state = AOF_OFF;
|
||||
|
||||
client *old_client = server.current_client;
|
||||
fakeClient = server.current_client = createAOFClient();
|
||||
client *old_cur_client = server.current_client;
|
||||
client *old_exec_client = server.executing_client;
|
||||
fakeClient = createAOFClient();
|
||||
server.current_client = server.executing_client = fakeClient;
|
||||
|
||||
/* Check if the AOF file is in RDB format (it may be RDB encoded base AOF
|
||||
* or old style RDB-preamble AOF). In that case we need to load the RDB file
|
||||
@ -1622,7 +1624,8 @@ fmterr: /* Format error. */
|
||||
|
||||
cleanup:
|
||||
if (fakeClient) freeClient(fakeClient);
|
||||
server.current_client = old_client;
|
||||
server.current_client = old_cur_client;
|
||||
server.executing_client = old_exec_client;
|
||||
fclose(fp);
|
||||
sdsfree(aof_filepath);
|
||||
return ret;
|
||||
|
10
src/debug.c
10
src/debug.c
@ -1846,14 +1846,13 @@ void logModulesInfo(void) {
|
||||
/* Log information about the "current" client, that is, the client that is
|
||||
* currently being served by Redis. May be NULL if Redis is not serving a
|
||||
* client right now. */
|
||||
void logCurrentClient(void) {
|
||||
if (server.current_client == NULL) return;
|
||||
void logCurrentClient(client *cc, const char *title) {
|
||||
if (cc == NULL) return;
|
||||
|
||||
client *cc = server.current_client;
|
||||
sds client;
|
||||
int j;
|
||||
|
||||
serverLogRaw(LL_WARNING|LL_RAW, "\n------ CURRENT CLIENT INFO ------\n");
|
||||
serverLog(LL_WARNING|LL_RAW, "\n------ %s CLIENT INFO ------\n", title);
|
||||
client = catClientInfoString(sdsempty(),cc);
|
||||
serverLog(LL_WARNING|LL_RAW,"%s\n", client);
|
||||
sdsfree(client);
|
||||
@ -2112,7 +2111,8 @@ void printCrashReport(void) {
|
||||
logServerInfo();
|
||||
|
||||
/* Log the current client */
|
||||
logCurrentClient();
|
||||
logCurrentClient(server.current_client, "CURRENT");
|
||||
logCurrentClient(server.executing_client, "EXECUTING");
|
||||
|
||||
/* Log modules info. Something we wanna do last since we fear it may crash. */
|
||||
logModulesInfo();
|
||||
|
18
src/eval.c
18
src/eval.c
@ -185,7 +185,6 @@ void scriptingInit(int setup) {
|
||||
|
||||
if (setup) {
|
||||
lctx.lua_client = NULL;
|
||||
server.script_caller = NULL;
|
||||
server.script_disable_deny_script = 0;
|
||||
ldbInit();
|
||||
}
|
||||
@ -470,22 +469,6 @@ sds luaCreateFunction(client *c, robj *body) {
|
||||
return sha;
|
||||
}
|
||||
|
||||
void prepareLuaClient(void) {
|
||||
/* Select the right DB in the context of the Lua client */
|
||||
selectDb(lctx.lua_client,server.script_caller->db->id);
|
||||
lctx.lua_client->resp = 2; /* Default is RESP2, scripts can change it. */
|
||||
|
||||
/* If we are in MULTI context, flag Lua client as CLIENT_MULTI. */
|
||||
if (server.script_caller->flags & CLIENT_MULTI) {
|
||||
lctx.lua_client->flags |= CLIENT_MULTI;
|
||||
}
|
||||
}
|
||||
|
||||
void resetLuaClient(void) {
|
||||
/* After the script done, remove the MULTI state. */
|
||||
lctx.lua_client->flags &= ~CLIENT_MULTI;
|
||||
}
|
||||
|
||||
void evalGenericCommand(client *c, int evalsha) {
|
||||
lua_State *lua = lctx.lua;
|
||||
char funcname[43];
|
||||
@ -1680,6 +1663,5 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
|
||||
luaError(lua);
|
||||
}
|
||||
rctx->start_time = getMonotonicUs();
|
||||
rctx->snapshot_time = mstime();
|
||||
}
|
||||
}
|
||||
|
@ -6126,10 +6126,10 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
|
||||
acl_retval = ACLCheckAllUserCommandPerm(user,c->cmd,c->argv,c->argc,&acl_errpos);
|
||||
if (acl_retval != ACL_OK) {
|
||||
sds object = (acl_retval == ACL_DENIED_CMD) ? sdsdup(c->cmd->fullname) : sdsdup(c->argv[acl_errpos]->ptr);
|
||||
addACLLogEntry(ctx->client, acl_retval, ACL_LOG_CTX_MODULE, -1, ctx->client->user->name, object);
|
||||
addACLLogEntry(ctx->client, acl_retval, ACL_LOG_CTX_MODULE, -1, c->user->name, object);
|
||||
if (error_as_call_replies) {
|
||||
/* verbosity should be same as processCommand() in server.c */
|
||||
sds acl_msg = getAclErrorMessage(acl_retval, ctx->client->user, c->cmd, c->argv[acl_errpos]->ptr, 0);
|
||||
sds acl_msg = getAclErrorMessage(acl_retval, c->user, c->cmd, c->argv[acl_errpos]->ptr, 0);
|
||||
sds msg = sdscatfmt(sdsempty(), "-NOPERM %S\r\n", acl_msg);
|
||||
sdsfree(acl_msg);
|
||||
reply = callReplyCreateError(msg, ctx);
|
||||
|
@ -3921,6 +3921,13 @@ void processEventsWhileBlocked(void) {
|
||||
* interaction time with clients and for other important things. */
|
||||
updateCachedTime(0);
|
||||
|
||||
/* For the few commands that are allowed during busy scripts, we rather
|
||||
* provide a fresher time than the one from when the script started (they
|
||||
* still won't get it from the call due to execution_nesting. For commands
|
||||
* during loading this doesn't matter. */
|
||||
mstime_t prev_cmd_time_snapshot = server.cmd_time_snapshot;
|
||||
server.cmd_time_snapshot = server.mstime;
|
||||
|
||||
/* Note: when we are processing events while blocked (for instance during
|
||||
* busy Lua scripts), we set a global flag. When such flag is set, we
|
||||
* avoid handling the read part of clients using threaded I/O.
|
||||
@ -3945,6 +3952,8 @@ void processEventsWhileBlocked(void) {
|
||||
|
||||
ProcessingEventsWhileBlocked--;
|
||||
serverAssert(ProcessingEventsWhileBlocked >= 0);
|
||||
|
||||
server.cmd_time_snapshot = prev_cmd_time_snapshot;
|
||||
}
|
||||
|
||||
/* ==========================================================================
|
||||
|
10
src/script.c
10
src/script.c
@ -212,7 +212,6 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca
|
||||
|
||||
client *script_client = run_ctx->c;
|
||||
client *curr_client = run_ctx->original_client;
|
||||
server.script_caller = curr_client;
|
||||
|
||||
/* Select the right DB in the context of the Lua client */
|
||||
selectDb(script_client, curr_client->db->id);
|
||||
@ -224,7 +223,6 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca
|
||||
}
|
||||
|
||||
run_ctx->start_time = getMonotonicUs();
|
||||
run_ctx->snapshot_time = mstime();
|
||||
|
||||
run_ctx->flags = 0;
|
||||
run_ctx->repl_flags = PROPAGATE_AOF | PROPAGATE_REPL;
|
||||
@ -257,8 +255,6 @@ void scriptResetRun(scriptRunCtx *run_ctx) {
|
||||
/* After the script done, remove the MULTI state. */
|
||||
run_ctx->c->flags &= ~CLIENT_MULTI;
|
||||
|
||||
server.script_caller = NULL;
|
||||
|
||||
if (scriptIsTimedout()) {
|
||||
exitScriptTimedoutMode(run_ctx);
|
||||
/* Restore the client that was protected when the script timeout
|
||||
@ -575,12 +571,6 @@ error:
|
||||
incrCommandStatsOnError(cmd, ERROR_COMMAND_REJECTED);
|
||||
}
|
||||
|
||||
/* Returns the time when the script invocation started */
|
||||
mstime_t scriptTimeSnapshot() {
|
||||
serverAssert(curr_run_ctx);
|
||||
return curr_run_ctx->snapshot_time;
|
||||
}
|
||||
|
||||
long long scriptRunDuration() {
|
||||
serverAssert(scriptIsRunning());
|
||||
return elapsedMs(curr_run_ctx->start_time);
|
||||
|
@ -74,7 +74,6 @@ struct scriptRunCtx {
|
||||
int flags;
|
||||
int repl_flags;
|
||||
monotime start_time;
|
||||
mstime_t snapshot_time;
|
||||
};
|
||||
|
||||
/* Scripts flags */
|
||||
@ -107,7 +106,6 @@ int scriptIsEval();
|
||||
int scriptIsTimedout();
|
||||
client* scriptGetClient();
|
||||
client* scriptGetCaller();
|
||||
mstime_t scriptTimeSnapshot();
|
||||
long long scriptRunDuration();
|
||||
|
||||
#endif /* __SCRIPT_H_ */
|
||||
|
84
src/server.c
84
src/server.c
@ -209,24 +209,20 @@ mstime_t mstime(void) {
|
||||
* reflect the same time.
|
||||
* More details can be found in the comments below. */
|
||||
mstime_t commandTimeSnapshot(void) {
|
||||
/* If we are in the context of a Lua script, we pretend that time is
|
||||
* blocked to when the Lua script started. This way a key can expire
|
||||
* only the first time it is accessed and not in the middle of the
|
||||
* script execution, making propagation to slaves / AOF consistent.
|
||||
* See issue #1525 on Github for more information. */
|
||||
if (server.script_caller) {
|
||||
return scriptTimeSnapshot();
|
||||
}
|
||||
/* If we are in the middle of a command execution, we still want to use
|
||||
* a reference time that does not change: in that case we just use the
|
||||
/* When we are in the middle of a command execution, we want to use a
|
||||
* reference time that does not change: in that case we just use the
|
||||
* cached time, that we update before each call in the call() function.
|
||||
* This way we avoid that commands such as RPOPLPUSH or similar, that
|
||||
* may re-open the same key multiple times, can invalidate an already
|
||||
* open object in a next call, if the next call will see the key expired,
|
||||
* while the first did not. */
|
||||
else {
|
||||
return server.mstime;
|
||||
}
|
||||
* while the first did not.
|
||||
* This is specificlally important in the context of scripts, where we
|
||||
* pretend that time freezes. This way a key can expire only the first time
|
||||
* it is accessed and not in the middle of the script execution, making
|
||||
* propagation to slaves / AOF consistent. See issue #1525 for more info.
|
||||
* Note that we cannot use the cached server.mstime because it can change
|
||||
* in processEventsWhileBlocked etc. */
|
||||
return server.cmd_time_snapshot;
|
||||
}
|
||||
|
||||
/* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
|
||||
@ -1722,8 +1718,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
* releasing the GIL. Redis main thread will not touch anything at this
|
||||
* time. */
|
||||
if (moduleCount()) moduleReleaseGIL();
|
||||
|
||||
/* Do NOT add anything below moduleReleaseGIL !!! */
|
||||
/********************* WARNING ********************
|
||||
* Do NOT add anything below moduleReleaseGIL !!! *
|
||||
***************************** ********************/
|
||||
}
|
||||
|
||||
/* This function is called immediately after the event loop multiplexing
|
||||
@ -1731,14 +1728,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
* the different events callbacks. */
|
||||
void afterSleep(struct aeEventLoop *eventLoop) {
|
||||
UNUSED(eventLoop);
|
||||
|
||||
/* Update the time cache. */
|
||||
updateCachedTime(1);
|
||||
|
||||
/* Do NOT add anything above moduleAcquireGIL !!! */
|
||||
|
||||
/* Acquire the modules GIL so that their threads won't touch anything. */
|
||||
/********************* WARNING ********************
|
||||
* Do NOT add anything above moduleAcquireGIL !!! *
|
||||
***************************** ********************/
|
||||
if (!ProcessingEventsWhileBlocked) {
|
||||
/* Acquire the modules GIL so that their threads won't touch anything. */
|
||||
if (moduleCount()) {
|
||||
mstime_t latency;
|
||||
latencyStartMonitor(latency);
|
||||
@ -1751,6 +1745,16 @@ void afterSleep(struct aeEventLoop *eventLoop) {
|
||||
latencyAddSampleIfNeeded("module-acquire-GIL",latency);
|
||||
}
|
||||
}
|
||||
|
||||
/* Update the time cache. */
|
||||
updateCachedTime(1);
|
||||
|
||||
/* Update command time snapshot in case it'll be required without a command
|
||||
* e.g. somehow used by module timers. Don't update it while yielding to a
|
||||
* blocked command, call() will handle that and restore the original time. */
|
||||
if (!ProcessingEventsWhileBlocked) {
|
||||
server.cmd_time_snapshot = server.mstime;
|
||||
}
|
||||
}
|
||||
|
||||
/* =========================== Server initialization ======================== */
|
||||
@ -1945,6 +1949,7 @@ void initServerConfig(void) {
|
||||
|
||||
initConfigValues();
|
||||
updateCachedTime(1);
|
||||
server.cmd_time_snapshot = server.mstime;
|
||||
getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
|
||||
server.runid[CONFIG_RUN_ID_SIZE] = '\0';
|
||||
changeReplicationId();
|
||||
@ -3435,6 +3440,9 @@ void call(client *c, int flags) {
|
||||
long long dirty;
|
||||
uint64_t client_old_flags = c->flags;
|
||||
struct redisCommand *real_cmd = c->realcmd;
|
||||
client *prev_client = server.executing_client;
|
||||
server.executing_client = c;
|
||||
|
||||
/* When call() is issued during loading the AOF we don't want commands called
|
||||
* from module, exec or LUA to go into the slowlog or to populate statistics. */
|
||||
int update_command_stats = !isAOFLoadingContext();
|
||||
@ -3466,6 +3474,7 @@ void call(client *c, int flags) {
|
||||
* in case we have nested calls we want to update only on the first call */
|
||||
if (server.execution_nesting++ == 0) {
|
||||
updateCachedTimeWithUs(0,call_timer);
|
||||
server.cmd_time_snapshot = server.mstime;
|
||||
c->flags |= CLIENT_EXECUTING_COMMAND;
|
||||
}
|
||||
|
||||
@ -3511,16 +3520,6 @@ void call(client *c, int flags) {
|
||||
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
|
||||
}
|
||||
|
||||
/* If the caller is Lua, we want to force the EVAL caller to propagate
|
||||
* the script if the command flag or client flag are forcing the
|
||||
* propagation. */
|
||||
if (c->flags & CLIENT_SCRIPT && server.script_caller) {
|
||||
if (c->flags & CLIENT_FORCE_REPL)
|
||||
server.script_caller->flags |= CLIENT_FORCE_REPL;
|
||||
if (c->flags & CLIENT_FORCE_AOF)
|
||||
server.script_caller->flags |= CLIENT_FORCE_AOF;
|
||||
}
|
||||
|
||||
/* Note: the code below uses the real command that was executed
|
||||
* c->cmd and c->lastcmd may be different, in case of MULTI-EXEC or
|
||||
* re-written commands such as EXPIRE, GEOADD, etc. */
|
||||
@ -3607,18 +3606,19 @@ void call(client *c, int flags) {
|
||||
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
|
||||
|
||||
/* If the client has keys tracking enabled for client side caching,
|
||||
* make sure to remember the keys it fetched via this command. Scripting
|
||||
* works a bit differently, where if the scripts executes any read command, it
|
||||
* remembers all of the declared keys from the script. */
|
||||
* make sure to remember the keys it fetched via this command. For read-only
|
||||
* scripts, don't process the script, only the commands it executes. */
|
||||
if ((c->cmd->flags & CMD_READONLY) && (c->cmd->proc != evalRoCommand)
|
||||
&& (c->cmd->proc != evalShaRoCommand) && (c->cmd->proc != fcallroCommand))
|
||||
{
|
||||
client *caller = (c->flags & CLIENT_SCRIPT && server.script_caller) ?
|
||||
server.script_caller : c;
|
||||
if (caller->flags & CLIENT_TRACKING &&
|
||||
!(caller->flags & CLIENT_TRACKING_BCAST))
|
||||
/* We use the tracking flag of the original external client that
|
||||
* triggered the command, but we take the keys from the actual command
|
||||
* being executed. */
|
||||
if (server.current_client &&
|
||||
(server.current_client->flags & CLIENT_TRACKING) &&
|
||||
!(server.current_client->flags & CLIENT_TRACKING_BCAST))
|
||||
{
|
||||
trackingRememberKeys(caller);
|
||||
trackingRememberKeys(server.current_client, c);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3639,6 +3639,8 @@ void call(client *c, int flags) {
|
||||
if (!server.in_exec && server.client_pause_in_transaction) {
|
||||
server.client_pause_in_transaction = 0;
|
||||
}
|
||||
|
||||
server.executing_client = prev_client;
|
||||
}
|
||||
|
||||
/* Used when a command that is ready for execution needs to be rejected, due to
|
||||
|
@ -1536,7 +1536,8 @@ struct redisServer {
|
||||
list *clients_pending_write; /* There is to write or install handler. */
|
||||
list *clients_pending_read; /* Client has pending read socket buffers. */
|
||||
list *slaves, *monitors; /* List of slaves and MONITORs */
|
||||
client *current_client; /* Current client executing the command. */
|
||||
client *current_client; /* The client that triggered the command execution (External or AOF). */
|
||||
client *executing_client; /* The client executing the current command (possibly script or module). */
|
||||
|
||||
/* Stuff for client mem eviction */
|
||||
clientMemUsageBucket* client_mem_usage_buckets;
|
||||
@ -1873,6 +1874,7 @@ struct redisServer {
|
||||
int daylight_active; /* Currently in daylight saving time. */
|
||||
mstime_t mstime; /* 'unixtime' in milliseconds. */
|
||||
ustime_t ustime; /* 'unixtime' in microseconds. */
|
||||
mstime_t cmd_time_snapshot; /* Time snapshot of the root execution nesting. */
|
||||
size_t blocking_op_nesting; /* Nesting level of blocking operation, used to reset blocked_last_cron. */
|
||||
long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */
|
||||
/* Pubsub */
|
||||
@ -1912,7 +1914,6 @@ struct redisServer {
|
||||
int cluster_drop_packet_filter; /* Debug config that allows tactically
|
||||
* dropping packets of a specific type */
|
||||
/* Scripting */
|
||||
client *script_caller; /* The client running script right now, or NULL */
|
||||
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
|
||||
int pre_command_oom_state; /* OOM before command (script?) was started */
|
||||
int script_disable_deny_script; /* Allow running commands marked "no-script" inside a script. */
|
||||
@ -2601,7 +2602,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
|
||||
/* Client side caching (tracking mode) */
|
||||
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
|
||||
void disableTracking(client *c);
|
||||
void trackingRememberKeys(client *c);
|
||||
void trackingRememberKeys(client *tracking, client *executing);
|
||||
void trackingInvalidateKey(client *c, robj *keyobj, int bcast);
|
||||
void trackingScheduleKeyInvalidation(uint64_t client_id, robj *keyobj);
|
||||
void trackingHandlePendingKeyInvalidations(void);
|
||||
|
@ -214,16 +214,16 @@ void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **pr
|
||||
* to the keys the user fetched, so that Redis will know what are the clients
|
||||
* that should receive an invalidation message with certain groups of keys
|
||||
* are modified. */
|
||||
void trackingRememberKeys(client *c) {
|
||||
void trackingRememberKeys(client *tracking, client *executing) {
|
||||
/* Return if we are in optin/out mode and the right CACHING command
|
||||
* was/wasn't given in order to modify the default behavior. */
|
||||
uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN;
|
||||
uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT;
|
||||
uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING;
|
||||
uint64_t optin = tracking->flags & CLIENT_TRACKING_OPTIN;
|
||||
uint64_t optout = tracking->flags & CLIENT_TRACKING_OPTOUT;
|
||||
uint64_t caching_given = tracking->flags & CLIENT_TRACKING_CACHING;
|
||||
if ((optin && !caching_given) || (optout && caching_given)) return;
|
||||
|
||||
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||
int numkeys = getKeysFromCommand(c->cmd,c->argv,c->argc,&result);
|
||||
int numkeys = getKeysFromCommand(executing->cmd,executing->argv,executing->argc,&result);
|
||||
if (!numkeys) {
|
||||
getKeysFreeResult(&result);
|
||||
return;
|
||||
@ -231,7 +231,7 @@ void trackingRememberKeys(client *c) {
|
||||
/* Shard channels are treated as special keys for client
|
||||
* library to rely on `COMMAND` command to discover the node
|
||||
* to connect to. These channels doesn't need to be tracked. */
|
||||
if (c->cmd->flags & CMD_PUBSUB) {
|
||||
if (executing->cmd->flags & CMD_PUBSUB) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -239,7 +239,7 @@ void trackingRememberKeys(client *c) {
|
||||
|
||||
for(int j = 0; j < numkeys; j++) {
|
||||
int idx = keys[j].pos;
|
||||
sds sdskey = c->argv[idx]->ptr;
|
||||
sds sdskey = executing->argv[idx]->ptr;
|
||||
rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
|
||||
if (ids == raxNotFound) {
|
||||
ids = raxNew();
|
||||
@ -247,7 +247,7 @@ void trackingRememberKeys(client *c) {
|
||||
sdslen(sdskey),ids, NULL);
|
||||
serverAssert(inserted == 1);
|
||||
}
|
||||
if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL))
|
||||
if (raxTryInsert(ids,(unsigned char*)&tracking->id,sizeof(tracking->id),NULL,NULL))
|
||||
TrackingTableTotalItems++;
|
||||
}
|
||||
getKeysFreeResult(&result);
|
||||
|
@ -253,12 +253,15 @@ start_server {tags {"repl external:skip"}} {
|
||||
# DB is empty.
|
||||
r -1 flushdb
|
||||
r -1 flushdb
|
||||
r -1 flushdb
|
||||
r -1 eval {redis.call("flushdb")} 0
|
||||
|
||||
# DBs are empty.
|
||||
r -1 flushall
|
||||
r -1 flushall
|
||||
r -1 flushall
|
||||
r -1 eval {redis.call("flushall")} 0
|
||||
|
||||
# add another command to check nothing else was propagated after the above
|
||||
r -1 incr x
|
||||
|
||||
# Assert that each FLUSHDB command is replicated even the DB is empty.
|
||||
# Assert that each FLUSHALL command is replicated even the DBs are empty.
|
||||
@ -273,6 +276,7 @@ start_server {tags {"repl external:skip"}} {
|
||||
{flushall}
|
||||
{flushall}
|
||||
{flushall}
|
||||
{incr x}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
}
|
||||
|
@ -1,4 +1,8 @@
|
||||
#include "redismodule.h"
|
||||
#include <pthread.h>
|
||||
#include <assert.h>
|
||||
|
||||
#define UNUSED(V) ((void) V)
|
||||
|
||||
RedisModuleUser *user = NULL;
|
||||
|
||||
@ -103,6 +107,98 @@ int reset_user(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
RedisModuleString **argv;
|
||||
int argc;
|
||||
RedisModuleBlockedClient *bc;
|
||||
} bg_call_data;
|
||||
|
||||
void *bg_call_worker(void *arg) {
|
||||
bg_call_data *bg = arg;
|
||||
|
||||
// Get Redis module context
|
||||
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
|
||||
|
||||
// Acquire GIL
|
||||
RedisModule_ThreadSafeContextLock(ctx);
|
||||
|
||||
// Set user
|
||||
RedisModule_SetContextUser(ctx, user);
|
||||
|
||||
// Call the command
|
||||
size_t format_len;
|
||||
RedisModuleString *format_redis_str = RedisModule_CreateString(NULL, "v", 1);
|
||||
const char *format = RedisModule_StringPtrLen(bg->argv[1], &format_len);
|
||||
RedisModule_StringAppendBuffer(NULL, format_redis_str, format, format_len);
|
||||
RedisModule_StringAppendBuffer(NULL, format_redis_str, "E", 1);
|
||||
format = RedisModule_StringPtrLen(format_redis_str, NULL);
|
||||
const char *cmd = RedisModule_StringPtrLen(bg->argv[2], NULL);
|
||||
RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + 3, bg->argc - 3);
|
||||
RedisModule_FreeString(NULL, format_redis_str);
|
||||
|
||||
// Release GIL
|
||||
RedisModule_ThreadSafeContextUnlock(ctx);
|
||||
|
||||
// Reply to client
|
||||
if (!rep) {
|
||||
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
||||
} else {
|
||||
RedisModule_ReplyWithCallReply(ctx, rep);
|
||||
RedisModule_FreeCallReply(rep);
|
||||
}
|
||||
|
||||
// Unblock client
|
||||
RedisModule_UnblockClient(bg->bc, NULL);
|
||||
|
||||
/* Free the arguments */
|
||||
for (int i=0; i<bg->argc; i++)
|
||||
RedisModule_FreeString(ctx, bg->argv[i]);
|
||||
RedisModule_Free(bg->argv);
|
||||
RedisModule_Free(bg);
|
||||
|
||||
// Free the Redis module context
|
||||
RedisModule_FreeThreadSafeContext(ctx);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int call_with_user_bg(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
UNUSED(argv);
|
||||
UNUSED(argc);
|
||||
|
||||
/* Make sure we're not trying to block a client when we shouldn't */
|
||||
int flags = RedisModule_GetContextFlags(ctx);
|
||||
int allFlags = RedisModule_GetContextFlagsAll();
|
||||
if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
|
||||
(flags & REDISMODULE_CTX_FLAGS_MULTI)) {
|
||||
RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&
|
||||
(flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
|
||||
RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Make a copy of the arguments and pass them to the thread. */
|
||||
bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data));
|
||||
bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc);
|
||||
bg->argc = argc;
|
||||
for (int i=0; i<argc; i++)
|
||||
bg->argv[i] = RedisModule_HoldString(ctx, argv[i]);
|
||||
|
||||
/* Block the client */
|
||||
bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
||||
|
||||
/* Start a thread to handle the request */
|
||||
pthread_t tid;
|
||||
int res = pthread_create(&tid, NULL, bg_call_worker, bg);
|
||||
assert(res == 0);
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
@ -116,6 +212,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (RedisModule_CreateCommand(ctx,"usercall.call_with_user_flag", call_with_user_flag,"write",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "usercall.call_with_user_bg", call_with_user_bg, "write", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "usercall.add_to_acl", add_to_acl, "write",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
|
@ -652,6 +652,7 @@ start_server {tags {"acl external:skip"}} {
|
||||
assert {[dict get $entry context] eq {toplevel}}
|
||||
assert {[dict get $entry reason] eq {command}}
|
||||
assert {[dict get $entry object] eq {get}}
|
||||
assert_match {*cmd=get*} [dict get $entry client-info]
|
||||
}
|
||||
|
||||
test "ACL LOG shows failed subcommand executions at toplevel" {
|
||||
@ -728,6 +729,7 @@ start_server {tags {"acl external:skip"}} {
|
||||
set entry [lindex [r ACL LOG] 0]
|
||||
assert {[dict get $entry context] eq {multi}}
|
||||
assert {[dict get $entry object] eq {incr}}
|
||||
assert_match {*cmd=exec*} [dict get $entry client-info]
|
||||
r ACL SETUSER antirez -incr
|
||||
}
|
||||
|
||||
@ -738,6 +740,7 @@ start_server {tags {"acl external:skip"}} {
|
||||
set entry [lindex [r ACL LOG] 0]
|
||||
assert {[dict get $entry context] eq {lua}}
|
||||
assert {[dict get $entry object] eq {incr}}
|
||||
assert_match {*cmd=eval*} [dict get $entry client-info]
|
||||
}
|
||||
|
||||
test {ACL LOG can accept a numerical argument to show less entries} {
|
||||
|
@ -116,6 +116,36 @@ start_server {tags {"modules"}} {
|
||||
r client tracking on
|
||||
set info [r test.clientinfo]
|
||||
assert { [dict get $info flags] == "${ssl_flag}::tracking::" }
|
||||
r CLIENT TRACKING off
|
||||
}
|
||||
|
||||
test {tracking with rm_call sanity} {
|
||||
set rd_trk [redis_client]
|
||||
$rd_trk HELLO 3
|
||||
$rd_trk CLIENT TRACKING on
|
||||
r MSET key1{t} 1 key2{t} 1
|
||||
|
||||
# GET triggers tracking, SET does not
|
||||
$rd_trk test.rm_call GET key1{t}
|
||||
$rd_trk test.rm_call SET key2{t} 2
|
||||
r MSET key1{t} 2 key2{t} 2
|
||||
assert_equal {invalidate key1{t}} [$rd_trk read]
|
||||
assert_equal "PONG" [$rd_trk ping]
|
||||
$rd_trk close
|
||||
}
|
||||
|
||||
test {tracking with rm_call with script} {
|
||||
set rd_trk [redis_client]
|
||||
$rd_trk HELLO 3
|
||||
$rd_trk CLIENT TRACKING on
|
||||
r MSET key1{t} 1 key2{t} 1
|
||||
|
||||
# GET triggers tracking, SET does not
|
||||
$rd_trk test.rm_call EVAL "redis.call('get', 'key1{t}')" 2 key1{t} key2{t}
|
||||
r MSET key1{t} 2 key2{t} 2
|
||||
assert_equal {invalidate key1{t}} [$rd_trk read]
|
||||
assert_equal "PONG" [$rd_trk ping]
|
||||
$rd_trk close
|
||||
}
|
||||
|
||||
test {test module get/set client name by id api} {
|
||||
|
@ -39,16 +39,49 @@ start_server {tags {"modules usercall"}} {
|
||||
test {test module check regular redis command with user and acl} {
|
||||
assert_equal [r set x 5] OK
|
||||
|
||||
r ACL LOG RESET
|
||||
assert_equal [r usercall.reset_user] OK
|
||||
assert_equal [r usercall.add_to_acl "~* &* +@all -set"] OK
|
||||
# off and sanitize-payload because module user / default value
|
||||
assert_equal [r usercall.get_acl] "off sanitize-payload ~* &* +@all -set"
|
||||
|
||||
# fails here as testing acl in rm call
|
||||
assert_error {*NOPERM User default has no permissions*} {r usercall.call_with_user_flag C set x 10}
|
||||
assert_error {*NOPERM User module_user has no permissions*} {r usercall.call_with_user_flag C set x 10}
|
||||
|
||||
assert_equal [r usercall.call_with_user_flag C get x] 5
|
||||
|
||||
# verify that new log entry added
|
||||
set entry [lindex [r ACL LOG] 0]
|
||||
assert_equal [dict get $entry username] {module_user}
|
||||
assert_equal [dict get $entry context] {module}
|
||||
assert_equal [dict get $entry object] {set}
|
||||
assert_equal [dict get $entry reason] {command}
|
||||
assert_match {*cmd=usercall.call_with_user_flag*} [dict get $entry client-info]
|
||||
|
||||
assert_equal [r usercall.reset_user] OK
|
||||
}
|
||||
|
||||
# call with user with acl set on it, but with testing the acl in rm_call (for cmd itself)
|
||||
test {test module check regular redis command with user and acl from blocked background thread} {
|
||||
assert_equal [r set x 5] OK
|
||||
|
||||
r ACL LOG RESET
|
||||
assert_equal [r usercall.reset_user] OK
|
||||
assert_equal [r usercall.add_to_acl "~* &* +@all -set"] OK
|
||||
|
||||
# fails here as testing acl in rm call from a background thread
|
||||
assert_error {*NOPERM User module_user has no permissions*} {r usercall.call_with_user_bg C set x 10}
|
||||
|
||||
assert_equal [r usercall.call_with_user_bg C get x] 5
|
||||
|
||||
# verify that new log entry added
|
||||
set entry [lindex [r ACL LOG] 0]
|
||||
assert_equal [dict get $entry username] {module_user}
|
||||
assert_equal [dict get $entry context] {module}
|
||||
assert_equal [dict get $entry object] {set}
|
||||
assert_equal [dict get $entry reason] {command}
|
||||
assert_match {*cmd=NULL*} [dict get $entry client-info]
|
||||
|
||||
assert_equal [r usercall.reset_user] OK
|
||||
}
|
||||
|
||||
@ -82,6 +115,7 @@ start_server {tags {"modules usercall"}} {
|
||||
set sha_set [r script load $test_script_set]
|
||||
set sha_get [r script load $test_script_get]
|
||||
|
||||
r ACL LOG RESET
|
||||
assert_equal [r usercall.reset_user] OK
|
||||
assert_equal [r usercall.add_to_acl "~* &* +@all -set"] OK
|
||||
|
||||
@ -90,5 +124,13 @@ start_server {tags {"modules usercall"}} {
|
||||
assert_match {*ERR ACL failure in script*} $e
|
||||
|
||||
assert_equal [r usercall.call_with_user_flag C evalsha $sha_get 0] 1
|
||||
|
||||
# verify that new log entry added
|
||||
set entry [lindex [r ACL LOG] 0]
|
||||
assert_equal [dict get $entry username] {module_user}
|
||||
assert_equal [dict get $entry context] {lua}
|
||||
assert_equal [dict get $entry object] {set}
|
||||
assert_equal [dict get $entry reason] {command}
|
||||
assert_match {*cmd=usercall.call_with_user_flag*} [dict get $entry client-info]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -229,22 +229,25 @@ start_server {tags {"tracking network"}} {
|
||||
# If a script doesn't call any read command, don't track any keys
|
||||
r EVAL "redis.call('set', 'key3{t}', 'bar')" 2 key1{t} key2{t}
|
||||
$rd_sg MSET key2{t} 2 key1{t} 2
|
||||
assert_equal "PONG" [r ping]
|
||||
|
||||
# If a script calls a read command, track all declared keys
|
||||
r EVAL "redis.call('get', 'key3{t}')" 2 key1{t} key2{t}
|
||||
$rd_sg MSET key2{t} 2 key1{t} 2
|
||||
# If a script calls a read command, just the read keys
|
||||
r EVAL "redis.call('get', 'key2{t}')" 2 key1{t} key2{t}
|
||||
$rd_sg MSET key2{t} 2 key3{t} 2
|
||||
assert_equal {invalidate key2{t}} [r read]
|
||||
assert_equal {invalidate key1{t}} [r read]
|
||||
assert_equal "PONG" [r ping]
|
||||
|
||||
# RO variants work like the normal variants
|
||||
r EVAL_RO "redis.call('ping')" 2 key1{t} key2{t}
|
||||
|
||||
# If a RO script doesn't call any read command, don't track any keys
|
||||
r EVAL_RO "redis.call('ping')" 2 key1{t} key2{t}
|
||||
$rd_sg MSET key2{t} 2 key1{t} 2
|
||||
assert_equal "PONG" [r ping]
|
||||
|
||||
r EVAL_RO "redis.call('get', 'key1{t}')" 2 key1{t} key2{t}
|
||||
$rd_sg MSET key2{t} 3 key1{t} 3
|
||||
# If a RO script calls a read command, just the read keys
|
||||
r EVAL_RO "redis.call('get', 'key2{t}')" 2 key1{t} key2{t}
|
||||
$rd_sg MSET key2{t} 2 key3{t} 2
|
||||
assert_equal {invalidate key2{t}} [r read]
|
||||
assert_equal {invalidate key1{t}} [r read]
|
||||
|
||||
assert_equal "PONG" [r ping]
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user