valkey/tests/modules/postnotifications.c
Meir Shpilraien (Spielrein) abc345ad28
Module API to allow writes after key space notification hooks (#11199)
### Summary of API additions

* `RedisModule_AddPostNotificationJob` - new API to call inside a key space
  notification (and on more locations in the future) and allow to add a post job as describe above.
* New module option, `REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS`,
  allows to disable Redis protection of nested key-space notifications.
* `RedisModule_GetModuleOptionsAll` - gets the mask of all supported module options so a module
  will be able to check if a given option is supported by the current running Redis instance.

### Background

The following PR is a proposal of handling write operations inside module key space notifications.
After a lot of discussions we came to a conclusion that module should not perform any write
operations on key space notification.

Some examples of issues that such write operation can cause are describe on the following links:

* Bad replication oreder - https://github.com/redis/redis/pull/10969
* Used after free - https://github.com/redis/redis/pull/10969#issuecomment-1223771006
* Used after free - https://github.com/redis/redis/pull/9406#issuecomment-1221684054

There are probably more issues that are yet to be discovered. The underline problem with writing
inside key space notification is that the notification runs synchronously, this means that the notification
code will be executed in the middle on Redis logic (commands logic, eviction, expire).
Redis **do not assume** that the data might change while running the logic and such changes
can crash Redis or cause unexpected behaviour.

The solution is to state that modules **should not** perform any write command inside key space
notification (we can chose whether or not we want to force it). To still cover the use-case where
module wants to perform a write operation as a reaction to key space notifications, we introduce
a new API , `RedisModule_AddPostNotificationJob`, that allows to register a callback that will be
called by Redis when the following conditions hold:

* It is safe to perform any write operation.
* The job will be called atomically along side the operation that triggers it (in our case, key
  space notification).

Module can use this new API to safely perform any write operation and still achieve atomicity
between the notification and the write.

Although currently the API is supported on key space notifications, the API is written in a generic
way so that in the future we will be able to use it on other places (server events for example).

### Technical Details

Whenever a module uses `RedisModule_AddPostNotificationJob` the callback is added to a list
of callbacks (called `modulePostExecUnitJobs`) that need to be invoke after the current execution
unit ends (whether its a command, eviction, or active expire). In order to trigger those callback
atomically with the notification effect, we call those callbacks on `postExecutionUnitOperations`
(which was `propagatePendingCommands` before this PR). The new function fires the post jobs
and then calls `propagatePendingCommands`.

If the callback perform more operations that triggers more key space notifications. Those keys
space notifications might register more callbacks. Those callbacks will be added to the end
of `modulePostExecUnitJobs` list and will be invoke atomically after the current callback ends.
This raises a concerns of entering an infinite loops, we consider infinite loops as a logical bug
that need to be fixed in the module, an attempt to protect against infinite loops by halting the
execution could result in violation of the feature correctness and so **Redis will make no attempt
to protect the module from infinite loops**

In addition, currently key space notifications are not nested. Some modules might want to allow
nesting key-space notifications. To allow that and keep backward compatibility, we introduce a
new module option called `REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS`.
Setting this option will disable the Redis key-space notifications nesting protection and will
pass this responsibility to the module.

### Redis infrastructure

This PR promotes the existing `propagatePendingCommands` to an "Execution Unit" concept,
which is called after each atomic unit of execution,

Co-authored-by: Oran Agra <oran@redislabs.com>
Co-authored-by: Yossi Gottlieb <yossigo@gmail.com>
Co-authored-by: Madelyn Olson <34459052+madolson@users.noreply.github.com>
2022-11-24 19:00:04 +02:00

237 lines
9.6 KiB
C

/* This module is used to test the server post keyspace jobs API.
*
* -----------------------------------------------------------------------------
*
* Copyright (c) 2020, Meir Shpilraien <meir at redislabs dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/* This module allow to verify 'RedisModule_AddPostNotificationJob' by registering to 3
* key space event:
* * STRINGS - the module register to all strings notifications and set post notification job
* that increase a counter indicating how many times the string key was changed.
* In addition, it increase another counter that counts the total changes that
* was made on all strings keys.
* * EXPIRED - the module register to expired event and set post notification job that that
* counts the total number of expired events.
* * EVICTED - the module register to evicted event and set post notification job that that
* counts the total number of evicted events.
*
* In addition, the module register a new command, 'postnotification.async_set', that performs a set
* command from a background thread. This allows to check the 'RedisModule_AddPostNotificationJob' on
* notifications that was triggered on a background thread. */
#define _BSD_SOURCE
#define _DEFAULT_SOURCE /* For usleep */
#include "redismodule.h"
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
static void KeySpace_PostNotificationStringFreePD(void *pd) {
RedisModule_FreeString(NULL, pd);
}
static void KeySpace_PostNotificationReadKey(RedisModuleCtx *ctx, void *pd) {
RedisModuleCallReply* rep = RedisModule_Call(ctx, "get", "!s", pd);
RedisModule_FreeCallReply(rep);
}
static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) {
REDISMODULE_NOT_USED(ctx);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!s", pd);
RedisModule_FreeCallReply(rep);
}
static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
REDISMODULE_NOT_USED(key);
RedisModuleString *new_key = RedisModule_CreateString(NULL, "expired", 7);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
}
static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
REDISMODULE_NOT_USED(key);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "evicted", 7) == 0) {
return REDISMODULE_OK; /* do not count the evicted key */
}
RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
}
static int KeySpace_NotificationString(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "string_", 7) != 0) {
return REDISMODULE_OK;
}
if (strcmp(key_str, "string_total") == 0) {
return REDISMODULE_OK;
}
RedisModuleString *new_key;
if (strncmp(key_str, "string_changed{", 15) == 0) {
new_key = RedisModule_CreateString(NULL, "string_total", 12);
} else {
new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str);
}
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
}
static int KeySpace_LazyExpireInsidePostNotificationJob(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "read_", 5) != 0) {
return REDISMODULE_OK;
}
RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 5, strlen(key_str) - 5);;
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
}
static int KeySpace_NestedNotification(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "write_sync_", 11) != 0) {
return REDISMODULE_OK;
}
/* This test was only meant to check REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS.
* In general it is wrong and discourage to perform any writes inside a notification callback. */
RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 11, strlen(key_str) - 11);;
RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!sc", new_key, "1");
RedisModule_FreeCallReply(rep);
RedisModule_FreeString(NULL, new_key);
return REDISMODULE_OK;
}
static void *KeySpace_PostNotificationsAsyncSetInner(void *arg) {
RedisModuleBlockedClient *bc = arg;
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
RedisModule_ThreadSafeContextLock(ctx);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!cc", "string_x", "1");
RedisModule_ThreadSafeContextUnlock(ctx);
RedisModule_ReplyWithCallReply(ctx, rep);
RedisModule_FreeCallReply(rep);
RedisModule_UnblockClient(bc, NULL);
RedisModule_FreeThreadSafeContext(ctx);
return NULL;
}
static int KeySpace_PostNotificationsAsyncSet(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
if (argc != 1)
return RedisModule_WrongArity(ctx);
pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
if (pthread_create(&tid,NULL,KeySpace_PostNotificationsAsyncSetInner,bc) != 0) {
RedisModule_AbortBlock(bc);
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
}
return REDISMODULE_OK;
}
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx,"postnotifications",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (!(RedisModule_GetModuleOptionsAll() & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS)) {
return REDISMODULE_ERR;
}
RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS);
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationString) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_LazyExpireInsidePostNotificationJob) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NestedNotification) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_EXPIRED, KeySpace_NotificationExpired) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_EVICTED, KeySpace_NotificationEvicted) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
return REDISMODULE_OK;
}
int RedisModule_OnUnload(RedisModuleCtx *ctx) {
REDISMODULE_NOT_USED(ctx);
return REDISMODULE_OK;
}