package pubsub import ( "context" "fmt" "runtime" "sync" "time" "github.com/libp2p/go-libp2p-core/peer" ) const ( defaultValidateQueueSize = 32 defaultValidateConcurrency = 1024 defaultValidateThrottle = 8192 ) // ValidationError is an error that may be signalled from message publication when the message // fails validation type ValidationError struct { Reason string } func (e ValidationError) Error() string { return e.Reason } // Validator is a function that validates a message with a binary decision: accept or reject. type Validator func(context.Context, peer.ID, *Message) bool // ValidatorEx is an extended validation function that validates a message with an enumerated decision type ValidatorEx func(context.Context, peer.ID, *Message) ValidationResult // ValidationResult represents the decision of an extended validator type ValidationResult int const ( // ValidationAccept is a validation decision that indicates a valid message that should be accepted and // delivered to the application and forwarded to the network. ValidationAccept = ValidationResult(0) // ValidationReject is a validation decision that indicates an invalid message that should not be // delivered to the application or forwarded to the application. Furthermore the peer that forwarded // the message should be penalized by peer scoring routers. ValidationReject = ValidationResult(1) // ValidationIgnore is a validation decision that indicates a message that should be ignored: it will // be neither delivered to the application nor forwarded to the network. However, in contrast to // ValidationReject, the peer that forwarded the message must not be penalized by peer scoring routers. ValidationIgnore = ValidationResult(2) // internal validationThrottled = ValidationResult(-1) ) // ValidatorOpt is an option for RegisterTopicValidator. type ValidatorOpt func(addVal *addValReq) error // validation represents the validator pipeline. // The validator pipeline performs signature validation and runs a // sequence of user-configured validators per-topic. It is possible to // adjust various concurrency parameters, such as the number of // workers and the max number of simultaneous validations. The user // can also attach inline validators that will be executed // synchronously; this may be useful to prevent superfluous // context-switching for lightweight tasks. type validation struct { p *PubSub tracer *pubsubTracer // mx protects the validator map mx sync.Mutex // topicVals tracks per topic validators topicVals map[string]*topicVal // validateQ is the front-end to the validation pipeline validateQ chan *validateReq // validateThrottle limits the number of active validation goroutines validateThrottle chan struct{} // this is the number of synchronous validation workers validateWorkers int } // validation requests type validateReq struct { vals []*topicVal src peer.ID msg *Message } // representation of topic validators type topicVal struct { topic string validate ValidatorEx validateTimeout time.Duration validateThrottle chan struct{} validateInline bool } // async request to add a topic validators type addValReq struct { topic string validate interface{} timeout time.Duration throttle int inline bool resp chan error } // async request to remove a topic validator type rmValReq struct { topic string resp chan error } // newValidation creates a new validation pipeline func newValidation() *validation { return &validation{ topicVals: make(map[string]*topicVal), validateQ: make(chan *validateReq, defaultValidateQueueSize), validateThrottle: make(chan struct{}, defaultValidateThrottle), validateWorkers: runtime.NumCPU(), } } // Start attaches the validation pipeline to a pubsub instance and starts background // workers func (v *validation) Start(p *PubSub) { v.p = p v.tracer = p.tracer for i := 0; i < v.validateWorkers; i++ { go v.validateWorker() } } // AddValidator adds a new validator func (v *validation) AddValidator(req *addValReq) { v.mx.Lock() defer v.mx.Unlock() topic := req.topic _, ok := v.topicVals[topic] if ok { req.resp <- fmt.Errorf("duplicate validator for topic %s", topic) return } makeValidatorEx := func(v Validator) ValidatorEx { return func(ctx context.Context, p peer.ID, msg *Message) ValidationResult { if v(ctx, p, msg) { return ValidationAccept } else { return ValidationReject } } } var validator ValidatorEx switch v := req.validate.(type) { case func(ctx context.Context, p peer.ID, msg *Message) bool: validator = makeValidatorEx(Validator(v)) case Validator: validator = makeValidatorEx(v) case func(ctx context.Context, p peer.ID, msg *Message) ValidationResult: validator = ValidatorEx(v) case ValidatorEx: validator = v default: req.resp <- fmt.Errorf("unknown validator type for topic %s; must be an instance of Validator or ValidatorEx", topic) return } val := &topicVal{ topic: topic, validate: validator, validateTimeout: 0, validateThrottle: make(chan struct{}, defaultValidateConcurrency), validateInline: req.inline, } if req.timeout > 0 { val.validateTimeout = req.timeout } if req.throttle > 0 { val.validateThrottle = make(chan struct{}, req.throttle) } v.topicVals[topic] = val req.resp <- nil } // RemoveValidator removes an existing validator func (v *validation) RemoveValidator(req *rmValReq) { v.mx.Lock() defer v.mx.Unlock() topic := req.topic _, ok := v.topicVals[topic] if ok { delete(v.topicVals, topic) req.resp <- nil } else { req.resp <- fmt.Errorf("no validator for topic %s", topic) } } // PushLocal synchronously pushes a locally published message and performs applicable // validations. // Returns an error if validation fails func (v *validation) PushLocal(msg *Message) error { v.p.tracer.PublishMessage(msg) err := v.p.checkSigningPolicy(msg) if err != nil { return err } vals := v.getValidators(msg) return v.validate(vals, msg.ReceivedFrom, msg, true) } // Push pushes a message into the validation pipeline. // It returns true if the message can be forwarded immediately without validation. func (v *validation) Push(src peer.ID, msg *Message) bool { vals := v.getValidators(msg) if len(vals) > 0 || msg.Signature != nil { select { case v.validateQ <- &validateReq{vals, src, msg}: default: log.Debugf("message validation throttled: queue full; dropping message from %s", src) v.tracer.RejectMessage(msg, RejectValidationQueueFull) } return false } return true } // getValidators returns all validators that apply to a given message func (v *validation) getValidators(msg *Message) []*topicVal { v.mx.Lock() defer v.mx.Unlock() topic := msg.GetTopic() val, ok := v.topicVals[topic] if !ok { return nil } return []*topicVal{val} } // validateWorker is an active goroutine performing inline validation func (v *validation) validateWorker() { for { select { case req := <-v.validateQ: v.validate(req.vals, req.src, req.msg, false) case <-v.p.ctx.Done(): return } } } // validate performs validation and only sends the message if all validators succeed func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message, synchronous bool) error { // If signature verification is enabled, but signing is disabled, // the Signature is required to be nil upon receiving the message in PubSub.pushMsg. if msg.Signature != nil { if !v.validateSignature(msg) { log.Debugf("message signature validation failed; dropping message from %s", src) v.tracer.RejectMessage(msg, RejectInvalidSignature) return ValidationError{Reason: RejectInvalidSignature} } } // we can mark the message as seen now that we have verified the signature // and avoid invoking user validators more than once id := v.p.msgID(msg.Message) if !v.p.markSeen(id) { v.tracer.DuplicateMessage(msg) return nil } else { v.tracer.ValidateMessage(msg) } var inline, async []*topicVal for _, val := range vals { if val.validateInline || synchronous { inline = append(inline, val) } else { async = append(async, val) } } // apply inline (synchronous) validators result := ValidationAccept loop: for _, val := range inline { switch val.validateMsg(v.p.ctx, src, msg) { case ValidationAccept: case ValidationReject: result = ValidationReject break loop case ValidationIgnore: result = ValidationIgnore } } if result == ValidationReject { log.Debugf("message validation failed; dropping message from %s", src) v.tracer.RejectMessage(msg, RejectValidationFailed) return ValidationError{Reason: RejectValidationFailed} } // apply async validators if len(async) > 0 { select { case v.validateThrottle <- struct{}{}: go func() { v.doValidateTopic(async, src, msg, result) <-v.validateThrottle }() default: log.Debugf("message validation throttled; dropping message from %s", src) v.tracer.RejectMessage(msg, RejectValidationThrottled) } return nil } if result == ValidationIgnore { v.tracer.RejectMessage(msg, RejectValidationIgnored) return ValidationError{Reason: RejectValidationIgnored} } // no async validators, accepted message, send it! select { case v.p.sendMsg <- msg: return nil case <-v.p.ctx.Done(): return v.p.ctx.Err() } } func (v *validation) validateSignature(msg *Message) bool { err := verifyMessageSignature(msg.Message) if err != nil { log.Debugf("signature verification error: %s", err.Error()) return false } return true } func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message, r ValidationResult) { result := v.validateTopic(vals, src, msg) if result == ValidationAccept && r != ValidationAccept { result = r } switch result { case ValidationAccept: v.p.sendMsg <- msg case ValidationReject: log.Debugf("message validation failed; dropping message from %s", src) v.tracer.RejectMessage(msg, RejectValidationFailed) return case ValidationIgnore: log.Debugf("message validation punted; ignoring message from %s", src) v.tracer.RejectMessage(msg, RejectValidationIgnored) return case validationThrottled: log.Debugf("message validation throttled; ignoring message from %s", src) v.tracer.RejectMessage(msg, RejectValidationThrottled) default: // BUG: this would be an internal programming error, so a panic seems appropiate. panic(fmt.Errorf("unexpected validation result: %d", result)) } } func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) ValidationResult { if len(vals) == 1 { return v.validateSingleTopic(vals[0], src, msg) } ctx, cancel := context.WithCancel(v.p.ctx) defer cancel() rch := make(chan ValidationResult, len(vals)) rcount := 0 for _, val := range vals { rcount++ select { case val.validateThrottle <- struct{}{}: go func(val *topicVal) { rch <- val.validateMsg(ctx, src, msg) <-val.validateThrottle }(val) default: log.Debugf("validation throttled for topic %s", val.topic) rch <- validationThrottled } } result := ValidationAccept loop: for i := 0; i < rcount; i++ { switch <-rch { case ValidationAccept: case ValidationReject: result = ValidationReject break loop case ValidationIgnore: // throttled validation has the same effect, but takes precedence over Ignore as it is not // known whether the throttled validator would have signaled rejection. if result != validationThrottled { result = ValidationIgnore } case validationThrottled: result = validationThrottled } } return result } // fast path for single topic validation that avoids the extra goroutine func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) ValidationResult { select { case val.validateThrottle <- struct{}{}: res := val.validateMsg(v.p.ctx, src, msg) <-val.validateThrottle return res default: log.Debugf("validation throttled for topic %s", val.topic) return validationThrottled } } func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) ValidationResult { start := time.Now() defer func() { log.Debugf("validation done; took %s", time.Since(start)) }() if val.validateTimeout > 0 { var cancel func() ctx, cancel = context.WithTimeout(ctx, val.validateTimeout) defer cancel() } r := val.validate(ctx, src, msg) switch r { case ValidationAccept: fallthrough case ValidationReject: fallthrough case ValidationIgnore: return r default: log.Warnf("Unexpected result from validator: %d; ignoring message", r) return ValidationIgnore } } /// Options // WithValidateQueueSize sets the buffer of validate queue. Defaults to 32. // When queue is full, validation is throttled and new messages are dropped. func WithValidateQueueSize(n int) Option { return func(ps *PubSub) error { if n > 0 { ps.val.validateQ = make(chan *validateReq, n) return nil } return fmt.Errorf("validate queue size must be > 0") } } // WithValidateThrottle sets the upper bound on the number of active validation // goroutines across all topics. The default is 8192. func WithValidateThrottle(n int) Option { return func(ps *PubSub) error { ps.val.validateThrottle = make(chan struct{}, n) return nil } } // WithValidateWorkers sets the number of synchronous validation worker goroutines. // Defaults to NumCPU. // // The synchronous validation workers perform signature validation, apply inline // user validators, and schedule asynchronous user validators. // You can adjust this parameter to devote less cpu time to synchronous validation. func WithValidateWorkers(n int) Option { return func(ps *PubSub) error { if n > 0 { ps.val.validateWorkers = n return nil } return fmt.Errorf("number of validation workers must be > 0") } } // WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. // By default there is no timeout in asynchronous validators. func WithValidatorTimeout(timeout time.Duration) ValidatorOpt { return func(addVal *addValReq) error { addVal.timeout = timeout return nil } } // WithValidatorConcurrency is an option that sets the topic validator throttle. // This controls the number of active validation goroutines for the topic; the default is 1024. func WithValidatorConcurrency(n int) ValidatorOpt { return func(addVal *addValReq) error { addVal.throttle = n return nil } } // WithValidatorInline is an option that sets the validation disposition to synchronous: // it will be executed inline in validation front-end, without spawning a new goroutine. // This is suitable for simple or cpu-bound validators that do not block. func WithValidatorInline(inline bool) ValidatorOpt { return func(addVal *addValReq) error { addVal.inline = inline return nil } }