Create PUSH handlers in redis-cli

Add logic to redis-cli to display RESP3 PUSH messages when we detect
STDOUT is a tty, with an optional command-line argument to override
the default behavior.

The new argument:  --show-pushes <yn>

Examples:

$ redis-cli -3 --show-pushes no
$ echo "client tracking on\nget k1\nset k1 v1"| redis-cli -3 --show-pushes y
This commit is contained in:
michael-grunder 2020-08-06 12:43:56 -07:00
parent 5f536b5d23
commit 81879bc171

View File

@ -234,6 +234,7 @@ static struct config {
int askpass; int askpass;
char *user; char *user;
int output; /* output mode, see OUTPUT_* defines */ int output; /* output mode, see OUTPUT_* defines */
int push_output; /* Should we display spontaneous PUSH replies */
sds mb_delim; sds mb_delim;
char prompt[128]; char prompt[128];
char *eval; char *eval;
@ -267,6 +268,8 @@ static long getLongInfoField(char *info, char *field);
* Utility functions * Utility functions
*--------------------------------------------------------------------------- */ *--------------------------------------------------------------------------- */
static void cliPushHandler(void *, void *);
uint16_t crc16(const char *buf, int len); uint16_t crc16(const char *buf, int len);
static long long ustime(void) { static long long ustime(void) {
@ -873,8 +876,8 @@ static int cliConnect(int flags) {
const char *err = NULL; const char *err = NULL;
if (cliSecureConnection(context, &err) == REDIS_ERR && err) { if (cliSecureConnection(context, &err) == REDIS_ERR && err) {
fprintf(stderr, "Could not negotiate a TLS connection: %s\n", err); fprintf(stderr, "Could not negotiate a TLS connection: %s\n", err);
context = NULL;
redisFree(context); redisFree(context);
context = NULL;
return REDIS_ERR; return REDIS_ERR;
} }
} }
@ -909,6 +912,12 @@ static int cliConnect(int flags) {
if (cliSwitchProto() != REDIS_OK) if (cliSwitchProto() != REDIS_OK)
return REDIS_ERR; return REDIS_ERR;
} }
/* Set a PUSH handler if configured to do so. */
if (config.push_output) {
redisSetPushCallback(context, cliPushHandler);
}
return REDIS_OK; return REDIS_OK;
} }
@ -917,6 +926,31 @@ static void cliPrintContextError(void) {
fprintf(stderr,"Error: %s\n",context->errstr); fprintf(stderr,"Error: %s\n",context->errstr);
} }
static int isInvalidateReply(redisReply *reply) {
return reply->type == REDIS_REPLY_PUSH && reply->elements == 2 &&
reply->element[0]->type == REDIS_REPLY_STRING &&
!strncmp(reply->element[0]->str, "invalidate", 10) &&
reply->element[1]->type == REDIS_REPLY_ARRAY;
}
/* Special display handler for RESP3 'invalidate' messages.
* This function does not validate the reply, so it should
* already be confirmed correct */
static sds cliFormatInvalidateTTY(redisReply *r) {
sds out = sdsnew("-> invalidate: ");
for (size_t i = 0; i < r->element[1]->elements; i++) {
redisReply *key = r->element[1]->element[i];
assert(key->type == REDIS_REPLY_STRING);
out = sdscatfmt(out, "'%s'", key->str, key->len);
if (i < r->element[1]->elements - 1)
out = sdscatlen(out, ", ", 2);
}
return sdscatlen(out, "\n", 1);
}
static sds cliFormatReplyTTY(redisReply *r, char *prefix) { static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
sds out = sdsempty(); sds out = sdsempty();
switch (r->type) { switch (r->type) {
@ -955,6 +989,7 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
case REDIS_REPLY_ARRAY: case REDIS_REPLY_ARRAY:
case REDIS_REPLY_MAP: case REDIS_REPLY_MAP:
case REDIS_REPLY_SET: case REDIS_REPLY_SET:
case REDIS_REPLY_PUSH:
if (r->elements == 0) { if (r->elements == 0) {
if (r->type == REDIS_REPLY_ARRAY) if (r->type == REDIS_REPLY_ARRAY)
out = sdscat(out,"(empty array)\n"); out = sdscat(out,"(empty array)\n");
@ -962,6 +997,8 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
out = sdscat(out,"(empty hash)\n"); out = sdscat(out,"(empty hash)\n");
else if (r->type == REDIS_REPLY_SET) else if (r->type == REDIS_REPLY_SET)
out = sdscat(out,"(empty set)\n"); out = sdscat(out,"(empty set)\n");
else if (r->type == REDIS_REPLY_PUSH)
out = sdscat(out,"(empty push)\n");
else else
out = sdscat(out,"(empty aggregate type)\n"); out = sdscat(out,"(empty aggregate type)\n");
} else { } else {
@ -1113,6 +1150,7 @@ static sds cliFormatReplyRaw(redisReply *r) {
out = sdscatprintf(out,"%s",r->str); out = sdscatprintf(out,"%s",r->str);
break; break;
case REDIS_REPLY_ARRAY: case REDIS_REPLY_ARRAY:
case REDIS_REPLY_PUSH:
for (i = 0; i < r->elements; i++) { for (i = 0; i < r->elements; i++) {
if (i > 0) out = sdscat(out,config.mb_delim); if (i > 0) out = sdscat(out,config.mb_delim);
tmp = cliFormatReplyRaw(r->element[i]); tmp = cliFormatReplyRaw(r->element[i]);
@ -1169,6 +1207,7 @@ static sds cliFormatReplyCSV(redisReply *r) {
out = sdscat(out,r->integer ? "true" : "false"); out = sdscat(out,r->integer ? "true" : "false");
break; break;
case REDIS_REPLY_ARRAY: case REDIS_REPLY_ARRAY:
case REDIS_REPLY_PUSH:
case REDIS_REPLY_MAP: /* CSV has no map type, just output flat list. */ case REDIS_REPLY_MAP: /* CSV has no map type, just output flat list. */
for (i = 0; i < r->elements; i++) { for (i = 0; i < r->elements; i++) {
sds tmp = cliFormatReplyCSV(r->element[i]); sds tmp = cliFormatReplyCSV(r->element[i]);
@ -1184,6 +1223,45 @@ static sds cliFormatReplyCSV(redisReply *r) {
return out; return out;
} }
/* Generate reply strings in various output modes */
static sds cliFormatReply(redisReply *reply, int mode, int verbatim) {
sds out;
if (verbatim) {
out = cliFormatReplyRaw(reply);
} else if (mode == OUTPUT_STANDARD) {
out = cliFormatReplyTTY(reply, "");
} else if (mode == OUTPUT_RAW) {
out = cliFormatReplyRaw(reply);
out = sdscatlen(out, "\n", 1);
} else if (mode == OUTPUT_CSV) {
out = cliFormatReplyCSV(reply);
out = sdscatlen(out, "\n", 1);
} else {
fprintf(stderr, "Error: Unknown output encoding %d\n", mode);
exit(1);
}
return out;
}
/* Output any spontaneous PUSH reply we receive */
static void cliPushHandler(void *privdata, void *reply) {
UNUSED(privdata);
sds out;
if (config.output == OUTPUT_STANDARD && isInvalidateReply(reply)) {
out = cliFormatInvalidateTTY(reply);
} else {
out = cliFormatReply(reply, config.output, 0);
}
fwrite(out, sdslen(out), 1, stdout);
freeReplyObject(reply);
sdsfree(out);
}
static int cliReadReply(int output_raw_strings) { static int cliReadReply(int output_raw_strings) {
void *_reply; void *_reply;
redisReply *reply; redisReply *reply;
@ -1244,19 +1322,7 @@ static int cliReadReply(int output_raw_strings) {
} }
if (output) { if (output) {
if (output_raw_strings) { out = cliFormatReply(reply, config.output, output_raw_strings);
out = cliFormatReplyRaw(reply);
} else {
if (config.output == OUTPUT_RAW) {
out = cliFormatReplyRaw(reply);
out = sdscat(out,"\n");
} else if (config.output == OUTPUT_STANDARD) {
out = cliFormatReplyTTY(reply,"");
} else if (config.output == OUTPUT_CSV) {
out = cliFormatReplyCSV(reply);
out = sdscat(out,"\n");
}
}
fwrite(out,sdslen(out),1,stdout); fwrite(out,sdslen(out),1,stdout);
sdsfree(out); sdsfree(out);
} }
@ -1346,6 +1412,10 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
if (config.pubsub_mode) { if (config.pubsub_mode) {
if (config.output != OUTPUT_RAW) if (config.output != OUTPUT_RAW)
printf("Reading messages... (press Ctrl-C to quit)\n"); printf("Reading messages... (press Ctrl-C to quit)\n");
/* Unset our default PUSH handler so this works in RESP2/RESP3 */
redisSetPushCallback(context, NULL);
while (1) { while (1) {
if (cliReadReply(output_raw) != REDIS_OK) exit(1); if (cliReadReply(output_raw) != REDIS_OK) exit(1);
} }
@ -1626,6 +1696,16 @@ static int parseOptions(int argc, char **argv) {
exit(0); exit(0);
} else if (!strcmp(argv[i],"-3")) { } else if (!strcmp(argv[i],"-3")) {
config.resp3 = 1; config.resp3 = 1;
} else if (!strcmp(argv[i],"--show-pushes") && !lastarg) {
char *argval = argv[++i];
if (!strncasecmp(argval, "n", 1)) {
config.push_output = 0;
} else if (!strncasecmp(argval, "y", 1)) {
config.push_output = 1;
} else {
fprintf(stderr, "Unknown --show-pushes value '%s' "
"(valid: '[y]es', '[n]o')\n", argval);
}
} else if (CLUSTER_MANAGER_MODE() && argv[i][0] != '-') { } else if (CLUSTER_MANAGER_MODE() && argv[i][0] != '-') {
if (config.cluster_manager_command.argc == 0) { if (config.cluster_manager_command.argc == 0) {
int j = i + 1; int j = i + 1;
@ -1734,6 +1814,8 @@ static void usage(void) {
" not a tty).\n" " not a tty).\n"
" --no-raw Force formatted output even when STDOUT is not a tty.\n" " --no-raw Force formatted output even when STDOUT is not a tty.\n"
" --csv Output in CSV format.\n" " --csv Output in CSV format.\n"
" --show-pushes <yn> Whether to print RESP3 PUSH messages. Enabled by default when\n"
" STDOUT is a tty but can be overriden with --show-pushes no.\n"
" --stat Print rolling stats about server: mem, clients, ...\n" " --stat Print rolling stats about server: mem, clients, ...\n"
" --latency Enter a special mode continuously sampling latency.\n" " --latency Enter a special mode continuously sampling latency.\n"
" If you use this mode in an interactive session it runs\n" " If you use this mode in an interactive session it runs\n"
@ -8063,10 +8145,13 @@ int main(int argc, char **argv) {
spectrum_palette = spectrum_palette_color; spectrum_palette = spectrum_palette_color;
spectrum_palette_size = spectrum_palette_color_size; spectrum_palette_size = spectrum_palette_color_size;
if (!isatty(fileno(stdout)) && (getenv("FAKETTY") == NULL)) if (!isatty(fileno(stdout)) && (getenv("FAKETTY") == NULL)) {
config.output = OUTPUT_RAW; config.output = OUTPUT_RAW;
else config.push_output = 0;
} else {
config.output = OUTPUT_STANDARD; config.output = OUTPUT_STANDARD;
config.push_output = 1;
}
config.mb_delim = sdsnew("\n"); config.mb_delim = sdsnew("\n");
firstarg = parseOptions(argc,argv); firstarg = parseOptions(argc,argv);