install global validation throttle, use reasonable defaults.
This commit is contained in:
parent
856a25c8eb
commit
edcb251ad1
59
floodsub.go
59
floodsub.go
@ -19,8 +19,9 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
ID = protocol.ID("/floodsub/1.0.0")
|
ID = protocol.ID("/floodsub/1.0.0")
|
||||||
defaultValidateConcurrency = 10
|
|
||||||
defaultValidateTimeout = 150 * time.Millisecond
|
defaultValidateTimeout = 150 * time.Millisecond
|
||||||
|
defaultValidateConcurrency = 100
|
||||||
|
defaultValidateThrottle = 8192
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("floodsub")
|
var log = logging.Logger("floodsub")
|
||||||
@ -61,6 +62,9 @@ type PubSub struct {
|
|||||||
// sendMsg handles messages that have been validated
|
// sendMsg handles messages that have been validated
|
||||||
sendMsg chan *sendReq
|
sendMsg chan *sendReq
|
||||||
|
|
||||||
|
// validateThrottle limits the number of active validation goroutines
|
||||||
|
validateThrottle chan struct{}
|
||||||
|
|
||||||
peers map[peer.ID]chan *RPC
|
peers map[peer.ID]chan *RPC
|
||||||
seenMessages *timecache.TimeCache
|
seenMessages *timecache.TimeCache
|
||||||
|
|
||||||
@ -90,22 +94,23 @@ type Option func(*PubSub) error
|
|||||||
// NewFloodSub returns a new FloodSub management object
|
// NewFloodSub returns a new FloodSub management object
|
||||||
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
|
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
|
||||||
ps := &PubSub{
|
ps := &PubSub{
|
||||||
host: h,
|
host: h,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
incoming: make(chan *RPC, 32),
|
incoming: make(chan *RPC, 32),
|
||||||
publish: make(chan *Message),
|
publish: make(chan *Message),
|
||||||
newPeers: make(chan inet.Stream),
|
newPeers: make(chan inet.Stream),
|
||||||
peerDead: make(chan peer.ID),
|
peerDead: make(chan peer.ID),
|
||||||
cancelCh: make(chan *Subscription),
|
cancelCh: make(chan *Subscription),
|
||||||
getPeers: make(chan *listPeerReq),
|
getPeers: make(chan *listPeerReq),
|
||||||
addSub: make(chan *addSubReq),
|
addSub: make(chan *addSubReq),
|
||||||
getTopics: make(chan *topicReq),
|
getTopics: make(chan *topicReq),
|
||||||
sendMsg: make(chan *sendReq),
|
sendMsg: make(chan *sendReq),
|
||||||
myTopics: make(map[string]map[*Subscription]struct{}),
|
validateThrottle: make(chan struct{}, defaultValidateThrottle),
|
||||||
topics: make(map[string]map[peer.ID]struct{}),
|
myTopics: make(map[string]map[*Subscription]struct{}),
|
||||||
peers: make(map[peer.ID]chan *RPC),
|
topics: make(map[string]map[peer.ID]struct{}),
|
||||||
seenMessages: timecache.NewTimeCache(time.Second * 30),
|
peers: make(map[peer.ID]chan *RPC),
|
||||||
counter: uint64(time.Now().UnixNano()),
|
seenMessages: timecache.NewTimeCache(time.Second * 30),
|
||||||
|
counter: uint64(time.Now().UnixNano()),
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
@ -123,6 +128,13 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err
|
|||||||
return ps, nil
|
return ps, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithValidateThrottle(n int) Option {
|
||||||
|
return func(ps *PubSub) error {
|
||||||
|
ps.validateThrottle = make(chan struct{}, n)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// processLoop handles all inputs arriving on the channels
|
// processLoop handles all inputs arriving on the channels
|
||||||
func (p *PubSub) processLoop(ctx context.Context) {
|
func (p *PubSub) processLoop(ctx context.Context) {
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -361,9 +373,16 @@ func (p *PubSub) pushMsg(subs []*Subscription, src peer.ID, msg *Message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if needval {
|
if needval {
|
||||||
// validation is asynchronous
|
// validation is asynchronous and globally throttled with the throttleValidate semaphore
|
||||||
// XXX vyzo: do we want a global validation throttle here?
|
select {
|
||||||
go p.validate(subs, src, msg)
|
case p.validateThrottle <- struct{}{}:
|
||||||
|
go func() {
|
||||||
|
p.validate(subs, src, msg)
|
||||||
|
<-p.validateThrottle
|
||||||
|
}()
|
||||||
|
default:
|
||||||
|
log.Warningf("message validation throttled; dropping message from %s", src)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user