mirror of
https://github.com/ZeroDream-CN/SakuraFrp
synced 2024-11-21 15:00:45 +00:00
fix: rante limit bug and cumu bug
This commit is contained in:
parent
7a3a34da30
commit
9db484c8ca
@ -8,8 +8,6 @@ import (
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
// "github.com/fatedier/frp/extend/limit"
|
||||
"github.com/fatedier/frp/models/config"
|
||||
"github.com/fatedier/frp/models/msg"
|
||||
)
|
||||
|
||||
@ -147,7 +145,7 @@ func (s Service) CheckProxy(user string, pMsg *msg.NewProxy, timestamp int64, st
|
||||
}
|
||||
|
||||
// GetProxyLimit 获取隧道限速信息
|
||||
func (s Service) GetProxyLimit(user string, pxyConf *config.BaseProxyConf, timestamp int64, stk string) (inLimit, outLimit uint64, err error) {
|
||||
func (s Service) GetProxyLimit(user string, timestamp int64, stk string) (inLimit, outLimit uint64, err error) {
|
||||
// 这部分就照之前的搬过去了,能跑就行x
|
||||
values := url.Values{}
|
||||
values.Set("action", "getlimit")
|
||||
@ -180,7 +178,7 @@ func (s Service) GetProxyLimit(user string, pxyConf *config.BaseProxyConf, times
|
||||
if err = json.Unmarshal(body, response); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
|
||||
// 这里直接返回 uint64 应该问题不大
|
||||
return response.MaxIn, response.MaxOut, nil
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"io"
|
||||
|
||||
frpNet "github.com/fatedier/frp/utils/net"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -16,7 +17,7 @@ const (
|
||||
EB
|
||||
)
|
||||
|
||||
const burstLimit = 1024 * 1024 * 1024
|
||||
const BurstLimit = 1024 * 1024 * 1024
|
||||
|
||||
type Conn struct {
|
||||
frpNet.Conn
|
||||
@ -26,8 +27,6 @@ type Conn struct {
|
||||
}
|
||||
|
||||
func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) Conn {
|
||||
// 这里不知道为什么要 49 才能对的上真实速度
|
||||
// 49 是根据 wget 速度来取的,测试了 512、1024、2048、4096、8192 等多种速度下都很准确
|
||||
return Conn{
|
||||
lr: NewReaderWithLimit(c, maxread),
|
||||
lw: NewWriterWithLimit(c, maxwrite),
|
||||
@ -35,6 +34,14 @@ func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) Conn {
|
||||
}
|
||||
}
|
||||
|
||||
func NewLimitConnWithBucket(c frpNet.Conn, rBucket, wBucket *rate.Limiter) Conn {
|
||||
return Conn{
|
||||
lr: NewReaderWithLimitWithBucket(c, rBucket),
|
||||
lw: NewWriterWithLimitWithBucket(c, wBucket),
|
||||
Conn: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (c Conn) Read(p []byte) (n int, err error) {
|
||||
return c.lr.Read(p)
|
||||
}
|
||||
|
@ -31,17 +31,29 @@ func NewReaderWithLimit(r io.Reader, speed uint64) *Reader {
|
||||
ctx: context.Background(),
|
||||
mux: sync.Mutex{},
|
||||
}
|
||||
rr.limiter = rate.NewLimiter(rate.Limit(speed), BurstLimit)
|
||||
rr.SetRateLimit(speed)
|
||||
return rr
|
||||
}
|
||||
|
||||
func NewReaderWithLimitWithBucket(r io.Reader, bucket *rate.Limiter) *Reader {
|
||||
rr := &Reader{
|
||||
r: r,
|
||||
ctx: context.Background(),
|
||||
mux: sync.Mutex{},
|
||||
limiter: bucket,
|
||||
}
|
||||
rr.limiter.AllowN(time.Now(), BurstLimit)
|
||||
return rr
|
||||
}
|
||||
|
||||
// SetRateLimit sets rate limit (bytes/sec) to the reader.
|
||||
func (s *Reader) SetRateLimit(bytesPerSec uint64) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
|
||||
s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst
|
||||
// s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
|
||||
s.limiter.AllowN(time.Now(), BurstLimit) // spend initial burst
|
||||
}
|
||||
|
||||
// Read reads bytes into p.
|
||||
|
@ -31,17 +31,27 @@ func NewWriterWithLimit(w io.Writer, speed uint64) *Writer {
|
||||
ctx: context.Background(),
|
||||
mux: sync.Mutex{},
|
||||
}
|
||||
ww.limiter = rate.NewLimiter(rate.Limit(speed), BurstLimit)
|
||||
ww.SetRateLimit(speed)
|
||||
return ww
|
||||
}
|
||||
func NewWriterWithLimitWithBucket(w io.Writer, bucket *rate.Limiter) *Writer {
|
||||
ww := &Writer{
|
||||
w: w,
|
||||
ctx: context.Background(),
|
||||
mux: sync.Mutex{},
|
||||
limiter: bucket,
|
||||
}
|
||||
return ww
|
||||
}
|
||||
|
||||
// SetRateLimit sets rate limit (bytes/sec) to the writer.
|
||||
func (s *Writer) SetRateLimit(bytesPerSec uint64) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
|
||||
s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst
|
||||
// s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
|
||||
s.limiter.AllowN(time.Now(), BurstLimit) // spend initial burst
|
||||
}
|
||||
|
||||
// Write writes bytes from p.
|
||||
|
@ -32,13 +32,13 @@ import (
|
||||
"github.com/fatedier/frp/utils/net"
|
||||
frpNet "github.com/fatedier/frp/utils/net"
|
||||
"github.com/fatedier/frp/utils/version"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/fatedier/golib/control/shutdown"
|
||||
"github.com/fatedier/golib/crypto"
|
||||
"github.com/fatedier/golib/errors"
|
||||
|
||||
"github.com/fatedier/frp/extend/api"
|
||||
"github.com/fatedier/frp/extend/cumu"
|
||||
"github.com/fatedier/frp/extend/limit"
|
||||
)
|
||||
|
||||
@ -133,11 +133,14 @@ type Control struct {
|
||||
managerShutdown *shutdown.Shutdown
|
||||
allShutdown *shutdown.Shutdown
|
||||
|
||||
inLimit *rate.Limiter
|
||||
outLimit *rate.Limiter
|
||||
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManager,
|
||||
statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login) *Control {
|
||||
statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login, inLimit, outLimit uint64) *Control {
|
||||
|
||||
return &Control{
|
||||
rc: rc,
|
||||
@ -158,6 +161,8 @@ func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManage
|
||||
writerShutdown: shutdown.New(),
|
||||
managerShutdown: shutdown.New(),
|
||||
allShutdown: shutdown.New(),
|
||||
inLimit: rate.NewLimiter(rate.Limit(inLimit*limit.KB), limit.BurstLimit),
|
||||
outLimit: rate.NewLimiter(rate.Limit(outLimit*limit.KB), limit.BurstLimit),
|
||||
}
|
||||
}
|
||||
|
||||
@ -448,33 +453,12 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
|
||||
return remoteAddr, fmt.Errorf("invalid proxy configuration")
|
||||
}
|
||||
|
||||
in, out, err := s.GetProxyLimit(ctl.loginMsg.User, pxyConf.GetBaseInfo(), nowTime, g.GlbServerCfg.ApiToken)
|
||||
if err != nil {
|
||||
return remoteAddr, err
|
||||
}
|
||||
|
||||
// 测试用
|
||||
ctl.conn.Debug("client speed limit: %dKB/s (Inbound) / %dKB/s (Outbound)", in, out)
|
||||
|
||||
workConn = func() (frpNet.Conn, error) {
|
||||
fconn, err := ctl.GetWorkConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cumuConn := cumu.NewCumuConn(fconn)
|
||||
go func(cc *cumu.Conn) {
|
||||
var oldIn, oldOut uint64
|
||||
for {
|
||||
time.Sleep(1 * time.Second)
|
||||
in, out := cc.InCount(), cc.OutCount()
|
||||
if (oldIn == in) && (oldOut == out) {
|
||||
continue
|
||||
}
|
||||
fmt.Printf("In/Out: %d Byte, %d Byte\n", in, out)
|
||||
oldIn, oldOut = in, out
|
||||
}
|
||||
}(cumuConn)
|
||||
return limit.NewLimitConn(in*limit.KB, out*limit.KB, cumuConn), nil
|
||||
return limit.NewLimitConnWithBucket(fconn, ctl.outLimit, ctl.inLimit), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -246,7 +246,7 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector sta
|
||||
}
|
||||
}
|
||||
}(cc, endSig)
|
||||
frpIo.Join(local, userConn)
|
||||
frpIo.Join(local, cc)
|
||||
statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()})
|
||||
endSig <- 1
|
||||
pxy.Debug("join connections closed")
|
||||
|
@ -371,6 +371,11 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
inLimit uint64
|
||||
outLimit uint64
|
||||
)
|
||||
|
||||
if g.GlbServerCfg.EnableApi {
|
||||
|
||||
nowTime := time.Now().Unix()
|
||||
@ -397,6 +402,12 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
|
||||
if !valid {
|
||||
return fmt.Errorf("authorization failed")
|
||||
}
|
||||
|
||||
inLimit, outLimit, err = s.GetProxyLimit(loginMsg.User, nowTime, g.GlbServerCfg.ApiToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctlConn.Debug("%s client speed limit: %dKB/s (Inbound) / %dKB/s (Outbound)", loginMsg.User, inLimit, outLimit)
|
||||
}
|
||||
|
||||
// If client's RunId is empty, it's a new client, we just create a new controller.
|
||||
@ -408,7 +419,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
|
||||
}
|
||||
}
|
||||
|
||||
ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg)
|
||||
ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg, inLimit, outLimit)
|
||||
|
||||
if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
|
||||
oldCtl.allShutdown.WaitDone()
|
||||
|
Loading…
Reference in New Issue
Block a user