diff --git a/floodsub.go b/floodsub.go index f32dbf4..45ade9c 100644 --- a/floodsub.go +++ b/floodsub.go @@ -19,8 +19,9 @@ import ( const ( ID = protocol.ID("/floodsub/1.0.0") - defaultValidateConcurrency = 10 defaultValidateTimeout = 150 * time.Millisecond + defaultValidateConcurrency = 100 + defaultValidateThrottle = 8192 ) var log = logging.Logger("floodsub") @@ -61,6 +62,9 @@ type PubSub struct { // sendMsg handles messages that have been validated sendMsg chan *sendReq + // validateThrottle limits the number of active validation goroutines + validateThrottle chan struct{} + peers map[peer.ID]chan *RPC seenMessages *timecache.TimeCache @@ -90,22 +94,23 @@ type Option func(*PubSub) error // NewFloodSub returns a new FloodSub management object func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { ps := &PubSub{ - host: h, - ctx: ctx, - incoming: make(chan *RPC, 32), - publish: make(chan *Message), - newPeers: make(chan inet.Stream), - peerDead: make(chan peer.ID), - cancelCh: make(chan *Subscription), - getPeers: make(chan *listPeerReq), - addSub: make(chan *addSubReq), - getTopics: make(chan *topicReq), - sendMsg: make(chan *sendReq), - myTopics: make(map[string]map[*Subscription]struct{}), - topics: make(map[string]map[peer.ID]struct{}), - peers: make(map[peer.ID]chan *RPC), - seenMessages: timecache.NewTimeCache(time.Second * 30), - counter: uint64(time.Now().UnixNano()), + host: h, + ctx: ctx, + incoming: make(chan *RPC, 32), + publish: make(chan *Message), + newPeers: make(chan inet.Stream), + peerDead: make(chan peer.ID), + cancelCh: make(chan *Subscription), + getPeers: make(chan *listPeerReq), + addSub: make(chan *addSubReq), + getTopics: make(chan *topicReq), + sendMsg: make(chan *sendReq), + validateThrottle: make(chan struct{}, defaultValidateThrottle), + myTopics: make(map[string]map[*Subscription]struct{}), + topics: make(map[string]map[peer.ID]struct{}), + peers: make(map[peer.ID]chan *RPC), + seenMessages: timecache.NewTimeCache(time.Second * 30), + counter: uint64(time.Now().UnixNano()), } for _, opt := range opts { @@ -123,6 +128,13 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err return ps, nil } +func WithValidateThrottle(n int) Option { + return func(ps *PubSub) error { + ps.validateThrottle = make(chan struct{}, n) + return nil + } +} + // processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { defer func() { @@ -361,9 +373,16 @@ func (p *PubSub) pushMsg(subs []*Subscription, src peer.ID, msg *Message) { } if needval { - // validation is asynchronous - // XXX vyzo: do we want a global validation throttle here? - go p.validate(subs, src, msg) + // validation is asynchronous and globally throttled with the throttleValidate semaphore + select { + case p.validateThrottle <- struct{}{}: + go func() { + p.validate(subs, src, msg) + <-p.validateThrottle + }() + default: + log.Warningf("message validation throttled; dropping message from %s", src) + } return }