Add close client

This commit is contained in:
kasuganosoras 2020-01-18 04:33:28 +08:00
parent 6eec531a32
commit 777b95bb90
8 changed files with 59 additions and 53 deletions

View File

@ -4,7 +4,6 @@ import (
"io" "io"
frpNet "github.com/fatedier/frp/utils/net" frpNet "github.com/fatedier/frp/utils/net"
"golang.org/x/time/rate"
) )
const ( const (
@ -17,35 +16,29 @@ const (
EB EB
) )
const BurstLimit = 1024 * 1024 * 1024 const burstLimit = 1024 * 1024 * 1024
type Conn struct { type LimitConn struct {
frpNet.Conn frpNet.Conn
lr io.Reader lr io.Reader
lw io.Writer lw io.Writer
} }
func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) Conn { func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) LimitConn {
return Conn{ // 这里不知道为什么要 49 才能对的上真实速度
lr: NewReaderWithLimit(c, maxread), // 49 是根据 wget 速度来取的,测试了 512、1024、2048、4096、8192 等多种速度下都很准确
lw: NewWriterWithLimit(c, maxwrite), return LimitConn{
lr: NewReaderWithLimit(c, maxread*49),
lw: NewWriterWithLimit(c, maxwrite*49),
Conn: c, Conn: c,
} }
} }
func NewLimitConnWithBucket(c frpNet.Conn, rBucket, wBucket *rate.Limiter) Conn { func (c LimitConn) Read(p []byte) (n int, err error) {
return Conn{
lr: NewReaderWithLimitWithBucket(c, rBucket),
lw: NewWriterWithLimitWithBucket(c, wBucket),
Conn: c,
}
}
func (c Conn) Read(p []byte) (n int, err error) {
return c.lr.Read(p) return c.lr.Read(p)
} }
func (c Conn) Write(p []byte) (n int, err error) { func (c LimitConn) Write(p []byte) (n int, err error) {
return c.lw.Write(p) return c.lw.Write(p)
} }

View File

@ -31,28 +31,17 @@ func NewReaderWithLimit(r io.Reader, speed uint64) *Reader {
ctx: context.Background(), ctx: context.Background(),
mux: sync.Mutex{}, mux: sync.Mutex{},
} }
rr.limiter = rate.NewLimiter(rate.Limit(speed), BurstLimit)
rr.SetRateLimit(speed) rr.SetRateLimit(speed)
return rr return rr
} }
func NewReaderWithLimitWithBucket(r io.Reader, bucket *rate.Limiter) *Reader {
rr := &Reader{
r: r,
ctx: context.Background(),
mux: sync.Mutex{},
limiter: bucket,
}
return rr
}
// SetRateLimit sets rate limit (bytes/sec) to the reader. // SetRateLimit sets rate limit (bytes/sec) to the reader.
func (s *Reader) SetRateLimit(bytesPerSec uint64) { func (s *Reader) SetRateLimit(bytesPerSec uint64) {
s.mux.Lock() s.mux.Lock()
defer s.mux.Unlock() defer s.mux.Unlock()
// s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
s.limiter.AllowN(time.Now(), BurstLimit) // spend initial burst s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst
} }
// Read reads bytes into p. // Read reads bytes into p.

View File

@ -31,27 +31,17 @@ func NewWriterWithLimit(w io.Writer, speed uint64) *Writer {
ctx: context.Background(), ctx: context.Background(),
mux: sync.Mutex{}, mux: sync.Mutex{},
} }
ww.limiter = rate.NewLimiter(rate.Limit(speed), BurstLimit)
ww.SetRateLimit(speed) ww.SetRateLimit(speed)
return ww return ww
} }
func NewWriterWithLimitWithBucket(w io.Writer, bucket *rate.Limiter) *Writer {
ww := &Writer{
w: w,
ctx: context.Background(),
mux: sync.Mutex{},
limiter: bucket,
}
return ww
}
// SetRateLimit sets rate limit (bytes/sec) to the writer. // SetRateLimit sets rate limit (bytes/sec) to the writer.
func (s *Writer) SetRateLimit(bytesPerSec uint64) { func (s *Writer) SetRateLimit(bytesPerSec uint64) {
s.mux.Lock() s.mux.Lock()
defer s.mux.Unlock() defer s.mux.Unlock()
// s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
s.limiter.AllowN(time.Now(), BurstLimit) // spend initial burst s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst
} }
// Write writes bytes from p. // Write writes bytes from p.

View File

