From b55a24a27e07fcb22c9d8c963cf6b4f84f5d8823 Mon Sep 17 00:00:00 2001 From: fatedier Date: Tue, 27 Jun 2017 23:30:09 +0800 Subject: [PATCH] update mutex used in frpc control --- client/control.go | 63 +++++++++++++++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/client/control.go b/client/control.go index 825d024..fb1ba71 100644 --- a/client/control.go +++ b/client/control.go @@ -137,13 +137,6 @@ func (ctl *Control) Run() (err error) { go ctl.writer() go ctl.reader() - // send NewProxy message for all configured proxies - for _, cfg := range ctl.pxyCfgs { - var newProxyMsg msg.NewProxy - cfg.UnMarshalToMsg(&newProxyMsg) - ctl.sendCh <- &newProxyMsg - } - // start all local vistors for _, cfg := range ctl.vistorCfgs { vistor := NewVistor(ctl, cfg) @@ -155,6 +148,13 @@ func (ctl *Control) Run() (err error) { ctl.vistors[cfg.GetName()] = vistor vistor.Info("start vistor success") } + + // send NewProxy message for all configured proxies + for _, cfg := range ctl.pxyCfgs { + var newProxyMsg msg.NewProxy + cfg.UnMarshalToMsg(&newProxyMsg) + ctl.sendCh <- &newProxyMsg + } return nil } @@ -165,7 +165,7 @@ func (ctl *Control) NewWorkConn() { } m := &msg.NewWorkConn{ - RunId: ctl.runId, + RunId: ctl.getRunId(), } if err = msg.WriteMsg(workConn, m); err != nil { ctl.Warn("work connection write to server error: %v", err) @@ -182,9 +182,7 @@ func (ctl *Control) NewWorkConn() { workConn.AddLogPrefix(startMsg.ProxyName) // dispatch this work connection to related proxy - ctl.mu.RLock() - pxy, ok := ctl.proxies[startMsg.ProxyName] - ctl.mu.RUnlock() + pxy, ok := ctl.getProxy(startMsg.ProxyName) if ok { workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) go pxy.InWorkConn(workConn) @@ -251,7 +249,7 @@ func (ctl *Control) login() (err error) { now := time.Now().Unix() ctl.loginMsg.PrivilegeKey = util.GetAuthKey(config.ClientCommonCfg.PrivilegeToken, now) ctl.loginMsg.Timestamp = now - ctl.loginMsg.RunId = ctl.runId + ctl.loginMsg.RunId = ctl.getRunId() if err = msg.WriteMsg(conn, ctl.loginMsg); err != nil { return err @@ -272,7 +270,7 @@ func (ctl *Control) login() (err error) { ctl.conn = conn // update runId got from server - ctl.runId = loginRespMsg.RunId + ctl.setRunId(loginRespMsg.RunId) ctl.ClearLogPrefix() ctl.AddLogPrefix(loginRespMsg.RunId) ctl.Info("login to server success, get run id [%s]", loginRespMsg.RunId) @@ -349,6 +347,7 @@ func (ctl *Control) writer() { } } +// manager handles all channel events and do corresponding process func (ctl *Control) manager() { defer func() { if err := recover(); err != nil { @@ -396,9 +395,7 @@ func (ctl *Control) manager() { continue } - ctl.mu.RLock() - oldPxy, ok := ctl.proxies[m.ProxyName] - ctl.mu.RUnlock() + oldPxy, ok := ctl.getProxy(m.ProxyName) if ok { oldPxy.Close() } @@ -410,9 +407,7 @@ func (ctl *Control) manager() { } continue } - ctl.mu.Lock() - ctl.proxies[m.ProxyName] = pxy - ctl.mu.Unlock() + ctl.addProxy(m.ProxyName, pxy) ctl.Info("[%s] start proxy success", m.ProxyName) case *msg.Pong: ctl.lastPong = time.Now() @@ -422,7 +417,8 @@ func (ctl *Control) manager() { } } -// control keep watching closedCh, start a new connection if previous control connection is closed +// controler keep watching closedCh, start a new connection if previous control connection is closed. +// If controler is notified by closedCh, reader and writer and manager will exit, then recall these functions. func (ctl *Control) controler() { var err error maxDelayTime := 30 * time.Second @@ -435,7 +431,7 @@ func (ctl *Control) controler() { case <-checkProxyTicker.C: // Every 30 seconds, check which proxy registered failed and reregister it to server. for _, cfg := range ctl.pxyCfgs { - if _, exist := ctl.proxies[cfg.GetName()]; !exist { + if _, exist := ctl.getProxy(cfg.GetName()); !exist { ctl.Info("try to reregister proxy [%s]", cfg.GetName()) var newProxyMsg msg.NewProxy cfg.UnMarshalToMsg(&newProxyMsg) @@ -501,3 +497,28 @@ func (ctl *Control) controler() { } } } + +func (ctl *Control) setRunId(runId string) { + ctl.mu.Lock() + defer ctl.mu.Unlock() + ctl.runId = runId +} + +func (ctl *Control) getRunId() string { + ctl.mu.RLock() + defer ctl.mu.RUnlock() + return ctl.runId +} + +func (ctl *Control) getProxy(name string) (pxy Proxy, ok bool) { + ctl.mu.RLock() + defer ctl.mu.RUnlock() + pxy, ok = ctl.proxies[name] + return +} + +func (ctl *Control) addProxy(name string, pxy Proxy) { + ctl.mu.Lock() + defer ctl.mu.Unlock() + ctl.proxies[name] = pxy +}