diff --git a/pubsub.go b/pubsub.go index ce57237..a3f5078 100644 --- a/pubsub.go +++ b/pubsub.go @@ -1132,7 +1132,7 @@ func (p *PubSub) BlacklistPeer(pid peer.ID) { // By default validators are asynchronous, which means they will run in a separate goroutine. // The number of active goroutines is controlled by global and per topic validator // throttles; if it exceeds the throttle threshold, messages will be dropped. -func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error { +func (p *PubSub) RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error { addVal := &addValReq{ topic: topic, validate: val, diff --git a/score.go b/score.go index aabd2cd..74997bb 100644 --- a/score.go +++ b/score.go @@ -105,6 +105,7 @@ const ( deliveryUnknown = iota // we don't know (yet) if the message is valid deliveryValid // we know the message is valid deliveryInvalid // we know the message is invalid + deliveryIgnored // we were intructed by the validator to ignore the message deliveryThrottled // we can't tell if it is valid because validation throttled ) @@ -551,13 +552,20 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) { drec := ps.deliveries.getRecord(ps.msgID(msg.Message)) - if reason == rejectValidationThrottled { + switch reason { + case rejectValidationThrottled: // if we reject with "validation throttled" we don't penalize the peer(s) that forward it // because we don't know if it was valid. drec.status = deliveryThrottled // release the delivery time tracking map to free some memory early drec.peers = nil return + case rejectValidationIgnored: + // we were explicitly instructed by the validator to ignore the message but not penalize + // the peer + drec.status = deliveryIgnored + drec.peers = nil + return } // mark the message as invalid and penalize peers that have already forwarded it. @@ -601,6 +609,8 @@ func (ps *peerScore) DuplicateMessage(msg *Message) { case deliveryThrottled: // the message was throttled; do nothing (we don't know if it was valid) + case deliveryIgnored: + // the message was ignored; do nothing } } diff --git a/tracer.go b/tracer.go index e587ba8..832f35f 100644 --- a/tracer.go +++ b/tracer.go @@ -33,6 +33,7 @@ const ( rejectValidationQueueFull = "validation queue full" rejectValidationThrottled = "validation throttled" rejectValidationFailed = "validation failed" + rejectValidationIgnored = "validation ignored" rejectSelfOrigin = "self originated message" ) diff --git a/validation.go b/validation.go index d83be65..1ddc2cf 100644 --- a/validation.go +++ b/validation.go @@ -15,9 +15,31 @@ const ( defaultValidateThrottle = 8192 ) -// Validator is a function that validates a message. +// Validator is a function that validates a message with a binary decision: accept or reject. type Validator func(context.Context, peer.ID, *Message) bool +// ValidatorEx is an extended validation function that validates a message with an enumerated decision +type ValidatorEx func(context.Context, peer.ID, *Message) ValidationResult + +// ValidationResult represents the decision of an extended validator +type ValidationResult int + +const ( + // ValidationAccept is a validation decision that indicates a valid message that should be accepted and + // delivered to the application and forwarded to the network. + ValidationAccept = ValidationResult(0) + // ValidationReject is a validation decision that indicates an invalid message that should not be + // delivered to the application or forwarded to the application. Furthermore the peer that forwarded + // the message should be penalized by peer scoring routers. + ValidationReject = ValidationResult(1) + // ValidationIgnore is a validation decision that indicates a message that should be ignored: it will + // be neither delivered to the application nor forwarded to the network. However, in contrast to + // ValidationReject, the peer that forwarded the message must not be penalized by peer scoring routers. + ValidationIgnore = ValidationResult(2) + // internal + validationThrottled = ValidationResult(-1) +) + // ValidatorOpt is an option for RegisterTopicValidator. type ValidatorOpt func(addVal *addValReq) error @@ -57,7 +79,7 @@ type validateReq struct { // representation of topic validators type topicVal struct { topic string - validate Validator + validate ValidatorEx validateTimeout time.Duration validateThrottle chan struct{} validateInline bool @@ -66,7 +88,7 @@ type topicVal struct { // async request to add a topic validators type addValReq struct { topic string - validate Validator + validate interface{} timeout time.Duration throttle int inline bool @@ -109,9 +131,36 @@ func (v *validation) AddValidator(req *addValReq) { return } + makeValidatorEx := func(v Validator) ValidatorEx { + return func(ctx context.Context, p peer.ID, msg *Message) ValidationResult { + if v(ctx, p, msg) { + return ValidationAccept + } else { + return ValidationReject + } + } + } + + var validator ValidatorEx + switch v := req.validate.(type) { + case func(ctx context.Context, p peer.ID, msg *Message) bool: + validator = makeValidatorEx(Validator(v)) + case Validator: + validator = makeValidatorEx(v) + + case func(ctx context.Context, p peer.ID, msg *Message) ValidationResult: + validator = ValidatorEx(v) + case ValidatorEx: + validator = v + + default: + req.resp <- fmt.Errorf("Unknown validator type for topic %s; must be an instance of Validator or ValiatorEx", topic) + return + } + val := &topicVal{ topic: topic, - validate: req.validate, + validate: validator, validateTimeout: 0, validateThrottle: make(chan struct{}, defaultValidateConcurrency), validateInline: req.inline, @@ -220,20 +269,30 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) { } // apply inline (synchronous) validators + result := ValidationAccept for _, val := range inline { - if !val.validateMsg(v.p.ctx, src, msg) { - log.Debugf("message validation failed; dropping message from %s", src) - v.tracer.RejectMessage(msg, rejectValidationFailed) - return + switch val.validateMsg(v.p.ctx, src, msg) { + case ValidationAccept: + case ValidationReject: + result = ValidationReject + break + case ValidationIgnore: + result = ValidationIgnore } } + if result == ValidationReject { + log.Warningf("message validation failed; dropping message from %s", src) + v.tracer.RejectMessage(msg, rejectValidationFailed) + return + } + // apply async validators if len(async) > 0 { select { case v.validateThrottle <- struct{}{}: go func() { - v.doValidateTopic(async, src, msg) + v.doValidateTopic(async, src, msg, result) <-v.validateThrottle }() default: @@ -243,7 +302,12 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) { return } - // no async validators, send the message + if result == ValidationIgnore { + v.tracer.RejectMessage(msg, rejectValidationIgnored) + return + } + + // no async validators, accepted message, send it! v.p.sendMsg <- msg } @@ -257,17 +321,35 @@ func (v *validation) validateSignature(msg *Message) bool { return true } -func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) { - if !v.validateTopic(vals, src, msg) { - log.Warningf("message validation failed; dropping message from %s", src) - v.tracer.RejectMessage(msg, rejectValidationFailed) - return +func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message, r ValidationResult) { + result := v.validateTopic(vals, src, msg) + + if result == ValidationAccept && r != ValidationAccept { + result = r } - v.p.sendMsg <- msg + switch result { + case ValidationAccept: + v.p.sendMsg <- msg + case ValidationReject: + log.Debugf("message validation failed; dropping message from %s", src) + v.tracer.RejectMessage(msg, rejectValidationFailed) + return + case ValidationIgnore: + log.Debugf("message validation punted; ignoring message from %s", src) + v.tracer.RejectMessage(msg, rejectValidationIgnored) + return + case validationThrottled: + log.Warningf("message validation throttled; ignoring message from %s", src) + v.tracer.RejectMessage(msg, rejectValidationThrottled) + + default: + // BUG: this would be an internal programming error, so a panic seems appropiate. + panic(fmt.Errorf("Unexpected validation result: %d", result)) + } } -func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool { +func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) ValidationResult { if len(vals) == 1 { return v.validateSingleTopic(vals[0], src, msg) } @@ -275,11 +357,9 @@ func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) ctx, cancel := context.WithCancel(v.p.ctx) defer cancel() - rch := make(chan bool, len(vals)) + rch := make(chan ValidationResult, len(vals)) rcount := 0 - throttle := false -loop: for _, val := range vals { rcount++ @@ -292,55 +372,63 @@ loop: default: log.Debugf("validation throttled for topic %s", val.topic) - throttle = true - break loop + rch <- validationThrottled } } - if throttle { - v.tracer.RejectMessage(msg, rejectValidationThrottled) - return false - } - + result := ValidationAccept for i := 0; i < rcount; i++ { - valid := <-rch - if !valid { - return false + switch <-rch { + case ValidationAccept: + case ValidationReject: + result = ValidationReject + break + case ValidationIgnore: + if result != validationThrottled { + result = ValidationIgnore + } + case validationThrottled: + result = validationThrottled } } - return true + return result } // fast path for single topic validation that avoids the extra goroutine -func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) bool { +func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) ValidationResult { select { case val.validateThrottle <- struct{}{}: res := val.validateMsg(v.p.ctx, src, msg) <-val.validateThrottle - return res default: log.Debugf("validation throttled for topic %s", val.topic) - v.tracer.RejectMessage(msg, rejectValidationThrottled) - return false + return validationThrottled } } -func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) bool { +func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) ValidationResult { if val.validateTimeout > 0 { var cancel func() ctx, cancel = context.WithTimeout(ctx, val.validateTimeout) defer cancel() } - valid := val.validate(ctx, src, msg) - if !valid { - log.Debugf("validation failed for topic %s", val.topic) - } + r := val.validate(ctx, src, msg) + switch r { + case ValidationAccept: + fallthrough + case ValidationReject: + fallthrough + case ValidationIgnore: + return r - return valid + default: + log.Warningf("Unexpected result from validator: %d; ignoring message", r) + return ValidationIgnore + } } /// Options