From d8f08cdba70065ae59e89e23f0cbcfd9dcad852f Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 26 Apr 2019 11:27:51 +0300 Subject: [PATCH] add support for inline (synchronous) validators --- pubsub.go | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/pubsub.go b/pubsub.go index 846b361..b1acc87 100644 --- a/pubsub.go +++ b/pubsub.go @@ -695,11 +695,29 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { return } - if len(vals) > 0 { + var inline, async []*topicVal + for _, val := range vals { + if val.validateInline { + inline = append(inline, val) + } else { + async = append(async, val) + } + } + + // apply inline (synchronous) validators + for _, val := range inline { + if !val.validateMsg(p.ctx, src, msg) { + log.Debugf("message validation failed; dropping message from %s", src) + return + } + } + + // apply async validators + if len(async) > 0 { select { case p.validateThrottle <- struct{}{}: go func() { - p.doValidateTopic(vals, src, msg) + p.doValidateTopic(async, src, msg) <-p.validateThrottle }() default: @@ -708,7 +726,7 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { return } - // no user validators and the signature was valid, send the message + // no async validators, send the message p.sendMsg <- &sendReq{ from: src, msg: msg, @@ -944,6 +962,7 @@ type addValReq struct { validate Validator timeout time.Duration throttle int + inline bool resp chan error } @@ -957,6 +976,7 @@ type topicVal struct { validate Validator validateTimeout time.Duration validateThrottle chan struct{} + validateInline bool } // Validator is a function that validates a message. @@ -983,6 +1003,16 @@ func WithValidatorConcurrency(n int) ValidatorOpt { } } +// WithValidatorInline is an option that sets the validation disposition to synchronous: +// it will be executed inline in validation front-end, without spawning a new goroutine. +// This is suitable for simple or cpu-bound validators that do not block. +func WithValidatorInline(inline bool) ValidatorOpt { + return func(addVal *addValReq) error { + addVal.inline = inline + return nil + } +} + // RegisterTopicValidator registers a validator for topic. // By default validators are asynchronous, which means they will run in a separate goroutine. // The number of active goroutines is controlled by global and per topic validator @@ -1019,6 +1049,7 @@ func (ps *PubSub) addValidator(req *addValReq) { validate: req.validate, validateTimeout: 0, validateThrottle: make(chan struct{}, defaultValidateConcurrency), + validateInline: req.inline, } if req.timeout > 0 {