diff --git a/deps/linenoise/linenoise.c b/deps/linenoise/linenoise.c index 1b01622c5..dd86abe86 100644 --- a/deps/linenoise/linenoise.c +++ b/deps/linenoise/linenoise.c @@ -163,6 +163,7 @@ enum KEY_ACTION{ CTRL_F = 6, /* Ctrl-f */ CTRL_H = 8, /* Ctrl-h */ TAB = 9, /* Tab */ + NL = 10, /* Enter typed before raw mode was enabled */ CTRL_K = 11, /* Ctrl+k */ CTRL_L = 12, /* Ctrl+l */ ENTER = 13, /* Enter */ @@ -256,8 +257,8 @@ static int enableRawMode(int fd) { * We want read to return every single byte, without timeout. */ raw.c_cc[VMIN] = 1; raw.c_cc[VTIME] = 0; /* 1 byte, no timer */ - /* put terminal in raw mode after flushing */ - if (tcsetattr(fd,TCSAFLUSH,&raw) < 0) goto fatal; + /* put terminal in raw mode */ + if (tcsetattr(fd,TCSANOW,&raw) < 0) goto fatal; rawmode = 1; return 0; @@ -268,7 +269,7 @@ fatal: static void disableRawMode(int fd) { /* Don't even check the return value as it's too late. */ - if (rawmode && tcsetattr(fd,TCSAFLUSH,&orig_termios) != -1) + if (rawmode && tcsetattr(fd,TCSANOW,&orig_termios) != -1) rawmode = 0; } @@ -840,6 +841,8 @@ static int linenoiseEdit(int stdin_fd, int stdout_fd, char *buf, size_t buflen, } switch(c) { + case NL: /* enter, typed before raw mode was enabled */ + break; case ENTER: /* enter */ history_len--; free(history[history_len]); diff --git a/src/redis-cli.c b/src/redis-cli.c index 964ae59ef..d8e6b966a 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -45,6 +45,7 @@ #include #include #include +#include #include #ifdef USE_OPENSSL @@ -172,6 +173,9 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253 int *spectrum_palette; int spectrum_palette_size; +static int orig_termios_saved = 0; +static struct termios orig_termios; /* To restore terminal at exit.*/ + /* Dict Helpers */ static uint64_t dictSdsHash(const void *key); static int dictSdsKeyCompare(dict *d, const void *key1, @@ -267,12 +271,14 @@ static struct config { int eval_ldb_end; /* Lua debugging session ended. */ int enable_ldb_on_eval; /* Handle manual SCRIPT DEBUG + EVAL commands. */ int last_cmd_type; + redisReply *last_reply; int verbose; int set_errcode; clusterManagerCommand cluster_manager_command; int no_auth_warning; - int resp2; + int resp2; /* value of 1: specified explicitly with option -2 */ int resp3; /* value of 1: specified explicitly, value of 2: implicit like --json option */ + int current_resp3; /* 1 if we have RESP3 right now in the current connection. */ int in_multi; int pre_multi_dbnum; } config; @@ -335,6 +341,9 @@ static void cliRefreshPrompt(void) { if (config.in_multi) prompt = sdscatlen(prompt,"(TX)",4); + if (config.pubsub_mode) + prompt = sdscatfmt(prompt,"(subscribed mode)"); + /* Copy the prompt in the static buffer. */ prompt = sdscatlen(prompt,"> ",2); snprintf(config.prompt,sizeof(config.prompt),"%s",prompt); @@ -1016,6 +1025,29 @@ static void freeHintsCallback(void *ptr) { sdsfree(ptr); } +/*------------------------------------------------------------------------------ + * TTY manipulation + *--------------------------------------------------------------------------- */ + +/* Restore terminal if we've changed it. */ +void cliRestoreTTY(void) { + if (orig_termios_saved) + tcsetattr(STDIN_FILENO, TCSANOW, &orig_termios); +} + +/* Put the terminal in "press any key" mode */ +static void cliPressAnyKeyTTY(void) { + if (!isatty(STDIN_FILENO)) return; + if (!orig_termios_saved) { + if (tcgetattr(STDIN_FILENO, &orig_termios) == -1) return; + atexit(cliRestoreTTY); + orig_termios_saved = 1; + } + struct termios mode = orig_termios; + mode.c_lflag &= ~(ECHO | ICANON); /* echoing off, canonical off */ + tcsetattr(STDIN_FILENO, TCSANOW, &mode); +} + /*------------------------------------------------------------------------------ * Networking / parsing *--------------------------------------------------------------------------- */ @@ -1088,6 +1120,7 @@ static int cliSwitchProto(void) { } } freeReplyObject(reply); + config.current_resp3 = 1; return result; } @@ -1147,6 +1180,9 @@ static int cliConnect(int flags) { * errors. */ anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL); + /* State of the current connection. */ + config.current_resp3 = 0; + /* Do AUTH, select the right DB, switch to RESP3 if needed. */ if (cliAuth(context, config.conn_info.user, config.conn_info.auth) != REDIS_OK) return REDIS_ERR; @@ -1309,6 +1345,8 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) { char numsep; if (r->type == REDIS_REPLY_SET) numsep = '~'; else if (r->type == REDIS_REPLY_MAP) numsep = '#'; + /* TODO: this would be a breaking change for scripts, do that in a major version. */ + /* else if (r->type == REDIS_REPLY_PUSH) numsep = '>'; */ else numsep = ')'; snprintf(_prefixfmt,sizeof(_prefixfmt),"%%s%%%ud%c ",idxlen,numsep); @@ -1351,6 +1389,25 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) { return out; } +/* Returns 1 if the reply is a pubsub pushed reply. */ +int isPubsubPush(redisReply *r) { + if (r == NULL || + r->type != (config.current_resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) || + r->elements < 3 || + r->element[0]->type != REDIS_REPLY_STRING) + { + return 0; + } + char *str = r->element[0]->str; + size_t len = r->element[0]->len; + /* Check if it is [p|s][un]subscribe or [p|s]message, but even simpler, we + * just check that it ends with "message" or "subscribe". */ + return ((len >= strlen("message") && + !strcmp(str + len - strlen("message"), "message")) || + (len >= strlen("subscribe") && + !strcmp(str + len - strlen("subscribe"), "subscribe"))); +} + int isColorTerm(void) { char *t = getenv("TERM"); return t != NULL && strstr(t,"xterm") != NULL; @@ -1656,6 +1713,11 @@ static int cliReadReply(int output_raw_strings) { sds out = NULL; int output = 1; + if (config.last_reply) { + freeReplyObject(config.last_reply); + config.last_reply = NULL; + } + if (redisGetReply(context,&_reply) != REDIS_OK) { if (config.blocking_state_aborted) { config.blocking_state_aborted = 0; @@ -1682,7 +1744,7 @@ static int cliReadReply(int output_raw_strings) { return REDIS_ERR; /* avoid compiler warning */ } - reply = (redisReply*)_reply; + config.last_reply = reply = (redisReply*)_reply; config.last_cmd_type = reply->type; @@ -1731,15 +1793,78 @@ static int cliReadReply(int output_raw_strings) { fflush(stdout); sdsfree(out); } - freeReplyObject(reply); return REDIS_OK; } +/* Simultaneously wait for pubsub messages from redis and input on stdin. */ +static void cliWaitForMessagesOrStdin() { + int show_info = config.output != OUTPUT_RAW && (isatty(STDOUT_FILENO) || + getenv("FAKETTY")); + int use_color = show_info && isColorTerm(); + cliPressAnyKeyTTY(); + while (config.pubsub_mode) { + /* First check if there are any buffered replies. */ + redisReply *reply; + do { + if (redisGetReplyFromReader(context, (void **)&reply) != REDIS_OK) { + cliPrintContextError(); + exit(1); + } + if (reply) { + sds out = cliFormatReply(reply, config.output, 0); + fwrite(out,sdslen(out),1,stdout); + fflush(stdout); + sdsfree(out); + } + } while(reply); + + /* Wait for input, either on the Redis socket or on stdin. */ + struct timeval tv; + fd_set readfds; + FD_ZERO(&readfds); + FD_SET(context->fd, &readfds); + FD_SET(STDIN_FILENO, &readfds); + tv.tv_sec = 5; + tv.tv_usec = 0; + if (show_info) { + if (use_color) printf("\033[1;90m"); /* Bold, bright color. */ + printf("Reading messages... (press Ctrl-C to quit or any key to type command)\r"); + if (use_color) printf("\033[0m"); /* Reset color. */ + fflush(stdout); + } + select(context->fd + 1, &readfds, NULL, NULL, &tv); + if (show_info) { + printf("\033[K"); /* Erase current line */ + fflush(stdout); + } + if (config.blocking_state_aborted) { + /* Ctrl-C pressed */ + config.blocking_state_aborted = 0; + config.pubsub_mode = 0; + if (cliConnect(CC_FORCE) != REDIS_OK) { + cliPrintContextError(); + exit(1); + } + break; + } else if (FD_ISSET(context->fd, &readfds)) { + /* Message from Redis */ + if (cliReadReply(0) != REDIS_OK) { + cliPrintContextError(); + exit(1); + } + fflush(stdout); + } else if (FD_ISSET(STDIN_FILENO, &readfds)) { + /* Any key pressed */ + break; + } + } + cliRestoreTTY(); +} + static int cliSendCommand(int argc, char **argv, long repeat) { char *command = argv[0]; size_t *argvlen; int j, output_raw; - int is_unsubscribe_command = 0; /* Is it an unsubscribe related command? */ if (context == NULL) return REDIS_ERR; @@ -1775,12 +1900,12 @@ static int cliSendCommand(int argc, char **argv, long repeat) { if (!strcasecmp(command,"shutdown")) config.shutdown = 1; if (!strcasecmp(command,"monitor")) config.monitor_mode = 1; - if (!strcasecmp(command,"subscribe") || - !strcasecmp(command,"psubscribe") || - !strcasecmp(command,"ssubscribe")) config.pubsub_mode = 1; - if (!strcasecmp(command,"unsubscribe") || - !strcasecmp(command,"punsubscribe") || - !strcasecmp(command,"sunsubscribe")) is_unsubscribe_command = 1; + int is_subscribe = (!strcasecmp(command, "subscribe") || + !strcasecmp(command, "psubscribe") || + !strcasecmp(command, "ssubscribe")); + int is_unsubscribe = (!strcasecmp(command, "unsubscribe") || + !strcasecmp(command, "punsubscribe") || + !strcasecmp(command, "sunsubscribe")); if (!strcasecmp(command,"sync") || !strcasecmp(command,"psync")) config.slave_mode = 1; @@ -1812,21 +1937,6 @@ static int cliSendCommand(int argc, char **argv, long repeat) { while(repeat < 0 || repeat-- > 0) { redisAppendCommandArgv(context,argc,(const char**)argv,argvlen); - if (is_unsubscribe_command) { - /* In unsubscribe related commands, we need to read the specified - * number of replies according to the number of parameters. */ - argc--; /* Skip the command */ - do { - if (cliReadReply(output_raw) != REDIS_OK) { - cliPrintContextError(); - exit(1); - } - fflush(stdout); - } while(--argc); - zfree(argvlen); - continue; - } - if (config.monitor_mode) { do { if (cliReadReply(output_raw) != REDIS_OK) { @@ -1843,27 +1953,15 @@ static int cliSendCommand(int argc, char **argv, long repeat) { return REDIS_OK; } - if (config.pubsub_mode) { - if (config.output != OUTPUT_RAW) - printf("Reading messages... (press Ctrl-C to quit)\n"); - + int num_expected_pubsub_push = 0; + if (is_subscribe || is_unsubscribe) { + /* When a push callback is set, redisGetReply (hiredis) loops until + * an in-band message is received, but these commands are confirmed + * using push replies only. There is one push reply per channel if + * channels are specified, otherwise at least one. */ + num_expected_pubsub_push = argc > 1 ? argc - 1 : 1; /* Unset our default PUSH handler so this works in RESP2/RESP3 */ redisSetPushCallback(context, NULL); - - while (config.pubsub_mode) { - if (cliReadReply(output_raw) != REDIS_OK) { - cliPrintContextError(); - exit(1); - } - fflush(stdout); /* Make it grep friendly */ - if (!config.pubsub_mode || config.last_cmd_type == REDIS_REPLY_ERROR) { - if (config.push_output) { - redisSetPushCallback(context, cliPushHandler); - } - config.pubsub_mode = 0; - } - } - continue; } if (config.slave_mode) { @@ -1874,10 +1972,35 @@ static int cliSendCommand(int argc, char **argv, long repeat) { return REDIS_ERR; /* Error = slaveMode lost connection to master */ } - if (cliReadReply(output_raw) != REDIS_OK) { - zfree(argvlen); - return REDIS_ERR; - } else { + /* Read response, possibly skipping pubsub/push messages. */ + while (1) { + if (cliReadReply(output_raw) != REDIS_OK) { + zfree(argvlen); + return REDIS_ERR; + } + fflush(stdout); + if (config.pubsub_mode || num_expected_pubsub_push > 0) { + if (isPubsubPush(config.last_reply)) { + if (num_expected_pubsub_push > 0 && + !strcasecmp(config.last_reply->element[0]->str, command)) + { + /* This pushed message confirms the + * [p|s][un]subscribe command. */ + if (is_subscribe && !config.pubsub_mode) { + config.pubsub_mode = 1; + cliRefreshPrompt(); + } + if (--num_expected_pubsub_push > 0) { + continue; /* We need more of these. */ + } + } else { + continue; /* Skip this pubsub message. */ + } + } else if (config.last_reply->type == REDIS_REPLY_PUSH) { + continue; /* Skip other push message. */ + } + } + /* Store database number when SELECT was successfully executed. */ if (!strcasecmp(command,"select") && argc == 2 && config.last_cmd_type != REDIS_REPLY_ERROR) @@ -1911,9 +2034,25 @@ static int cliSendCommand(int argc, char **argv, long repeat) { config.in_multi = 0; config.dbnum = 0; config.conn_info.input_dbnum = 0; - config.resp3 = 0; + config.current_resp3 = 0; + if (config.pubsub_mode && config.push_output) { + redisSetPushCallback(context, cliPushHandler); + } + config.pubsub_mode = 0; cliRefreshPrompt(); + } else if (!strcasecmp(command,"hello")) { + if (config.last_cmd_type == REDIS_REPLY_MAP) { + config.current_resp3 = 1; + } else if (config.last_cmd_type == REDIS_REPLY_ARRAY) { + config.current_resp3 = 0; + } + } else if ((is_subscribe || is_unsubscribe) && !config.pubsub_mode) { + /* We didn't enter pubsub mode. Restore push callback. */ + if (config.push_output) + redisSetPushCallback(context, cliPushHandler); } + + break; } if (config.cluster_reissue_command){ /* If we need to reissue the command, break to prevent a @@ -2664,8 +2803,17 @@ static void repl(void) { } cliRefreshPrompt(); - while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) { - if (line[0] != '\0') { + while(1) { + line = linenoise(context ? config.prompt : "not connected> "); + if (line == NULL) { + /* ^C, ^D or similar. */ + if (config.pubsub_mode) { + config.pubsub_mode = 0; + if (cliConnect(CC_FORCE) == REDIS_OK) + continue; + } + break; + } else if (line[0] != '\0') { long repeat = 1; int skipargs = 0; char *endptr = NULL; @@ -2759,6 +2907,11 @@ static void repl(void) { /* Free the argument vector */ sdsfreesplitres(argv,argc); } + + if (config.pubsub_mode) { + cliWaitForMessagesOrStdin(); + } + /* linenoise() returns malloc-ed lines like readline() */ linenoiseFree(line); } @@ -2799,6 +2952,13 @@ static int noninteractive(int argc, char **argv) { retval = issueCommand(argc, sds_args); sdsfreesplitres(sds_args, argc); + while (config.pubsub_mode) { + if (cliReadReply(0) != REDIS_OK) { + cliPrintContextError(); + exit(1); + } + fflush(stdout); + } return retval == REDIS_OK ? 0 : 1; } @@ -9011,6 +9171,7 @@ int main(int argc, char **argv) { config.eval_ldb_sync = 0; config.enable_ldb_on_eval = 0; config.last_cmd_type = -1; + config.last_reply = NULL; config.verbose = 0; config.set_errcode = 0; config.no_auth_warning = 0; diff --git a/tests/integration/redis-cli.tcl b/tests/integration/redis-cli.tcl index e159fb17d..50f4a2e83 100644 --- a/tests/integration/redis-cli.tcl +++ b/tests/integration/redis-cli.tcl @@ -60,7 +60,7 @@ start_server {tags {"cli"}} { # Helpers to run tests in interactive mode proc format_output {output} { - set _ [string trimright [regsub -all "\r" $output ""] "\n"] + set _ [string trimright $output "\n"] } proc run_command {fd cmd} { @@ -76,6 +76,12 @@ start_server {tags {"cli"}} { unset ::env(FAKETTY) } + proc test_interactive_nontty_cli {name code} { + set fd [open_cli] + test "Interactive non-TTY CLI: $name" $code + close_cli $fd + } + # Helpers to run tests where stdout is not a tty proc write_tmpfile {contents} { set tmp [tmpfile "cli"] @@ -142,7 +148,8 @@ start_server {tags {"cli"}} { test_interactive_cli "INFO response should be printed raw" { set lines [split [run_command $fd info] "\n"] foreach line $lines { - if {![regexp {^$|^#|^[^#:]+:} $line]} { + # Info lines end in \r\n, so they now end in \r. + if {![regexp {^\r$|^#|^[^#:]+:} $line]} { fail "Malformed info line: $line" } } @@ -186,6 +193,77 @@ start_server {tags {"cli"}} { assert_equal "bar" [r get key] } + test_interactive_cli "Subscribed mode" { + set reading "Reading messages... (press Ctrl-C to quit or any key to type command)\r" + set erase "\033\[K"; # Erases the "Reading messages..." line. + + # Subscribe to some channels. + set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n" + set sub2 "1) \"subscribe\"\n2) \"ch2\"\n3) (integer) 2\n" + set sub3 "1) \"subscribe\"\n2) \"ch3\"\n3) (integer) 3\n" + assert_equal $sub1$sub2$sub3$reading \ + [run_command $fd "subscribe ch1 ch2 ch3"] + + # Receive pubsub message. + r publish ch2 hello + set message "1) \"message\"\n2) \"ch2\"\n3) \"hello\"\n" + assert_equal $erase$message$reading [read_cli $fd] + + # Unsubscribe some. + set unsub1 "1) \"unsubscribe\"\n2) \"ch1\"\n3) (integer) 2\n" + set unsub2 "1) \"unsubscribe\"\n2) \"ch2\"\n3) (integer) 1\n" + assert_equal $erase$unsub1$unsub2$reading \ + [run_command $fd "unsubscribe ch1 ch2"] + + # Command forbidden in subscribed mode (RESP2). + set err "(error) ERR Can't execute 'get': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context\n" + assert_equal $erase$err$reading [run_command $fd "get k"] + + # Command allowed in subscribed mode. + set pong "1) \"pong\"\n2) \"\"\n" + assert_equal $erase$pong$reading [run_command $fd "ping"] + + # Reset exits subscribed mode. + assert_equal ${erase}RESET [run_command $fd "reset"] + assert_equal PONG [run_command $fd "ping"] + + # Check TTY output of push messages in RESP3 has ")" prefix (to be changed to ">" in the future). + assert_match "1#*" [run_command $fd "hello 3"] + set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n" + assert_equal $sub1$reading \ + [run_command $fd "subscribe ch1"] + } + + test_interactive_nontty_cli "Subscribed mode" { + # Raw output and no "Reading messages..." info message. + # Use RESP3 in this test case. + assert_match {*proto 3*} [run_command $fd "hello 3"] + + # Subscribe to some channels. + set sub1 "subscribe\nch1\n1" + set sub2 "subscribe\nch2\n2" + assert_equal $sub1\n$sub2 \ + [run_command $fd "subscribe ch1 ch2"] + + assert_equal OK [run_command $fd "client tracking on"] + assert_equal OK [run_command $fd "set k 42"] + assert_equal 42 [run_command $fd "get k"] + + # Interleaving invalidate and pubsub messages. + r publish ch1 hello + r del k + r publish ch2 world + set message1 "message\nch1\nhello" + set invalidate "invalidate\nk" + set message2 "message\nch2\nworld" + assert_equal $message1\n$invalidate\n$message2\n [read_cli $fd] + + # Unsubscribe all. + set unsub1 "unsubscribe\nch1\n1" + set unsub2 "unsubscribe\nch2\n0" + assert_equal $unsub1\n$unsub2 [run_command $fd "unsubscribe ch1 ch2"] + } + test_tty_cli "Status reply" { assert_equal "OK" [run_cli set key bar] assert_equal "bar" [r get key]