Merge pull request #5 from ZeroDream-CN/test

Update: close client feature
This commit is contained in:
Akkariin Meiko 2020-01-18 04:56:19 +08:00 committed by GitHub
commit c391c2ea98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 154 additions and 31 deletions

View File

@ -8,8 +8,6 @@ import (
"net/url" "net/url"
"strconv" "strconv"
// "github.com/fatedier/frp/extend/limit"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/msg"
) )
@ -147,7 +145,7 @@ func (s Service) CheckProxy(user string, pMsg *msg.NewProxy, timestamp int64, st
} }
// GetProxyLimit 获取隧道限速信息 // GetProxyLimit 获取隧道限速信息
func (s Service) GetProxyLimit(user string, pxyConf *config.BaseProxyConf, timestamp int64, stk string) (inLimit, outLimit uint64, err error) { func (s Service) GetProxyLimit(user string, timestamp int64, stk string) (inLimit, outLimit uint64, err error) {
// 这部分就照之前的搬过去了能跑就行x // 这部分就照之前的搬过去了能跑就行x
values := url.Values{} values := url.Values{}
values.Set("action", "getlimit") values.Set("action", "getlimit")

67
extend/cumu/cumu.go Normal file
View File

@ -0,0 +1,67 @@
package cumu
import (
"sync"
frpNet "github.com/fatedier/frp/utils/net"
)
// Conn 速度累计
type Conn struct {
frpNet.Conn
inCount int64
outCount int64
inCountLock sync.Mutex
outCountLock sync.Mutex
}
// NewCumuConn ...
func NewCumuConn(conn frpNet.Conn) *Conn {
return &Conn{
Conn: conn,
inCount: 0,
outCount: 0,
}
}
func (c *Conn) Read(p []byte) (n int, err error) {
n, err = c.Conn.Read(p)
if err != nil {
return
}
c.outCountLock.Lock()
defer c.outCountLock.Unlock()
c.outCount += int64(n)
return
}
func (c *Conn) Write(p []byte) (n int, err error) {
n, err = c.Conn.Write(p)
if err != nil {
return
}
c.inCountLock.Lock()
defer c.inCountLock.Unlock()
c.inCount += int64(n)
return
}
// InCount get in bound byte count
func (c *Conn) InCount() int64 {
c.inCountLock.Lock()
defer c.inCountLock.Unlock()
in := c.inCount
c.inCount = 0
return in
}
// OutCount get out bound byte count
func (c *Conn) OutCount() int64 {
c.outCountLock.Lock()
defer c.outCountLock.Unlock()
out := c.outCount
c.outCount = 0
return out
}

View File

@ -29,8 +29,8 @@ func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) LimitConn {
// 这里不知道为什么要 49 才能对的上真实速度 // 这里不知道为什么要 49 才能对的上真实速度
// 49 是根据 wget 速度来取的,测试了 512、1024、2048、4096、8192 等多种速度下都很准确 // 49 是根据 wget 速度来取的,测试了 512、1024、2048、4096、8192 等多种速度下都很准确
return LimitConn{ return LimitConn{
lr: NewReaderWithLimit(c, maxread * 49), lr: NewReaderWithLimit(c, maxread*49),
lw: NewWriterWithLimit(c, maxwrite * 49), lw: NewWriterWithLimit(c, maxwrite*49),
Conn: c, Conn: c,
} }
} }

View File

@ -132,11 +132,14 @@ type Control struct {
managerShutdown *shutdown.Shutdown managerShutdown *shutdown.Shutdown
allShutdown *shutdown.Shutdown allShutdown *shutdown.Shutdown
inLimit uint64
outLimit uint64
mu sync.RWMutex mu sync.RWMutex
} }
func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManager, func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManager,
statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login) *Control { statsCollector stats.Collector, ctlConn net.Conn, loginMsg *msg.Login, inLimit, outLimit uint64) *Control {
return &Control{ return &Control{
rc: rc, rc: rc,
@ -157,6 +160,8 @@ func NewControl(rc *controller.ResourceController, pxyManager *proxy.ProxyManage
writerShutdown: shutdown.New(), writerShutdown: shutdown.New(),
managerShutdown: shutdown.New(), managerShutdown: shutdown.New(),
allShutdown: shutdown.New(), allShutdown: shutdown.New(),
inLimit: inLimit, //rate.NewLimiter(rate.Limit(inLimit*limit.KB), int(inLimit*limit.KB)),
outLimit: outLimit, //rate.NewLimiter(rate.Limit(outLimit*limit.KB), int(outLimit*limit.KB)),
} }
} }
@ -447,20 +452,12 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
return remoteAddr, fmt.Errorf("invalid proxy configuration") return remoteAddr, fmt.Errorf("invalid proxy configuration")
} }
in, out, err := s.GetProxyLimit(ctl.loginMsg.User, pxyConf.GetBaseInfo(), nowTime, g.GlbServerCfg.ApiToken)
if err != nil {
return remoteAddr, err
}
// 测试用
ctl.conn.Debug("client speed limit: %dKB/s (Inbound) / %dKB/s (Outbound)", in, out)
workConn = func() (frpNet.Conn, error) { workConn = func() (frpNet.Conn, error) {
fconn, err := ctl.GetWorkConn() fconn, err := ctl.GetWorkConn()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return limit.NewLimitConn(in, out, fconn), nil return limit.NewLimitConn(ctl.inLimit, ctl.outLimit, fconn), nil
} }
} }

View File

