From 7a38f7642e8349661aeede3945437bf74979b487 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 25 Apr 2019 21:28:13 +0300 Subject: [PATCH] validation pipeline front-end for handling signature validation synchronously --- pubsub.go | 66 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 13 deletions(-) diff --git a/pubsub.go b/pubsub.go index c8a0e91..a9fe375 100644 --- a/pubsub.go +++ b/pubsub.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "math/rand" + "runtime" "sync/atomic" "time" @@ -92,6 +93,9 @@ type PubSub struct { // topicVals tracks per topic validators topicVals map[string]*topicVal + // validateQ is the front-end to the validation pipeline + validateQ chan *validateReq + // validateThrottle limits the number of active validation goroutines validateThrottle chan struct{} @@ -184,6 +188,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), topicVals: make(map[string]*topicVal), + validateQ: make(chan *validateReq, 32), blacklist: NewMapBlacklist(), blacklistPeer: make(chan peer.ID), seenMessages: timecache.NewTimeCache(TimeCacheDuration), @@ -210,6 +215,11 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option go ps.processLoop(ctx) + numcpu := runtime.NumCPU() + for i := 0; i < numcpu; i++ { + go ps.validateWorker() + } + return ps, nil } @@ -631,16 +641,8 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { } if len(vals) > 0 || msg.Signature != nil { - // validation is asynchronous and globally throttled with the throttleValidate semaphore. - // the purpose of the global throttle is to bound the goncurrency possible from incoming - // network traffic; each validator also has an individual throttle to preclude - // slow (or faulty) validators from starving other topics; see validate below. select { - case p.validateThrottle <- struct{}{}: - go func() { - p.validate(vals, src, msg) - <-p.validateThrottle - }() + case p.validateQ <- &validateReq{vals, src, msg}: default: log.Warningf("message validation throttled; dropping message from %s", src) } @@ -650,7 +652,20 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { p.publishMessage(src, msg.Message) } +func (p *PubSub) validateWorker() { + for { + select { + case req := <-p.validateQ: + p.validate(req.vals, req.src, req.msg) + case <-p.ctx.Done(): + return + } + } +} + // validate performs validation and only sends the message if all validators succeed +// signature validation is performed synchronously, while user validators are invoked +// asynchronously, throttled by the global validation throttle. func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { if msg.Signature != nil { if !p.validateSignature(msg) { @@ -660,13 +675,19 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { } if len(vals) > 0 { - if !p.validateTopic(vals, src, msg) { - log.Warningf("message validation failed; dropping message from %s", src) - return + select { + case p.validateThrottle <- struct{}{}: + go func() { + p.doValidateTopic(vals, src, msg) + <-p.validateThrottle + }() + default: + log.Warningf("message validation throttled; dropping message from %s", src) } + return } - // all validators were successful, send the message + // no user validators and the signature was valid, send the message p.sendMsg <- &sendReq{ from: src, msg: msg, @@ -683,6 +704,18 @@ func (p *PubSub) validateSignature(msg *Message) bool { return true } +func (p *PubSub) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) { + if !p.validateTopic(vals, src, msg) { + log.Warningf("message validation failed; dropping message from %s", src) + return + } + + p.sendMsg <- &sendReq{ + from: src, + msg: msg, + } +} + func (p *PubSub) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool { if len(vals) == 1 { return p.validateSingleTopic(vals[0], src, msg) @@ -883,6 +916,13 @@ func (p *PubSub) BlacklistPeer(pid peer.ID) { p.blacklistPeer <- pid } +// validation requests +type validateReq struct { + vals []*topicVal + src peer.ID + msg *Message +} + // per topic validators type addValReq struct { topic string