diff --git a/src/connection.h b/src/connection.h index db09dfd83..0fd6c5f24 100644 --- a/src/connection.h +++ b/src/connection.h @@ -222,6 +222,6 @@ const char *connGetInfo(connection *conn, char *buf, size_t buf_len); /* Helpers for tls special considerations */ int tlsHasPendingData(); -void tlsProcessPendingData(); +int tlsProcessPendingData(); #endif /* __REDIS_CONNECTION_H */ diff --git a/src/networking.c b/src/networking.c index ce5b0ae38..671e374f4 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1257,7 +1257,10 @@ void freeClientAsync(client *c) { pthread_mutex_unlock(&async_free_queue_mutex); } -void freeClientsInAsyncFreeQueue(void) { +/* Free the clietns marked as CLOSE_ASAP, return the number of clients + * freed. */ +int freeClientsInAsyncFreeQueue(void) { + int freed = listLength(server.clients_to_close); while (listLength(server.clients_to_close)) { listNode *ln = listFirst(server.clients_to_close); client *c = listNodeValue(ln); @@ -1266,6 +1269,7 @@ void freeClientsInAsyncFreeQueue(void) { freeClient(c); listDelNode(server.clients_to_close,ln); } + return freed; } /* Return a client by ID, or NULL if the client ID is not in the set @@ -2852,9 +2856,8 @@ int clientsArePaused(void) { * write, close sequence needed to serve a client. * * The function returns the total number of events processed. */ -int processEventsWhileBlocked(void) { +void processEventsWhileBlocked(void) { int iterations = 4; /* See the function top-comment. */ - int count = 0; /* Note: when we are processing events while blocked (for instance during * busy Lua scripts), we set a global flag. When such flag is set, we @@ -2862,15 +2865,17 @@ int processEventsWhileBlocked(void) { * See https://github.com/antirez/redis/issues/6988 for more info. */ ProcessingEventsWhileBlocked = 1; while (iterations--) { - int events = 0; - events += aeProcessEvents(server.el, + long long startval = server.events_processed_while_blocked; + long long ae_events = aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT| AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); + /* Note that server.events_processed_while_blocked will also get + * incremeted by callbacks called by the event loop handlers. */ + server.events_processed_while_blocked += ae_events; + long long events = server.events_processed_while_blocked - startval; if (!events) break; - count += events; } ProcessingEventsWhileBlocked = 0; - return count; } /* ========================================================================== diff --git a/src/server.c b/src/server.c index 380a6d5b6..1bed3959d 100644 --- a/src/server.c +++ b/src/server.c @@ -2101,12 +2101,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Just call a subset of vital functions in case we are re-entering - * the event loop from processEventsWhileBlocked(). */ + * the event loop from processEventsWhileBlocked(). Note that in this + * case we keep track of the number of events we are processing, since + * processEventsWhileBlocked() wants to stop ASAP if there are no longer + * events to handle. */ if (ProcessingEventsWhileBlocked) { - handleClientsWithPendingReadsUsingThreads(); - tlsProcessPendingData(); - handleClientsWithPendingWrites(); - freeClientsInAsyncFreeQueue(); + uint64_t processed = 0; + processed += handleClientsWithPendingReadsUsingThreads(); + processed += tlsProcessPendingData(); + processed += handleClientsWithPendingWrites(); + processed += freeClientsInAsyncFreeQueue(); + server.events_processed_while_blocked += processed; return; } @@ -2757,6 +2762,7 @@ void initServer(void) { server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.clients_paused = 0; + server.events_processed_while_blocked = 0; server.system_memory_size = zmalloc_get_memory_size(); if (server.tls_port && tlsConfigure(&server.tls_ctx_config) == C_ERR) { diff --git a/src/server.h b/src/server.h index 59cf1370e..55ee2d300 100644 --- a/src/server.h +++ b/src/server.h @@ -1095,6 +1095,7 @@ struct redisServer { queries. Will still serve RESP2 queries. */ int io_threads_num; /* Number of IO threads to use. */ int io_threads_do_reads; /* Read and parse from IO threads? */ + long long events_processed_while_blocked; /* processEventsWhileBlocked() */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ @@ -1652,7 +1653,7 @@ void rewriteClientCommandVector(client *c, int argc, ...); void rewriteClientCommandArgument(client *c, int i, robj *newval); void replaceClientCommandVector(client *c, int argc, robj **argv); unsigned long getClientOutputBufferMemoryUsage(client *c); -void freeClientsInAsyncFreeQueue(void); +int freeClientsInAsyncFreeQueue(void); void asyncCloseClientOnOutputBufferLimitReached(client *c); int getClientType(client *c); int getClientTypeByName(char *name); @@ -1662,7 +1663,7 @@ void disconnectSlaves(void); int listenToPort(int port, int *fds, int *count); void pauseClients(mstime_t duration); int clientsArePaused(void); -int processEventsWhileBlocked(void); +void processEventsWhileBlocked(void); int handleClientsWithPendingWrites(void); int handleClientsWithPendingWritesUsingThreads(void); int handleClientsWithPendingReadsUsingThreads(void); diff --git a/src/tls.c b/src/tls.c index c18aafebe..ee85bd302 100644 --- a/src/tls.c +++ b/src/tls.c @@ -768,15 +768,17 @@ int tlsHasPendingData() { return listLength(pending_list) > 0; } -void tlsProcessPendingData() { +int tlsProcessPendingData() { listIter li; listNode *ln; + int processed = listLength(pending_list); listRewind(pending_list,&li); while((ln = listNext(&li))) { tls_connection *conn = listNodeValue(ln); tlsHandleEvent(conn, AE_READABLE); } + return processed; } #else /* USE_OPENSSL */ @@ -804,7 +806,8 @@ int tlsHasPendingData() { return 0; } -void tlsProcessPendingData() { +int tlsProcessPendingData() { + return 0; } #endif