diff --git a/extend/limit/limit.go b/extend/limit/limit.go index f3cb9bf..c0236c0 100755 --- a/extend/limit/limit.go +++ b/extend/limit/limit.go @@ -4,7 +4,6 @@ import ( "io" frpNet "github.com/fatedier/frp/utils/net" - "golang.org/x/time/rate" ) const ( @@ -17,35 +16,29 @@ const ( EB ) -const BurstLimit = 1024 * 1024 * 1024 +const burstLimit = 1024 * 1024 * 1024 -type Conn struct { +type LimitConn struct { frpNet.Conn lr io.Reader lw io.Writer } -func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) Conn { - return Conn{ - lr: NewReaderWithLimit(c, maxread), - lw: NewWriterWithLimit(c, maxwrite), +func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) LimitConn { + // 这里不知道为什么要 49 才能对的上真实速度 + // 49 是根据 wget 速度来取的,测试了 512、1024、2048、4096、8192 等多种速度下都很准确 + return LimitConn{ + lr: NewReaderWithLimit(c, maxread*49), + lw: NewWriterWithLimit(c, maxwrite*49), Conn: c, } } -func NewLimitConnWithBucket(c frpNet.Conn, rBucket, wBucket *rate.Limiter) Conn { - return Conn{ - lr: NewReaderWithLimitWithBucket(c, rBucket), - lw: NewWriterWithLimitWithBucket(c, wBucket), - Conn: c, - } -} - -func (c Conn) Read(p []byte) (n int, err error) { +func (c LimitConn) Read(p []byte) (n int, err error) { return c.lr.Read(p) } -func (c Conn) Write(p []byte) (n int, err error) { +func (c LimitConn) Write(p []byte) (n int, err error) { return c.lw.Write(p) } diff --git a/extend/limit/reader.go b/extend/limit/reader.go index 3d8fc18..cac8fb3 100644 --- a/extend/limit/reader.go +++ b/extend/limit/reader.go @@ -31,28 +31,17 @@ func NewReaderWithLimit(r io.Reader, speed uint64) *Reader { ctx: context.Background(), mux: sync.Mutex{}, } - rr.limiter = rate.NewLimiter(rate.Limit(speed), BurstLimit) rr.SetRateLimit(speed) return rr } -func NewReaderWithLimitWithBucket(r io.Reader, bucket *rate.Limiter) *Reader { - rr := &Reader{ - r: r, - ctx: context.Background(), - mux: sync.Mutex{}, - limiter: bucket, - } - return rr -} - // SetRateLimit sets rate limit (bytes/sec) to the reader. func (s *Reader) SetRateLimit(bytesPerSec uint64) { s.mux.Lock() defer s.mux.Unlock() - // s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) - s.limiter.AllowN(time.Now(), BurstLimit) // spend initial burst + s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) + s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst } // Read reads bytes into p. diff --git a/extend/limit/writer.go b/extend/limit/writer.go index 2a705a1..628e223 100644 --- a/extend/limit/writer.go +++ b/extend/limit/writer.go @@ -31,27 +31,17 @@ func NewWriterWithLimit(w io.Writer, speed uint64) *Writer { ctx: context.Background(), mux: sync.Mutex{}, } - ww.limiter = rate.NewLimiter(rate.Limit(speed), BurstLimit) ww.SetRateLimit(speed) return ww } -func NewWriterWithLimitWithBucket(w io.Writer, bucket *rate.Limiter) *Writer { - ww := &Writer{ - w: w, - ctx: context.Background(), - mux: sync.Mutex{}, - limiter: bucket, - } - return ww -} // SetRateLimit sets rate limit (bytes/sec) to the writer. func (s *Writer) SetRateLimit(bytesPerSec uint64) { s.mux.Lock() defer s.mux.Unlock() - // s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) - s.limiter.AllowN(time.Now(), BurstLimit) // spend initial burst + s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) + s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst } // Write writes bytes from p. diff --git a/server/control.go b/server/control.go index 78e6ca2..c848c54 100755 --- a/server/control.go +++ b/server/control.go @@ -32,7 +32,6 @@ import ( "github.com/fatedier/frp/utils/net" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/version" - "golang.org/x/time/rate" "github.com/fatedier/golib/control/shutdown" "github.com/fatedier/golib/crypto" @@ -133,8 +132,8 @@ type Control struct { managerShutdown *shutdown.Shutdown allShutdown *shutdown.Shutdown - inLimit *rate.Limiter - outLimit *rate.Limiter + inLimit uint64 + outLimit uint64 mu sync.RWMutex } @@ -161,8 +160,8 @@ func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManage writerShutdown: shutdown.New(), managerShutdown: shutdown.New(), allShutdown: shutdown.New(), - inLimit: rate.NewLimiter(rate.Limit(inLimit*limit.KB), int(inLimit*limit.KB)), - outLimit: rate.NewLimiter(rate.Limit(outLimit*limit.KB), int(outLimit*limit.KB)), + inLimit: inLimit, //rate.NewLimiter(rate.Limit(inLimit*limit.KB), int(inLimit*limit.KB)), + outLimit: outLimit, //rate.NewLimiter(rate.Limit(outLimit*limit.KB), int(outLimit*limit.KB)), } } @@ -458,7 +457,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err if err != nil { return nil, err } - return limit.NewLimitConnWithBucket(fconn, ctl.outLimit, ctl.inLimit), nil + return limit.NewLimitConn(ctl.inLimit, ctl.outLimit, fconn), nil } } diff --git a/server/dashboard.go b/server/dashboard.go index d682aeb..0181a57 100644 --- a/server/dashboard.go +++ b/server/dashboard.go @@ -44,6 +44,7 @@ func (svr *Service) RunDashboardServer(addr string, port int) (err error) { router.HandleFunc("/api/proxy/{type}", svr.ApiProxyByType).Methods("GET") router.HandleFunc("/api/proxy/{type}/{name}", svr.ApiProxyByTypeAndName).Methods("GET") router.HandleFunc("/api/traffic/{name}", svr.ApiProxyTraffic).Methods("GET") + router.HandleFunc("/api/client/close/{user}", svr.ApiCloseClient).Methods("GET") // view router.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET") diff --git a/server/dashboard_api.go b/server/dashboard_api.go index dcea7e6..db591df 100644 --- a/server/dashboard_api.go +++ b/server/dashboard_api.go @@ -322,3 +322,31 @@ func (svr *Service) ApiProxyTraffic(w http.ResponseWriter, r *http.Request) { buf, _ := json.Marshal(&trafficResp) res.Msg = string(buf) } + +type CloseUserResp struct { + Status int `json:"status"` + Msg string `json:"message"` +} + +func (svr *Service) ApiCloseClient(w http.ResponseWriter, r *http.Request) { + var ( + buf []byte + resp = CloseUserResp{} + ) + params := mux.Vars(r) + user := params["user"] + defer func() { + log.Info("Http response [/api/client/close/{user}]: code [%d]", resp.Status) + }() + log.Info("Http request: [/api/client/close/{user}] %#v", user) + err := svr.CloseUser(user) + if err != nil { + resp.Status = 404 + resp.Msg = err.Error() + } else { + resp.Status = 200 + resp.Msg = "OK" + } + buf, _ = json.Marshal(&resp) + w.Write(buf) +} diff --git a/server/service.go b/server/service.go index c93d127..58350dc 100755 --- a/server/service.go +++ b/server/service.go @@ -413,10 +413,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e // If client's RunId is empty, it's a new client, we just create a new controller. // Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one. if loginMsg.RunId == "" { - loginMsg.RunId, err = util.RandId() - if err != nil { - return - } + loginMsg.RunId = loginMsg.User } ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg, inLimit, outLimit) @@ -475,3 +472,12 @@ func generateTLSConfig() *tls.Config { } return &tls.Config{Certificates: []tls.Certificate{tlsCert}} } + +func (svr *Service) CloseUser(user string) error { + ctl, ok := svr.ctlManager.GetById(user) + if !ok { + return fmt.Errorf("user not login") + } + ctl.allShutdown.Start() + return nil +} diff --git a/utils/vhost/resource.go b/utils/vhost/resource.go index 71bfc3a..d0a686f 100755 --- a/utils/vhost/resource.go +++ b/utils/vhost/resource.go @@ -93,7 +93,7 @@ func getServiceUnavailablePageContent() []byte { func notFoundResponse() *http.Response { header := make(http.Header) - header.Set("server", "frp/" + version.Full() + "-sakurapanel") + header.Set("server", "frp/"+version.Full()+"-sakurapanel") header.Set("Content-Type", "text/html") res := &http.Response{