mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 00:52:38 +00:00
The metric tracks cpu time in micro-seconds, sharing the same value as `INFO COMMANDSTATS`, aggregated under per-slot context. --------- Signed-off-by: Kyle Kim <kimkyle@amazon.com> Signed-off-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
This commit is contained in:
parent
ccafbb750b
commit
5000c050b5
@ -64,6 +64,7 @@
|
||||
#include "slowlog.h"
|
||||
#include "latency.h"
|
||||
#include "monotonic.h"
|
||||
#include "cluster_slot_stats.h"
|
||||
|
||||
/* forward declarations */
|
||||
static void unblockClientWaitingData(client *c);
|
||||
@ -101,10 +102,11 @@ void blockClient(client *c, int btype) {
|
||||
* he will attempt to reprocess the command which will update the statistics.
|
||||
* However in case the client was timed out or in case of module blocked client is being unblocked
|
||||
* the command will not be reprocessed and we need to make stats update.
|
||||
* This function will make updates to the commandstats, slowlog and monitors.*/
|
||||
* This function will make updates to the commandstats, slot-stats, slowlog and monitors.*/
|
||||
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors) {
|
||||
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
|
||||
c->lastcmd->microseconds += total_cmd_duration;
|
||||
clusterSlotStatsAddCpuDuration(c, total_cmd_duration);
|
||||
c->lastcmd->calls++;
|
||||
c->commands_processed++;
|
||||
server.stat_numcommands++;
|
||||
|
@ -35,6 +35,7 @@
|
||||
|
||||
#include "server.h"
|
||||
#include "cluster.h"
|
||||
#include "cluster_slot_stats.h"
|
||||
|
||||
#include <ctype.h>
|
||||
|
||||
@ -1461,3 +1462,12 @@ void readwriteCommand(client *c) {
|
||||
c->flag.readonly = 0;
|
||||
addReply(c, shared.ok);
|
||||
}
|
||||
|
||||
/* Resets transient cluster stats that we expose via INFO or other means that we want
|
||||
* to reset via CONFIG RESETSTAT. The function is also used in order to
|
||||
* initialize these fields in clusterInit() at server startup. */
|
||||
void resetClusterStats(void) {
|
||||
if (!server.cluster_enabled) return;
|
||||
|
||||
clusterSlotStatResetAll();
|
||||
}
|
||||
|
@ -127,4 +127,5 @@ ConnectionType *connTypeOfCluster(void);
|
||||
int isNodeAvailable(clusterNode *node);
|
||||
long long getNodeReplicationOffset(clusterNode *node);
|
||||
sds aggregateClientOutputBuffer(client *c);
|
||||
void resetClusterStats(void);
|
||||
#endif /* __CLUSTER_H */
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include "server.h"
|
||||
#include "cluster.h"
|
||||
#include "cluster_legacy.h"
|
||||
#include "cluster_slot_stats.h"
|
||||
#include "endianconv.h"
|
||||
#include "connection.h"
|
||||
|
||||
@ -1107,6 +1108,7 @@ void clusterInit(void) {
|
||||
clusterUpdateMyselfClientIpV6();
|
||||
clusterUpdateMyselfHostname();
|
||||
clusterUpdateMyselfHumanNodename();
|
||||
resetClusterStats();
|
||||
}
|
||||
|
||||
void clusterInitLast(void) {
|
||||
@ -4993,6 +4995,7 @@ int clusterAddSlot(clusterNode *n, int slot) {
|
||||
clusterNodeSetSlotBit(n, slot);
|
||||
server.cluster->slots[slot] = n;
|
||||
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
|
||||
clusterSlotStatReset(slot);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@ -5011,6 +5014,7 @@ int clusterDelSlot(int slot) {
|
||||
server.cluster->slots[slot] = NULL;
|
||||
/* Make owner_not_claiming_slot flag consistent with slot ownership information. */
|
||||
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
|
||||
clusterSlotStatReset(slot);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
|
@ -329,6 +329,11 @@ struct _clusterNode {
|
||||
Update with updateAndCountChangedNodeHealth(). */
|
||||
};
|
||||
|
||||
/* Struct used for storing slot statistics. */
|
||||
typedef struct slotStat {
|
||||
uint64_t cpu_usec;
|
||||
} slotStat;
|
||||
|
||||
struct clusterState {
|
||||
clusterNode *myself; /* This node */
|
||||
uint64_t currentEpoch;
|
||||
@ -376,6 +381,8 @@ struct clusterState {
|
||||
* stops claiming the slot. This prevents spreading incorrect information (that
|
||||
* source still owns the slot) using UPDATE messages. */
|
||||
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
|
||||
/* Struct used for storing slot statistics, for all slots owned by the current shard. */
|
||||
slotStat slot_stats[CLUSTER_SLOTS];
|
||||
};
|
||||
|
||||
|
||||
|
@ -4,15 +4,11 @@
|
||||
* SPDX-License-Identifier: BSD 3-Clause
|
||||
*/
|
||||
|
||||
#include "server.h"
|
||||
#include "cluster.h"
|
||||
#include "cluster_slot_stats.h"
|
||||
|
||||
#define UNASSIGNED_SLOT 0
|
||||
|
||||
typedef enum {
|
||||
KEY_COUNT,
|
||||
INVALID,
|
||||
} slotStatTypes;
|
||||
typedef enum { KEY_COUNT, CPU_USEC, SLOT_STAT_COUNT, INVALID } slotStatTypes;
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* CLUSTER SLOT-STATS command
|
||||
@ -47,6 +43,8 @@ static uint64_t getSlotStat(int slot, int stat_type) {
|
||||
uint64_t slot_stat = 0;
|
||||
if (stat_type == KEY_COUNT) {
|
||||
slot_stat = countKeysInSlot(slot);
|
||||
} else if (stat_type == CPU_USEC) {
|
||||
slot_stat = server.cluster->slot_stats[slot].cpu_usec;
|
||||
}
|
||||
return slot_stat;
|
||||
}
|
||||
@ -88,9 +86,17 @@ static void addReplySlotStat(client *c, int slot) {
|
||||
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
|
||||
* and 1st index represents (map) usage statistics. */
|
||||
addReplyLongLong(c, slot);
|
||||
addReplyMapLen(c, 1); /* Nested map representing slot usage statistics. */
|
||||
addReplyMapLen(c, (server.cluster_slot_stats_enabled) ? SLOT_STAT_COUNT
|
||||
: 1); /* Nested map representing slot usage statistics. */
|
||||
addReplyBulkCString(c, "key-count");
|
||||
addReplyLongLong(c, countKeysInSlot(slot));
|
||||
|
||||
/* Any additional metrics aside from key-count come with a performance trade-off,
|
||||
* and are aggregated and returned based on its server config. */
|
||||
if (server.cluster_slot_stats_enabled) {
|
||||
addReplyBulkCString(c, "cpu-usec");
|
||||
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
|
||||
}
|
||||
}
|
||||
|
||||
/* Adds reply for the SLOTSRANGE variant.
|
||||
@ -121,6 +127,46 @@ static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
|
||||
addReplySortedSlotStats(c, slot_stats, limit);
|
||||
}
|
||||
|
||||
/* Resets applicable slot statistics. */
|
||||
void clusterSlotStatReset(int slot) {
|
||||
/* key-count is exempt, as it is queried separately through `countKeysInSlot()`. */
|
||||
memset(&server.cluster->slot_stats[slot], 0, sizeof(slotStat));
|
||||
}
|
||||
|
||||
void clusterSlotStatResetAll(void) {
|
||||
memset(server.cluster->slot_stats, 0, sizeof(server.cluster->slot_stats));
|
||||
}
|
||||
|
||||
/* For cpu-usec accumulation, nested commands within EXEC, EVAL, FCALL are skipped.
|
||||
* This is due to their unique callstack, where the c->duration for
|
||||
* EXEC, EVAL and FCALL already includes all of its nested commands.
|
||||
* Meaning, the accumulation of cpu-usec for these nested commands
|
||||
* would equate to repeating the same calculation twice.
|
||||
*/
|
||||
static int canAddCpuDuration(client *c) {
|
||||
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
|
||||
server.cluster_enabled && /* Cluster mode should be enabled. */
|
||||
c->slot != -1 && /* Command should be slot specific. */
|
||||
(!server.execution_nesting || /* Either; */
|
||||
(server.execution_nesting && /* 1) Command should not be nested, or */
|
||||
c->realcmd->flags & CMD_BLOCKING)); /* 2) If command is nested, it must be due to unblocking. */
|
||||
}
|
||||
|
||||
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) {
|
||||
if (!canAddCpuDuration(c)) return;
|
||||
|
||||
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
|
||||
server.cluster->slot_stats[c->slot].cpu_usec += duration;
|
||||
}
|
||||
|
||||
/* For cross-slot scripting, its caller client's slot must be invalidated,
|
||||
* such that its slot-stats aggregation is bypassed. */
|
||||
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
|
||||
if (!(ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) return;
|
||||
|
||||
ctx->original_client->slot = -1;
|
||||
}
|
||||
|
||||
void clusterSlotStatsCommand(client *c) {
|
||||
if (server.cluster_enabled == 0) {
|
||||
addReplyError(c, "This instance has cluster support disabled");
|
||||
@ -149,8 +195,10 @@ void clusterSlotStatsCommand(client *c) {
|
||||
int desc = 1, order_by = INVALID;
|
||||
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
|
||||
order_by = KEY_COUNT;
|
||||
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
|
||||
order_by = CPU_USEC;
|
||||
} else {
|
||||
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count.");
|
||||
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
|
||||
return;
|
||||
}
|
||||
int i = 4; /* Next argument index, following ORDERBY */
|
||||
|
9
src/cluster_slot_stats.h
Normal file
9
src/cluster_slot_stats.h
Normal file
@ -0,0 +1,9 @@
|
||||
#include "server.h"
|
||||
#include "cluster.h"
|
||||
#include "script.h"
|
||||
#include "cluster_legacy.h"
|
||||
|
||||
void clusterSlotStatReset(int slot);
|
||||
void clusterSlotStatResetAll(void);
|
||||
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
|
||||
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx);
|
@ -35,6 +35,9 @@
|
||||
"properties": {
|
||||
"key-count": {
|
||||
"type": "integer"
|
||||
},
|
||||
"cpu-usec": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3105,6 +3105,7 @@ standardConfig static_configs[] = {
|
||||
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
|
||||
createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat),
|
||||
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
|
||||
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
|
||||
|
||||
/* String Configs */
|
||||
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
|
||||
@ -3420,6 +3421,7 @@ void configHelpCommand(client *c) {
|
||||
|
||||
void configResetStatCommand(client *c) {
|
||||
resetServerStats();
|
||||
resetClusterStats();
|
||||
resetCommandTableStats(server.commands);
|
||||
resetErrorTableStats();
|
||||
addReply(c, shared.ok);
|
||||
|
@ -30,6 +30,7 @@
|
||||
#include "server.h"
|
||||
#include "script.h"
|
||||
#include "cluster.h"
|
||||
#include "cluster_slot_stats.h"
|
||||
|
||||
scriptFlag scripts_flags_def[] = {
|
||||
{.flag = SCRIPT_FLAG_NO_WRITES, .str = "no-writes"},
|
||||
@ -583,6 +584,7 @@ void scriptCall(scriptRunCtx *run_ctx, sds *err) {
|
||||
}
|
||||
call(c, call_flags);
|
||||
serverAssert(c->flag.blocked == 0);
|
||||
clusterSlotStatsInvalidateSlotIfApplicable(run_ctx);
|
||||
return;
|
||||
|
||||
error:
|
||||
|
@ -30,6 +30,7 @@
|
||||
#include "server.h"
|
||||
#include "monotonic.h"
|
||||
#include "cluster.h"
|
||||
#include "cluster_slot_stats.h"
|
||||
#include "slowlog.h"
|
||||
#include "bio.h"
|
||||
#include "latency.h"
|
||||
@ -3545,13 +3546,14 @@ void call(client *c, int flags) {
|
||||
* If the client is blocked we will handle slowlog when it is unblocked. */
|
||||
if (!c->flag.blocked) freeClientOriginalArgv(c);
|
||||
|
||||
/* populate the per-command statistics that we show in INFO commandstats.
|
||||
* If the client is blocked we will handle latency stats and duration when it is unblocked. */
|
||||
/* Populate the per-command and per-slot statistics that we show in INFO commandstats and CLUSTER SLOT-STATS,
|
||||
* respectively. If the client is blocked we will handle latency stats and duration when it is unblocked. */
|
||||
if (update_command_stats && !c->flag.blocked) {
|
||||
real_cmd->calls++;
|
||||
real_cmd->microseconds += c->duration;
|
||||
if (server.latency_tracking_enabled && !c->flag.blocked)
|
||||
updateCommandLatencyHistogram(&(real_cmd->latency_histogram), c->duration * 1000);
|
||||
clusterSlotStatsAddCpuDuration(c, c->duration);
|
||||
}
|
||||
|
||||
/* The duration needs to be reset after each call except for a blocked command,
|
||||
|
@ -2154,6 +2154,7 @@ struct valkeyServer {
|
||||
* dropping packets of a specific type */
|
||||
unsigned long cluster_blacklist_ttl; /* Duration in seconds that a node is denied re-entry into
|
||||
* the cluster after it is forgotten with CLUSTER FORGET. */
|
||||
int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */
|
||||
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
|
||||
uint32_t debug_cluster_close_link_on_packet_drop : 1;
|
||||
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */
|
||||
|
@ -16,6 +16,12 @@ proc convert_array_into_dict {slot_stats} {
|
||||
return $res
|
||||
}
|
||||
|
||||
proc get_cmdstat_usec {cmd r} {
|
||||
set cmdstatline [cmdrstat $cmd r]
|
||||
regexp "usec=(.*?),usec_per_call=(.*?),rejected_calls=0,failed_calls=0" $cmdstatline -> usec _
|
||||
return $usec
|
||||
}
|
||||
|
||||
proc initialize_expected_slots_dict {} {
|
||||
set expected_slots [dict create]
|
||||
for {set i 0} {$i < 16384} {incr i 1} {
|
||||
@ -33,21 +39,45 @@ proc initialize_expected_slots_dict_with_range {start_slot end_slot} {
|
||||
return $expected_slots
|
||||
}
|
||||
|
||||
proc assert_empty_slot_stats {slot_stats} {
|
||||
proc assert_empty_slot_stats {slot_stats metrics_to_assert} {
|
||||
set slot_stats [convert_array_into_dict $slot_stats]
|
||||
dict for {slot stats} $slot_stats {
|
||||
assert {[dict get $stats key-count] == 0}
|
||||
foreach metric_name $metrics_to_assert {
|
||||
set metric_value [dict get $stats $metric_name]
|
||||
assert {$metric_value == 0}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proc assert_empty_slot_stats_with_exception {slot_stats exception_slots} {
|
||||
proc assert_empty_slot_stats_with_exception {slot_stats exception_slots metrics_to_assert} {
|
||||
set slot_stats [convert_array_into_dict $slot_stats]
|
||||
dict for {slot stats} $exception_slots {
|
||||
assert {[dict exists $slot_stats $slot]} ;# slot_stats must contain the expected slots.
|
||||
}
|
||||
dict for {slot stats} $slot_stats {
|
||||
if {[dict exists $exception_slots $slot]} {
|
||||
set expected_key_count [dict get $exception_slots $slot]
|
||||
assert {[dict get $stats key-count] == $expected_key_count}
|
||||
foreach metric_name $metrics_to_assert {
|
||||
set metric_value [dict get $exception_slots $slot $metric_name]
|
||||
assert {[dict get $stats $metric_name] == $metric_value}
|
||||
}
|
||||
} else {
|
||||
assert {[dict get $stats key-count] == 0}
|
||||
dict for {metric value} $stats {
|
||||
assert {$value == 0}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 metrics_to_assert} {
|
||||
set slot_stats_1 [convert_array_into_dict $slot_stats_1]
|
||||
set slot_stats_2 [convert_array_into_dict $slot_stats_2]
|
||||
assert {[dict size $slot_stats_1] == [dict size $slot_stats_2]}
|
||||
|
||||
dict for {slot stats_1} $slot_stats_1 {
|
||||
assert {[dict exists $slot_stats_2 $slot]}
|
||||
set stats_2 [dict get $slot_stats_2 $slot]
|
||||
foreach metric_name $metrics_to_assert {
|
||||
assert {[dict get $stats_1 $metric_name] == [dict get $stats_2 $metric_name]}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -115,7 +145,209 @@ proc wait_for_replica_key_exists {key key_count} {
|
||||
}
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Test cases for CLUSTER SLOT-STATS correctness, without additional arguments.
|
||||
# Test cases for CLUSTER SLOT-STATS cpu-usec metric correctness.
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
|
||||
|
||||
# Define shared variables.
|
||||
set key "FOO"
|
||||
set key_slot [R 0 cluster keyslot $key]
|
||||
set key_secondary "FOO2"
|
||||
set key_secondary_slot [R 0 cluster keyslot $key_secondary]
|
||||
set metrics_to_assert [list cpu-usec]
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec reset upon CONFIG RESETSTAT." {
|
||||
R 0 SET $key VALUE
|
||||
R 0 DEL $key
|
||||
R 0 CONFIG RESETSTAT
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec reset upon slot migration." {
|
||||
R 0 SET $key VALUE
|
||||
|
||||
R 0 CLUSTER DELSLOTS $key_slot
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
|
||||
R 0 CLUSTER ADDSLOTS $key_slot
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec for non-slot specific commands." {
|
||||
R 0 INFO
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec for slot specific commands." {
|
||||
R 0 SET $key VALUE
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set usec [get_cmdstat_usec set r]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create cpu-usec $usec
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec for blocking commands, unblocked on keyspace update." {
|
||||
# Blocking command with no timeout. Only keyspace update can unblock this client.
|
||||
set rd [valkey_deferring_client]
|
||||
$rd BLPOP $key 0
|
||||
wait_for_blocked_clients_count 1
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
# When the client is blocked, no accumulation is made. This behaviour is identical to INFO COMMANDSTATS.
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
|
||||
# Unblocking command.
|
||||
R 0 LPUSH $key value
|
||||
wait_for_blocked_clients_count 0
|
||||
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set lpush_usec [get_cmdstat_usec lpush r]
|
||||
set blpop_usec [get_cmdstat_usec blpop r]
|
||||
|
||||
# Assert that both blocking and non-blocking command times have been accumulated.
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create cpu-usec [expr $lpush_usec + $blpop_usec]
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec for blocking commands, unblocked on timeout." {
|
||||
# Blocking command with 1 second timeout.
|
||||
set rd [valkey_deferring_client]
|
||||
$rd BLPOP $key 1
|
||||
|
||||
# Confirm that the client is blocked, then unblocked after 1 second timeout.
|
||||
wait_for_blocked_clients_count 1
|
||||
wait_for_blocked_clients_count 0
|
||||
|
||||
# Assert that the blocking command time has been accumulated.
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set blpop_usec [get_cmdstat_usec blpop r]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create cpu-usec $blpop_usec
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec for transactions." {
|
||||
set r1 [valkey_client]
|
||||
$r1 MULTI
|
||||
$r1 SET $key value
|
||||
$r1 GET $key
|
||||
|
||||
# CPU metric is not accumulated until EXEC is reached. This behaviour is identical to INFO COMMANDSTATS.
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
|
||||
# Execute transaction, and assert that all nested command times have been accumulated.
|
||||
$r1 EXEC
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set exec_usec [get_cmdstat_usec exec r]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create cpu-usec $exec_usec
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec for lua-scripts, without cross-slot keys." {
|
||||
r eval [format "#!lua
|
||||
redis.call('set', '%s', 'bar'); redis.call('get', '%s')" $key $key] 0
|
||||
|
||||
set eval_usec [get_cmdstat_usec eval r]
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create cpu-usec $eval_usec
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec for lua-scripts, with cross-slot keys." {
|
||||
r eval [format "#!lua flags=allow-cross-slot-keys
|
||||
redis.call('set', '%s', 'bar'); redis.call('get', '%s');
|
||||
" $key $key_secondary] 0
|
||||
|
||||
# For cross-slot, we do not accumulate at all.
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec for functions, without cross-slot keys." {
|
||||
set function_str [format "#!lua name=f1
|
||||
server.register_function{
|
||||
function_name='f1',
|
||||
callback=function() redis.call('set', '%s', '1') redis.call('get', '%s') end
|
||||
}" $key $key]
|
||||
r function load replace $function_str
|
||||
r fcall f1 0
|
||||
|
||||
set fcall_usec [get_cmdstat_usec fcall r]
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create cpu-usec $fcall_usec
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS cpu-usec for functions, with cross-slot keys." {
|
||||
set function_str [format "#!lua name=f1
|
||||
server.register_function{
|
||||
function_name='f1',
|
||||
callback=function() redis.call('set', '%s', '1') redis.call('get', '%s') end,
|
||||
flags={'allow-cross-slot-keys'}
|
||||
}" $key $key_secondary]
|
||||
r function load replace $function_str
|
||||
r fcall f1 0
|
||||
|
||||
# For cross-slot, we do not accumulate at all.
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
}
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Test cases for CLUSTER SLOT-STATS key-count metric correctness.
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
@ -123,34 +355,39 @@ start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
# Define shared variables.
|
||||
set key "FOO"
|
||||
set key_slot [R 0 cluster keyslot $key]
|
||||
set expected_slots_to_key_count [dict create $key_slot 1]
|
||||
set metrics_to_assert [list key-count]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create key-count 1
|
||||
]
|
||||
]
|
||||
|
||||
test "CLUSTER SLOT-STATS contains default value upon valkey-server startup" {
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS contains correct metrics upon key introduction" {
|
||||
R 0 SET $key TEST
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slots_to_key_count
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS contains correct metrics upon key mutation" {
|
||||
R 0 SET $key NEW_VALUE
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slots_to_key_count
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS contains correct metrics upon key deletion" {
|
||||
R 0 DEL $key
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS slot visibility based on slot ownership changes" {
|
||||
R 0 CONFIG SET cluster-require-full-coverage no
|
||||
|
||||
|
||||
R 0 CLUSTER DELSLOTS $key_slot
|
||||
set expected_slots [initialize_expected_slots_dict]
|
||||
dict unset expected_slots $key_slot
|
||||
@ -266,18 +503,33 @@ start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
assert_slot_visibility $slot_stats_desc $expected_slots
|
||||
assert_slot_visibility $slot_stats_asc $expected_slots
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS ORDERBY unsupported sort metric." {
|
||||
set orderby "non-existent-metric"
|
||||
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
|
||||
|
||||
# When cluster-slot-stats-enabled config is disabled, you cannot sort using advanced metrics.
|
||||
set orderby "cpu-usec"
|
||||
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
|
||||
}
|
||||
}
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Test cases for CLUSTER SLOT-STATS replication.
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
start_cluster 1 1 {tags {external:skip cluster}} {
|
||||
|
||||
start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
|
||||
|
||||
# Define shared variables.
|
||||
set key "FOO"
|
||||
set key_slot [R 0 CLUSTER KEYSLOT $key]
|
||||
|
||||
# For replication, only those metrics that are deterministic upon replication are asserted.
|
||||
# * key-count is asserted, as both the primary and its replica must hold the same number of keys.
|
||||
# * cpu-usec is not asserted, as its micro-seconds command duration is not guaranteed to be exact
|
||||
# between the primary and its replica.
|
||||
set metrics_to_assert [list key-count]
|
||||
|
||||
# Setup replication.
|
||||
assert {[s -1 role] eq {slave}}
|
||||
wait_for_condition 1000 50 {
|
||||
@ -296,7 +548,7 @@ start_cluster 1 1 {tags {external:skip cluster}} {
|
||||
wait_for_replica_key_exists $key 1
|
||||
|
||||
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert {$slot_stats_master eq $slot_stats_replica}
|
||||
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS key-count replication for existing keys" {
|
||||
@ -308,7 +560,7 @@ start_cluster 1 1 {tags {external:skip cluster}} {
|
||||
wait_for_replica_key_exists $key 1
|
||||
|
||||
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert {$slot_stats_master eq $slot_stats_replica}
|
||||
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS key-count replication for deleting keys" {
|
||||
@ -320,6 +572,6 @@ start_cluster 1 1 {tags {external:skip cluster}} {
|
||||
wait_for_replica_key_exists $key 0
|
||||
|
||||
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert {$slot_stats_master eq $slot_stats_replica}
|
||||
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
|
||||
}
|
||||
}
|
10
valkey.conf
10
valkey.conf
@ -1788,6 +1788,16 @@ aof-timestamp-enabled no
|
||||
#
|
||||
# cluster-blacklist-ttl 60
|
||||
|
||||
# Clusters can be configured to track per-slot resource statistics,
|
||||
# which are accessible by the CLUSTER SLOT-STATS command.
|
||||
#
|
||||
# By default, the 'cluster-slot-stats-enabled' is disabled, and only 'key-count' is captured.
|
||||
# By enabling the 'cluster-slot-stats-enabled' config, the cluster will begin to capture advanced statistics.
|
||||
# These statistics can be leveraged to assess general slot usage trends, identify hot / cold slots,
|
||||
# migrate slots for a balanced cluster workload, and / or re-write application logic to better utilize slots.
|
||||
#
|
||||
# cluster-slot-stats-enabled no
|
||||
|
||||
# In order to setup your cluster make sure to read the documentation
|
||||
# available at https://valkey.io web site.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user