From 499109b16542dfd4f6e02380362a479b2687abf1 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Tue, 21 Jan 2020 10:19:42 +0100 Subject: [PATCH 1/2] Configurable size of validate queue validateWorker() reads from validateQ and invokes validate function that performs validation of the message. Signature validation is performed synchronously. The number of validate workers defaults to the number of CPUs and can be updated with WithValidateWorkers function. With no additional user validators, signature validation is the bottleneck when receiving new messages. Increasing the number of validating workers does not help given the context switching and bottleneck nature of this spot. As stated in WithValidateWorkers documentation, this function should be used rather to limit the number of workers to devote less CPU time for synchronous validation. On the other hand, with the default size of `validateQ`, some applications built on a top of libp2p may experience throttled validation and lost messages. This problem is addressed by WithValidateQueueSize allowing to configure the buffer size for synchronous validation. Application developers knowing the nature of their protocols can set this value to minimise the possibility of throttled synchronous validation and dropped messages. Configurable buffer size allows to gracefully handle peaks of messages and, from the other side, the number of concurrent synchronous workers is still limited by validateWorkers property so the receiver should not get congested. --- validation.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/validation.go b/validation.go index 70b4cde..d6ae546 100644 --- a/validation.go +++ b/validation.go @@ -10,6 +10,7 @@ import ( ) const ( + defaultValidateQueueSize = 32 defaultValidateConcurrency = 1024 defaultValidateThrottle = 8192 ) @@ -82,7 +83,7 @@ type rmValReq struct { func newValidation() *validation { return &validation{ topicVals: make(map[string]*topicVal), - validateQ: make(chan *validateReq, 32), + validateQ: make(chan *validateReq, defaultValidateQueueSize), validateThrottle: make(chan struct{}, defaultValidateThrottle), validateWorkers: runtime.NumCPU(), } @@ -342,6 +343,18 @@ func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) /// Options +// WithValidateQueueSize sets the buffer of validate queue. Defaults to 32. +// When queue is full, validation is throttled and new messages are dropped. +func WithValidateQueueSize(n int) Option { + return func(ps *PubSub) error { + if n > 0 { + ps.val.validateQ = make(chan *validateReq, n) + return nil + } + return fmt.Errorf("validate queue size must be > 0") + } +} + // WithValidateThrottle sets the upper bound on the number of active validation // goroutines across all topics. The default is 8192. func WithValidateThrottle(n int) Option { From ae00326b96baf8e07ba5d7c8eccd1846edfdb249 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Mon, 27 Jan 2020 13:44:03 +0100 Subject: [PATCH 2/2] go fmt on pubsub.go --- pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub.go b/pubsub.go index 7cba8cf..ef5bcf0 100644 --- a/pubsub.go +++ b/pubsub.go @@ -159,7 +159,7 @@ type PubSubRouter interface { type Message struct { *pb.Message - ReceivedFrom peer.ID + ReceivedFrom peer.ID ValidatorData interface{} }