From 7a3a34da307554604909f22cb84cab78b70868da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=98=89=E8=81=AA?= <9a6c5609806a@gmail.com> Date: Mon, 9 Sep 2019 16:11:00 +0800 Subject: [PATCH 1/5] add: proxy live cumulative flow --- extend/cumu/cumu.go | 67 +++++++++++++++++++++++++++++++++++++++++++ extend/limit/limit.go | 14 ++++----- server/control.go | 20 +++++++++++-- server/proxy/proxy.go | 33 +++++++++++++++------ 4 files changed, 115 insertions(+), 19 deletions(-) create mode 100644 extend/cumu/cumu.go diff --git a/extend/cumu/cumu.go b/extend/cumu/cumu.go new file mode 100644 index 0000000..979c4a9 --- /dev/null +++ b/extend/cumu/cumu.go @@ -0,0 +1,67 @@ +package cumu + +import ( + "sync" + + frpNet "github.com/fatedier/frp/utils/net" +) + +// Conn 速度累计 +type Conn struct { + frpNet.Conn + + inCount int64 + outCount int64 + + inCountLock sync.Mutex + outCountLock sync.Mutex +} + +// NewCumuConn ... +func NewCumuConn(conn frpNet.Conn) *Conn { + return &Conn{ + Conn: conn, + inCount: 0, + outCount: 0, + } +} + +func (c *Conn) Read(p []byte) (n int, err error) { + n, err = c.Conn.Read(p) + if err != nil { + return + } + c.outCountLock.Lock() + defer c.outCountLock.Unlock() + c.outCount += int64(n) + return +} + +func (c *Conn) Write(p []byte) (n int, err error) { + n, err = c.Conn.Write(p) + if err != nil { + return + } + c.inCountLock.Lock() + defer c.inCountLock.Unlock() + c.inCount += int64(n) + return +} + +// InCount get in bound byte count +func (c *Conn) InCount() int64 { + c.inCountLock.Lock() + defer c.inCountLock.Unlock() + in := c.inCount + c.inCount = 0 + return in +} + +// OutCount get out bound byte count +func (c *Conn) OutCount() int64 { + c.outCountLock.Lock() + defer c.outCountLock.Unlock() + out := c.outCount + c.outCount = 0 + return out +} diff --git a/extend/limit/limit.go b/extend/limit/limit.go index 1f09d4f..e06b76b 100755 --- a/extend/limit/limit.go +++ b/extend/limit/limit.go @@ -18,27 +18,27 @@ const ( const burstLimit = 1024 * 1024 * 1024 -type LimitConn struct { +type Conn struct { frpNet.Conn lr io.Reader lw io.Writer } -func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) LimitConn { +func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) Conn { // 这里不知道为什么要 49 才能对的上真实速度 // 49 是根据 wget 速度来取的,测试了 512、1024、2048、4096、8192 等多种速度下都很准确 - return LimitConn{ - lr: NewReaderWithLimit(c, maxread * 49), - lw: NewWriterWithLimit(c, maxwrite * 49), + return Conn{ + lr: NewReaderWithLimit(c, maxread), + lw: NewWriterWithLimit(c, maxwrite), Conn: c, } } -func (c LimitConn) Read(p []byte) (n int, err error) { +func (c Conn) Read(p []byte) (n int, err error) { return c.lr.Read(p) } -func (c LimitConn) Write(p []byte) (n int, err error) { +func (c Conn) Write(p []byte) (n int, err error) { return c.lw.Write(p) } diff --git a/server/control.go b/server/control.go index 3da3709..dd988a7 100755 --- a/server/control.go +++ b/server/control.go @@ -38,6 +38,7 @@ import ( "github.com/fatedier/golib/errors" "github.com/fatedier/frp/extend/api" + "github.com/fatedier/frp/extend/cumu" "github.com/fatedier/frp/extend/limit" ) @@ -451,16 +452,29 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err if err != nil { return remoteAddr, err } - + // 测试用 ctl.conn.Debug("client speed limit: %dKB/s (Inbound) / %dKB/s (Outbound)", in, out) - + workConn = func() (frpNet.Conn, error) { fconn, err := ctl.GetWorkConn() if err != nil { return nil, err } - return limit.NewLimitConn(in, out, fconn), nil + cumuConn := cumu.NewCumuConn(fconn) + go func(cc *cumu.Conn) { + var oldIn, oldOut uint64 + for { + time.Sleep(1 * time.Second) + in, out := cc.InCount(), cc.OutCount() + if (oldIn == in) && (oldOut == out) { + continue + } + fmt.Printf("In/Out: %d Byte, %d Byte\n", in, out) + oldIn, oldOut = in, out + } + }(cumuConn) + return limit.NewLimitConn(in*limit.KB, out*limit.KB, cumuConn), nil } } diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 1a9a28e..7f0512d 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -20,7 +20,9 @@ import ( "net" "strconv" "sync" + "time" + "github.com/fatedier/frp/extend/cumu" "github.com/fatedier/frp/g" "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" @@ -224,16 +226,29 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector sta workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String()) statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()}) - inCount, outCount := frpIo.Join(local, userConn) + cc := cumu.NewCumuConn(userConn) + endSig := make(chan int) + go func(cc *cumu.Conn, ch chan int) { + for { + select { + case <-ch: + return + default: + time.Sleep(1 * time.Second) + statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{ + ProxyName: pxy.GetName(), + TrafficBytes: cc.OutCount(), + }) + statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{ + ProxyName: pxy.GetName(), + TrafficBytes: cc.InCount(), + }) + } + } + }(cc, endSig) + frpIo.Join(local, userConn) statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()}) - statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{ - ProxyName: pxy.GetName(), - TrafficBytes: inCount, - }) - statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{ - ProxyName: pxy.GetName(), - TrafficBytes: outCount, - }) + endSig <- 1 pxy.Debug("join connections closed") } From 9db484c8ca003dc2072ef0cd9c0af5b1441fc40b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=98=89=E8=81=AA?= <9a6c5609806a@gmail.com> Date: Mon, 9 Sep 2019 20:51:00 +0800 Subject: [PATCH 2/5] fix: rante limit bug and cumu bug --- extend/api/api.go | 6 ++---- extend/limit/limit.go | 13 ++++++++++--- extend/limit/reader.go | 16 ++++++++++++++-- extend/limit/writer.go | 14 ++++++++++++-- server/control.go | 32 ++++++++------------------------ server/proxy/proxy.go | 2 +- server/service.go | 13 ++++++++++++- 7 files changed, 59 insertions(+), 37 deletions(-) diff --git a/extend/api/api.go b/extend/api/api.go index 0b4f86b..f2c56e8 100755 --- a/extend/api/api.go +++ b/extend/api/api.go @@ -8,8 +8,6 @@ import ( "net/url" "strconv" - // "github.com/fatedier/frp/extend/limit" - "github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/msg" ) @@ -147,7 +145,7 @@ func (s Service) CheckProxy(user string, pMsg *msg.NewProxy, timestamp int64, st } // GetProxyLimit 获取隧道限速信息 -func (s Service) GetProxyLimit(user string, pxyConf *config.BaseProxyConf, timestamp int64, stk string) (inLimit, outLimit uint64, err error) { +func (s Service) GetProxyLimit(user string, timestamp int64, stk string) (inLimit, outLimit uint64, err error) { // 这部分就照之前的搬过去了,能跑就行x values := url.Values{} values.Set("action", "getlimit") @@ -180,7 +178,7 @@ func (s Service) GetProxyLimit(user string, pxyConf *config.BaseProxyConf, times if err = json.Unmarshal(body, response); err != nil { return 0, 0, err } - + // 这里直接返回 uint64 应该问题不大 return response.MaxIn, response.MaxOut, nil } diff --git a/extend/limit/limit.go b/extend/limit/limit.go index e06b76b..f3cb9bf 100755 --- a/extend/limit/limit.go +++ b/extend/limit/limit.go @@ -4,6 +4,7 @@ import ( "io" frpNet "github.com/fatedier/frp/utils/net" + "golang.org/x/time/rate" ) const ( @@ -16,7 +17,7 @@ const ( EB ) -const burstLimit = 1024 * 1024 * 1024 +const BurstLimit = 1024 * 1024 * 1024 type Conn struct { frpNet.Conn @@ -26,8 +27,6 @@ type Conn struct { } func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) Conn { - // 这里不知道为什么要 49 才能对的上真实速度 - // 49 是根据 wget 速度来取的,测试了 512、1024、2048、4096、8192 等多种速度下都很准确 return Conn{ lr: NewReaderWithLimit(c, maxread), lw: NewWriterWithLimit(c, maxwrite), @@ -35,6 +34,14 @@ func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) Conn { } } +func NewLimitConnWithBucket(c frpNet.Conn, rBucket, wBucket *rate.Limiter) Conn { + return Conn{ + lr: NewReaderWithLimitWithBucket(c, rBucket), + lw: NewWriterWithLimitWithBucket(c, wBucket), + Conn: c, + } +} + func (c Conn) Read(p []byte) (n int, err error) { return c.lr.Read(p) } diff --git a/extend/limit/reader.go b/extend/limit/reader.go index cac8fb3..8473dc6 100644 --- a/extend/limit/reader.go +++ b/extend/limit/reader.go @@ -31,17 +31,29 @@ func NewReaderWithLimit(r io.Reader, speed uint64) *Reader { ctx: context.Background(), mux: sync.Mutex{}, } + rr.limiter = rate.NewLimiter(rate.Limit(speed), BurstLimit) rr.SetRateLimit(speed) return rr } +func NewReaderWithLimitWithBucket(r io.Reader, bucket *rate.Limiter) *Reader { + rr := &Reader{ + r: r, + ctx: context.Background(), + mux: sync.Mutex{}, + limiter: bucket, + } + rr.limiter.AllowN(time.Now(), BurstLimit) + return rr +} + // SetRateLimit sets rate limit (bytes/sec) to the reader. func (s *Reader) SetRateLimit(bytesPerSec uint64) { s.mux.Lock() defer s.mux.Unlock() - s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) - s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst + // s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) + s.limiter.AllowN(time.Now(), BurstLimit) // spend initial burst } // Read reads bytes into p. diff --git a/extend/limit/writer.go b/extend/limit/writer.go index 628e223..2a705a1 100644 --- a/extend/limit/writer.go +++ b/extend/limit/writer.go @@ -31,17 +31,27 @@ func NewWriterWithLimit(w io.Writer, speed uint64) *Writer { ctx: context.Background(), mux: sync.Mutex{}, } + ww.limiter = rate.NewLimiter(rate.Limit(speed), BurstLimit) ww.SetRateLimit(speed) return ww } +func NewWriterWithLimitWithBucket(w io.Writer, bucket *rate.Limiter) *Writer { + ww := &Writer{ + w: w, + ctx: context.Background(), + mux: sync.Mutex{}, + limiter: bucket, + } + return ww +} // SetRateLimit sets rate limit (bytes/sec) to the writer. func (s *Writer) SetRateLimit(bytesPerSec uint64) { s.mux.Lock() defer s.mux.Unlock() - s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) - s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst + // s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) + s.limiter.AllowN(time.Now(), BurstLimit) // spend initial burst } // Write writes bytes from p. diff --git a/server/control.go b/server/control.go index dd988a7..ff8af62 100755 --- a/server/control.go +++ b/server/control.go @@ -32,13 +32,13 @@ import ( "github.com/fatedier/frp/utils/net" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/version" + "golang.org/x/time/rate" "github.com/fatedier/golib/control/shutdown" "github.com/fatedier/golib/crypto" "github.com/fatedier/golib/errors" "github.com/fatedier/frp/extend/api" - "github.com/fatedier/frp/extend/cumu" "github.com/fatedier/frp/extend/limit" ) @@ -133,11 +133,14 @@ type Control struct { managerShutdown *shutdown.Shutdown allShutdown *shutdown.Shutdown + inLimit *rate.Limiter + outLimit *rate.Limiter + mu sync.RWMutex } func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManager, - statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login) *Control { + statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login, inLimit, outLimit uint64) *Control { return &Control{ rc: rc, @@ -158,6 +161,8 @@ func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManage writerShutdown: shutdown.New(), managerShutdown: shutdown.New(), allShutdown: shutdown.New(), + inLimit: rate.NewLimiter(rate.Limit(inLimit*limit.KB), limit.BurstLimit), + outLimit: rate.NewLimiter(rate.Limit(outLimit*limit.KB), limit.BurstLimit), } } @@ -448,33 +453,12 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err return remoteAddr, fmt.Errorf("invalid proxy configuration") } - in, out, err := s.GetProxyLimit(ctl.loginMsg.User, pxyConf.GetBaseInfo(), nowTime, g.GlbServerCfg.ApiToken) - if err != nil { - return remoteAddr, err - } - - // 测试用 - ctl.conn.Debug("client speed limit: %dKB/s (Inbound) / %dKB/s (Outbound)", in, out) - workConn = func() (frpNet.Conn, error) { fconn, err := ctl.GetWorkConn() if err != nil { return nil, err } - cumuConn := cumu.NewCumuConn(fconn) - go func(cc *cumu.Conn) { - var oldIn, oldOut uint64 - for { - time.Sleep(1 * time.Second) - in, out := cc.InCount(), cc.OutCount() - if (oldIn == in) && (oldOut == out) { - continue - } - fmt.Printf("In/Out: %d Byte, %d Byte\n", in, out) - oldIn, oldOut = in, out - } - }(cumuConn) - return limit.NewLimitConn(in*limit.KB, out*limit.KB, cumuConn), nil + return limit.NewLimitConnWithBucket(fconn, ctl.outLimit, ctl.inLimit), nil } } diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 7f0512d..4485512 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -246,7 +246,7 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector sta } } }(cc, endSig) - frpIo.Join(local, userConn) + frpIo.Join(local, cc) statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()}) endSig <- 1 pxy.Debug("join connections closed") diff --git a/server/service.go b/server/service.go index 4f79be2..c93d127 100755 --- a/server/service.go +++ b/server/service.go @@ -371,6 +371,11 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e return } + var ( + inLimit uint64 + outLimit uint64 + ) + if g.GlbServerCfg.EnableApi { nowTime := time.Now().Unix() @@ -397,6 +402,12 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e if !valid { return fmt.Errorf("authorization failed") } + + inLimit, outLimit, err = s.GetProxyLimit(loginMsg.User, nowTime, g.GlbServerCfg.ApiToken) + if err != nil { + return err + } + ctlConn.Debug("%s client speed limit: %dKB/s (Inbound) / %dKB/s (Outbound)", loginMsg.User, inLimit, outLimit) } // If client's RunId is empty, it's a new client, we just create a new controller. @@ -408,7 +419,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e } } - ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg) + ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg, inLimit, outLimit) if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil { oldCtl.allShutdown.WaitDone() From 993e9161bf71fe82606ea174499383f42e2c7179 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=98=89=E8=81=AA?= <9a6c5609806a@gmail.com> Date: Mon, 9 Sep 2019 20:56:20 +0800 Subject: [PATCH 3/5] fix: limit reader fix --- extend/limit/reader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/extend/limit/reader.go b/extend/limit/reader.go index 8473dc6..3d8fc18 100644 --- a/extend/limit/reader.go +++ b/extend/limit/reader.go @@ -43,7 +43,6 @@ func NewReaderWithLimitWithBucket(r io.Reader, bucket *rate.Limiter) *Reader { mux: sync.Mutex{}, limiter: bucket, } - rr.limiter.AllowN(time.Now(), BurstLimit) return rr } From 6eec531a32d198430550fe757390953e02ae963d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=98=89=E8=81=AA?= <9a6c5609806a@gmail.com> Date: Mon, 9 Sep 2019 21:12:31 +0800 Subject: [PATCH 4/5] fix: limit bug --- server/control.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/control.go b/server/control.go index ff8af62..78e6ca2 100755 --- a/server/control.go +++ b/server/control.go @@ -161,8 +161,8 @@ func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManage writerShutdown: shutdown.New(), managerShutdown: shutdown.New(), allShutdown: shutdown.New(), - inLimit: rate.NewLimiter(rate.Limit(inLimit*limit.KB), limit.BurstLimit), - outLimit: rate.NewLimiter(rate.Limit(outLimit*limit.KB), limit.BurstLimit), + inLimit: rate.NewLimiter(rate.Limit(inLimit*limit.KB), int(inLimit*limit.KB)), + outLimit: rate.NewLimiter(rate.Limit(outLimit*limit.KB), int(outLimit*limit.KB)), } } From 777b95bb90839119457ee131a6ae4e2fd19c3c16 Mon Sep 17 00:00:00 2001 From: kasuganosoras Date: Sat, 18 Jan 2020 04:33:28 +0800 Subject: [PATCH 5/5] Add close client --- extend/limit/limit.go | 27 ++++++++++----------------- extend/limit/reader.go | 15 ++------------- extend/limit/writer.go | 14 ++------------ server/control.go | 11 +++++------ server/dashboard.go | 1 + server/dashboard_api.go | 28 ++++++++++++++++++++++++++++ server/service.go | 14 ++++++++++---- utils/vhost/resource.go | 2 +- 8 files changed, 59 insertions(+), 53 deletions(-) diff --git a/extend/limit/limit.go b/extend/limit/limit.go index f3cb9bf..c0236c0 100755 --- a/extend/limit/limit.go +++ b/extend/limit/limit.go @@ -4,7 +4,6 @@ import ( "io" frpNet "github.com/fatedier/frp/utils/net" - "golang.org/x/time/rate" ) const ( @@ -17,35 +16,29 @@ const ( EB ) -const BurstLimit = 1024 * 1024 * 1024 +const burstLimit = 1024 * 1024 * 1024 -type Conn struct { +type LimitConn struct { frpNet.Conn lr io.Reader lw io.Writer } -func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) Conn { - return Conn{ - lr: NewReaderWithLimit(c, maxread), - lw: NewWriterWithLimit(c, maxwrite), +func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) LimitConn { + // 这里不知道为什么要 49 才能对的上真实速度 + // 49 是根据 wget 速度来取的,测试了 512、1024、2048、4096、8192 等多种速度下都很准确 + return LimitConn{ + lr: NewReaderWithLimit(c, maxread*49), + lw: NewWriterWithLimit(c, maxwrite*49), Conn: c, } } -func NewLimitConnWithBucket(c frpNet.Conn, rBucket, wBucket *rate.Limiter) Conn { - return Conn{ - lr: NewReaderWithLimitWithBucket(c, rBucket), - lw: NewWriterWithLimitWithBucket(c, wBucket), - Conn: c, - } -} - -func (c Conn) Read(p []byte) (n int, err error) { +func (c LimitConn) Read(p []byte) (n int, err error) { return c.lr.Read(p) } -func (c Conn) Write(p []byte) (n int, err error) { +func (c LimitConn) Write(p []byte) (n int, err error) { return c.lw.Write(p) } diff --git a/extend/limit/reader.go b/extend/limit/reader.go index 3d8fc18..cac8fb3 100644 --- a/extend/limit/reader.go +++ b/extend/limit/reader.go @@ -31,28 +31,17 @@ func NewReaderWithLimit(r io.Reader, speed uint64) *Reader { ctx: context.Background(), mux: sync.Mutex{}, } - rr.limiter = rate.NewLimiter(rate.Limit(speed), BurstLimit) rr.SetRateLimit(speed) return rr } -func NewReaderWithLimitWithBucket(r io.Reader, bucket *rate.Limiter) *Reader { - rr := &Reader{ - r: r, - ctx: context.Background(), - mux: sync.Mutex{}, - limiter: bucket, - } - return rr -} - // SetRateLimit sets rate limit (bytes/sec) to the reader. func (s *Reader) SetRateLimit(bytesPerSec uint64) { s.mux.Lock() defer s.mux.Unlock() - // s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) - s.limiter.AllowN(time.Now(), BurstLimit) // spend initial burst + s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) + s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst } // Read reads bytes into p. diff --git a/extend/limit/writer.go b/extend/limit/writer.go index 2a705a1..628e223 100644 --- a/extend/limit/writer.go +++ b/extend/limit/writer.go @@ -31,27 +31,17 @@ func NewWriterWithLimit(w io.Writer, speed uint64) *Writer { ctx: context.Background(), mux: sync.Mutex{}, } - ww.limiter = rate.NewLimiter(rate.Limit(speed), BurstLimit) ww.SetRateLimit(speed) return ww } -func NewWriterWithLimitWithBucket(w io.Writer, bucket *rate.Limiter) *Writer { - ww := &Writer{ - w: w, - ctx: context.Background(), - mux: sync.Mutex{}, - limiter: bucket, - } - return ww -} // SetRateLimit sets rate limit (bytes/sec) to the writer. func (s *Writer) SetRateLimit(bytesPerSec uint64) { s.mux.Lock() defer s.mux.Unlock() - // s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) - s.limiter.AllowN(time.Now(), BurstLimit) // spend initial burst + s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) + s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst } // Write writes bytes from p. diff --git a/server/control.go b/server/control.go index 78e6ca2..c848c54 100755 --- a/server/control.go +++ b/server/control.go @@ -32,7 +32,6 @@ import ( "github.com/fatedier/frp/utils/net" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/version" - "golang.org/x/time/rate" "github.com/fatedier/golib/control/shutdown" "github.com/fatedier/golib/crypto" @@ -133,8 +132,8 @@ type Control struct { managerShutdown *shutdown.Shutdown allShutdown *shutdown.Shutdown - inLimit *rate.Limiter - outLimit *rate.Limiter + inLimit uint64 + outLimit uint64 mu sync.RWMutex } @@ -161,8 +160,8 @@ func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManage writerShutdown: shutdown.New(), managerShutdown: shutdown.New(), allShutdown: shutdown.New(), - inLimit: rate.NewLimiter(rate.Limit(inLimit*limit.KB), int(inLimit*limit.KB)), - outLimit: rate.NewLimiter(rate.Limit(outLimit*limit.KB), int(outLimit*limit.KB)), + inLimit: inLimit, //rate.NewLimiter(rate.Limit(inLimit*limit.KB), int(inLimit*limit.KB)), + outLimit: outLimit, //rate.NewLimiter(rate.Limit(outLimit*limit.KB), int(outLimit*limit.KB)), } } @@ -458,7 +457,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err if err != nil { return nil, err } - return limit.NewLimitConnWithBucket(fconn, ctl.outLimit, ctl.inLimit), nil + return limit.NewLimitConn(ctl.inLimit, ctl.outLimit, fconn), nil } } diff --git a/server/dashboard.go b/server/dashboard.go index d682aeb..0181a57 100644 --- a/server/dashboard.go +++ b/server/dashboard.go @@ -44,6 +44,7 @@ func (svr *Service) RunDashboardServer(addr string, port int) (err error) { router.HandleFunc("/api/proxy/{type}", svr.ApiProxyByType).Methods("GET") router.HandleFunc("/api/proxy/{type}/{name}", svr.ApiProxyByTypeAndName).Methods("GET") router.HandleFunc("/api/traffic/{name}", svr.ApiProxyTraffic).Methods("GET") + router.HandleFunc("/api/client/close/{user}", svr.ApiCloseClient).Methods("GET") // view router.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET") diff --git a/server/dashboard_api.go b/server/dashboard_api.go index dcea7e6..db591df 100644 --- a/server/dashboard_api.go +++ b/server/dashboard_api.go @@ -322,3 +322,31 @@ func (svr *Service) ApiProxyTraffic(w http.ResponseWriter, r *http.Request) { buf, _ := json.Marshal(&trafficResp) res.Msg = string(buf) } + +type CloseUserResp struct { + Status int `json:"status"` + Msg string `json:"message"` +} + +func (svr *Service) ApiCloseClient(w http.ResponseWriter, r *http.Request) { + var ( + buf []byte + resp = CloseUserResp{} + ) + params := mux.Vars(r) + user := params["user"] + defer func() { + log.Info("Http response [/api/client/close/{user}]: code [%d]", resp.Status) + }() + log.Info("Http request: [/api/client/close/{user}] %#v", user) + err := svr.CloseUser(user) + if err != nil { + resp.Status = 404 + resp.Msg = err.Error() + } else { + resp.Status = 200 + resp.Msg = "OK" + } + buf, _ = json.Marshal(&resp) + w.Write(buf) +} diff --git a/server/service.go b/server/service.go index c93d127..58350dc 100755 --- a/server/service.go +++ b/server/service.go @@ -413,10 +413,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e // If client's RunId is empty, it's a new client, we just create a new controller. // Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one. if loginMsg.RunId == "" { - loginMsg.RunId, err = util.RandId() - if err != nil { - return - } + loginMsg.RunId = loginMsg.User } ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg, inLimit, outLimit) @@ -475,3 +472,12 @@ func generateTLSConfig() *tls.Config { } return &tls.Config{Certificates: []tls.Certificate{tlsCert}} } + +func (svr *Service) CloseUser(user string) error { + ctl, ok := svr.ctlManager.GetById(user) + if !ok { + return fmt.Errorf("user not login") + } + ctl.allShutdown.Start() + return nil +} diff --git a/utils/vhost/resource.go b/utils/vhost/resource.go index 71bfc3a..d0a686f 100755 --- a/utils/vhost/resource.go +++ b/utils/vhost/resource.go @@ -93,7 +93,7 @@ func getServiceUnavailablePageContent() []byte { func notFoundResponse() *http.Response { header := make(http.Header) - header.Set("server", "frp/" + version.Full() + "-sakurapanel") + header.Set("server", "frp/"+version.Full()+"-sakurapanel") header.Set("Content-Type", "text/html") res := &http.Response{