@ -32,7 +32,6 @@ import (
"github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/net"
frpNet "github.com/fatedier/frp/utils/net" frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/version" "github.com/fatedier/frp/utils/version"
"golang.org/x/time/rate"
"github.com/fatedier/golib/control/shutdown" "github.com/fatedier/golib/control/shutdown"
"github.com/fatedier/golib/crypto" "github.com/fatedier/golib/crypto"
@ -133,8 +132,8 @@ type Control struct {
managerShutdown *shutdown.Shutdown managerShutdown *shutdown.Shutdown
allShutdown *shutdown.Shutdown allShutdown *shutdown.Shutdown
inLimit *rate.Limiter inLimit uint64
outLimit *rate.Limiter outLimit uint64
mu sync.RWMutex mu sync.RWMutex
} }
@ -161,8 +160,8 @@ func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManage
writerShutdown: shutdown.New(), writerShutdown: shutdown.New(),
managerShutdown: shutdown.New(), managerShutdown: shutdown.New(),
allShutdown: shutdown.New(), allShutdown: shutdown.New(),
inLimit: rate.NewLimiter(rate.Limit(inLimit*limit.KB), int(inLimit*limit.KB)), inLimit: inLimit, //rate.NewLimiter(rate.Limit(inLimit*limit.KB), int(inLimit*limit.KB)),
outLimit: rate.NewLimiter(rate.Limit(outLimit*limit.KB), int(outLimit*limit.KB)), outLimit: outLimit, //rate.NewLimiter(rate.Limit(outLimit*limit.KB), int(outLimit*limit.KB)),
} }
} }
@ -458,7 +457,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
if err != nil { if err != nil {
return nil, err return nil, err
} }
return limit.NewLimitConnWithBucket(fconn, ctl.outLimit, ctl.inLimit), nil return limit.NewLimitConn(ctl.inLimit, ctl.outLimit, fconn), nil
} }
} }

View File

@ -44,6 +44,7 @@ func (svr *Service) RunDashboardServer(addr string, port int) (err error) {
router.HandleFunc("/api/proxy/{type}", svr.ApiProxyByType).Methods("GET") router.HandleFunc("/api/proxy/{type}", svr.ApiProxyByType).Methods("GET")
router.HandleFunc("/api/proxy/{type}/{name}", svr.ApiProxyByTypeAndName).Methods("GET") router.HandleFunc("/api/proxy/{type}/{name}", svr.ApiProxyByTypeAndName).Methods("GET")
router.HandleFunc("/api/traffic/{name}", svr.ApiProxyTraffic).Methods("GET") router.HandleFunc("/api/traffic/{name}", svr.ApiProxyTraffic).Methods("GET")
router.HandleFunc("/api/client/close/{user}", svr.ApiCloseClient).Methods("GET")
// view // view
router.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET") router.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET")

View File

@ -322,3 +322,31 @@ func (svr *Service) ApiProxyTraffic(w http.ResponseWriter, r *http.Request) {
buf, _ := json.Marshal(&trafficResp) buf, _ := json.Marshal(&trafficResp)
res.Msg = string(buf) res.Msg = string(buf)
} }
type CloseUserResp struct {
Status int `json:"status"`
Msg string `json:"message"`
}
func (svr *Service) ApiCloseClient(w http.ResponseWriter, r *http.Request) {
var (
buf []byte
resp = CloseUserResp{}
)
params := mux.Vars(r)
user := params["user"]
defer func() {
log.Info("Http response [/api/client/close/{user}]: code [%d]", resp.Status)
}()
log.Info("Http request: [/api/client/close/{user}] %#v", user)
err := svr.CloseUser(user)
if err != nil {
resp.Status = 404
resp.Msg = err.Error()
} else {
resp.Status = 200
resp.Msg = "OK"
}
buf, _ = json.Marshal(&resp)
w.Write(buf)
}

View File

@ -413,10 +413,7 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
// If client's RunId is empty, it's a new client, we just create a new controller. // If client's RunId is empty, it's a new client, we just create a new controller.
// Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one. // Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one.
if loginMsg.RunId == "" { if loginMsg.RunId == "" {
loginMsg.RunId, err = util.RandId() loginMsg.RunId = loginMsg.User
if err != nil {
return
}
} }
ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg, inLimit, outLimit) ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg, inLimit, outLimit)
@ -475,3 +472,12 @@ func generateTLSConfig() *tls.Config {
} }
return &tls.Config{Certificates: []tls.Certificate{tlsCert}} return &tls.Config{Certificates: []tls.Certificate{tlsCert}}
} }
func (svr *Service) CloseUser(user string) error {
ctl, ok := svr.ctlManager.GetById(user)
if !ok {
return fmt.Errorf("user not login")
}
ctl.allShutdown.Start()
return nil
}

View File

@ -93,7 +93,7 @@ func getServiceUnavailablePageContent() []byte {
func notFoundResponse() *http.Response { func notFoundResponse() *http.Response {
header := make(http.Header) header := make(http.Header)
header.Set("server", "frp/" + version.Full() + "-sakurapanel") header.Set("server", "frp/"+version.Full()+"-sakurapanel")
header.Set("Content-Type", "text/html") header.Set("Content-Type", "text/html")
res := &http.Response{ res := &http.Response{