diff --git a/TLS.md b/TLS.md index ee24a8df5..90ed08e7c 100644 --- a/TLS.md +++ b/TLS.md @@ -81,23 +81,6 @@ implementation details between TLS and TCP. difficult, but there are probably other good reasons to improve that part anyway. -5. A mechanism to re-trigger read callbacks for connections with unread buffers - (the case of reading partial TLS frames): - - a) Before sleep should iterate connections looking for those with a read handler, - SSL_pending() != 0 and no read event. - b) If found, trigger read handler for these conns. - c) After iteration if this state persists, epoll should be called in a way - that won't block so the process continues and this behave the same as a - level trigerred epoll. - -Replication ------------ - -Diskless master replication is broken, until child/parent connection proxying is -implemented. - - TLS Features ------------ @@ -119,6 +102,10 @@ most actions. This will need to be cleaned up for proper TLS support. The best approach is probably to migrate to hiredis async mode. +redis-cli +--------- +1. Support tls in --slave and --rdb + Others ------ diff --git a/src/ae.c b/src/ae.c index 53629ef77..54c0d994e 100644 --- a/src/ae.c +++ b/src/ae.c @@ -76,6 +76,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) { eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; + eventLoop->flags = 0; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ @@ -97,6 +98,14 @@ int aeGetSetSize(aeEventLoop *eventLoop) { return eventLoop->setsize; } +/* Tells the next iteration/s of the event processing to set timeout of 0. */ +void aeDontWait(aeEventLoop *eventLoop, int noWait) { + if (noWait) + eventLoop->flags |= AE_DONT_WAIT; + else + eventLoop->flags &= ~AE_DONT_WAIT; +} + /* Resize the maximum set size of the event loop. * If the requested set size is smaller than the current set size, but * there is already a file descriptor in use that is >= the requested @@ -406,6 +415,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) } } + if (eventLoop->flags & AE_DONT_WAIT) { + tv.tv_sec = tv.tv_usec = 0; + tvp = &tv; + } + /* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); diff --git a/src/ae.h b/src/ae.h index 184fe3d1b..bbb43fe6e 100644 --- a/src/ae.h +++ b/src/ae.h @@ -106,6 +106,7 @@ typedef struct aeEventLoop { void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; + int flags; } aeEventLoop; /* Prototypes */ @@ -128,5 +129,6 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep); int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); +void aeDontWait(aeEventLoop *eventLoop, int noWait); #endif diff --git a/src/ae_epoll.c b/src/ae_epoll.c index 410aac70d..fa197297e 100644 --- a/src/ae_epoll.c +++ b/src/ae_epoll.c @@ -121,8 +121,8 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; - if (e->events & EPOLLERR) mask |= AE_WRITABLE; - if (e->events & EPOLLHUP) mask |= AE_WRITABLE; + if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; + if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } diff --git a/src/aof.c b/src/aof.c index eed994bf2..ef34097ed 100644 --- a/src/aof.c +++ b/src/aof.c @@ -836,6 +836,8 @@ int loadAppendOnlyFile(char *filename) { freeFakeClientArgv(fakeClient); fakeClient->cmd = NULL; if (server.aof_load_truncated) valid_up_to = ftello(fp); + if (server.key_load_delay) + usleep(server.key_load_delay); } /* This point can only be reached when EOF is reached without errors. diff --git a/src/config.c b/src/config.c index 456fb0226..792e45057 100644 --- a/src/config.c +++ b/src/config.c @@ -522,6 +522,12 @@ void loadServerConfigFromString(char *config) { err = "rdb-key-save-delay can't be negative"; goto loaderr; } + } else if (!strcasecmp(argv[0],"key-load-delay") && argc==2) { + server.key_load_delay = atoi(argv[1]); + if (server.key_load_delay < 0) { + err = "key-load-delay can't be negative"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; @@ -1191,6 +1197,8 @@ void configSetCommand(client *c) { "replica-priority",server.slave_priority,0,INT_MAX) { } config_set_numerical_field( "rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) { + } config_set_numerical_field( + "key-load-delay",server.key_load_delay,0,LLONG_MAX) { } config_set_numerical_field( "slave-announce-port",server.slave_announce_port,0,65535) { } config_set_numerical_field( @@ -1452,6 +1460,7 @@ void configGetCommand(client *c) { config_get_numerical_field("cluster-replica-validity-factor",server.cluster_slave_validity_factor); config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay); config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay); + config_get_numerical_field("key-load-delay",server.key_load_delay); config_get_numerical_field("tcp-keepalive",server.tcpkeepalive); /* Bool (yes/no) values */ @@ -2272,6 +2281,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ); rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE); rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY); + rewriteConfigNumericalOption(state,"key-load-delay",server.key_load_delay,CONFIG_DEFAULT_KEY_LOAD_DELAY); rewriteConfigStringOption(state,"tls-cert-file",server.tls_cert_file,NULL); rewriteConfigStringOption(state,"tls-key-file",server.tls_key_file,NULL); rewriteConfigStringOption(state,"tls-dh-params-file",server.tls_dh_params_file,NULL); diff --git a/src/connection.c b/src/connection.c index 62c6f3506..601f175bc 100644 --- a/src/connection.c +++ b/src/connection.c @@ -140,10 +140,6 @@ void *connGetPrivateData(connection *conn) { * move here as we implement additional connection types. */ -static int connSocketShutdown(connection *conn, int how) { - return shutdown(conn->fd, how); -} - /* Close the connection and free resources. */ static void connSocketClose(connection *conn) { if (conn->fd != -1) { @@ -298,7 +294,6 @@ static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, ConnectionType CT_Socket = { .ae_handler = connSocketEventHandler, .close = connSocketClose, - .shutdown = connSocketShutdown, .write = connSocketWrite, .read = connSocketRead, .accept = connSocketAccept, diff --git a/src/connection.h b/src/connection.h index e3e844f95..3fdcddebe 100644 --- a/src/connection.h +++ b/src/connection.h @@ -55,7 +55,6 @@ typedef struct ConnectionType { int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler); int (*write)(struct connection *conn, const void *data, size_t data_len); int (*read)(struct connection *conn, void *buf, size_t buf_len); - int (*shutdown)(struct connection *conn, int how); void (*close)(struct connection *conn); int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler); @@ -159,10 +158,6 @@ static inline void connClose(connection *conn) { conn->type->close(conn); } -static inline int connShutdown(connection *conn, int how) { - return conn->type->shutdown(conn, how); -} - /* Returns the last error encountered by the connection, as a string. If no error, * a NULL is returned. */ @@ -208,4 +203,8 @@ int connFormatPeer(connection *conn, char *buf, size_t buf_len); int connSockName(connection *conn, char *ip, size_t ip_len, int *port); const char *connGetInfo(connection *conn, char *buf, size_t buf_len); +/* Helpers for tls special considerations */ +int tlsHasPendingData(); +void tlsProcessPendingData(); + #endif /* __REDIS_CONNECTION_H */ diff --git a/src/networking.c b/src/networking.c index 2d00b0e74..d52eb1eba 100644 --- a/src/networking.c +++ b/src/networking.c @@ -881,7 +881,7 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip) { serverLog(LL_WARNING, "Error accepting a client connection: %s (conn: %s)", connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); - connClose(conn); + freeClient(connGetPrivateData(conn)); return; } } @@ -984,14 +984,21 @@ void unlinkClient(client *c) { c->client_list_node = NULL; } - /* In the case of diskless replication the fork is writing to the - * sockets and just closing the fd isn't enough, if we don't also - * shutdown the socket the fork will continue to write to the slave - * and the salve will only find out that it was disconnected when - * it will finish reading the rdb. */ - int need_shutdown = ((c->flags & CLIENT_SLAVE) && - (c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)); - if (need_shutdown) connShutdown(c->conn, SHUT_RDWR); + /* Check if this is a replica waiting for diskless replication (rdb pipe), + * in which case it needs to be cleaned from that list */ + if (c->flags & CLIENT_SLAVE && + c->replstate == SLAVE_STATE_WAIT_BGSAVE_END && + server.rdb_pipe_conns) + { + int i; + for (i=0; i < server.rdb_pipe_numconns; i++) { + if (server.rdb_pipe_conns[i] == c->conn) { + rdbPipeWriteHandlerConnRemoved(c->conn); + server.rdb_pipe_conns[i] = NULL; + break; + } + } + } connClose(c->conn); c->conn = NULL; } @@ -1309,7 +1316,7 @@ int handleClientsWithPendingWrites(void) { { ae_flags |= AE_BARRIER; } - /* TODO: Handle write barriers in connection */ + /* TODO: Handle write barriers in connection (also see tlsProcessPendingData) */ if (connSetWriteHandler(c->conn, sendReplyToClient) == C_ERR) { freeClientAsync(c); } diff --git a/src/rdb.c b/src/rdb.c index 3b98f57bb..59f22916b 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2211,6 +2211,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * own reference. */ decrRefCount(key); } + if (server.key_load_delay) + usleep(server.key_load_delay); /* Reset the state that is key-specified and is populated by * opcodes before the key, so that we start from scratch again. */ @@ -2306,8 +2308,6 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { * This function covers the case of RDB -> Salves socket transfers for * diskless replication. */ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { - uint64_t *ok_slaves; - if (!bysignal && exitcode == 0) { serverLog(LL_NOTICE, "Background RDB transfer terminated with success"); @@ -2321,79 +2321,6 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { server.rdb_child_type = RDB_CHILD_TYPE_NONE; server.rdb_save_time_start = -1; - /* If the child returns an OK exit code, read the set of slave client - * IDs and the associated status code. We'll terminate all the slaves - * in error state. - * - * If the process returned an error, consider the list of slaves that - * can continue to be empty, so that it's just a special case of the - * normal code path. */ - ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */ - ok_slaves[0] = 0; - if (!bysignal && exitcode == 0) { - int readlen = sizeof(uint64_t); - - if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) == - readlen) - { - readlen = ok_slaves[0]*sizeof(uint64_t)*2; - - /* Make space for enough elements as specified by the first - * uint64_t element in the array. */ - ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen); - if (readlen && - read(server.rdb_pipe_read_result_from_child, ok_slaves+1, - readlen) != readlen) - { - ok_slaves[0] = 0; - } - } - } - - close(server.rdb_pipe_read_result_from_child); - close(server.rdb_pipe_write_result_to_parent); - - /* We can continue the replication process with all the slaves that - * correctly received the full payload. Others are terminated. */ - listNode *ln; - listIter li; - - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { - uint64_t j; - int errorcode = 0; - - /* Search for the slave ID in the reply. In order for a slave to - * continue the replication process, we need to find it in the list, - * and it must have an error code set to 0 (which means success). */ - for (j = 0; j < ok_slaves[0]; j++) { - if (slave->id == ok_slaves[2*j+1]) { - errorcode = ok_slaves[2*j+2]; - break; /* Found in slaves list. */ - } - } - if (j == ok_slaves[0] || errorcode != 0) { - serverLog(LL_WARNING, - "Closing slave %s: child->slave RDB transfer failed: %s", - replicationGetSlaveName(slave), - (errorcode == 0) ? "RDB transfer child aborted" - : strerror(errorcode)); - freeClient(slave); - } else { - serverLog(LL_WARNING, - "Slave %s correctly received the streamed RDB file.", - replicationGetSlaveName(slave)); - /* Restore the socket as non-blocking. */ - connNonBlock(slave->conn); - connSendTimeout(slave->conn,0); - } - } - } - zfree(ok_slaves); - updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_SOCKET); } @@ -2425,9 +2352,6 @@ void killRDBChild(void) { /* Spawn an RDB child that writes the RDB to the sockets of the slaves * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { - connection **conns; - uint64_t *clientids; - int numconns; listNode *ln; listIter li; pid_t childpid; @@ -2436,35 +2360,30 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; - /* Before to fork, create a pipe that will be used in order to - * send back to the parent the IDs of the slaves that successfully - * received all the writes. */ + /* Even if the previous fork child exited, don't start a new one until we + * drained the pipe. */ + if (server.rdb_pipe_conns) return C_ERR; + + /* Before to fork, create a pipe that is used to transfer the rdb bytes to + * the parant, we can't let it write directly to the sockets, since in case + * of TLS we must let the parent handle a contineous TLS state when the + * child terminates and parent takes over. */ if (pipe(pipefds) == -1) return C_ERR; - server.rdb_pipe_read_result_from_child = pipefds[0]; - server.rdb_pipe_write_result_to_parent = pipefds[1]; + server.rdb_pipe_read = pipefds[0]; + server.rdb_pipe_write = pipefds[1]; + anetNonBlock(NULL, server.rdb_pipe_read); - /* Collect the file descriptors of the slaves we want to transfer + /* Collect the connections of the replicas we want to transfer * the RDB to, which are i WAIT_BGSAVE_START state. */ - conns = zmalloc(sizeof(connection *)*listLength(server.slaves)); - /* We also allocate an array of corresponding client IDs. This will - * be useful for the child process in order to build the report - * (sent via unix pipe) that will be sent to the parent. */ - clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves)); - numconns = 0; - + server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves)); + server.rdb_pipe_numconns = 0; + server.rdb_pipe_numconns_writing = 0; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { - clientids[numconns] = slave->id; - conns[numconns++] = slave->conn; + server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn; replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset()); - /* Put the socket in blocking mode to simplify RDB transfer. - * We'll restore it when the children returns (since duped socket - * will share the O_NONBLOCK attribute with the parent). */ - connBlock(slave->conn); - connSendTimeout(slave->conn,server.repl_timeout*1000); } } @@ -2474,16 +2393,15 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { if ((childpid = fork()) == 0) { /* Child */ int retval; - rio slave_sockets; + rio rdb; - rioInitWithConnset(&slave_sockets,conns,numconns); - zfree(conns); + rioInitWithFd(&rdb,server.rdb_pipe_write); closeListeningSockets(0); redisSetProcTitle("redis-rdb-to-slaves"); - retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi); - if (retval == C_OK && rioFlush(&slave_sockets) == 0) + retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi); + if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR; if (retval == C_OK) { @@ -2497,48 +2415,9 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { server.child_info_data.cow_size = private_dirty; sendChildInfo(CHILD_INFO_TYPE_RDB); - - /* If we are returning OK, at least one slave was served - * with the RDB file as expected, so we need to send a report - * to the parent via the pipe. The format of the message is: - * - * ... - * - * len, slave IDs, and slave errors, are all uint64_t integers, - * so basically the reply is composed of 64 bits for the len field - * plus 2 additional 64 bit integers for each entry, for a total - * of 'len' entries. - * - * The 'id' represents the slave's client ID, so that the master - * can match the report with a specific slave, and 'error' is - * set to 0 if the replication process terminated with a success - * or the error code if an error occurred. */ - void *msg = zmalloc(sizeof(uint64_t)*(1+2*numconns)); - uint64_t *len = msg; - uint64_t *ids = len+1; - int j, msglen; - - *len = numconns; - for (j = 0; j < numconns; j++) { - *ids++ = clientids[j]; - *ids++ = slave_sockets.io.connset.state[j]; - } - - /* Write the message to the parent. If we have no good slaves or - * we are unable to transfer the message to the parent, we exit - * with an error so that the parent will abort the replication - * process with all the childre that were waiting. */ - msglen = sizeof(uint64_t)*(1+2*numconns); - if (*len == 0 || - write(server.rdb_pipe_write_result_to_parent,msg,msglen) - != msglen) - { - retval = C_ERR; - } - zfree(msg); } - zfree(clientids); - rioFreeConnset(&slave_sockets); + rioFreeFd(&rdb); + close(server.rdb_pipe_write); /* wake up the reader, tell it we're done. */ exitFromChild((retval == C_OK) ? 0 : 1); } else { /* Parent */ @@ -2552,17 +2431,16 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; - int j; - - for (j = 0; j < numconns; j++) { - if (slave->id == clientids[j]) { - slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START; - break; - } + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { + slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START; } } - close(pipefds[0]); - close(pipefds[1]); + close(server.rdb_pipe_write); + close(server.rdb_pipe_read); + zfree(server.rdb_pipe_conns); + server.rdb_pipe_conns = NULL; + server.rdb_pipe_numconns = 0; + server.rdb_pipe_numconns_writing = 0; closeChildInfoPipe(); } else { server.stat_fork_time = ustime()-start; @@ -2574,10 +2452,12 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; + close(server.rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ + if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { + serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); + } updateDictResizePolicy(); } - zfree(clientids); - zfree(conns); return (childpid == -1) ? C_ERR : C_OK; } return C_OK; /* Unreached. */ diff --git a/src/replication.c b/src/replication.c index cbc52cb2e..9f5c19053 100644 --- a/src/replication.c +++ b/src/replication.c @@ -884,7 +884,7 @@ void sendBulkToSlave(connection *conn) { nwritten = connWrite(conn,slave->replpreamble,sdslen(slave->replpreamble)); if (nwritten == -1) { serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s", - strerror(errno)); + connGetLastError(conn)); freeClient(slave); return; } @@ -911,7 +911,7 @@ void sendBulkToSlave(connection *conn) { if ((nwritten = connWrite(conn,buf,buflen)) == -1) { if (connGetState(conn) != CONN_STATE_CONNECTED) { serverLog(LL_WARNING,"Write error sending DB to replica: %s", - strerror(errno)); + connGetLastError(conn)); freeClient(slave); } return; @@ -926,6 +926,152 @@ void sendBulkToSlave(connection *conn) { } } +/* Remove one write handler from the list of connections waiting to be writable + * during rdb pipe transfer. */ +void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { + if (!connHasWriteHandler(conn)) + return; + connSetWriteHandler(conn, NULL); + server.rdb_pipe_numconns_writing--; + /* if there are no more writes for now for this conn, or write error: */ + if (server.rdb_pipe_numconns_writing == 0) { + if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { + serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); + } + } +} + +/* Called in diskless master during transfer of data from the rdb pipe, when + * the replica becomes writable again. */ +void rdbPipeWriteHandler(struct connection *conn) { + serverAssert(server.rdb_pipe_bufflen>0); + client *slave = connGetPrivateData(conn); + int nwritten; + if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff, + server.rdb_pipe_bufflen - slave->repldboff)) == -1) + { + if (connGetState(conn) == CONN_STATE_CONNECTED) + return; /* equivalent to EAGAIN */ + serverLog(LL_WARNING,"Write error sending DB to replica: %s", + connGetLastError(conn)); + freeClient(slave); + return; + } else { + slave->repldboff += nwritten; + server.stat_net_output_bytes += nwritten; + if (slave->repldboff < server.rdb_pipe_bufflen) + return; /* more data to write.. */ + } + rdbPipeWriteHandlerConnRemoved(conn); +} + +/* When the the pipe serving diskless rdb transfer is drained (write end was + * closed), we can clean up all the temporary variables, and cleanup after the + * fork child. */ +void RdbPipeCleanup() { + close(server.rdb_pipe_read); + zfree(server.rdb_pipe_conns); + server.rdb_pipe_conns = NULL; + server.rdb_pipe_numconns = 0; + server.rdb_pipe_numconns_writing = 0; + zfree(server.rdb_pipe_buff); + server.rdb_pipe_buff = NULL; + server.rdb_pipe_bufflen = 0; + + /* Since we're avoiding to detect the child exited as long as the pipe is + * not drained, so now is the time to check. */ + checkChildrenDone(); +} + +/* Called in diskless master, when there's data to read from the child's rdb pipe */ +void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) { + UNUSED(mask); + UNUSED(clientData); + UNUSED(eventLoop); + int i; + if (!server.rdb_pipe_buff) + server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN); + serverAssert(server.rdb_pipe_numconns_writing==0); + + while (1) { + server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN); + if (server.rdb_pipe_bufflen < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + serverLog(LL_WARNING,"Diskless rdb transfer, read error sending DB to replicas: %s", strerror(errno)); + for (i=0; i < server.rdb_pipe_numconns; i++) { + connection *conn = server.rdb_pipe_conns[i]; + if (!conn) + continue; + client *slave = connGetPrivateData(conn); + freeClient(slave); + server.rdb_pipe_conns[i] = NULL; + } + killRDBChild(); + return; + } + + if (server.rdb_pipe_bufflen == 0) { + /* EOF - write end was closed. */ + int stillUp = 0; + aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); + for (i=0; i < server.rdb_pipe_numconns; i++) + { + connection *conn = server.rdb_pipe_conns[i]; + if (!conn) + continue; + stillUp++; + } + serverLog(LL_WARNING,"Diskless rdb transfer, done reading from pipe, %d replicas still up.", stillUp); + RdbPipeCleanup(); + return; + } + + int stillAlive = 0; + for (i=0; i < server.rdb_pipe_numconns; i++) + { + int nwritten; + connection *conn = server.rdb_pipe_conns[i]; + if (!conn) + continue; + + client *slave = connGetPrivateData(conn); + if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) { + if (connGetState(conn) != CONN_STATE_CONNECTED) { + serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s", + connGetLastError(conn)); + freeClient(slave); + server.rdb_pipe_conns[i] = NULL; + continue; + } + /* An error and still in connected state, is equivalent to EAGAIN */ + slave->repldboff = 0; + } else { + slave->repldboff = nwritten; + server.stat_net_output_bytes += nwritten; + } + /* If we were unable to write all the data to one of the replicas, + * setup write handler (and disable pipe read handler, below) */ + if (nwritten != server.rdb_pipe_bufflen) { + server.rdb_pipe_numconns_writing++; + connSetWriteHandler(conn, rdbPipeWriteHandler); + } + stillAlive++; + } + + if (stillAlive == 0) { + serverLog(LL_WARNING,"Diskless rdb transfer, last replica dropped, killing fork child."); + killRDBChild(); + RdbPipeCleanup(); + } + /* Remove the pipe read handler if at least one write handler was set. */ + if (server.rdb_pipe_numconns_writing || stillAlive == 0) { + aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); + break; + } + } +} + /* This function is called at the end of every background saving, * or when the replication RDB transfer strategy is modified from * disk to socket or the other way around. diff --git a/src/rio.c b/src/rio.c index 5e3b4a06e..c8c924380 100644 --- a/src/rio.c +++ b/src/rio.c @@ -272,85 +272,67 @@ void rioFreeConn(rio *r, sds *remaining) { r->io.conn.buf = NULL; } -/* ------------------- File descriptors set implementation ------------------ - * This target is used to write the RDB file to N different replicas via - * sockets, when the master just streams the data to the replicas without - * creating an RDB on-disk image (diskless replication option). +/* ------------------- File descriptor implementation ------------------ + * This target is used to write the RDB file to pipe, when the master just + * streams the data to the replicas without creating an RDB on-disk image + * (diskless replication option). * It only implements writes. */ /* Returns 1 or 0 for success/failure. - * The function returns success as long as we are able to correctly write - * to at least one file descriptor. * * When buf is NULL and len is 0, the function performs a flush operation * if there is some pending buffer, so this function is also used in order - * to implement rioFdsetFlush(). */ -static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { + * to implement rioFdFlush(). */ +static size_t rioFdWrite(rio *r, const void *buf, size_t len) { ssize_t retval; - int j; unsigned char *p = (unsigned char*) buf; int doflush = (buf == NULL && len == 0); - /* To start we always append to our buffer. If it gets larger than - * a given size, we actually write to the sockets. */ - if (len) { - r->io.connset.buf = sdscatlen(r->io.connset.buf,buf,len); - len = 0; /* Prevent entering the while below if we don't flush. */ - if (sdslen(r->io.connset.buf) > PROTO_IOBUF_LEN) doflush = 1; - } - - if (doflush) { - p = (unsigned char*) r->io.connset.buf; - len = sdslen(r->io.connset.buf); - } - - /* Write in little chunchs so that when there are big writes we - * parallelize while the kernel is sending data in background to - * the TCP socket. */ - while(len) { - size_t count = len < 1024 ? len : 1024; - int broken = 0; - for (j = 0; j < r->io.connset.numconns; j++) { - if (r->io.connset.state[j] != 0) { - /* Skip FDs alraedy in error. */ - broken++; - continue; - } - - /* Make sure to write 'count' bytes to the socket regardless - * of short writes. */ - size_t nwritten = 0; - while(nwritten != count) { - retval = connWrite(r->io.connset.conns[j],p+nwritten,count-nwritten); - if (retval <= 0) { - /* With blocking sockets, which is the sole user of this - * rio target, EWOULDBLOCK is returned only because of - * the SO_SNDTIMEO socket option, so we translate the error - * into one more recognizable by the user. */ - if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; - break; - } - nwritten += retval; - } - - if (nwritten != count) { - /* Mark this FD as broken. */ - r->io.connset.state[j] = errno; - if (r->io.connset.state[j] == 0) r->io.connset.state[j] = EIO; - } + /* For small writes, we rather keep the data in user-space buffer, and flush + * it only when it grows. however for larger writes, we prefer to flush + * any pre-existing buffer, and write the new one directly without reallocs + * and memory copying. */ + if (len > PROTO_IOBUF_LEN) { + /* First, flush any pre-existing buffered data. */ + if (sdslen(r->io.fd.buf)) { + if (rioFdWrite(r, NULL, 0) == 0) + return 0; } - if (broken == r->io.connset.numconns) return 0; /* All the FDs in error. */ - p += count; - len -= count; - r->io.connset.pos += count; + /* Write the new data, keeping 'p' and 'len' from the input. */ + } else { + if (len) { + r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len); + if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN) + doflush = 1; + if (!doflush) + return 1; + } + /* Flusing the buffered data. set 'p' and 'len' accordintly. */ + p = (unsigned char*) r->io.fd.buf; + len = sdslen(r->io.fd.buf); } - if (doflush) sdsclear(r->io.connset.buf); + size_t nwritten = 0; + while(nwritten != len) { + retval = write(r->io.fd.fd,p+nwritten,len-nwritten); + if (retval <= 0) { + /* With blocking io, which is the sole user of this + * rio target, EWOULDBLOCK is returned only because of + * the SO_SNDTIMEO socket option, so we translate the error + * into one more recognizable by the user. */ + if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; + return 0; /* error. */ + } + nwritten += retval; + } + + r->io.fd.pos += len; + sdsclear(r->io.fd.buf); return 1; } /* Returns 1 or 0 for success/failure. */ -static size_t rioFdsetRead(rio *r, void *buf, size_t len) { +static size_t rioFdRead(rio *r, void *buf, size_t len) { UNUSED(r); UNUSED(buf); UNUSED(len); @@ -358,23 +340,23 @@ static size_t rioFdsetRead(rio *r, void *buf, size_t len) { } /* Returns read/write position in file. */ -static off_t rioFdsetTell(rio *r) { - return r->io.connset.pos; +static off_t rioFdTell(rio *r) { + return r->io.fd.pos; } /* Flushes any buffer to target device if applicable. Returns 1 on success * and 0 on failures. */ -static int rioFdsetFlush(rio *r) { +static int rioFdFlush(rio *r) { /* Our flush is implemented by the write method, that recognizes a * buffer set to NULL with a count of zero as a flush request. */ - return rioFdsetWrite(r,NULL,0); + return rioFdWrite(r,NULL,0); } -static const rio rioFdsetIO = { - rioFdsetRead, - rioFdsetWrite, - rioFdsetTell, - rioFdsetFlush, +static const rio rioFdIO = { + rioFdRead, + rioFdWrite, + rioFdTell, + rioFdFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* flags */ @@ -383,24 +365,16 @@ static const rio rioFdsetIO = { { { NULL, 0 } } /* union for io-specific vars */ }; -void rioInitWithConnset(rio *r, connection **conns, int numconns) { - int j; - - *r = rioFdsetIO; - r->io.connset.conns = zmalloc(sizeof(connection *)*numconns); - r->io.connset.state = zmalloc(sizeof(int)*numconns); - memcpy(r->io.connset.conns,conns,sizeof(connection *)*numconns); - for (j = 0; j < numconns; j++) r->io.connset.state[j] = 0; - r->io.connset.numconns = numconns; - r->io.connset.pos = 0; - r->io.connset.buf = sdsempty(); +void rioInitWithFd(rio *r, int fd) { + *r = rioFdIO; + r->io.fd.fd = fd; + r->io.fd.pos = 0; + r->io.fd.buf = sdsempty(); } /* release the rio stream. */ -void rioFreeConnset(rio *r) { - zfree(r->io.connset.conns); - zfree(r->io.connset.state); - sdsfree(r->io.connset.buf); +void rioFreeFd(rio *r) { + sdsfree(r->io.fd.buf); } /* ---------------------------- Generic functions ---------------------------- */ diff --git a/src/rio.h b/src/rio.h index fdde7c20e..9576335e8 100644 --- a/src/rio.h +++ b/src/rio.h @@ -77,7 +77,7 @@ struct _rio { off_t buffered; /* Bytes written since last fsync. */ off_t autosync; /* fsync after 'autosync' bytes written. */ } file; - /* Connection object */ + /* Connection object (used to read from socket) */ struct { connection *conn; /* Connection */ off_t pos; /* pos in buf that was returned */ @@ -85,14 +85,12 @@ struct _rio { size_t read_limit; /* don't allow to buffer/read more than that */ size_t read_so_far; /* amount of data read from the rio (not buffered) */ } conn; - /* Multiple FDs target (used to write to N sockets). */ + /* FD target (used to write to pipe). */ struct { - connection **conns; /* Connections */ - int *state; /* Error state of each fd. 0 (if ok) or errno. */ - int numconns; + int fd; /* File descriptor. */ off_t pos; sds buf; - } connset; + } fd; } io; }; @@ -161,9 +159,9 @@ static inline void rioClearErrors(rio *r) { void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); void rioInitWithConn(rio *r, connection *conn, size_t read_limit); -void rioInitWithConnset(rio *r, connection **conns, int numconns); +void rioInitWithFd(rio *r, int fd); -void rioFreeConnset(rio *r); +void rioFreeFd(rio *r); void rioFreeConn(rio *r, sds* out_remainingBufferedData); size_t rioWriteBulkCount(rio *r, char prefix, long count); diff --git a/src/server.c b/src/server.c index 930a01723..f05ce3151 100644 --- a/src/server.c +++ b/src/server.c @@ -1746,6 +1746,48 @@ void updateCachedTime(void) { server.daylight_active = tm.tm_isdst; } +void checkChildrenDone(void) { + int statloc; + pid_t pid; + + /* If we have a diskless rdb child (note that we support only one concurrent + * child), we want to avoid collecting it's exit status and acting on it + * as long as we didn't finish to drain the pipe, since then we're at risk + * of starting a new fork and a new pipe before we're done with the previous + * one. */ + if (server.rdb_child_pid != -1 && server.rdb_pipe_conns) + return; + + if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { + int exitcode = WEXITSTATUS(statloc); + int bysignal = 0; + + if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); + + if (pid == -1) { + serverLog(LL_WARNING,"wait3() returned an error: %s. " + "rdb_child_pid = %d, aof_child_pid = %d", + strerror(errno), + (int) server.rdb_child_pid, + (int) server.aof_child_pid); + } else if (pid == server.rdb_child_pid) { + backgroundSaveDoneHandler(exitcode,bysignal); + if (!bysignal && exitcode == 0) receiveChildInfo(); + } else if (pid == server.aof_child_pid) { + backgroundRewriteDoneHandler(exitcode,bysignal); + if (!bysignal && exitcode == 0) receiveChildInfo(); + } else { + if (!ldbRemoveChild(pid)) { + serverLog(LL_WARNING, + "Warning, detected child with unmatched pid: %ld", + (long)pid); + } + } + updateDictResizePolicy(); + closeChildInfoPipe(); + } +} + /* This is our timer interrupt, called server.hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: @@ -1898,37 +1940,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { - int statloc; - pid_t pid; - - if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { - int exitcode = WEXITSTATUS(statloc); - int bysignal = 0; - - if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); - - if (pid == -1) { - serverLog(LL_WARNING,"wait3() returned an error: %s. " - "rdb_child_pid = %d, aof_child_pid = %d", - strerror(errno), - (int) server.rdb_child_pid, - (int) server.aof_child_pid); - } else if (pid == server.rdb_child_pid) { - backgroundSaveDoneHandler(exitcode,bysignal); - if (!bysignal && exitcode == 0) receiveChildInfo(); - } else if (pid == server.aof_child_pid) { - backgroundRewriteDoneHandler(exitcode,bysignal); - if (!bysignal && exitcode == 0) receiveChildInfo(); - } else { - if (!ldbRemoveChild(pid)) { - serverLog(LL_WARNING, - "Warning, detected child with unmatched pid: %ld", - (long)pid); - } - } - updateDictResizePolicy(); - closeChildInfoPipe(); - } + checkChildrenDone(); } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now. */ @@ -2081,6 +2093,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); + /* TODO: How do i handle write barriers flag */ + tlsProcessPendingData(); + /* If tls already has pending unread data don't sleep at all. */ + aeDontWait(server.el, tlsHasPendingData()); + /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); @@ -2280,6 +2297,7 @@ void initServerConfig(void) { server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC; server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY; + server.key_load_delay = CONFIG_DEFAULT_KEY_LOAD_DELAY; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.pidfile = NULL; @@ -2813,6 +2831,11 @@ void initServer(void) { server.rdb_child_pid = -1; server.aof_child_pid = -1; server.rdb_child_type = RDB_CHILD_TYPE_NONE; + server.rdb_pipe_conns = NULL; + server.rdb_pipe_numconns = 0; + server.rdb_pipe_numconns_writing = 0; + server.rdb_pipe_buff = NULL; + server.rdb_pipe_bufflen = 0; server.rdb_bgsave_scheduled = 0; server.child_info_pipe[0] = -1; server.child_info_pipe[1] = -1; diff --git a/src/server.h b/src/server.h index 1b41c9ac5..89f219033 100644 --- a/src/server.h +++ b/src/server.h @@ -135,6 +135,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0 #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5 #define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0 +#define CONFIG_DEFAULT_KEY_LOAD_DELAY 0 #define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define CONFIG_DEFAULT_SLAVE_READ_ONLY 1 #define CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY 1 @@ -1236,10 +1237,17 @@ struct redisServer { int rdb_child_type; /* Type of save by active child. */ int lastbgsave_status; /* C_OK or C_ERR */ int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ - int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ - int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ + int rdb_pipe_write; /* RDB pipes used to transfer the rdb */ + int rdb_pipe_read; /* data to the parent process in diskless repl. */ + connection **rdb_pipe_conns; /* Connections which are currently the */ + int rdb_pipe_numconns; /* target of diskless rdb fork child. */ + int rdb_pipe_numconns_writing; /* Number of rdb conns with pending writes. */ + char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */ + int rdb_pipe_bufflen; /* that was read from the the rdb pipe. */ int rdb_key_save_delay; /* Delay in microseconds between keys while * writing the RDB. (for testings) */ + int key_load_delay; /* Delay in microseconds between keys while + * loading aof or rdb. (for testings) */ /* Pipe and data structures for child -> parent info sharing. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ struct { @@ -1779,6 +1787,8 @@ void clearReplicationId2(void); void chopReplicationBacklog(void); void replicationCacheMasterUsingMyself(void); void feedReplicationBacklog(void *ptr, size_t len); +void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); +void rdbPipeWriteHandlerConnRemoved(struct connection *conn); /* Generic persistence functions */ void startLoadingFile(FILE* fp, char* filename); @@ -1944,6 +1954,7 @@ unsigned int LRU_CLOCK(void); const char *evictPolicyToString(void); struct redisMemOverhead *getMemoryOverheadData(void); void freeMemoryOverheadData(struct redisMemOverhead *mh); +void checkChildrenDone(void); #define RESTART_SERVER_NONE 0 #define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */ diff --git a/src/tls.c b/src/tls.c index dabb2ee0f..c939ebf55 100644 --- a/src/tls.c +++ b/src/tls.c @@ -30,6 +30,7 @@ #include "server.h" #include "connhelpers.h" +#include "adlist.h" #ifdef USE_OPENSSL @@ -41,6 +42,10 @@ extern ConnectionType CT_Socket; SSL_CTX *redis_tls_ctx; +/* list of connections with pending data already read from the socket, but not + * served to the reader yet. */ +static list *pending_list = NULL; + void tlsInit(void) { ERR_load_crypto_strings(); SSL_load_error_strings(); @@ -49,6 +54,8 @@ void tlsInit(void) { if (!RAND_poll()) { serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator."); } + + pending_list = listCreate(); } int tlsConfigureServer(void) { @@ -188,6 +195,7 @@ typedef struct tls_connection { int flags; SSL *ssl; char *ssl_error; + listNode *pending_list_node; } tls_connection; connection *connCreateTLS(void) { @@ -288,11 +296,7 @@ void updateSSLEvent(tls_connection *conn) { aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE); } - -static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) { - UNUSED(el); - UNUSED(fd); - tls_connection *conn = clientData; +static void tlsHandleEvent(tls_connection *conn, int mask) { int ret; TLSCONN_DEBUG("tlsEventHandler(): fd=%d, state=%d, mask=%d, r=%d, w=%d, flags=%d", @@ -369,6 +373,15 @@ static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, in if ((mask & AE_READABLE) && conn->c.read_handler) { if (!callHandler((connection *) conn, conn->c.read_handler)) return; + if (SSL_has_pending(conn->ssl)) { + if (!conn->pending_list_node) { + listAddNodeTail(pending_list, conn); + conn->pending_list_node = listLast(pending_list); + } + } else if (conn->pending_list_node) { + listDelNode(pending_list, conn->pending_list_node); + conn->pending_list_node = NULL; + } } if ((mask & AE_WRITABLE) && conn->c.write_handler) { @@ -382,6 +395,13 @@ static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, in updateSSLEvent(conn); } +static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) { + UNUSED(el); + UNUSED(fd); + tls_connection *conn = clientData; + tlsHandleEvent(conn, mask); +} + static void connTLSClose(connection *conn_) { tls_connection *conn = (tls_connection *) conn_; @@ -395,6 +415,11 @@ static void connTLSClose(connection *conn_) { conn->ssl_error = NULL; } + if (conn->pending_list_node) { + listDelNode(pending_list, conn->pending_list_node); + conn->pending_list_node = NULL; + } + CT_Socket.close(conn_); } @@ -610,17 +635,6 @@ exit: return nread; } -/* TODO: This is probably not the right thing to do, but as we handle proxying from child - * processes we'll probably not need any shutdown mechanism anyway so this is just a - * place holder for now. - */ -static int connTLSShutdown(connection *conn_, int how) { - UNUSED(how); - tls_connection *conn = (tls_connection *) conn_; - - return SSL_shutdown(conn->ssl); -} - ConnectionType CT_TLS = { .ae_handler = tlsEventHandler, .accept = connTLSAccept, @@ -635,9 +649,25 @@ ConnectionType CT_TLS = { .sync_write = connTLSSyncWrite, .sync_read = connTLSSyncRead, .sync_readline = connTLSSyncReadLine, - .shutdown = connTLSShutdown }; +int tlsHasPendingData() { + if (!pending_list) + return 0; + return listLength(pending_list) > 0; +} + +void tlsProcessPendingData() { + listIter li; + listNode *ln; + + listRewind(pending_list,&li); + while((ln = listNext(&li))) { + tls_connection *conn = listNodeValue(ln); + tlsHandleEvent(conn, AE_READABLE); + } +} + #else /* USE_OPENSSL */ void tlsInit(void) { @@ -666,4 +696,11 @@ connection *connCreateAcceptedTLS(int fd, int require_auth) { return NULL; } +int tlsHasPendingData() { + return 0; +} + +void tlsProcessPendingData() { +} + #endif diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 43bc684b4..192137e87 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -29,9 +29,6 @@ start_server {tags {"repl"}} { $slave slaveof $master_host $master_port test {Slave enters handshake} { - if {$::tls} { - fail "TLS with repl-diskless-sync not supported yet." - } wait_for_condition 50 1000 { [string match *handshake* [$slave role]] } else { @@ -187,10 +184,6 @@ start_server {tags {"repl"}} { } foreach mdl {no yes} { - if {$::tls && $mdl eq "yes"} { - puts "** Skipping test: TLS with repl-diskless-sync not supported yet." - continue - } foreach sdl {disabled swapdb} { start_server {tags {"repl"}} { set master [srv 0 client] @@ -327,9 +320,6 @@ start_server {tags {"repl"}} { } test {slave fails full sync and diskless load swapdb recoveres it} { - if {$::tls} { - fail "" - } start_server {tags {"repl"}} { set slave [srv 0 client] set slave_host [srv 0 host] @@ -397,10 +387,6 @@ test {slave fails full sync and diskless load swapdb recoveres it} { } test {diskless loading short read} { - if {$::tls} { - fail "TLS with repl-diskless-sync not supported yet." - } - start_server {tags {"repl"}} { set replica [srv 0 client] set replica_host [srv 0 host] @@ -480,3 +466,159 @@ test {diskless loading short read} { } } +# get current stime and utime metrics for a thread (since it's creation) +proc get_cpu_metrics { statfile } { + if { [ catch { + set fid [ open $statfile r ] + set data [ read $fid 1024 ] + ::close $fid + set data [ split $data ] + + ;## number of jiffies it has been scheduled... + set utime [ lindex $data 13 ] + set stime [ lindex $data 14 ] + } err ] } { + error "assertion:can't parse /proc: $err" + } + set mstime [clock milliseconds] + return [ list $mstime $utime $stime ] +} + +# compute %utime and %stime of a thread between two measurements +proc compute_cpu_usage {start end} { + set clock_ticks [exec getconf CLK_TCK] + # convert ms time to jiffies and calc delta + set dtime [ expr { ([lindex $end 0] - [lindex $start 0]) * double($clock_ticks) / 1000 } ] + set utime [ expr { [lindex $end 1] - [lindex $start 1] } ] + set stime [ expr { [lindex $end 2] - [lindex $start 2] } ] + set pucpu [ expr { ($utime / $dtime) * 100 } ] + set pscpu [ expr { ($stime / $dtime) * 100 } ] + return [ list $pucpu $pscpu ] +} + + +# test diskless rdb pipe with multiple replicas, which may drop half way +start_server {tags {"repl"}} { + set master [srv 0 client] + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 1 + set master_host [srv 0 host] + set master_port [srv 0 port] + # put enough data in the db that the rdb file will be bigger than the socket buffers + # and since we'll have key-load-delay of 100, 10000 keys will take at least 1 second + # we also need the replica to process requests during transfer (which it does only once in 2mb) + $master debug populate 10000 test 10000 + $master config set rdbcompression no + foreach all_drop {no slow fast all} { + test "diskless $all_drop replicas drop during rdb pipe" { + set replicas {} + set replicas_alive {} + # start one replica that will read the rdb fast, and one that will be slow + start_server {} { + lappend replicas [srv 0 client] + lappend replicas_alive [srv 0 client] + start_server {} { + lappend replicas [srv 0 client] + lappend replicas_alive [srv 0 client] + + # start replication + # it's enough for just one replica to be slow, and have it's write handler enabled + # so that the whole rdb generation process is bound to that + [lindex $replicas 0] config set repl-diskless-load swapdb + [lindex $replicas 0] config set key-load-delay 100 + [lindex $replicas 0] replicaof $master_host $master_port + [lindex $replicas 1] replicaof $master_host $master_port + + # wait for the replicas to start reading the rdb + # using the log file since the replica only responds to INFO once in 2mb + wait_for_log_message -1 "*Loading DB in memory*" 8 800 10 + + set master_statfile [format "/proc/%s/stat" [srv -2 pid]] + set master_start_metrics [get_cpu_metrics $master_statfile] + set start_time [clock seconds] + + # wait a while so that the pipe socket writer will be + # blocked on write (since replica 0 is slow to read from the socket) + after 500 + + # add some command to be present in the command stream after the rdb. + $master incr $all_drop + + # disconnect replicas depending on the current test + if {$all_drop == "all" || $all_drop == "fast"} { + exec kill [srv 0 pid] + set replicas_alive [lreplace $replicas_alive 1 1] + } + if {$all_drop == "all" || $all_drop == "slow"} { + exec kill [srv -1 pid] + set replicas_alive [lreplace $replicas_alive 0 0] + } + + # wait for rdb child to exit + wait_for_condition 500 100 { + [s -2 rdb_bgsave_in_progress] == 0 + } else { + fail "rdb child didn't terminate" + } + + # make sure we got what we were aiming for, by looking for the message in the log file + if {$all_drop == "all"} { + wait_for_log_message -2 "*Diskless rdb transfer, last replica dropped, killing fork child*" 12 1 1 + } + if {$all_drop == "no"} { + wait_for_log_message -2 "*Diskless rdb transfer, done reading from pipe, 2 replicas still up*" 12 1 1 + } + if {$all_drop == "slow" || $all_drop == "fast"} { + wait_for_log_message -2 "*Diskless rdb transfer, done reading from pipe, 1 replicas still up*" 12 1 1 + } + + # make sure we don't have a busy loop going thought epoll_wait + set master_end_metrics [get_cpu_metrics $master_statfile] + set time_elapsed [expr {[clock seconds]-$start_time}] + set master_cpu [compute_cpu_usage $master_start_metrics $master_end_metrics] + set master_utime [lindex $master_cpu 0] + set master_stime [lindex $master_cpu 1] + if {$::verbose} { + puts "elapsed: $time_elapsed" + puts "master utime: $master_utime" + puts "master stime: $master_stime" + } + if {$all_drop == "all" || $all_drop == "slow"} { + assert {$master_utime < 30} + assert {$master_stime < 30} + } + if {$all_drop == "none" || $all_drop == "fast"} { + assert {$master_utime < 15} + assert {$master_stime < 15} + } + + # verify the data integrity + foreach replica $replicas_alive { + # Wait that replicas acknowledge they are online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. + wait_for_condition 50 100 { + [lindex [$replica role] 3] eq {connected} + } else { + fail "replicas still not connected after some time" + } + + # Make sure that replicas and master have same + # number of keys + wait_for_condition 50 100 { + [$master dbsize] == [$replica dbsize] + } else { + fail "Different number of keys between master and replicas after too long time." + } + + # Check digests + set digest [$master debug digest] + set digest0 [$replica debug digest] + assert {$digest ne 0000000000000000000000000000000000000000} + assert {$digest eq $digest0} + } + } + } + } + } +}