add: proxy limit

This commit is contained in:
马嘉聪 2019-09-07 00:24:20 +08:00
parent 24b71f987c
commit f49348f34c
9 changed files with 238 additions and 23 deletions

1
.gitignore vendored
View File

@ -27,6 +27,7 @@ _testmain.go
bin/
packages/
test/bin/
release/
# Cache
*.swp

View File

@ -7,14 +7,18 @@ import (
"net/http"
"net/url"
"strconv"
"github.com/fatedier/frp/extend/limit"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
)
// Service sakurafrp api servie
type Service struct {
Host url.URL
}
// NewService crate sakurafrp api servie
func NewService(host string) (s *Service, err error) {
u, err := url.Parse(host)
if err != nil {
@ -67,52 +71,52 @@ func (s Service) CheckProxy(user string, pMsg *msg.NewProxy, timestamp int64, st
if err != nil {
return false, err
}
headers, err := json.Marshal(pMsg.Headers)
if err != nil {
return false, err
}
locations, err := json.Marshal(pMsg.Locations)
if err != nil {
return false, err
}
values := url.Values{}
// API Basic
values.Set("action", "checkproxy")
values.Set("user", user)
values.Set("timestamp", fmt.Sprintf("%d", timestamp))
values.Set("apitoken", stk)
// Proxies basic info
values.Set("proxy_name", pMsg.ProxyName)
values.Set("proxy_type", pMsg.ProxyType)
values.Set("use_encryption", BoolToString(pMsg.UseEncryption))
values.Set("use_compression", BoolToString(pMsg.UseCompression))
// Http Proxies
values.Set("domain", string(domains))
values.Set("subdomain", pMsg.SubDomain)
// Headers
values.Set("locations", string(locations))
values.Set("http_user", pMsg.HttpUser)
values.Set("http_pwd", pMsg.HttpPwd)
values.Set("host_header_rewrite", pMsg.HostHeaderRewrite)
values.Set("headers", string(headers))
// Tcp & Udp & Stcp
values.Set("remote_port", strconv.Itoa(pMsg.RemotePort))
// Stcp & Xtcp
values.Set("sk", pMsg.Sk)
// Load balance
values.Set("group", pMsg.Group)
values.Set("group_key", pMsg.GroupKey)
s.Host.RawQuery = values.Encode()
defer func(u *url.URL) {
u.RawQuery = ""
@ -142,12 +146,17 @@ func (s Service) CheckProxy(user string, pMsg *msg.NewProxy, timestamp int64, st
return true, nil
}
// GetProxyLimit 获取隧道限速信息
func (s Service) GetProxyLimit(pxyConf *config.BaseProxyConf) (inLimit, outLimit uint64, err error) {
return 1 * limit.MB, 1 * limit.MB, nil
}
func BoolToString(val bool) (str string) {
if val {
return "true"
} else {
return "false"
}
return "false"
}
type ErrHTTPStatus struct {

42
extend/limit/limit.go Normal file
View File

@ -0,0 +1,42 @@
package limit
import (
"io"
frpNet "github.com/fatedier/frp/utils/net"
)
const (
B uint64 = 1 << (10 * (iota))
KB
MB
GB
TB
PB
EB
)
const burstLimit = 1024 * 1024 * 1024
type LimitConn struct {
frpNet.Conn
lr io.Reader
lw io.Writer
}
func NewLimitConn(maxread, maxwrite uint64, c frpNet.Conn) LimitConn {
return LimitConn{
lr: NewReaderWithLimit(c, maxread*KB),
lw: NewWriterWithLimit(c, maxwrite*KB),
Conn: c,
}
}
func (c LimitConn) Read(p []byte) (n int, err error) {
return c.lr.Read(p)
}
func (c LimitConn) Write(p []byte) (n int, err error) {
return c.lw.Write(p)
}

63
extend/limit/reader.go Normal file
View File

@ -0,0 +1,63 @@
package limit
import (
"context"
"io"
"sync"
"time"
"golang.org/x/time/rate"
)
type Reader struct {
r io.Reader
limiter *rate.Limiter
ctx context.Context
mux sync.Mutex
}
// NewReader returns a reader that implements io.Reader with rate limiting.
func NewReader(r io.Reader) *Reader {
return &Reader{
r: r,
ctx: context.Background(),
mux: sync.Mutex{},
}
}
func NewReaderWithLimit(r io.Reader, speed uint64) *Reader {
rr := &Reader{
r: r,
ctx: context.Background(),
mux: sync.Mutex{},
}
rr.SetRateLimit(speed)
return rr
}
// SetRateLimit sets rate limit (bytes/sec) to the reader.
func (s *Reader) SetRateLimit(bytesPerSec uint64) {
s.mux.Lock()
defer s.mux.Unlock()
s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst
}
// Read reads bytes into p.
func (s *Reader) Read(p []byte) (int, error) {
s.mux.Lock()
defer s.mux.Unlock()
if s.limiter == nil {
return s.r.Read(p)
}
n, err := s.r.Read(p)
if err != nil {
return n, err
}
if err := s.limiter.WaitN(s.ctx, n); err != nil {
return n, err
}
return n, nil
}

17
extend/limit/w_test.go Normal file
View File

@ -0,0 +1,17 @@
package limit
import (
"fmt"
"net/http"
"testing"
)
func TestHttp(t *testing.T) {
http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
lw := NewWriterWithLimit(w, 10*KB)
for {
fmt.Fprintf(lw, "x")
}
})
http.ListenAndServe(":62542", nil)
}

63
extend/limit/writer.go Normal file
View File

@ -0,0 +1,63 @@
package limit
import (
"context"
"io"
"sync"
"time"
"golang.org/x/time/rate"
)
type Writer struct {
w io.Writer
limiter *rate.Limiter
ctx context.Context
mux sync.Mutex
}
// NewWriter returns a writer that implements io.Writer with rate limiting.
func NewWriter(w io.Writer) *Writer {
return &Writer{
w: w,
ctx: context.Background(),
mux: sync.Mutex{},
}
}
func NewWriterWithLimit(w io.Writer, speed uint64) *Writer {
ww := &Writer{
w: w,
ctx: context.Background(),
mux: sync.Mutex{},
}
ww.SetRateLimit(speed)
return ww
}
// SetRateLimit sets rate limit (bytes/sec) to the writer.
func (s *Writer) SetRateLimit(bytesPerSec uint64) {
s.mux.Lock()
defer s.mux.Unlock()
s.limiter = rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
s.limiter.AllowN(time.Now(), burstLimit) // spend initial burst
}
// Write writes bytes from p.
func (s *Writer) Write(p []byte) (int, error) {
s.mux.Lock()
defer s.mux.Unlock()
if s.limiter == nil {
return s.w.Write(p)
}
n, err := s.w.Write(p)
if err != nil {
return n, err
}
if err := s.limiter.WaitN(s.ctx, n); err != nil {
return n, err
}
return n, err
}

1
go.mod
View File

@ -29,4 +29,5 @@ require (
github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80
golang.org/x/text v0.3.2 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
)

4
go.sum
View File

@ -1,5 +1,6 @@
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatedier/beego v0.0.0-20171024143340-6c6a4f5bd5eb h1:wCrNShQidLmvVWn/0PikGmpdP0vtQmnvyRg3ZBEhczw=
github.com/fatedier/beego v0.0.0-20171024143340-6c6a4f5bd5eb/go.mod h1:wx3gB6dbIfBRcucp94PI9Bt3I0F2c/MyNEWuhzpWiwk=
@ -27,6 +28,7 @@ github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc h1:lNOt1SMsgHX
github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc/go.mod h1:6/gX3+E/IYGa0wMORlSMla999awQFdbaeQCHjSMKIzY=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rakyll/statik v0.1.1 h1:fCLHsIMajHqD5RKigbFXpvX3dN7c80Pm12+NCrI3kvg=
github.com/rakyll/statik v0.1.1/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs=
@ -58,4 +60,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@ -30,13 +30,15 @@ import (
"github.com/fatedier/frp/server/proxy"
"github.com/fatedier/frp/server/stats"
"github.com/fatedier/frp/utils/net"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/version"
"github.com/fatedier/golib/control/shutdown"
"github.com/fatedier/golib/crypto"
"github.com/fatedier/golib/errors"
"github.com/fatedier/frp/extend/api"
"github.com/fatedier/frp/extend/limit"
)
type ControlManager struct {
@ -418,27 +420,40 @@ func (ctl *Control) manager() {
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
var pxyConf config.ProxyConf
s, err := api.NewService(g.GlbServerCfg.ApiBaseUrl)
var workConn proxy.GetWorkConnFn = ctl.GetWorkConn
if err != nil {
return remoteAddr, err
}
if g.GlbServerCfg.EnableApi {
nowTime := time.Now().Unix()
ok, err := s.CheckProxy(ctl.loginMsg.User, pxyMsg, nowTime, g.GlbServerCfg.ApiToken)
if err != nil {
return remoteAddr, err
}
if !ok {
return remoteAddr, fmt.Errorf("invalid proxy configuration")
}
in, out, err := s.GetProxyLimit(pxyConf.GetBaseInfo())
if err != nil {
return remoteAddr, err
}
workConn = func() (frpNet.Conn, error) {
fconn, err := ctl.GetWorkConn()
if err != nil {
return nil, err
}
return limit.NewLimitConn(in, out, fconn), nil
}
}
// Load configures from NewProxy message and check.
pxyConf, err = config.NewProxyConfFromMsg(pxyMsg)
if err != nil {
@ -447,7 +462,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
// NewProxy will return a interface Proxy.
// In fact it create different proxies by different proxy type, we just call run() here.
pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, ctl.GetWorkConn, pxyConf)
pxy, err := proxy.NewProxy(ctl.runId, ctl.rc, ctl.statsCollector, ctl.poolCount, workConn, pxyConf)
if err != nil {
return remoteAddr, err
}