Configurable size of validate queue

validateWorker() reads from validateQ and invokes validate function
that performs validation of the message. Signature validation is performed
synchronously. The number of validate workers defaults to the number of CPUs
and can be updated with WithValidateWorkers function. With no additional user
validators, signature validation is the bottleneck when receiving new messages.

Increasing the number of validating workers does not help given the context
switching and bottleneck nature of this spot. As stated in WithValidateWorkers
documentation, this function should be used rather to limit the number of workers
to devote less CPU time for synchronous validation. On the other hand, with the
default size of `validateQ`, some applications built on a top of libp2p may
experience throttled validation and lost messages.

This problem is addressed by WithValidateQueueSize allowing to configure the buffer
size for synchronous validation. Application developers knowing the nature of their
protocols can set this value to minimise the possibility of throttled synchronous
validation and dropped messages. Configurable buffer size allows to gracefully
handle peaks of messages and, from the other side, the number of concurrent
synchronous workers is still limited by validateWorkers property so the receiver
should not get congested.
This commit is contained in:
Piotr Dyraga 2020-01-21 10:19:42 +01:00
parent 97846b5748
commit 499109b165
No known key found for this signature in database
GPG Key ID: 5BA730370A97B009
1 changed files with 14 additions and 1 deletions

View File

@ -10,6 +10,7 @@ import (
)
const (
defaultValidateQueueSize = 32
defaultValidateConcurrency = 1024
defaultValidateThrottle = 8192
)
@ -82,7 +83,7 @@ type rmValReq struct {
func newValidation() *validation {
return &validation{
topicVals: make(map[string]*topicVal),
validateQ: make(chan *validateReq, 32),
validateQ: make(chan *validateReq, defaultValidateQueueSize),
validateThrottle: make(chan struct{}, defaultValidateThrottle),
validateWorkers: runtime.NumCPU(),
}
@ -342,6 +343,18 @@ func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message)
/// Options
// WithValidateQueueSize sets the buffer of validate queue. Defaults to 32.
// When queue is full, validation is throttled and new messages are dropped.
func WithValidateQueueSize(n int) Option {
return func(ps *PubSub) error {
if n > 0 {
ps.val.validateQ = make(chan *validateReq, n)
return nil
}
return fmt.Errorf("validate queue size must be > 0")
}
}
// WithValidateThrottle sets the upper bound on the number of active validation
// goroutines across all topics. The default is 8192.
func WithValidateThrottle(n int) Option {