diff --git a/ae.c b/ae.c index 81fd9add8..25e55fdef 100644 --- a/ae.c +++ b/ae.c @@ -93,7 +93,6 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; - if (mask & AE_EXCEPTION) fe->efileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; @@ -325,18 +324,19 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; + int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ - if (fe->mask & mask & AE_READABLE) + if (fe->mask & mask & AE_READABLE) { + rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); - if (fe->mask & mask & AE_WRITABLE && fe->wfileProc != fe->rfileProc) - fe->wfileProc(eventLoop,fd,fe->clientData,mask); - if (fe->mask & mask & AE_EXCEPTION && - fe->efileProc != fe->wfileProc && - fe->efileProc != fe->rfileProc) - fe->efileProc(eventLoop,fd,fe->clientData,mask); + } + if (fe->mask & mask & AE_WRITABLE) { + if (!rfired || fe->wfileProc != fe->rfileProc) + fe->wfileProc(eventLoop,fd,fe->clientData,mask); + } processed++; } } @@ -362,11 +362,9 @@ int aeWait(int fd, int mask, long long milliseconds) { if (mask & AE_READABLE) FD_SET(fd,&rfds); if (mask & AE_WRITABLE) FD_SET(fd,&wfds); - if (mask & AE_EXCEPTION) FD_SET(fd,&efds); if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) { if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE; if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE; - if (FD_ISSET(fd,&efds)) retmask |= AE_EXCEPTION; return retmask; } else { return retval; diff --git a/ae.h b/ae.h index 0a056ce91..4e9503bc5 100644 --- a/ae.h +++ b/ae.h @@ -41,7 +41,6 @@ #define AE_NONE 0 #define AE_READABLE 1 #define AE_WRITABLE 2 -#define AE_EXCEPTION 4 #define AE_FILE_EVENTS 1 #define AE_TIME_EVENTS 2 @@ -62,10 +61,9 @@ typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientDat /* File event structure */ typedef struct aeFileEvent { - int mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */ + int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc; aeFileProc *wfileProc; - aeFileProc *efileProc; void *clientData; } aeFileEvent; diff --git a/ae_epoll.c b/ae_epoll.c index ce9ce3b01..a8cd3adc1 100644 --- a/ae_epoll.c +++ b/ae_epoll.c @@ -38,7 +38,6 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; - if (mask & AE_EXCEPTION) ee.events |= EPOLLPRI; ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; @@ -53,7 +52,6 @@ static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { ee.events = 0; if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; - if (mask & AE_EXCEPTION) ee.events |= EPOLLPRI; ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; if (mask != AE_NONE) { @@ -81,7 +79,6 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; - if (e->events & EPOLLPRI) mask |= AE_EXCEPTION; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } diff --git a/ae_select.c b/ae_select.c index 11fd719aa..6dc825153 100644 --- a/ae_select.c +++ b/ae_select.c @@ -5,10 +5,10 @@ #include typedef struct aeApiState { - fd_set rfds, wfds, efds; + fd_set rfds, wfds; /* We need to have a copy of the fd sets as it's not safe to reuse * FD sets after select(). */ - fd_set _rfds, _wfds, _efds; + fd_set _rfds, _wfds; } aeApiState; static int aeApiCreate(aeEventLoop *eventLoop) { @@ -17,7 +17,6 @@ static int aeApiCreate(aeEventLoop *eventLoop) { if (!state) return -1; FD_ZERO(&state->rfds); FD_ZERO(&state->wfds); - FD_ZERO(&state->efds); eventLoop->apidata = state; return 0; } @@ -31,7 +30,6 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { if (mask & AE_READABLE) FD_SET(fd,&state->rfds); if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds); - if (mask & AE_EXCEPTION) FD_SET(fd,&state->efds); return 0; } @@ -40,7 +38,6 @@ static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { if (mask & AE_READABLE) FD_CLR(fd,&state->rfds); if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds); - if (mask & AE_EXCEPTION) FD_CLR(fd,&state->efds); } static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { @@ -49,10 +46,9 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); - memcpy(&state->_efds,&state->efds,sizeof(fd_set)); retval = select(eventLoop->maxfd+1, - &state->_rfds,&state->_wfds,&state->_efds,tvp); + &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { for (j = 0; j <= eventLoop->maxfd; j++) { int mask = 0; @@ -63,8 +59,6 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; - if (fe->mask & AE_EXCEPTION && FD_ISSET(j,&state->_efds)) - mask |= AE_EXCEPTION; eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; diff --git a/redis-cli.c b/redis-cli.c index 7559d00e2..cac22c9bb 100644 --- a/redis-cli.c +++ b/redis-cli.c @@ -134,6 +134,7 @@ static struct redisCommand cmdTable[] = { {"debug",-2,REDIS_CMD_INLINE}, {"mset",-3,REDIS_CMD_MULTIBULK}, {"msetnx",-3,REDIS_CMD_MULTIBULK}, + {"monitor",1,REDIS_CMD_INLINE}, {NULL,0,0} }; @@ -188,6 +189,7 @@ static int cliReadSingleLineReply(int fd, int quiet) { if (reply == NULL) return 1; if (!quiet) printf("%s\n", reply); + sdsfree(reply); return 0; } @@ -287,6 +289,7 @@ static int selectDb(int fd) static int cliSendCommand(int argc, char **argv) { struct redisCommand *rc = lookupCommand(argv[0]); int fd, j, retval = 0; + int read_forever = 0; sds cmd; if (!rc) { @@ -299,6 +302,7 @@ static int cliSendCommand(int argc, char **argv) { fprintf(stderr,"Wrong number of arguments for '%s'\n",rc->name); return 1; } + if (!strcasecmp(rc->name,"monitor")) read_forever = 1; if ((fd = cliConnect()) == -1) return 1; /* Select db number */ @@ -337,6 +341,11 @@ static int cliSendCommand(int argc, char **argv) { } anetWrite(fd,cmd,sdslen(cmd)); sdsfree(cmd); + + while (read_forever) { + cliReadSingleLineReply(fd,0); + } + retval = cliReadReply(fd); if (retval) { close(fd);