mirror of
https://github.com/waku-org/telemetry.git
synced 2025-02-22 12:48:22 +00:00
Merge pull request #17 from vpavlin/feat/add-ratelimiter
add per IP rate limitting
This commit is contained in:
commit
1ca9526278
15
go.mod
15
go.mod
@ -1,13 +1,24 @@
|
||||
module github.com/status-im/dev-telemetry
|
||||
|
||||
go 1.15
|
||||
go 1.20
|
||||
|
||||
require (
|
||||
github.com/go-bindata/go-bindata v3.1.2+incompatible // indirect
|
||||
github.com/go-auxiliaries/shrinking-map v0.3.0
|
||||
github.com/golang-migrate/migrate/v4 v4.15.2
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/lib/pq v1.10.3
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/stretchr/testify v1.8.1
|
||||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
go.uber.org/atomic v1.7.0 // indirect
|
||||
go.uber.org/multierr v1.10.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
11
go.sum
11
go.sum
@ -330,7 +330,6 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
|
||||
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
|
||||
github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
|
||||
@ -408,8 +407,8 @@ github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYis
|
||||
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
|
||||
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/go-bindata/go-bindata v3.1.2+incompatible h1:5vjJMVhowQdPzjE1LdxyFF7YFTXg5IgGVW4gBr5IbvE=
|
||||
github.com/go-bindata/go-bindata v3.1.2+incompatible/go.mod h1:xK8Dsgwmeed+BBsSy2XTopBn/8uK2HWuGSnA11C3Joo=
|
||||
github.com/go-auxiliaries/shrinking-map v0.3.0 h1:kXiLmFY4y2s35WtOYAb02LRZ92IRnfzio+3prZn6ULs=
|
||||
github.com/go-auxiliaries/shrinking-map v0.3.0/go.mod h1:UtBmTTKuUfI8wkhzaZ7G/xgHjxGxLwM2a6kf+aWmSmc=
|
||||
github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g=
|
||||
github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks=
|
||||
github.com/go-fonts/liberation v0.1.1/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2HYqyqAO9z7GY=
|
||||
@ -554,7 +553,6 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
||||
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-containerregistry v0.5.1/go.mod h1:Ct15B4yir3PLOP5jsy0GNeYVaIZs/MK/Jz5any1wFW0=
|
||||
github.com/google/go-github/v39 v39.2.0/go.mod h1:C1s8C5aCC9L+JXIYpJM5GYytdX52vC1bLvHEF1IhBrE=
|
||||
@ -1127,7 +1125,6 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
|
||||
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
|
||||
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||
@ -1441,7 +1438,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
@ -1536,7 +1532,6 @@ golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8T
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
|
||||
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
|
||||
@ -1746,11 +1741,9 @@ gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gorm.io/driver/postgres v1.0.8/go.mod h1:4eOzrI1MUfm6ObJU/UcmbXyiHSs8jSwH95G5P5dxcAg=
|
||||
gorm.io/gorm v1.20.12/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
|
||||
gorm.io/gorm v1.21.4/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
|
||||
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
|
||||
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
||||
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
|
||||
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
|
||||
gotest.tools/v3 v3.1.0 h1:rVV8Tcg/8jHUkPUorwjaMTtemIMVXfIPKiOqnhEhakk=
|
||||
gotest.tools/v3 v3.1.0/go.mod h1:fHy7eyTmJFO5bQbUsEGQ1v4m2J3Jz9eWL54TP2/ZuYQ=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
|
101
telemetry/ratelimiter.go
Normal file
101
telemetry/ratelimiter.go
Normal file
@ -0,0 +1,101 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
shrinkingmap "github.com/go-auxiliaries/shrinking-map/pkg/shrinking-map"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
const DEFAULT_CLEANUP_TIME = 1 * time.Hour
|
||||
|
||||
type Limiter struct {
|
||||
limiter *rate.Limiter
|
||||
lastUsed time.Time
|
||||
}
|
||||
|
||||
type RateLimiter struct {
|
||||
limiters *shrinkingmap.Map[string, *Limiter] //map[string]*Limiter
|
||||
lock *sync.RWMutex
|
||||
r rate.Limit
|
||||
b int
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewRateLimiter(ctx context.Context, r rate.Limit, b int, logger *zap.Logger) *RateLimiter {
|
||||
return NewRateLimiterWithCleanup(ctx, r, b, DEFAULT_CLEANUP_TIME, logger)
|
||||
}
|
||||
|
||||
func NewRateLimiterWithCleanup(ctx context.Context, r rate.Limit, b int, cleanupTime time.Duration, logger *zap.Logger) *RateLimiter {
|
||||
rl := &RateLimiter{
|
||||
limiters: shrinkingmap.New[string, *Limiter](200),
|
||||
lock: &sync.RWMutex{},
|
||||
r: r,
|
||||
b: b,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
go rl.cleanup(ctx, cleanupTime)
|
||||
|
||||
return rl
|
||||
}
|
||||
|
||||
func (rl *RateLimiter) GetLimiter(ip string) *rate.Limiter {
|
||||
rl.lock.Lock()
|
||||
|
||||
limiter, ok := rl.limiters.Get2(ip)
|
||||
if !ok {
|
||||
rl.lock.Unlock()
|
||||
return rl.AddIP(ip)
|
||||
}
|
||||
|
||||
limiter.lastUsed = time.Now()
|
||||
|
||||
rl.lock.Unlock()
|
||||
return limiter.limiter
|
||||
}
|
||||
|
||||
func (rl *RateLimiter) AddIP(ip string) *rate.Limiter {
|
||||
rl.lock.Lock()
|
||||
defer rl.lock.Unlock()
|
||||
|
||||
limiter := rate.NewLimiter(rl.r, rl.b)
|
||||
rl.limiters.Set(ip, &Limiter{limiter: limiter, lastUsed: time.Now()})
|
||||
|
||||
return limiter
|
||||
}
|
||||
|
||||
func (rl *RateLimiter) RemoveIP(ip string) {
|
||||
rl.lock.Lock()
|
||||
defer rl.lock.Unlock()
|
||||
|
||||
rl.limiters.Delete(ip)
|
||||
}
|
||||
|
||||
func (rl *RateLimiter) NumClients() int {
|
||||
return len(rl.limiters.Values())
|
||||
}
|
||||
|
||||
func (rl *RateLimiter) cleanup(ctx context.Context, cleanupEvery time.Duration) {
|
||||
t := time.NewTicker(cleanupEvery)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case now := <-t.C:
|
||||
numCleaned := 0
|
||||
for ip, limiter := range rl.limiters.Values() {
|
||||
if limiter.lastUsed.Add(2 * time.Second).Before(now) {
|
||||
rl.RemoveIP(ip)
|
||||
numCleaned++
|
||||
}
|
||||
}
|
||||
rl.logger.Debug("cleanup", zap.Int("removed", numCleaned))
|
||||
}
|
||||
}
|
||||
}
|
67
telemetry/ratelimiter_test.go
Normal file
67
telemetry/ratelimiter_test.go
Normal file
@ -0,0 +1,67 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
func TestRateLimit(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
defer ctx.Done()
|
||||
|
||||
logger, err := zap.NewDevelopment()
|
||||
require.NoError(t, err)
|
||||
rl := NewRateLimiter(ctx, 1, 1, logger)
|
||||
|
||||
ip1 := "1.1.1.1"
|
||||
|
||||
limiter := rl.GetLimiter(ip1)
|
||||
require.True(t, limiter.Allow())
|
||||
|
||||
limiter = rl.GetLimiter(ip1)
|
||||
require.False(t, limiter.Allow())
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
limiter = rl.GetLimiter(ip1)
|
||||
require.True(t, limiter.Allow())
|
||||
|
||||
ip2 := "2.2.2.2:8080"
|
||||
limiter = rl.GetLimiter(ip2)
|
||||
require.True(t, limiter.Allow())
|
||||
|
||||
limiter = rl.GetLimiter(ip2)
|
||||
require.False(t, limiter.Allow())
|
||||
}
|
||||
|
||||
func TestRateLimitCleanup(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
defer ctx.Done()
|
||||
|
||||
logger, err := zap.NewDevelopment()
|
||||
require.NoError(t, err)
|
||||
rl := NewRateLimiterWithCleanup(ctx, rate.Limit(1/time.Hour), 1, 100*time.Millisecond, logger)
|
||||
|
||||
for i := 0; i < 300; i++ {
|
||||
ip := fmt.Sprintf("%d.%d.%d.%d", i, i, i, i)
|
||||
limiter := rl.GetLimiter(ip)
|
||||
require.True(t, limiter.Allow())
|
||||
require.False(t, limiter.Allow())
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
require.Equal(t, 300, rl.NumClients())
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
limiter2 := rl.GetLimiter("1.1.1.1")
|
||||
require.True(t, limiter2.Allow())
|
||||
|
||||
require.Equal(t, 1, rl.NumClients())
|
||||
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
@ -12,19 +13,30 @@ import (
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
const (
|
||||
RATE_LIMIT = rate.Limit(10)
|
||||
BURST = 1
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
Router *mux.Router
|
||||
DB *sql.DB
|
||||
logger *zap.Logger
|
||||
Router *mux.Router
|
||||
DB *sql.DB
|
||||
logger *zap.Logger
|
||||
rateLimiter RateLimiter
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewServer(db *sql.DB, logger *zap.Logger) *Server {
|
||||
ctx := context.Background()
|
||||
server := &Server{
|
||||
Router: mux.NewRouter().StrictSlash(true),
|
||||
DB: db,
|
||||
logger: logger,
|
||||
Router: mux.NewRouter().StrictSlash(true),
|
||||
DB: db,
|
||||
logger: logger,
|
||||
rateLimiter: *NewRateLimiter(ctx, RATE_LIMIT, BURST, logger),
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
server.Router.HandleFunc("/protocol-stats", server.createProtocolStats).Methods("POST")
|
||||
@ -32,6 +44,7 @@ func NewServer(db *sql.DB, logger *zap.Logger) *Server {
|
||||
server.Router.HandleFunc("/received-envelope", server.createReceivedEnvelope).Methods("POST")
|
||||
server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST")
|
||||
server.Router.HandleFunc("/health", handleHealthCheck).Methods("GET")
|
||||
server.Router.Use(server.rateLimit)
|
||||
|
||||
return server
|
||||
}
|
||||
@ -207,6 +220,18 @@ func (s *Server) createProtocolStats(w http.ResponseWriter, r *http.Request) {
|
||||
)
|
||||
}
|
||||
|
||||
func (s *Server) rateLimit(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
limiter := s.rateLimiter.GetLimiter(r.RemoteAddr)
|
||||
if !limiter.Allow() {
|
||||
http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
|
||||
return
|
||||
}
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) Start(port int) {
|
||||
s.logger.Info("Starting server", zap.Int("port", port))
|
||||
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), s.Router))
|
||||
|
Loading…
x
Reference in New Issue
Block a user