From b84a32a4ee66e932b33ebfde8ab8dfab8a3abc22 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 26 Apr 2019 11:07:39 +0300 Subject: [PATCH] remove default async validation timeout and increase default topic validation throttle. and some better documentation. --- pubsub.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/pubsub.go b/pubsub.go index a3638ee..846b361 100644 --- a/pubsub.go +++ b/pubsub.go @@ -22,8 +22,7 @@ import ( ) const ( - defaultValidateTimeout = 150 * time.Millisecond - defaultValidateConcurrency = 100 + defaultValidateConcurrency = 1024 defaultValidateThrottle = 8192 ) @@ -227,7 +226,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option } // WithValidateThrottle sets the upper bound on the number of active validation -// goroutines. +// goroutines across all topics. The default is 8192. func WithValidateThrottle(n int) Option { return func(ps *PubSub) error { ps.validateThrottle = make(chan struct{}, n) @@ -966,7 +965,8 @@ type Validator func(context.Context, peer.ID, *Message) bool // ValidatorOpt is an option for RegisterTopicValidator. type ValidatorOpt func(addVal *addValReq) error -// WithValidatorTimeout is an option that sets the topic validator timeout. +// WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. +// By default there is no timeout in asynchronous validators. func WithValidatorTimeout(timeout time.Duration) ValidatorOpt { return func(addVal *addValReq) error { addVal.timeout = timeout @@ -974,7 +974,8 @@ func WithValidatorTimeout(timeout time.Duration) ValidatorOpt { } } -// WithValidatorConcurrency is an option that sets topic validator throttle. +// WithValidatorConcurrency is an option that sets the topic validator throttle. +// This controls the number of active validation goroutines for the topic; the default is 1024. func WithValidatorConcurrency(n int) ValidatorOpt { return func(addVal *addValReq) error { addVal.throttle = n @@ -983,6 +984,9 @@ func WithValidatorConcurrency(n int) ValidatorOpt { } // RegisterTopicValidator registers a validator for topic. +// By default validators are asynchronous, which means they will run in a separate goroutine. +// The number of active goroutines is controlled by global and per topic validator +// throttles; if it exceeds the throttle threshold, messages will be dropped. func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error { addVal := &addValReq{ topic: topic, @@ -1013,7 +1017,7 @@ func (ps *PubSub) addValidator(req *addValReq) { val := &topicVal{ topic: topic, validate: req.validate, - validateTimeout: defaultValidateTimeout, + validateTimeout: 0, validateThrottle: make(chan struct{}, defaultValidateConcurrency), } @@ -1054,10 +1058,13 @@ func (ps *PubSub) rmValidator(req *rmValReq) { } func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) bool { - vctx, cancel := context.WithTimeout(ctx, val.validateTimeout) - defer cancel() + if val.validateTimeout > 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, val.validateTimeout) + defer cancel() + } - valid := val.validate(vctx, src, msg) + valid := val.validate(ctx, src, msg) if !valid { log.Debugf("validation failed for topic %s", val.topic) }