From 7a38f7642e8349661aeede3945437bf74979b487 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 25 Apr 2019 21:28:13 +0300 Subject: [PATCH 1/8] validation pipeline front-end for handling signature validation synchronously --- pubsub.go | 66 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 13 deletions(-) diff --git a/pubsub.go b/pubsub.go index c8a0e91..a9fe375 100644 --- a/pubsub.go +++ b/pubsub.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "math/rand" + "runtime" "sync/atomic" "time" @@ -92,6 +93,9 @@ type PubSub struct { // topicVals tracks per topic validators topicVals map[string]*topicVal + // validateQ is the front-end to the validation pipeline + validateQ chan *validateReq + // validateThrottle limits the number of active validation goroutines validateThrottle chan struct{} @@ -184,6 +188,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), topicVals: make(map[string]*topicVal), + validateQ: make(chan *validateReq, 32), blacklist: NewMapBlacklist(), blacklistPeer: make(chan peer.ID), seenMessages: timecache.NewTimeCache(TimeCacheDuration), @@ -210,6 +215,11 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option go ps.processLoop(ctx) + numcpu := runtime.NumCPU() + for i := 0; i < numcpu; i++ { + go ps.validateWorker() + } + return ps, nil } @@ -631,16 +641,8 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { } if len(vals) > 0 || msg.Signature != nil { - // validation is asynchronous and globally throttled with the throttleValidate semaphore. - // the purpose of the global throttle is to bound the goncurrency possible from incoming - // network traffic; each validator also has an individual throttle to preclude - // slow (or faulty) validators from starving other topics; see validate below. select { - case p.validateThrottle <- struct{}{}: - go func() { - p.validate(vals, src, msg) - <-p.validateThrottle - }() + case p.validateQ <- &validateReq{vals, src, msg}: default: log.Warningf("message validation throttled; dropping message from %s", src) } @@ -650,7 +652,20 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { p.publishMessage(src, msg.Message) } +func (p *PubSub) validateWorker() { + for { + select { + case req := <-p.validateQ: + p.validate(req.vals, req.src, req.msg) + case <-p.ctx.Done(): + return + } + } +} + // validate performs validation and only sends the message if all validators succeed +// signature validation is performed synchronously, while user validators are invoked +// asynchronously, throttled by the global validation throttle. func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { if msg.Signature != nil { if !p.validateSignature(msg) { @@ -660,13 +675,19 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { } if len(vals) > 0 { - if !p.validateTopic(vals, src, msg) { - log.Warningf("message validation failed; dropping message from %s", src) - return + select { + case p.validateThrottle <- struct{}{}: + go func() { + p.doValidateTopic(vals, src, msg) + <-p.validateThrottle + }() + default: + log.Warningf("message validation throttled; dropping message from %s", src) } + return } - // all validators were successful, send the message + // no user validators and the signature was valid, send the message p.sendMsg <- &sendReq{ from: src, msg: msg, @@ -683,6 +704,18 @@ func (p *PubSub) validateSignature(msg *Message) bool { return true } +func (p *PubSub) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) { + if !p.validateTopic(vals, src, msg) { + log.Warningf("message validation failed; dropping message from %s", src) + return + } + + p.sendMsg <- &sendReq{ + from: src, + msg: msg, + } +} + func (p *PubSub) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool { if len(vals) == 1 { return p.validateSingleTopic(vals[0], src, msg) @@ -883,6 +916,13 @@ func (p *PubSub) BlacklistPeer(pid peer.ID) { p.blacklistPeer <- pid } +// validation requests +type validateReq struct { + vals []*topicVal + src peer.ID + msg *Message +} + // per topic validators type addValReq struct { topic string From b227afbf9f47804e4ccabcb6c2d0a1576b41bdbd Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 25 Apr 2019 21:46:40 +0300 Subject: [PATCH 2/8] invoke user validators once for each message --- pubsub.go | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/pubsub.go b/pubsub.go index a9fe375..a3638ee 100644 --- a/pubsub.go +++ b/pubsub.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand" "runtime" + "sync" "sync/atomic" "time" @@ -106,8 +107,10 @@ type PubSub struct { blacklist Blacklist blacklistPeer chan peer.ID - peers map[peer.ID]chan *RPC - seenMessages *timecache.TimeCache + peers map[peer.ID]chan *RPC + + seenMessagesMx sync.Mutex + seenMessages *timecache.TimeCache // key for signing messages; nil when signing is disabled (default for now) signKey crypto.PrivKey @@ -552,12 +555,22 @@ func (p *PubSub) notifySubs(msg *pb.Message) { // seenMessage returns whether we already saw this message before func (p *PubSub) seenMessage(id string) bool { + p.seenMessagesMx.Lock() + defer p.seenMessagesMx.Unlock() return p.seenMessages.Has(id) } // markSeen marks a message as seen such that seenMessage returns `true' for the given id -func (p *PubSub) markSeen(id string) { +// returns true if the message was freshly marked +func (p *PubSub) markSeen(id string) bool { + p.seenMessagesMx.Lock() + defer p.seenMessagesMx.Unlock() + if p.seenMessages.Has(id) { + return false + } + p.seenMessages.Add(id) + return true } // subscribedToMessage returns whether we are subscribed to one of the topics @@ -649,7 +662,9 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { return } - p.publishMessage(src, msg.Message) + if p.markSeen(id) { + p.publishMessage(src, msg.Message) + } } func (p *PubSub) validateWorker() { @@ -674,6 +689,13 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { } } + // we can mark the message as seen now that we have verified the signature + // and avoid invoking user validators more than once + id := msgID(msg.Message) + if !p.markSeen(id) { + return + } + if len(vals) > 0 { select { case p.validateThrottle <- struct{}{}: @@ -779,12 +801,6 @@ func (p *PubSub) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) b } func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) { - id := msgID(pmsg) - if p.seenMessage(id) { - return - } - p.markSeen(id) - p.notifySubs(pmsg) p.rt.Publish(from, pmsg) } From b84a32a4ee66e932b33ebfde8ab8dfab8a3abc22 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 26 Apr 2019 11:07:39 +0300 Subject: [PATCH 3/8] remove default async validation timeout and increase default topic validation throttle. and some better documentation. --- pubsub.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/pubsub.go b/pubsub.go index a3638ee..846b361 100644 --- a/pubsub.go +++ b/pubsub.go @@ -22,8 +22,7 @@ import ( ) const ( - defaultValidateTimeout = 150 * time.Millisecond - defaultValidateConcurrency = 100 + defaultValidateConcurrency = 1024 defaultValidateThrottle = 8192 ) @@ -227,7 +226,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option } // WithValidateThrottle sets the upper bound on the number of active validation -// goroutines. +// goroutines across all topics. The default is 8192. func WithValidateThrottle(n int) Option { return func(ps *PubSub) error { ps.validateThrottle = make(chan struct{}, n) @@ -966,7 +965,8 @@ type Validator func(context.Context, peer.ID, *Message) bool // ValidatorOpt is an option for RegisterTopicValidator. type ValidatorOpt func(addVal *addValReq) error -// WithValidatorTimeout is an option that sets the topic validator timeout. +// WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. +// By default there is no timeout in asynchronous validators. func WithValidatorTimeout(timeout time.Duration) ValidatorOpt { return func(addVal *addValReq) error { addVal.timeout = timeout @@ -974,7 +974,8 @@ func WithValidatorTimeout(timeout time.Duration) ValidatorOpt { } } -// WithValidatorConcurrency is an option that sets topic validator throttle. +// WithValidatorConcurrency is an option that sets the topic validator throttle. +// This controls the number of active validation goroutines for the topic; the default is 1024. func WithValidatorConcurrency(n int) ValidatorOpt { return func(addVal *addValReq) error { addVal.throttle = n @@ -983,6 +984,9 @@ func WithValidatorConcurrency(n int) ValidatorOpt { } // RegisterTopicValidator registers a validator for topic. +// By default validators are asynchronous, which means they will run in a separate goroutine. +// The number of active goroutines is controlled by global and per topic validator +// throttles; if it exceeds the throttle threshold, messages will be dropped. func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error { addVal := &addValReq{ topic: topic, @@ -1013,7 +1017,7 @@ func (ps *PubSub) addValidator(req *addValReq) { val := &topicVal{ topic: topic, validate: req.validate, - validateTimeout: defaultValidateTimeout, + validateTimeout: 0, validateThrottle: make(chan struct{}, defaultValidateConcurrency), } @@ -1054,10 +1058,13 @@ func (ps *PubSub) rmValidator(req *rmValReq) { } func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) bool { - vctx, cancel := context.WithTimeout(ctx, val.validateTimeout) - defer cancel() + if val.validateTimeout > 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, val.validateTimeout) + defer cancel() + } - valid := val.validate(vctx, src, msg) + valid := val.validate(ctx, src, msg) if !valid { log.Debugf("validation failed for topic %s", val.topic) } From d8f08cdba70065ae59e89e23f0cbcfd9dcad852f Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 26 Apr 2019 11:27:51 +0300 Subject: [PATCH 4/8] add support for inline (synchronous) validators --- pubsub.go | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/pubsub.go b/pubsub.go index 846b361..b1acc87 100644 --- a/pubsub.go +++ b/pubsub.go @@ -695,11 +695,29 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { return } - if len(vals) > 0 { + var inline, async []*topicVal + for _, val := range vals { + if val.validateInline { + inline = append(inline, val) + } else { + async = append(async, val) + } + } + + // apply inline (synchronous) validators + for _, val := range inline { + if !val.validateMsg(p.ctx, src, msg) { + log.Debugf("message validation failed; dropping message from %s", src) + return + } + } + + // apply async validators + if len(async) > 0 { select { case p.validateThrottle <- struct{}{}: go func() { - p.doValidateTopic(vals, src, msg) + p.doValidateTopic(async, src, msg) <-p.validateThrottle }() default: @@ -708,7 +726,7 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { return } - // no user validators and the signature was valid, send the message + // no async validators, send the message p.sendMsg <- &sendReq{ from: src, msg: msg, @@ -944,6 +962,7 @@ type addValReq struct { validate Validator timeout time.Duration throttle int + inline bool resp chan error } @@ -957,6 +976,7 @@ type topicVal struct { validate Validator validateTimeout time.Duration validateThrottle chan struct{} + validateInline bool } // Validator is a function that validates a message. @@ -983,6 +1003,16 @@ func WithValidatorConcurrency(n int) ValidatorOpt { } } +// WithValidatorInline is an option that sets the validation disposition to synchronous: +// it will be executed inline in validation front-end, without spawning a new goroutine. +// This is suitable for simple or cpu-bound validators that do not block. +func WithValidatorInline(inline bool) ValidatorOpt { + return func(addVal *addValReq) error { + addVal.inline = inline + return nil + } +} + // RegisterTopicValidator registers a validator for topic. // By default validators are asynchronous, which means they will run in a separate goroutine. // The number of active goroutines is controlled by global and per topic validator @@ -1019,6 +1049,7 @@ func (ps *PubSub) addValidator(req *addValReq) { validate: req.validate, validateTimeout: 0, validateThrottle: make(chan struct{}, defaultValidateConcurrency), + validateInline: req.inline, } if req.timeout > 0 { From 8d0c8d60b12cdbcf348aba8fa682792cb71a1c17 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 26 Apr 2019 22:17:29 +0300 Subject: [PATCH 5/8] add option to control number of synchronous validation workers --- pubsub.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pubsub.go b/pubsub.go index b1acc87..26623fb 100644 --- a/pubsub.go +++ b/pubsub.go @@ -99,6 +99,9 @@ type PubSub struct { // validateThrottle limits the number of active validation goroutines validateThrottle chan struct{} + // this is the number of synchronous validation workers + validateWorkers int + // eval thunk in event loop eval chan func() @@ -195,6 +198,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option blacklistPeer: make(chan peer.ID), seenMessages: timecache.NewTimeCache(TimeCacheDuration), counter: uint64(time.Now().UnixNano()), + validateWorkers: runtime.NumCPU(), } for _, opt := range opts { @@ -217,8 +221,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option go ps.processLoop(ctx) - numcpu := runtime.NumCPU() - for i := 0; i < numcpu; i++ { + for i := 0; i < ps.validateWorkers; i++ { go ps.validateWorker() } @@ -234,6 +237,18 @@ func WithValidateThrottle(n int) Option { } } +// WithValidateWorkers sets the number of synchronous validation worker goroutines. +// Defaults to NumCPU. +func WithValidateWorkers(n int) Option { + return func(ps *PubSub) error { + if n > 0 { + ps.validateWorkers = n + return nil + } + return fmt.Errorf("number of validation workers must be > 0") + } +} + // WithMessageSigning enables or disables message signing (enabled by default). func WithMessageSigning(enabled bool) Option { return func(p *PubSub) error { From cb423f474d0b0b46ef1ad2ef840ea5b9df0f4e3f Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 29 Apr 2019 22:45:48 +0300 Subject: [PATCH 6/8] split off validation into its own type --- pubsub.go | 412 +++++--------------------------------------------- validation.go | 383 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 421 insertions(+), 374 deletions(-) create mode 100644 validation.go diff --git a/pubsub.go b/pubsub.go index 26623fb..9875a3c 100644 --- a/pubsub.go +++ b/pubsub.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "fmt" "math/rand" - "runtime" "sync" "sync/atomic" "time" @@ -21,11 +20,6 @@ import ( timecache "github.com/whyrusleeping/timecache" ) -const ( - defaultValidateConcurrency = 1024 - defaultValidateThrottle = 8192 -) - var ( TimeCacheDuration = 120 * time.Second ) @@ -45,6 +39,8 @@ type PubSub struct { rt PubSubRouter + val *validation + // incoming messages from other peers incoming chan *RPC @@ -90,18 +86,6 @@ type PubSub struct { // rmVal handles validator unregistration requests rmVal chan *rmValReq - // topicVals tracks per topic validators - topicVals map[string]*topicVal - - // validateQ is the front-end to the validation pipeline - validateQ chan *validateReq - - // validateThrottle limits the number of active validation goroutines - validateThrottle chan struct{} - - // this is the number of synchronous validation workers - validateWorkers int - // eval thunk in event loop eval chan func() @@ -168,37 +152,34 @@ type Option func(*PubSub) error // NewPubSub returns a new PubSub management object. func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) { ps := &PubSub{ - host: h, - ctx: ctx, - rt: rt, - signID: h.ID(), - signKey: h.Peerstore().PrivKey(h.ID()), - signStrict: true, - incoming: make(chan *RPC, 32), - publish: make(chan *Message), - newPeers: make(chan peer.ID), - newPeerStream: make(chan inet.Stream), - newPeerError: make(chan peer.ID), - peerDead: make(chan peer.ID), - cancelCh: make(chan *Subscription), - getPeers: make(chan *listPeerReq), - addSub: make(chan *addSubReq), - getTopics: make(chan *topicReq), - sendMsg: make(chan *sendReq, 32), - addVal: make(chan *addValReq), - rmVal: make(chan *rmValReq), - validateThrottle: make(chan struct{}, defaultValidateThrottle), - eval: make(chan func()), - myTopics: make(map[string]map[*Subscription]struct{}), - topics: make(map[string]map[peer.ID]struct{}), - peers: make(map[peer.ID]chan *RPC), - topicVals: make(map[string]*topicVal), - validateQ: make(chan *validateReq, 32), - blacklist: NewMapBlacklist(), - blacklistPeer: make(chan peer.ID), - seenMessages: timecache.NewTimeCache(TimeCacheDuration), - counter: uint64(time.Now().UnixNano()), - validateWorkers: runtime.NumCPU(), + host: h, + ctx: ctx, + rt: rt, + val: newValidation(), + signID: h.ID(), + signKey: h.Peerstore().PrivKey(h.ID()), + signStrict: true, + incoming: make(chan *RPC, 32), + publish: make(chan *Message), + newPeers: make(chan peer.ID), + newPeerStream: make(chan inet.Stream), + newPeerError: make(chan peer.ID), + peerDead: make(chan peer.ID), + cancelCh: make(chan *Subscription), + getPeers: make(chan *listPeerReq), + addSub: make(chan *addSubReq), + getTopics: make(chan *topicReq), + sendMsg: make(chan *sendReq, 32), + addVal: make(chan *addValReq), + rmVal: make(chan *rmValReq), + eval: make(chan func()), + myTopics: make(map[string]map[*Subscription]struct{}), + topics: make(map[string]map[peer.ID]struct{}), + peers: make(map[peer.ID]chan *RPC), + blacklist: NewMapBlacklist(), + blacklistPeer: make(chan peer.ID), + seenMessages: timecache.NewTimeCache(TimeCacheDuration), + counter: uint64(time.Now().UnixNano()), } for _, opt := range opts { @@ -219,36 +200,13 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option } h.Network().Notify((*PubSubNotif)(ps)) + ps.val.Start(ps) + go ps.processLoop(ctx) - for i := 0; i < ps.validateWorkers; i++ { - go ps.validateWorker() - } - return ps, nil } -// WithValidateThrottle sets the upper bound on the number of active validation -// goroutines across all topics. The default is 8192. -func WithValidateThrottle(n int) Option { - return func(ps *PubSub) error { - ps.validateThrottle = make(chan struct{}, n) - return nil - } -} - -// WithValidateWorkers sets the number of synchronous validation worker goroutines. -// Defaults to NumCPU. -func WithValidateWorkers(n int) Option { - return func(ps *PubSub) error { - if n > 0 { - ps.validateWorkers = n - return nil - } - return fmt.Errorf("number of validation workers must be > 0") - } -} - // WithMessageSigning enables or disables message signing (enabled by default). func WithMessageSigning(enabled bool) Option { return func(p *PubSub) error { @@ -411,17 +369,16 @@ func (p *PubSub) processLoop(ctx context.Context) { p.handleIncomingRPC(rpc) case msg := <-p.publish: - vals := p.getValidators(msg) - p.pushMsg(vals, p.host.ID(), msg) + p.pushMsg(p.host.ID(), msg) case req := <-p.sendMsg: p.publishMessage(req.from, req.msg.Message) case req := <-p.addVal: - p.addValidator(req) + p.val.AddValidator(req) case req := <-p.rmVal: - p.rmValidator(req) + p.val.RemoveValidator(req) case thunk := <-p.eval: thunk() @@ -629,8 +586,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { } msg := &Message{pmsg} - vals := p.getValidators(msg) - p.pushMsg(vals, rpc.from, msg) + p.pushMsg(rpc.from, msg) } p.rt.HandleRPC(rpc) @@ -642,7 +598,7 @@ func msgID(pmsg *pb.Message) string { } // pushMsg pushes a message performing validation as necessary -func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { +func (p *PubSub) pushMsg(src peer.ID, msg *Message) { // reject messages from blacklisted peers if p.blacklist.Contains(src) { log.Warningf("dropping message from blacklisted peer %s", src) @@ -667,12 +623,7 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { return } - if len(vals) > 0 || msg.Signature != nil { - select { - case p.validateQ <- &validateReq{vals, src, msg}: - default: - log.Warningf("message validation throttled; dropping message from %s", src) - } + if !p.val.Push(src, msg) { return } @@ -681,178 +632,11 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { } } -func (p *PubSub) validateWorker() { - for { - select { - case req := <-p.validateQ: - p.validate(req.vals, req.src, req.msg) - case <-p.ctx.Done(): - return - } - } -} - -// validate performs validation and only sends the message if all validators succeed -// signature validation is performed synchronously, while user validators are invoked -// asynchronously, throttled by the global validation throttle. -func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { - if msg.Signature != nil { - if !p.validateSignature(msg) { - log.Warningf("message signature validation failed; dropping message from %s", src) - return - } - } - - // we can mark the message as seen now that we have verified the signature - // and avoid invoking user validators more than once - id := msgID(msg.Message) - if !p.markSeen(id) { - return - } - - var inline, async []*topicVal - for _, val := range vals { - if val.validateInline { - inline = append(inline, val) - } else { - async = append(async, val) - } - } - - // apply inline (synchronous) validators - for _, val := range inline { - if !val.validateMsg(p.ctx, src, msg) { - log.Debugf("message validation failed; dropping message from %s", src) - return - } - } - - // apply async validators - if len(async) > 0 { - select { - case p.validateThrottle <- struct{}{}: - go func() { - p.doValidateTopic(async, src, msg) - <-p.validateThrottle - }() - default: - log.Warningf("message validation throttled; dropping message from %s", src) - } - return - } - - // no async validators, send the message - p.sendMsg <- &sendReq{ - from: src, - msg: msg, - } -} - -func (p *PubSub) validateSignature(msg *Message) bool { - err := verifyMessageSignature(msg.Message) - if err != nil { - log.Debugf("signature verification error: %s", err.Error()) - return false - } - - return true -} - -func (p *PubSub) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) { - if !p.validateTopic(vals, src, msg) { - log.Warningf("message validation failed; dropping message from %s", src) - return - } - - p.sendMsg <- &sendReq{ - from: src, - msg: msg, - } -} - -func (p *PubSub) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool { - if len(vals) == 1 { - return p.validateSingleTopic(vals[0], src, msg) - } - - ctx, cancel := context.WithCancel(p.ctx) - defer cancel() - - rch := make(chan bool, len(vals)) - rcount := 0 - throttle := false - -loop: - for _, val := range vals { - rcount++ - - select { - case val.validateThrottle <- struct{}{}: - go func(val *topicVal) { - rch <- val.validateMsg(ctx, src, msg) - <-val.validateThrottle - }(val) - - default: - log.Debugf("validation throttled for topic %s", val.topic) - throttle = true - break loop - } - } - - if throttle { - return false - } - - for i := 0; i < rcount; i++ { - valid := <-rch - if !valid { - return false - } - } - - return true -} - -// fast path for single topic validation that avoids the extra goroutine -func (p *PubSub) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) bool { - select { - case val.validateThrottle <- struct{}{}: - ctx, cancel := context.WithCancel(p.ctx) - defer cancel() - - res := val.validateMsg(ctx, src, msg) - <-val.validateThrottle - - return res - - default: - log.Debugf("validation throttled for topic %s", val.topic) - return false - } -} - func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) { p.notifySubs(pmsg) p.rt.Publish(from, pmsg) } -// getValidators returns all validators that apply to a given message -func (p *PubSub) getValidators(msg *Message) []*topicVal { - var vals []*topicVal - - for _, topic := range msg.GetTopicIDs() { - val, ok := p.topicVals[topic] - if !ok { - continue - } - - vals = append(vals, val) - } - - return vals -} - type addSubReq struct { sub *Subscription resp chan *Subscription @@ -964,70 +748,6 @@ func (p *PubSub) BlacklistPeer(pid peer.ID) { p.blacklistPeer <- pid } -// validation requests -type validateReq struct { - vals []*topicVal - src peer.ID - msg *Message -} - -// per topic validators -type addValReq struct { - topic string - validate Validator - timeout time.Duration - throttle int - inline bool - resp chan error -} - -type rmValReq struct { - topic string - resp chan error -} - -type topicVal struct { - topic string - validate Validator - validateTimeout time.Duration - validateThrottle chan struct{} - validateInline bool -} - -// Validator is a function that validates a message. -type Validator func(context.Context, peer.ID, *Message) bool - -// ValidatorOpt is an option for RegisterTopicValidator. -type ValidatorOpt func(addVal *addValReq) error - -// WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. -// By default there is no timeout in asynchronous validators. -func WithValidatorTimeout(timeout time.Duration) ValidatorOpt { - return func(addVal *addValReq) error { - addVal.timeout = timeout - return nil - } -} - -// WithValidatorConcurrency is an option that sets the topic validator throttle. -// This controls the number of active validation goroutines for the topic; the default is 1024. -func WithValidatorConcurrency(n int) ValidatorOpt { - return func(addVal *addValReq) error { - addVal.throttle = n - return nil - } -} - -// WithValidatorInline is an option that sets the validation disposition to synchronous: -// it will be executed inline in validation front-end, without spawning a new goroutine. -// This is suitable for simple or cpu-bound validators that do not block. -func WithValidatorInline(inline bool) ValidatorOpt { - return func(addVal *addValReq) error { - addVal.inline = inline - return nil - } -} - // RegisterTopicValidator registers a validator for topic. // By default validators are asynchronous, which means they will run in a separate goroutine. // The number of active goroutines is controlled by global and per topic validator @@ -1050,35 +770,6 @@ func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...Val return <-addVal.resp } -func (ps *PubSub) addValidator(req *addValReq) { - topic := req.topic - - _, ok := ps.topicVals[topic] - if ok { - req.resp <- fmt.Errorf("Duplicate validator for topic %s", topic) - return - } - - val := &topicVal{ - topic: topic, - validate: req.validate, - validateTimeout: 0, - validateThrottle: make(chan struct{}, defaultValidateConcurrency), - validateInline: req.inline, - } - - if req.timeout > 0 { - val.validateTimeout = req.timeout - } - - if req.throttle > 0 { - val.validateThrottle = make(chan struct{}, req.throttle) - } - - ps.topicVals[topic] = val - req.resp <- nil -} - // UnregisterTopicValidator removes a validator from a topic. // Returns an error if there was no validator registered with the topic. func (p *PubSub) UnregisterTopicValidator(topic string) error { @@ -1090,30 +781,3 @@ func (p *PubSub) UnregisterTopicValidator(topic string) error { p.rmVal <- rmVal return <-rmVal.resp } - -func (ps *PubSub) rmValidator(req *rmValReq) { - topic := req.topic - - _, ok := ps.topicVals[topic] - if ok { - delete(ps.topicVals, topic) - req.resp <- nil - } else { - req.resp <- fmt.Errorf("No validator for topic %s", topic) - } -} - -func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) bool { - if val.validateTimeout > 0 { - var cancel func() - ctx, cancel = context.WithTimeout(ctx, val.validateTimeout) - defer cancel() - } - - valid := val.validate(ctx, src, msg) - if !valid { - log.Debugf("validation failed for topic %s", val.topic) - } - - return valid -} diff --git a/validation.go b/validation.go new file mode 100644 index 0000000..3c2ed57 --- /dev/null +++ b/validation.go @@ -0,0 +1,383 @@ +package pubsub + +import ( + "context" + "fmt" + "runtime" + "time" + + peer "github.com/libp2p/go-libp2p-peer" +) + +const ( + defaultValidateConcurrency = 1024 + defaultValidateThrottle = 8192 +) + +// Validator is a function that validates a message. +type Validator func(context.Context, peer.ID, *Message) bool + +// ValidatorOpt is an option for RegisterTopicValidator. +type ValidatorOpt func(addVal *addValReq) error + +// validation represents the validator pipeline +type validation struct { + p *PubSub + + // topicVals tracks per topic validators + topicVals map[string]*topicVal + + // validateQ is the front-end to the validation pipeline + validateQ chan *validateReq + + // validateThrottle limits the number of active validation goroutines + validateThrottle chan struct{} + + // this is the number of synchronous validation workers + validateWorkers int +} + +// validation requests +type validateReq struct { + vals []*topicVal + src peer.ID + msg *Message +} + +// representation of topic validators +type topicVal struct { + topic string + validate Validator + validateTimeout time.Duration + validateThrottle chan struct{} + validateInline bool +} + +// async request to add a topic validators +type addValReq struct { + topic string + validate Validator + timeout time.Duration + throttle int + inline bool + resp chan error +} + +// async request to remove a topic validator +type rmValReq struct { + topic string + resp chan error +} + +// newValidation creates a new validation pipeline +func newValidation() *validation { + return &validation{ + topicVals: make(map[string]*topicVal), + validateQ: make(chan *validateReq, 32), + validateThrottle: make(chan struct{}, defaultValidateThrottle), + validateWorkers: runtime.NumCPU(), + } +} + +// Start attaches the validation pipeline to a pubsub instance and starts background +// workers +func (v *validation) Start(p *PubSub) { + v.p = p + for i := 0; i < v.validateWorkers; i++ { + go v.validateWorker() + } +} + +// AddValidator adds a new validator +func (v *validation) AddValidator(req *addValReq) { + topic := req.topic + + _, ok := v.topicVals[topic] + if ok { + req.resp <- fmt.Errorf("Duplicate validator for topic %s", topic) + return + } + + val := &topicVal{ + topic: topic, + validate: req.validate, + validateTimeout: 0, + validateThrottle: make(chan struct{}, defaultValidateConcurrency), + validateInline: req.inline, + } + + if req.timeout > 0 { + val.validateTimeout = req.timeout + } + + if req.throttle > 0 { + val.validateThrottle = make(chan struct{}, req.throttle) + } + + v.topicVals[topic] = val + req.resp <- nil +} + +// RemoveValidator removes an existing validator +func (v *validation) RemoveValidator(req *rmValReq) { + topic := req.topic + + _, ok := v.topicVals[topic] + if ok { + delete(v.topicVals, topic) + req.resp <- nil + } else { + req.resp <- fmt.Errorf("No validator for topic %s", topic) + } +} + +// Push pushes a message into the validation pipeline. +// It returns true if the message can be forwarded immediately without validation. +func (v *validation) Push(src peer.ID, msg *Message) bool { + vals := v.getValidators(msg) + + if len(vals) > 0 || msg.Signature != nil { + select { + case v.validateQ <- &validateReq{vals, src, msg}: + default: + log.Warningf("message validation throttled; dropping message from %s", src) + } + return false + } + + return true +} + +// getValidators returns all validators that apply to a given message +func (v *validation) getValidators(msg *Message) []*topicVal { + var vals []*topicVal + + for _, topic := range msg.GetTopicIDs() { + val, ok := v.topicVals[topic] + if !ok { + continue + } + + vals = append(vals, val) + } + + return vals +} + +// validateWorker is an active goroutine performing inline validation +func (v *validation) validateWorker() { + for { + select { + case req := <-v.validateQ: + v.validate(req.vals, req.src, req.msg) + case <-v.p.ctx.Done(): + return + } + } +} + +// validate performs validation and only sends the message if all validators succeed +// signature validation is performed synchronously, while user validators are invoked +// asynchronously, throttled by the global validation throttle. +func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) { + if msg.Signature != nil { + if !v.validateSignature(msg) { + log.Warningf("message signature validation failed; dropping message from %s", src) + return + } + } + + // we can mark the message as seen now that we have verified the signature + // and avoid invoking user validators more than once + id := msgID(msg.Message) + if !v.p.markSeen(id) { + return + } + + var inline, async []*topicVal + for _, val := range vals { + if val.validateInline { + inline = append(inline, val) + } else { + async = append(async, val) + } + } + + // apply inline (synchronous) validators + for _, val := range inline { + if !val.validateMsg(v.p.ctx, src, msg) { + log.Debugf("message validation failed; dropping message from %s", src) + return + } + } + + // apply async validators + if len(async) > 0 { + select { + case v.validateThrottle <- struct{}{}: + go func() { + v.doValidateTopic(async, src, msg) + <-v.validateThrottle + }() + default: + log.Warningf("message validation throttled; dropping message from %s", src) + } + return + } + + // no async validators, send the message + v.p.sendMsg <- &sendReq{ + from: src, + msg: msg, + } +} + +func (v *validation) validateSignature(msg *Message) bool { + err := verifyMessageSignature(msg.Message) + if err != nil { + log.Debugf("signature verification error: %s", err.Error()) + return false + } + + return true +} + +func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) { + if !v.validateTopic(vals, src, msg) { + log.Warningf("message validation failed; dropping message from %s", src) + return + } + + v.p.sendMsg <- &sendReq{ + from: src, + msg: msg, + } +} + +func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool { + if len(vals) == 1 { + return v.validateSingleTopic(vals[0], src, msg) + } + + ctx, cancel := context.WithCancel(v.p.ctx) + defer cancel() + + rch := make(chan bool, len(vals)) + rcount := 0 + throttle := false + +loop: + for _, val := range vals { + rcount++ + + select { + case val.validateThrottle <- struct{}{}: + go func(val *topicVal) { + rch <- val.validateMsg(ctx, src, msg) + <-val.validateThrottle + }(val) + + default: + log.Debugf("validation throttled for topic %s", val.topic) + throttle = true + break loop + } + } + + if throttle { + return false + } + + for i := 0; i < rcount; i++ { + valid := <-rch + if !valid { + return false + } + } + + return true +} + +// fast path for single topic validation that avoids the extra goroutine +func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) bool { + select { + case val.validateThrottle <- struct{}{}: + ctx, cancel := context.WithCancel(v.p.ctx) + defer cancel() + + res := val.validateMsg(ctx, src, msg) + <-val.validateThrottle + + return res + + default: + log.Debugf("validation throttled for topic %s", val.topic) + return false + } +} + +func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) bool { + if val.validateTimeout > 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, val.validateTimeout) + defer cancel() + } + + valid := val.validate(ctx, src, msg) + if !valid { + log.Debugf("validation failed for topic %s", val.topic) + } + + return valid +} + +/// Options + +// WithValidateThrottle sets the upper bound on the number of active validation +// goroutines across all topics. The default is 8192. +func WithValidateThrottle(n int) Option { + return func(ps *PubSub) error { + ps.val.validateThrottle = make(chan struct{}, n) + return nil + } +} + +// WithValidateWorkers sets the number of synchronous validation worker goroutines. +// Defaults to NumCPU. +func WithValidateWorkers(n int) Option { + return func(ps *PubSub) error { + if n > 0 { + ps.val.validateWorkers = n + return nil + } + return fmt.Errorf("number of validation workers must be > 0") + } +} + +// WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. +// By default there is no timeout in asynchronous validators. +func WithValidatorTimeout(timeout time.Duration) ValidatorOpt { + return func(addVal *addValReq) error { + addVal.timeout = timeout + return nil + } +} + +// WithValidatorConcurrency is an option that sets the topic validator throttle. +// This controls the number of active validation goroutines for the topic; the default is 1024. +func WithValidatorConcurrency(n int) ValidatorOpt { + return func(addVal *addValReq) error { + addVal.throttle = n + return nil + } +} + +// WithValidatorInline is an option that sets the validation disposition to synchronous: +// it will be executed inline in validation front-end, without spawning a new goroutine. +// This is suitable for simple or cpu-bound validators that do not block. +func WithValidatorInline(inline bool) ValidatorOpt { + return func(addVal *addValReq) error { + addVal.inline = inline + return nil + } +} From f4d9eeec26dab8f9a085d8d60d84ea6749bbbc9b Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 14 May 2019 13:57:44 +0300 Subject: [PATCH 7/8] improve godocs --- validation.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/validation.go b/validation.go index 3c2ed57..6468308 100644 --- a/validation.go +++ b/validation.go @@ -20,7 +20,14 @@ type Validator func(context.Context, peer.ID, *Message) bool // ValidatorOpt is an option for RegisterTopicValidator. type ValidatorOpt func(addVal *addValReq) error -// validation represents the validator pipeline +// validation represents the validator pipeline. +// The validator pipeline performs signature validation and runs a +// sequence of user-configured validators per-topic. It is possible to +// adjust various concurrency parameters, such as the number of +// workers and the max number of simultaneous validations. The user +// can also attach inline validators that will be executed +// synchronously; this may be useful to prevent superfluous +// context-switching for lightweight tasks. type validation struct { p *PubSub @@ -344,6 +351,10 @@ func WithValidateThrottle(n int) Option { // WithValidateWorkers sets the number of synchronous validation worker goroutines. // Defaults to NumCPU. +// +// The synchronous validation workers perform signature validation, apply inline +// user validators, and schedule asynchronous user validators. +// You can adjust this parameter to devote less cpu time to synchronous validation. func WithValidateWorkers(n int) Option { return func(ps *PubSub) error { if n > 0 { From 2df9d940bff7bcbadb5b09d6125916f3485c202f Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 14 May 2019 14:17:32 +0300 Subject: [PATCH 8/8] remove unnecessary context in validateSingleTopic --- validation.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/validation.go b/validation.go index 6468308..023ee86 100644 --- a/validation.go +++ b/validation.go @@ -309,10 +309,7 @@ loop: func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) bool { select { case val.validateThrottle <- struct{}{}: - ctx, cancel := context.WithCancel(v.p.ctx) - defer cancel() - - res := val.validateMsg(ctx, src, msg) + res := val.validateMsg(v.p.ctx, src, msg) <-val.validateThrottle return res