go-libp2p-pubsub/validation.go
Piotr Dyraga 499109b165
Configurable size of validate queue
validateWorker() reads from validateQ and invokes validate function
that performs validation of the message. Signature validation is performed
synchronously. The number of validate workers defaults to the number of CPUs
and can be updated with WithValidateWorkers function. With no additional user
validators, signature validation is the bottleneck when receiving new messages.

Increasing the number of validating workers does not help given the context
switching and bottleneck nature of this spot. As stated in WithValidateWorkers
documentation, this function should be used rather to limit the number of workers
to devote less CPU time for synchronous validation. On the other hand, with the
default size of `validateQ`, some applications built on a top of libp2p may
experience throttled validation and lost messages.

This problem is addressed by WithValidateQueueSize allowing to configure the buffer
size for synchronous validation. Application developers knowing the nature of their
protocols can set this value to minimise the possibility of throttled synchronous
validation and dropped messages. Configurable buffer size allows to gracefully
handle peaks of messages and, from the other side, the number of concurrent
synchronous workers is still limited by validateWorkers property so the receiver
should not get congested.
2020-01-21 10:19:42 +01:00

410 lines
10 KiB
Go

package pubsub
import (
"context"
"fmt"
"runtime"
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
const (
defaultValidateQueueSize = 32
defaultValidateConcurrency = 1024
defaultValidateThrottle = 8192
)
// Validator is a function that validates a message.
type Validator func(context.Context, peer.ID, *Message) bool
// ValidatorOpt is an option for RegisterTopicValidator.
type ValidatorOpt func(addVal *addValReq) error
// validation represents the validator pipeline.
// The validator pipeline performs signature validation and runs a
// sequence of user-configured validators per-topic. It is possible to
// adjust various concurrency parameters, such as the number of
// workers and the max number of simultaneous validations. The user
// can also attach inline validators that will be executed
// synchronously; this may be useful to prevent superfluous
// context-switching for lightweight tasks.
type validation struct {
p *PubSub
tracer *pubsubTracer
// topicVals tracks per topic validators
topicVals map[string]*topicVal
// validateQ is the front-end to the validation pipeline
validateQ chan *validateReq
// validateThrottle limits the number of active validation goroutines
validateThrottle chan struct{}
// this is the number of synchronous validation workers
validateWorkers int
}
// validation requests
type validateReq struct {
vals []*topicVal
src peer.ID
msg *Message
}
// representation of topic validators
type topicVal struct {
topic string
validate Validator
validateTimeout time.Duration
validateThrottle chan struct{}
validateInline bool
}
// async request to add a topic validators
type addValReq struct {
topic string
validate Validator
timeout time.Duration
throttle int
inline bool
resp chan error
}
// async request to remove a topic validator
type rmValReq struct {
topic string
resp chan error
}
// newValidation creates a new validation pipeline
func newValidation() *validation {
return &validation{
topicVals: make(map[string]*topicVal),
validateQ: make(chan *validateReq, defaultValidateQueueSize),
validateThrottle: make(chan struct{}, defaultValidateThrottle),
validateWorkers: runtime.NumCPU(),
}
}
// Start attaches the validation pipeline to a pubsub instance and starts background
// workers
func (v *validation) Start(p *PubSub) {
v.p = p
v.tracer = p.tracer
for i := 0; i < v.validateWorkers; i++ {
go v.validateWorker()
}
}
// AddValidator adds a new validator
func (v *validation) AddValidator(req *addValReq) {
topic := req.topic
_, ok := v.topicVals[topic]
if ok {
req.resp <- fmt.Errorf("Duplicate validator for topic %s", topic)
return
}
val := &topicVal{
topic: topic,
validate: req.validate,
validateTimeout: 0,
validateThrottle: make(chan struct{}, defaultValidateConcurrency),
validateInline: req.inline,
}
if req.timeout > 0 {
val.validateTimeout = req.timeout
}
if req.throttle > 0 {
val.validateThrottle = make(chan struct{}, req.throttle)
}
v.topicVals[topic] = val
req.resp <- nil
}
// RemoveValidator removes an existing validator
func (v *validation) RemoveValidator(req *rmValReq) {
topic := req.topic
_, ok := v.topicVals[topic]
if ok {
delete(v.topicVals, topic)
req.resp <- nil
} else {
req.resp <- fmt.Errorf("No validator for topic %s", topic)
}
}
// Push pushes a message into the validation pipeline.
// It returns true if the message can be forwarded immediately without validation.
func (v *validation) Push(src peer.ID, msg *Message) bool {
vals := v.getValidators(msg)
if len(vals) > 0 || msg.Signature != nil {
select {
case v.validateQ <- &validateReq{vals, src, msg}:
default:
log.Warningf("message validation throttled; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation throttled")
}
return false
}
return true
}
// getValidators returns all validators that apply to a given message
func (v *validation) getValidators(msg *Message) []*topicVal {
var vals []*topicVal
for _, topic := range msg.GetTopicIDs() {
val, ok := v.topicVals[topic]
if !ok {
continue
}
vals = append(vals, val)
}
return vals
}
// validateWorker is an active goroutine performing inline validation
func (v *validation) validateWorker() {
for {
select {
case req := <-v.validateQ:
v.validate(req.vals, req.src, req.msg)
case <-v.p.ctx.Done():
return
}
}
}
// validate performs validation and only sends the message if all validators succeed
// signature validation is performed synchronously, while user validators are invoked
// asynchronously, throttled by the global validation throttle.
func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
if msg.Signature != nil {
if !v.validateSignature(msg) {
log.Warningf("message signature validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, "invalid signature")
return
}
}
// we can mark the message as seen now that we have verified the signature
// and avoid invoking user validators more than once
id := v.p.msgID(msg.Message)
if !v.p.markSeen(id) {
v.tracer.DuplicateMessage(msg)
return
}
var inline, async []*topicVal
for _, val := range vals {
if val.validateInline {
inline = append(inline, val)
} else {
async = append(async, val)
}
}
// apply inline (synchronous) validators
for _, val := range inline {
if !val.validateMsg(v.p.ctx, src, msg) {
log.Debugf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation failed")
return
}
}
// apply async validators
if len(async) > 0 {
select {
case v.validateThrottle <- struct{}{}:
go func() {
v.doValidateTopic(async, src, msg)
<-v.validateThrottle
}()
default:
log.Warningf("message validation throttled; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation throttled")
}
return
}
// no async validators, send the message
v.p.sendMsg <- msg
}
func (v *validation) validateSignature(msg *Message) bool {
err := verifyMessageSignature(msg.Message)
if err != nil {
log.Debugf("signature verification error: %s", err.Error())
return false
}
return true
}
func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) {
if !v.validateTopic(vals, src, msg) {
log.Warningf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation failed")
return
}
v.p.sendMsg <- msg
}
func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool {
if len(vals) == 1 {
return v.validateSingleTopic(vals[0], src, msg)
}
ctx, cancel := context.WithCancel(v.p.ctx)
defer cancel()
rch := make(chan bool, len(vals))
rcount := 0
throttle := false
loop:
for _, val := range vals {
rcount++
select {
case val.validateThrottle <- struct{}{}:
go func(val *topicVal) {
rch <- val.validateMsg(ctx, src, msg)
<-val.validateThrottle
}(val)
default:
log.Debugf("validation throttled for topic %s", val.topic)
throttle = true
break loop
}
}
if throttle {
v.tracer.RejectMessage(msg, "validation throttled")
return false
}
for i := 0; i < rcount; i++ {
valid := <-rch
if !valid {
return false
}
}
return true
}
// fast path for single topic validation that avoids the extra goroutine
func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) bool {
select {
case val.validateThrottle <- struct{}{}:
res := val.validateMsg(v.p.ctx, src, msg)
<-val.validateThrottle
return res
default:
log.Debugf("validation throttled for topic %s", val.topic)
v.tracer.RejectMessage(msg, "validation throttled")
return false
}
}
func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) bool {
if val.validateTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, val.validateTimeout)
defer cancel()
}
valid := val.validate(ctx, src, msg)
if !valid {
log.Debugf("validation failed for topic %s", val.topic)
}
return valid
}
/// Options
// WithValidateQueueSize sets the buffer of validate queue. Defaults to 32.
// When queue is full, validation is throttled and new messages are dropped.
func WithValidateQueueSize(n int) Option {
return func(ps *PubSub) error {
if n > 0 {
ps.val.validateQ = make(chan *validateReq, n)
return nil
}
return fmt.Errorf("validate queue size must be > 0")
}
}
// WithValidateThrottle sets the upper bound on the number of active validation
// goroutines across all topics. The default is 8192.
func WithValidateThrottle(n int) Option {
return func(ps *PubSub) error {
ps.val.validateThrottle = make(chan struct{}, n)
return nil
}
}
// WithValidateWorkers sets the number of synchronous validation worker goroutines.
// Defaults to NumCPU.
//
// The synchronous validation workers perform signature validation, apply inline
// user validators, and schedule asynchronous user validators.
// You can adjust this parameter to devote less cpu time to synchronous validation.
func WithValidateWorkers(n int) Option {
return func(ps *PubSub) error {
if n > 0 {
ps.val.validateWorkers = n
return nil
}
return fmt.Errorf("number of validation workers must be > 0")
}
}
// WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator.
// By default there is no timeout in asynchronous validators.
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt {
return func(addVal *addValReq) error {
addVal.timeout = timeout
return nil
}
}
// WithValidatorConcurrency is an option that sets the topic validator throttle.
// This controls the number of active validation goroutines for the topic; the default is 1024.
func WithValidatorConcurrency(n int) ValidatorOpt {
return func(addVal *addValReq) error {
addVal.throttle = n
return nil
}
}
// WithValidatorInline is an option that sets the validation disposition to synchronous:
// it will be executed inline in validation front-end, without spawning a new goroutine.
// This is suitable for simple or cpu-bound validators that do not block.
func WithValidatorInline(inline bool) ValidatorOpt {
return func(addVal *addValReq) error {
addVal.inline = inline
return nil
}
}