diff --git a/pubsub.go b/pubsub.go index 7cba8cf..ef5bcf0 100644 --- a/pubsub.go +++ b/pubsub.go @@ -159,7 +159,7 @@ type PubSubRouter interface { type Message struct { *pb.Message - ReceivedFrom peer.ID + ReceivedFrom peer.ID ValidatorData interface{} } diff --git a/validation.go b/validation.go index 70b4cde..d6ae546 100644 --- a/validation.go +++ b/validation.go @@ -10,6 +10,7 @@ import ( ) const ( + defaultValidateQueueSize = 32 defaultValidateConcurrency = 1024 defaultValidateThrottle = 8192 ) @@ -82,7 +83,7 @@ type rmValReq struct { func newValidation() *validation { return &validation{ topicVals: make(map[string]*topicVal), - validateQ: make(chan *validateReq, 32), + validateQ: make(chan *validateReq, defaultValidateQueueSize), validateThrottle: make(chan struct{}, defaultValidateThrottle), validateWorkers: runtime.NumCPU(), } @@ -342,6 +343,18 @@ func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) /// Options +// WithValidateQueueSize sets the buffer of validate queue. Defaults to 32. +// When queue is full, validation is throttled and new messages are dropped. +func WithValidateQueueSize(n int) Option { + return func(ps *PubSub) error { + if n > 0 { + ps.val.validateQ = make(chan *validateReq, n) + return nil + } + return fmt.Errorf("validate queue size must be > 0") + } +} + // WithValidateThrottle sets the upper bound on the number of active validation // goroutines across all topics. The default is 8192. func WithValidateThrottle(n int) Option {