Show threading configuration in INFO output (#7446)

Co-authored-by: Oran Agra <oran@redislabs.com>
(cherry picked from commit f6cad30bb6)
This commit is contained in:
Arun Ranganathan 2020-07-29 01:46:44 -04:00 committed by Oran Agra
parent 2257f38b68
commit e81bac32fd
3 changed files with 46 additions and 14 deletions

View File

@ -1313,6 +1313,9 @@ client *lookupClientByID(uint64_t id) {
* set to 0. So when handler_installed is set to 0 the function must be
* thread safe. */
int writeToClient(client *c, int handler_installed) {
/* Update total number of writes on server */
server.stat_total_writes_processed++;
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
clientReplyBlock *o;
@ -1929,6 +1932,9 @@ void readQueryFromClient(connection *conn) {
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
/* Update total number of reads on server */
server.stat_total_reads_processed++;
readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
@ -2926,7 +2932,6 @@ int tio_debug = 0;
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
int io_threads_active; /* Are the threads currently spinning waiting I/O? */
int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
/* This is the list of clients each thread will serve when threaded I/O is
@ -2985,7 +2990,7 @@ void *IOThreadMain(void *myid) {
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
server.io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
@ -3019,10 +3024,10 @@ void initThreadedIO(void) {
void startThreadedIO(void) {
if (tio_debug) { printf("S"); fflush(stdout); }
if (tio_debug) printf("--- STARTING THREADED IO ---\n");
serverAssert(io_threads_active == 0);
serverAssert(server.io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
io_threads_active = 1;
server.io_threads_active = 1;
}
void stopThreadedIO(void) {
@ -3033,10 +3038,10 @@ void stopThreadedIO(void) {
if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n",
(int) listLength(server.clients_pending_read),
(int) listLength(server.clients_pending_write));
serverAssert(io_threads_active == 1);
serverAssert(server.io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0;
server.io_threads_active = 0;
}
/* This function checks if there are not enough pending clients to justify
@ -3055,7 +3060,7 @@ int stopThreadedIOIfNeeded(void) {
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
if (server.io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
@ -3073,7 +3078,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
}
/* Start threads if needed. */
if (!io_threads_active) startThreadedIO();
if (!server.io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
@ -3130,6 +3135,10 @@ int handleClientsWithPendingWritesUsingThreads(void) {
}
}
listEmpty(server.clients_pending_write);
/* Update processed count on server */
server.stat_io_writes_processed += processed;
return processed;
}
@ -3138,7 +3147,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (io_threads_active &&
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
@ -3158,7 +3167,7 @@ int postponeClientRead(client *c) {
* the reads in the buffers, and also parse the first command available
* rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
@ -3219,5 +3228,9 @@ int handleClientsWithPendingReadsUsingThreads(void) {
}
processInputBuffer(c);
}
/* Update processed count on server */
server.stat_io_reads_processed += processed;
return processed;
}

View File

@ -2726,6 +2726,10 @@ void resetServerStats(void) {
server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
server.stat_io_reads_processed = 0;
server.stat_total_reads_processed = 0;
server.stat_io_writes_processed = 0;
server.stat_total_writes_processed = 0;
for (j = 0; j < STATS_METRIC_COUNT; j++) {
server.inst_metric[j].idx = 0;
server.inst_metric[j].last_sample_time = mstime();
@ -4075,7 +4079,8 @@ sds genRedisInfoString(const char *section) {
"configured_hz:%i\r\n"
"lru_clock:%u\r\n"
"executable:%s\r\n"
"config_file:%s\r\n",
"config_file:%s\r\n"
"io_threads_active:%d\r\n",
REDIS_VERSION,
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
@ -4099,7 +4104,8 @@ sds genRedisInfoString(const char *section) {
server.config_hz,
server.lruclock,
server.executable ? server.executable : "",
server.configfile ? server.configfile : "");
server.configfile ? server.configfile : "",
server.io_threads_active);
}
/* Clients */
@ -4371,7 +4377,11 @@ sds genRedisInfoString(const char *section) {
"tracking_total_keys:%lld\r\n"
"tracking_total_items:%lld\r\n"
"tracking_total_prefixes:%lld\r\n"
"unexpected_error_replies:%lld\r\n",
"unexpected_error_replies:%lld\r\n"
"total_reads_processed:%lld\r\n"
"total_writes_processed:%lld\r\n"
"io_threaded_reads_processed:%lld\r\n"
"io_threaded_writes_processed:%lld\r\n",
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
@ -4402,7 +4412,11 @@ sds genRedisInfoString(const char *section) {
(unsigned long long) trackingGetTotalKeys(),
(unsigned long long) trackingGetTotalItems(),
(unsigned long long) trackingGetTotalPrefixes(),
server.stat_unexpected_error_replies);
server.stat_unexpected_error_replies,
server.stat_total_reads_processed,
server.stat_total_writes_processed,
server.stat_io_reads_processed,
server.stat_io_writes_processed);
}
/* Replication */

View File

@ -1108,6 +1108,7 @@ struct redisServer {
queries. Will still serve RESP2 queries. */
int io_threads_num; /* Number of IO threads to use. */
int io_threads_do_reads; /* Read and parse from IO threads? */
int io_threads_active; /* Is IO threads currently active? */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
/* RDB / AOF loading information */
@ -1157,6 +1158,10 @@ struct redisServer {
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */
long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */
_Atomic long long stat_total_reads_processed; /* Total number of read events processed */
_Atomic long long stat_total_writes_processed; /* Total number of write events processed */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {