Update Whisper and introduce rate limiting (#1673)
This commit is contained in:
parent
6ce437255e
commit
c199c8f342
2
go.mod
2
go.mod
|
@ -33,7 +33,7 @@ require (
|
|||
github.com/status-im/rendezvous v1.3.0
|
||||
github.com/status-im/status-protocol-go v0.5.1
|
||||
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501
|
||||
github.com/status-im/whisper v1.5.2
|
||||
github.com/status-im/whisper v1.6.1
|
||||
github.com/stretchr/testify v1.4.0
|
||||
github.com/syndtr/goleveldb v1.0.0
|
||||
go.uber.org/zap v1.10.0
|
||||
|
|
4
go.sum
4
go.sum
|
@ -602,6 +602,8 @@ github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501 h1:oa0KU5jJRN
|
|||
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501/go.mod h1:RYo/itke1oU5k/6sj9DNM3QAwtE5rZSgg5JnkOv83hk=
|
||||
github.com/status-im/whisper v1.5.2 h1:26NgiKusmPic38eQdtXnaY+iaQ/LuQ3Dh0kCGYT/Uxs=
|
||||
github.com/status-im/whisper v1.5.2/go.mod h1:emrOxzJme0k66QtbbQ2bdd3P8RCdLZ8sTD7SkwH1s2s=
|
||||
github.com/status-im/whisper v1.6.1 h1:C/T1HQHZfUI2jbccf3yIe8yfkl435I3BILIKeNASJDc=
|
||||
github.com/status-im/whisper v1.6.1/go.mod h1:lygchT4p9Y1/hR451OhNNqfinvy9EYEDxtXU2T/U30Q=
|
||||
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE=
|
||||
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw=
|
||||
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM=
|
||||
|
@ -620,6 +622,8 @@ github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2K
|
|||
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
|
||||
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
|
||||
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
|
||||
github.com/tsenart/tb v0.0.0-20181025101425-0d2499c8b6e9 h1:kjbwitOGH46vD01f2s3leBfrMnePQa3NSAIlW35MvY8=
|
||||
github.com/tsenart/tb v0.0.0-20181025101425-0d2499c8b6e9/go.mod h1:EcGP24b8DY+bWHnpfJDP7fM+o8Nmz4fYH0l2xTtNr3I=
|
||||
github.com/tyler-smith/go-bip39 v1.0.2 h1:+t3w+KwLXO6154GNJY+qUtIxLTmFjfUmpguQT1OlOT8=
|
||||
github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
|
||||
github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
|
||||
|
|
29
node/node.go
29
node/node.go
|
@ -324,6 +324,11 @@ func activateShhService(stack *node.Node, config *params.NodeConfig, db *leveldb
|
|||
|
||||
whisperService := whisper.New(whisperServiceConfig)
|
||||
|
||||
if config.WhisperConfig.EnableRateLimiter {
|
||||
r := whisperRateLimiter(config)
|
||||
whisperService.SetRateLimiter(r)
|
||||
}
|
||||
|
||||
if config.WhisperConfig.EnableNTPSync {
|
||||
timesource, err := whisperTimeSource(ctx)
|
||||
if err != nil {
|
||||
|
@ -451,3 +456,27 @@ func whisperTimeSource(ctx *node.ServiceContext) (func() time.Time, error) {
|
|||
}
|
||||
return timeSource.Now, nil
|
||||
}
|
||||
|
||||
func whisperRateLimiter(config *params.NodeConfig) *whisper.PeerRateLimiter {
|
||||
enodes := append(
|
||||
parseNodes(config.ClusterConfig.StaticNodes),
|
||||
parseNodes(config.ClusterConfig.TrustedMailServers)...,
|
||||
)
|
||||
var (
|
||||
ips []string
|
||||
peerIDs []enode.ID
|
||||
)
|
||||
for _, item := range enodes {
|
||||
ips = append(ips, item.IP().String())
|
||||
peerIDs = append(peerIDs, item.ID())
|
||||
}
|
||||
return whisper.NewPeerRateLimiter(
|
||||
&whisper.MetricsRateLimiterHandler{},
|
||||
&whisper.PeerRateLimiterConfig{
|
||||
LimitPerSecIP: config.WhisperConfig.RateLimitIP,
|
||||
LimitPerSecPeerID: config.WhisperConfig.RateLimitPeerID,
|
||||
WhitelistedIPs: ips,
|
||||
WhitelistedPeerIDs: peerIDs,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
|
|
@ -88,6 +88,17 @@ type WhisperConfig struct {
|
|||
|
||||
// DatabaseConfig is configuration for which datastore we use
|
||||
DatabaseConfig DatabaseConfig
|
||||
|
||||
// EnableRateLimiter set to true enables IP and peer ID rate limiting.
|
||||
EnableRateLimiter bool
|
||||
|
||||
// RateLimitIP sets the limit on the number of messages per second
|
||||
// from a given IP.
|
||||
RateLimitIP int64
|
||||
|
||||
// RateLimitPeerID sets the limit on the number of messages per second
|
||||
// from a given peer ID.
|
||||
RateLimitPeerID int64
|
||||
}
|
||||
|
||||
type DatabaseConfig struct {
|
||||
|
@ -95,7 +106,7 @@ type DatabaseConfig struct {
|
|||
}
|
||||
|
||||
type PGConfig struct {
|
||||
// Enabled whether we should use a posgres instance
|
||||
// Enabled whether we should use a Postgres instance
|
||||
Enabled bool
|
||||
// The URI of the server
|
||||
URI string
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
// +build gofuzz
|
||||
|
||||
package whisperv6
|
||||
|
||||
func Fuzz(data []byte) int {
|
||||
if len(data) < 2 {
|
||||
return -1
|
||||
}
|
||||
|
||||
msg := &ReceivedMessage{Raw: data}
|
||||
msg.ValidateAndParse()
|
||||
|
||||
return 0
|
||||
}
|
|
@ -322,11 +322,14 @@ func (msg *ReceivedMessage) ValidateAndParse() bool {
|
|||
payloadSize := 0
|
||||
sizeOfPayloadSizeField := int(msg.Raw[0] & SizeMask) // number of bytes indicating the size of payload
|
||||
if sizeOfPayloadSizeField != 0 {
|
||||
payloadSize = int(bytesToUintLittleEndian(msg.Raw[beg : beg+sizeOfPayloadSizeField]))
|
||||
if payloadSize+1 > end {
|
||||
if end < beg+sizeOfPayloadSizeField {
|
||||
return false
|
||||
}
|
||||
payloadSize = int(bytesToUintLittleEndian(msg.Raw[beg : beg+sizeOfPayloadSizeField]))
|
||||
beg += sizeOfPayloadSizeField
|
||||
if beg+payloadSize > end {
|
||||
return false
|
||||
}
|
||||
msg.Payload = msg.Raw[beg : beg+payloadSize]
|
||||
}
|
||||
|
||||
|
|
|
@ -13,4 +13,9 @@ var (
|
|||
envelopeErrLowPowCounter = metrics.NewRegisteredCounter("whisper/envelopeErrLowPow", nil)
|
||||
envelopeErrNoBloomMatchCounter = metrics.NewRegisteredCounter("whisper/envelopeErrNoBloomMatch", nil)
|
||||
envelopeSizeMeter = metrics.NewRegisteredMeter("whisper/envelopeSize", nil)
|
||||
|
||||
// rate limiter metrics
|
||||
rateLimiterProcessed = metrics.NewRegisteredCounter("whisper/rateLimiterProcessed", nil)
|
||||
rateLimiterIPExceeded = metrics.NewRegisteredCounter("whisper/rateLimiterIPExceeded", nil)
|
||||
rateLimiterPeerExceeded = metrics.NewRegisteredCounter("whisper/rateLimiterPeerExceeded", nil)
|
||||
)
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
package whisperv6
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/tsenart/tb"
|
||||
)
|
||||
|
||||
type runLoop func(p *Peer, rw p2p.MsgReadWriter) error
|
||||
|
||||
type RateLimiterHandler interface {
|
||||
ExceedPeerLimit()
|
||||
ExceedIPLimit()
|
||||
}
|
||||
|
||||
type MetricsRateLimiterHandler struct{}
|
||||
|
||||
func (MetricsRateLimiterHandler) ExceedPeerLimit() { rateLimiterPeerExceeded.Inc(1) }
|
||||
func (MetricsRateLimiterHandler) ExceedIPLimit() { rateLimiterIPExceeded.Inc(1) }
|
||||
|
||||
type PeerRateLimiterConfig struct {
|
||||
LimitPerSecIP int64
|
||||
LimitPerSecPeerID int64
|
||||
WhitelistedIPs []string
|
||||
WhitelistedPeerIDs []enode.ID
|
||||
}
|
||||
|
||||
var defaultPeerRateLimiterConfig = PeerRateLimiterConfig{
|
||||
LimitPerSecIP: 10,
|
||||
LimitPerSecPeerID: 5,
|
||||
WhitelistedIPs: nil,
|
||||
WhitelistedPeerIDs: nil,
|
||||
}
|
||||
|
||||
type PeerRateLimiter struct {
|
||||
peerIDThrottler *tb.Throttler
|
||||
ipThrottler *tb.Throttler
|
||||
|
||||
limitPerSecIP int64
|
||||
limitPerSecPeerID int64
|
||||
|
||||
whitelistedPeerIDs []enode.ID
|
||||
whitelistedIPs []string
|
||||
|
||||
handler RateLimiterHandler
|
||||
}
|
||||
|
||||
func NewPeerRateLimiter(handler RateLimiterHandler, cfg *PeerRateLimiterConfig) *PeerRateLimiter {
|
||||
if cfg == nil {
|
||||
copy := defaultPeerRateLimiterConfig
|
||||
cfg = ©
|
||||
}
|
||||
|
||||
return &PeerRateLimiter{
|
||||
peerIDThrottler: tb.NewThrottler(time.Millisecond * 100),
|
||||
ipThrottler: tb.NewThrottler(time.Millisecond * 100),
|
||||
limitPerSecIP: cfg.LimitPerSecIP,
|
||||
limitPerSecPeerID: cfg.LimitPerSecPeerID,
|
||||
whitelistedPeerIDs: cfg.WhitelistedPeerIDs,
|
||||
whitelistedIPs: cfg.WhitelistedIPs,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoop) error {
|
||||
in, out := p2p.MsgPipe()
|
||||
defer in.Close()
|
||||
defer out.Close()
|
||||
errC := make(chan error, 1)
|
||||
|
||||
// Read from the original reader and write to the message pipe.
|
||||
go func() {
|
||||
for {
|
||||
packet, err := rw.ReadMsg()
|
||||
if err != nil {
|
||||
errC <- fmt.Errorf("failed to read packet: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
rateLimiterProcessed.Inc(1)
|
||||
|
||||
var ip string
|
||||
if p != nil && p.peer != nil {
|
||||
ip = p.peer.Node().IP().String()
|
||||
}
|
||||
if halted := r.throttleIP(ip); halted {
|
||||
r.handler.ExceedIPLimit()
|
||||
}
|
||||
|
||||
var peerID []byte
|
||||
if p != nil {
|
||||
peerID = p.ID()
|
||||
}
|
||||
if halted := r.throttlePeer(peerID); halted {
|
||||
r.handler.ExceedPeerLimit()
|
||||
}
|
||||
|
||||
if err := in.WriteMsg(packet); err != nil {
|
||||
errC <- fmt.Errorf("failed to write packet to pipe: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Read from the message pipe and write to the original writer.
|
||||
go func() {
|
||||
for {
|
||||
packet, err := in.ReadMsg()
|
||||
if err != nil {
|
||||
errC <- fmt.Errorf("failed to read packet from pipe: %v", err)
|
||||
return
|
||||
}
|
||||
if err := rw.WriteMsg(packet); err != nil {
|
||||
errC <- fmt.Errorf("failed to write packet: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
errC <- runLoop(p, out)
|
||||
}()
|
||||
|
||||
return <-errC
|
||||
}
|
||||
|
||||
// throttleIP throttles a number of messages incoming from a given IP.
|
||||
// It allows 10 packets per second.
|
||||
func (r *PeerRateLimiter) throttleIP(ip string) bool {
|
||||
if r.limitPerSecIP == 0 {
|
||||
return false
|
||||
}
|
||||
if stringSliceContains(r.whitelistedIPs, ip) {
|
||||
return false
|
||||
}
|
||||
return r.ipThrottler.Halt(ip, 1, r.limitPerSecIP)
|
||||
}
|
||||
|
||||
// throttlePeer throttles a number of messages incoming from a peer.
|
||||
// It allows 3 packets per second.
|
||||
func (r *PeerRateLimiter) throttlePeer(peerID []byte) bool {
|
||||
if r.limitPerSecIP == 0 {
|
||||
return false
|
||||
}
|
||||
var id enode.ID
|
||||
copy(id[:], peerID)
|
||||
if enodeIDSliceContains(r.whitelistedPeerIDs, id) {
|
||||
return false
|
||||
}
|
||||
return r.peerIDThrottler.Halt(id.String(), 1, r.limitPerSecPeerID)
|
||||
}
|
||||
|
||||
func stringSliceContains(s []string, searched string) bool {
|
||||
for _, item := range s {
|
||||
if item == searched {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func enodeIDSliceContains(s []enode.ID, searched enode.ID) bool {
|
||||
for _, item := range s {
|
||||
if bytes.Equal(item.Bytes(), searched.Bytes()) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
|
@ -105,6 +105,8 @@ type Whisper struct {
|
|||
|
||||
mailServer MailServer // MailServer interface
|
||||
|
||||
rateLimiter *PeerRateLimiter
|
||||
|
||||
messageStoreFabric func() MessageStore
|
||||
|
||||
envelopeFeed event.Feed
|
||||
|
@ -335,6 +337,10 @@ func (whisper *Whisper) SetLightClientMode(v bool) {
|
|||
whisper.settings.Store(lightClientModeIdx, v)
|
||||
}
|
||||
|
||||
func (whisper *Whisper) SetRateLimiter(r *PeerRateLimiter) {
|
||||
whisper.rateLimiter = r
|
||||
}
|
||||
|
||||
//LightClientMode indicates is this node is light client (does not forward any messages)
|
||||
func (whisper *Whisper) LightClientMode() bool {
|
||||
val, exist := whisper.settings.Load(lightClientModeIdx)
|
||||
|
@ -887,6 +893,9 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
|
|||
whisperPeer.start()
|
||||
defer whisperPeer.stop()
|
||||
|
||||
if whisper.rateLimiter != nil {
|
||||
return whisper.rateLimiter.decorate(whisperPeer, rw, whisper.runMessageLoop)
|
||||
}
|
||||
return whisper.runMessageLoop(whisperPeer, rw)
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
language: go
|
||||
sudo: false
|
||||
|
||||
go:
|
||||
- 1.2
|
||||
- 1.3
|
||||
- 1.4
|
||||
- 1.5
|
||||
- tip
|
||||
|
||||
install:
|
||||
- go get -v golang.org/x/tools/cmd/vet
|
||||
- go get -v golang.org/x/lint/golint
|
||||
- go get -d -t -v ./...
|
||||
- go build -v ./...
|
||||
|
||||
script:
|
||||
- go vet ./...
|
||||
- golint .
|
||||
- go test -v -parallel=8 ./...
|
|
@ -0,0 +1,49 @@
|
|||
# Token Bucket (tb) [![Build Status](https://secure.travis-ci.org/tsenart/tb.png)](http://travis-ci.org/tsenart/tb) [![GoDoc](https://godoc.org/github.com/tsenart/tb?status.png)](https://godoc.org/github.com/tsenart/tb)
|
||||
|
||||
This package provides a generic lock-free implementation of the "Token bucket"
|
||||
algorithm where handling of non-conformity is left to the user.
|
||||
|
||||
|
||||
> The token bucket is an algorithm used in packet switched computer networks and telecommunications networks. It can be used to check that data transmissions, in the form of packets, conform to defined limits on bandwidth and burstiness (a measure of the unevenness or variations in the traffic flow)
|
||||
-- <cite>[Wikipedia](http://en.wikipedia.org/wiki/Token_bucket)</cite>
|
||||
|
||||
This implementation of the token bucket generalises its applications beyond packet rate conformance. Hence, the word *generic*. You can use it to throttle any flow over time as long as it can be expressed as a number (bytes/s, requests/s, messages/s, packets/s, potatoes/s, heartbeats/s, etc...).
|
||||
|
||||
The *lock-free* part of the description refers to the lock-free programming techniques (CAS loop) used in the core `Bucket` methods (`Take` and `Put`). [Here is](http://preshing.com/20120612/an-introduction-to-lock-free-programming/) a good overview of lock-free programming you can refer to.
|
||||
|
||||
All utility pacakges such as [http](http/) and [io](io/) are just wrappers around the core package.
|
||||
This ought to be your one stop shop for all things **throttling** in Go so feel free to propose missing common functionality.
|
||||
|
||||
|
||||
|
||||
## Install
|
||||
```shell
|
||||
$ go get github.com/tsenart/tb
|
||||
```
|
||||
|
||||
## Usage
|
||||
Read up the [docs](https://godoc.org/github.com/tsenart/tb) and have a look at some [examples](examples/).
|
||||
|
||||
## Licence
|
||||
```
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2014-2015 Tomás Senart
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
```
|
|
@ -0,0 +1,130 @@
|
|||
package tb
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Bucket defines a generic lock-free implementation of a Token Bucket.
|
||||
type Bucket struct {
|
||||
inc int64
|
||||
tokens int64
|
||||
capacity int64
|
||||
freq time.Duration
|
||||
closing chan struct{}
|
||||
}
|
||||
|
||||
// NewBucket returns a full Bucket with c capacity and starts a filling
|
||||
// go-routine which ticks every freq. The number of tokens added on each tick
|
||||
// is computed dynamically to be even across the duration of a second.
|
||||
//
|
||||
// If freq == -1 then the filling go-routine won't be started. Otherwise,
|
||||
// If freq < 1/c seconds, then it will be adjusted to 1/c seconds.
|
||||
func NewBucket(c int64, freq time.Duration) *Bucket {
|
||||
b := &Bucket{tokens: c, capacity: c, closing: make(chan struct{})}
|
||||
|
||||
if freq == -1 {
|
||||
return b
|
||||
} else if evenFreq := time.Duration(1e9 / c); freq < evenFreq {
|
||||
freq = evenFreq
|
||||
}
|
||||
|
||||
b.freq = freq
|
||||
b.inc = int64(math.Floor(.5 + (float64(c) * freq.Seconds())))
|
||||
|
||||
go b.fill()
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
// Take attempts to take n tokens out of the bucket.
|
||||
// If tokens == 0, nothing will be taken.
|
||||
// If n <= tokens, n tokens will be taken.
|
||||
// If n > tokens, all tokens will be taken.
|
||||
//
|
||||
// This method is thread-safe.
|
||||
func (b *Bucket) Take(n int64) (taken int64) {
|
||||
for {
|
||||
if tokens := atomic.LoadInt64(&b.tokens); tokens == 0 {
|
||||
return 0
|
||||
} else if n <= tokens {
|
||||
if !atomic.CompareAndSwapInt64(&b.tokens, tokens, tokens-n) {
|
||||
continue
|
||||
}
|
||||
return n
|
||||
} else if atomic.CompareAndSwapInt64(&b.tokens, tokens, 0) { // Spill
|
||||
return tokens
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Put attempts to add n tokens to the bucket.
|
||||
// If tokens == capacity, nothing will be added.
|
||||
// If n <= capacity - tokens, n tokens will be added.
|
||||
// If n > capacity - tokens, capacity - tokens will be added.
|
||||
//
|
||||
// This method is thread-safe.
|
||||
func (b *Bucket) Put(n int64) (added int64) {
|
||||
for {
|
||||
if tokens := atomic.LoadInt64(&b.tokens); tokens == b.capacity {
|
||||
return 0
|
||||
} else if left := b.capacity - tokens; n <= left {
|
||||
if !atomic.CompareAndSwapInt64(&b.tokens, tokens, tokens+n) {
|
||||
continue
|
||||
}
|
||||
return n
|
||||
} else if atomic.CompareAndSwapInt64(&b.tokens, tokens, b.capacity) {
|
||||
return left
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait waits for n amount of tokens to be available.
|
||||
// If n tokens are immediatelly available it doesn't sleep.
|
||||
// Otherwise, it sleeps the minimum amount of time required for the remaining
|
||||
// tokens to be available. It returns the wait duration.
|
||||
//
|
||||
// This method is thread-safe.
|
||||
func (b *Bucket) Wait(n int64) time.Duration {
|
||||
var rem int64
|
||||
if rem = n - b.Take(n); rem == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
var wait time.Duration
|
||||
for rem > 0 {
|
||||
sleep := b.wait(rem)
|
||||
wait += sleep
|
||||
time.Sleep(sleep)
|
||||
rem -= b.Take(rem)
|
||||
}
|
||||
return wait
|
||||
}
|
||||
|
||||
// Close stops the filling go-routine given it was started.
|
||||
func (b *Bucket) Close() error {
|
||||
close(b.closing)
|
||||
return nil
|
||||
}
|
||||
|
||||
// wait returns the minimum amount of time required for n tokens to be available.
|
||||
// if n > capacity, n will be adjusted to capacity
|
||||
func (b *Bucket) wait(n int64) time.Duration {
|
||||
return time.Duration(int64(math.Ceil(math.Min(float64(n), float64(b.capacity))/float64(b.inc))) *
|
||||
b.freq.Nanoseconds())
|
||||
}
|
||||
|
||||
func (b *Bucket) fill() {
|
||||
ticker := time.NewTicker(b.freq)
|
||||
defer ticker.Stop()
|
||||
|
||||
for _ = range ticker.C {
|
||||
select {
|
||||
case <-b.closing:
|
||||
return
|
||||
default:
|
||||
b.Put(b.inc)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
// Package tb provides a generic lock-free implementation of the
|
||||
// Token Bucket algorithm where non-conformity is handled by the user.
|
||||
// http://en.wikipedia.org/wiki/Token_bucket
|
||||
package tb
|
|
@ -0,0 +1,124 @@
|
|||
package tb
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Throttler is a thread-safe wrapper around a map of buckets and an easy to
|
||||
// use API for generic throttling.
|
||||
type Throttler struct {
|
||||
mu sync.RWMutex
|
||||
freq time.Duration
|
||||
buckets map[string]*Bucket
|
||||
closing chan struct{}
|
||||
}
|
||||
|
||||
// NewThrottler returns a Throttler with a single filler go-routine for all
|
||||
// its Buckets which ticks every freq.
|
||||
// The number of tokens added on each tick for each bucket is computed
|
||||
// dynamically to be even accross the duration of a second.
|
||||
//
|
||||
// If freq <= 0, the filling go-routine won't be started.
|
||||
func NewThrottler(freq time.Duration) *Throttler {
|
||||
th := &Throttler{
|
||||
freq: freq,
|
||||
buckets: map[string]*Bucket{},
|
||||
closing: make(chan struct{}),
|
||||
}
|
||||
|
||||
if freq > 0 {
|
||||
go th.fill(freq)
|
||||
}
|
||||
|
||||
return th
|
||||
}
|
||||
|
||||
// Bucket returns a Bucket with rate capacity, keyed by key.
|
||||
//
|
||||
// If a Bucket (key, rate) doesn't exist yet, it is created.
|
||||
//
|
||||
// You must call Close when you're done with the Throttler in order to not leak
|
||||
// a go-routine and a system-timer.
|
||||
func (t *Throttler) Bucket(key string, rate int64) *Bucket {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
b, ok := t.buckets[key]
|
||||
|
||||
if !ok {
|
||||
b = NewBucket(rate, -1)
|
||||
b.inc = int64(math.Floor(.5 + (float64(b.capacity) * t.freq.Seconds())))
|
||||
b.freq = t.freq
|
||||
t.buckets[key] = b
|
||||
}
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
// Wait waits for n amount of tokens to be available.
|
||||
// If n tokens are immediatelly available it doesn't sleep. Otherwise, it sleeps
|
||||
// the minimum amount of time required for the remaining tokens to be available.
|
||||
// It returns the wait duration.
|
||||
//
|
||||
// If a Bucket (key, rate) doesn't exist yet, it is created.
|
||||
// If freq < 1/rate seconds, the effective wait rate won't be correct.
|
||||
//
|
||||
// You must call Close when you're done with the Throttler in order to not leak
|
||||
// a go-routine and a system-timer.
|
||||
func (t *Throttler) Wait(key string, n, rate int64) time.Duration {
|
||||
return t.Bucket(key, rate).Wait(n)
|
||||
}
|
||||
|
||||
// Halt returns a bool indicating if the Bucket identified by key and rate has
|
||||
// n amount of tokens. If it doesn't, the taken tokens are added back to the
|
||||
// bucket.
|
||||
//
|
||||
// If a Bucket (key, rate) doesn't exist yet, it is created.
|
||||
// If freq < 1/rate seconds, the results won't be correct.
|
||||
//
|
||||
// You must call Close when you're done with the Throttler in order to not leak
|
||||
// a go-routine and a system-timer.
|
||||
func (t *Throttler) Halt(key string, n, rate int64) bool {
|
||||
b := t.Bucket(key, rate)
|
||||
|
||||
if got := b.Take(n); got != n {
|
||||
b.Put(got)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Close stops filling the Buckets, closing the filling go-routine.
|
||||
func (t *Throttler) Close() error {
|
||||
close(t.closing)
|
||||
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
for _, b := range t.buckets {
|
||||
b.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Throttler) fill(freq time.Duration) {
|
||||
ticker := time.NewTicker(freq)
|
||||
defer ticker.Stop()
|
||||
|
||||
for _ = range ticker.C {
|
||||
select {
|
||||
case <-t.closing:
|
||||
return
|
||||
default:
|
||||
}
|
||||
t.mu.RLock()
|
||||
for _, b := range t.buckets {
|
||||
b.Put(b.inc)
|
||||
}
|
||||
t.mu.RUnlock()
|
||||
}
|
||||
}
|
|
@ -392,7 +392,7 @@ github.com/status-im/status-protocol-go/v1
|
|||
github.com/status-im/status-protocol-go/zaputil
|
||||
# github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501
|
||||
github.com/status-im/tcp-shaker
|
||||
# github.com/status-im/whisper v1.5.2
|
||||
# github.com/status-im/whisper v1.6.1
|
||||
github.com/status-im/whisper/whisperv6
|
||||
# github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570
|
||||
github.com/steakknife/bloomfilter
|
||||
|
@ -415,6 +415,8 @@ github.com/syndtr/goleveldb/leveldb/opt
|
|||
github.com/syndtr/goleveldb/leveldb/storage
|
||||
github.com/syndtr/goleveldb/leveldb/table
|
||||
github.com/syndtr/goleveldb/leveldb/util
|
||||
# github.com/tsenart/tb v0.0.0-20181025101425-0d2499c8b6e9
|
||||
github.com/tsenart/tb
|
||||
# github.com/tyler-smith/go-bip39 v1.0.2
|
||||
github.com/tyler-smith/go-bip39
|
||||
github.com/tyler-smith/go-bip39/wordlists
|
||||
|
|
Loading…
Reference in New Issue