@ -44,6 +44,7 @@ func (svr *Service) RunDashboardServer(addr string, port int) (err error) {
router.HandleFunc("/api/proxy/{type}", svr.ApiProxyByType).Methods("GET") router.HandleFunc("/api/proxy/{type}", svr.ApiProxyByType).Methods("GET")
router.HandleFunc("/api/proxy/{type}/{name}", svr.ApiProxyByTypeAndName).Methods("GET") router.HandleFunc("/api/proxy/{type}/{name}", svr.ApiProxyByTypeAndName).Methods("GET")
router.HandleFunc("/api/traffic/{name}", svr.ApiProxyTraffic).Methods("GET") router.HandleFunc("/api/traffic/{name}", svr.ApiProxyTraffic).Methods("GET")
router.HandleFunc("/api/client/close/{user}", svr.ApiCloseClient).Methods("GET")
// view // view
router.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET") router.Handle("/favicon.ico", http.FileServer(assets.FileSystem)).Methods("GET")

View File

@ -322,3 +322,31 @@ func (svr *Service) ApiProxyTraffic(w http.ResponseWriter, r *http.Request) {
buf, _ := json.Marshal(&trafficResp) buf, _ := json.Marshal(&trafficResp)
res.Msg = string(buf) res.Msg = string(buf)
} }
type CloseUserResp struct {
Status int `json:"status"`
Msg string `json:"message"`
}
func (svr *Service) ApiCloseClient(w http.ResponseWriter, r *http.Request) {
var (
buf []byte
resp = CloseUserResp{}
)
params := mux.Vars(r)
user := params["user"]
defer func() {
log.Info("Http response [/api/client/close/{user}]: code [%d]", resp.Status)
}()
log.Info("Http request: [/api/client/close/{user}] %#v", user)
err := svr.CloseUser(user)
if err != nil {
resp.Status = 404
resp.Msg = err.Error()
} else {
resp.Status = 200
resp.Msg = "OK"
}
buf, _ = json.Marshal(&resp)
w.Write(buf)
}

View File

@ -20,7 +20,9 @@ import (
"net" "net"
"strconv" "strconv"
"sync" "sync"
"time"
"github.com/fatedier/frp/extend/cumu"
"github.com/fatedier/frp/g" "github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/msg"
@ -224,16 +226,29 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector sta
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String()) workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()}) statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()})
inCount, outCount := frpIo.Join(local, userConn) cc := cumu.NewCumuConn(userConn)
endSig := make(chan int)
go func(cc *cumu.Conn, ch chan int) {
for {
select {
case <-ch:
return
default:
time.Sleep(1 * time.Second)
statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
ProxyName: pxy.GetName(),
TrafficBytes: cc.OutCount(),
})
statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: pxy.GetName(),
TrafficBytes: cc.InCount(),
})
}
}
}(cc, endSig)
frpIo.Join(local, cc)
statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()}) statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()})
statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{ endSig <- 1
ProxyName: pxy.GetName(),
TrafficBytes: inCount,
})
statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
ProxyName: pxy.GetName(),
TrafficBytes: outCount,
})
pxy.Debug("join connections closed") pxy.Debug("join connections closed")
} }

View File

@ -371,6 +371,11 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
return return
} }
var (
inLimit uint64
outLimit uint64
)
if g.GlbServerCfg.EnableApi { if g.GlbServerCfg.EnableApi {
nowTime := time.Now().Unix() nowTime := time.Now().Unix()
@ -397,18 +402,21 @@ func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (e
if !valid { if !valid {
return fmt.Errorf("authorization failed") return fmt.Errorf("authorization failed")
} }
inLimit, outLimit, err = s.GetProxyLimit(loginMsg.User, nowTime, g.GlbServerCfg.ApiToken)
if err != nil {
return err
}
ctlConn.Debug("%s client speed limit: %dKB/s (Inbound) / %dKB/s (Outbound)", loginMsg.User, inLimit, outLimit)
} }
// If client's RunId is empty, it's a new client, we just create a new controller. // If client's RunId is empty, it's a new client, we just create a new controller.
// Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one. // Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one.
if loginMsg.RunId == "" { if loginMsg.RunId == "" {
loginMsg.RunId, err = util.RandId() loginMsg.RunId = loginMsg.User
if err != nil {
return
}
} }
ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg) ctl := NewControl(svr.rc, svr.pxyManager, svr.statsCollector, ctlConn, loginMsg, inLimit, outLimit)
if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil { if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
oldCtl.allShutdown.WaitDone() oldCtl.allShutdown.WaitDone()
@ -464,3 +472,12 @@ func generateTLSConfig() *tls.Config {
} }
return &tls.Config{Certificates: []tls.Certificate{tlsCert}} return &tls.Config{Certificates: []tls.Certificate{tlsCert}}
} }
func (svr *Service) CloseUser(user string) error {
ctl, ok := svr.ctlManager.GetById(user)
if !ok {
return fmt.Errorf("user not login")
}
ctl.allShutdown.Start()
return nil
}

View File

@ -93,7 +93,7 @@ func getServiceUnavailablePageContent() []byte {
func notFoundResponse() *http.Response { func notFoundResponse() *http.Response {
header := make(http.Header) header := make(http.Header)
header.Set("server", "frp/" + version.Full() + "-sakurapanel") header.Set("server", "frp/"+version.Full()+"-sakurapanel")
header.Set("Content-Type", "text/html") header.Set("Content-Type", "text/html")
res := &http.Response{ res := &http.Response{