implement extended validator support

This commit is contained in:
vyzo 2020-04-28 17:40:36 +03:00
parent 37ed7a3360
commit 1901383099
4 changed files with 142 additions and 43 deletions

View File

@ -1132,7 +1132,7 @@ func (p *PubSub) BlacklistPeer(pid peer.ID) {
// By default validators are asynchronous, which means they will run in a separate goroutine. // By default validators are asynchronous, which means they will run in a separate goroutine.
// The number of active goroutines is controlled by global and per topic validator // The number of active goroutines is controlled by global and per topic validator
// throttles; if it exceeds the throttle threshold, messages will be dropped. // throttles; if it exceeds the throttle threshold, messages will be dropped.
func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error { func (p *PubSub) RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error {
addVal := &addValReq{ addVal := &addValReq{
topic: topic, topic: topic,
validate: val, validate: val,

View File

@ -105,6 +105,7 @@ const (
deliveryUnknown = iota // we don't know (yet) if the message is valid deliveryUnknown = iota // we don't know (yet) if the message is valid
deliveryValid // we know the message is valid deliveryValid // we know the message is valid
deliveryInvalid // we know the message is invalid deliveryInvalid // we know the message is invalid
deliveryIgnored // we were intructed by the validator to ignore the message
deliveryThrottled // we can't tell if it is valid because validation throttled deliveryThrottled // we can't tell if it is valid because validation throttled
) )
@ -551,13 +552,20 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
drec := ps.deliveries.getRecord(ps.msgID(msg.Message)) drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
if reason == rejectValidationThrottled { switch reason {
case rejectValidationThrottled:
// if we reject with "validation throttled" we don't penalize the peer(s) that forward it // if we reject with "validation throttled" we don't penalize the peer(s) that forward it
// because we don't know if it was valid. // because we don't know if it was valid.
drec.status = deliveryThrottled drec.status = deliveryThrottled
// release the delivery time tracking map to free some memory early // release the delivery time tracking map to free some memory early
drec.peers = nil drec.peers = nil
return return
case rejectValidationIgnored:
// we were explicitly instructed by the validator to ignore the message but not penalize
// the peer
drec.status = deliveryIgnored
drec.peers = nil
return
} }
// mark the message as invalid and penalize peers that have already forwarded it. // mark the message as invalid and penalize peers that have already forwarded it.
@ -601,6 +609,8 @@ func (ps *peerScore) DuplicateMessage(msg *Message) {
case deliveryThrottled: case deliveryThrottled:
// the message was throttled; do nothing (we don't know if it was valid) // the message was throttled; do nothing (we don't know if it was valid)
case deliveryIgnored:
// the message was ignored; do nothing
} }
} }

View File

@ -33,6 +33,7 @@ const (
rejectValidationQueueFull = "validation queue full" rejectValidationQueueFull = "validation queue full"
rejectValidationThrottled = "validation throttled" rejectValidationThrottled = "validation throttled"
rejectValidationFailed = "validation failed" rejectValidationFailed = "validation failed"
rejectValidationIgnored = "validation ignored"
rejectSelfOrigin = "self originated message" rejectSelfOrigin = "self originated message"
) )

View File

@ -15,9 +15,31 @@ const (
defaultValidateThrottle = 8192 defaultValidateThrottle = 8192
) )
// Validator is a function that validates a message. // Validator is a function that validates a message with a binary decision: accept or reject.
type Validator func(context.Context, peer.ID, *Message) bool type Validator func(context.Context, peer.ID, *Message) bool
// ValidatorEx is an extended validation function that validates a message with an enumerated decision
type ValidatorEx func(context.Context, peer.ID, *Message) ValidationResult
// ValidationResult represents the decision of an extended validator
type ValidationResult int
const (
// ValidationAccept is a validation decision that indicates a valid message that should be accepted and
// delivered to the application and forwarded to the network.
ValidationAccept = ValidationResult(0)
// ValidationReject is a validation decision that indicates an invalid message that should not be
// delivered to the application or forwarded to the application. Furthermore the peer that forwarded
// the message should be penalized by peer scoring routers.
ValidationReject = ValidationResult(1)
// ValidationIgnore is a validation decision that indicates a message that should be ignored: it will
// be neither delivered to the application nor forwarded to the network. However, in contrast to
// ValidationReject, the peer that forwarded the message must not be penalized by peer scoring routers.
ValidationIgnore = ValidationResult(2)
// internal
validationThrottled = ValidationResult(-1)
)
// ValidatorOpt is an option for RegisterTopicValidator. // ValidatorOpt is an option for RegisterTopicValidator.
type ValidatorOpt func(addVal *addValReq) error type ValidatorOpt func(addVal *addValReq) error
@ -57,7 +79,7 @@ type validateReq struct {
// representation of topic validators // representation of topic validators
type topicVal struct { type topicVal struct {
topic string topic string
validate Validator validate ValidatorEx
validateTimeout time.Duration validateTimeout time.Duration
validateThrottle chan struct{} validateThrottle chan struct{}
validateInline bool validateInline bool
@ -66,7 +88,7 @@ type topicVal struct {
// async request to add a topic validators // async request to add a topic validators
type addValReq struct { type addValReq struct {
topic string topic string
validate Validator validate interface{}
timeout time.Duration timeout time.Duration
throttle int throttle int
inline bool inline bool
@ -109,9 +131,36 @@ func (v *validation) AddValidator(req *addValReq) {
return return
} }
makeValidatorEx := func(v Validator) ValidatorEx {
return func(ctx context.Context, p peer.ID, msg *Message) ValidationResult {
if v(ctx, p, msg) {
return ValidationAccept
} else {
return ValidationReject
}
}
}
var validator ValidatorEx
switch v := req.validate.(type) {
case func(ctx context.Context, p peer.ID, msg *Message) bool:
validator = makeValidatorEx(Validator(v))
case Validator:
validator = makeValidatorEx(v)
case func(ctx context.Context, p peer.ID, msg *Message) ValidationResult:
validator = ValidatorEx(v)
case ValidatorEx:
validator = v
default:
req.resp <- fmt.Errorf("Unknown validator type for topic %s; must be an instance of Validator or ValiatorEx", topic)
return
}
val := &topicVal{ val := &topicVal{
topic: topic, topic: topic,
validate: req.validate, validate: validator,
validateTimeout: 0, validateTimeout: 0,
validateThrottle: make(chan struct{}, defaultValidateConcurrency), validateThrottle: make(chan struct{}, defaultValidateConcurrency),
validateInline: req.inline, validateInline: req.inline,
@ -220,20 +269,30 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
} }
// apply inline (synchronous) validators // apply inline (synchronous) validators
result := ValidationAccept
for _, val := range inline { for _, val := range inline {
if !val.validateMsg(v.p.ctx, src, msg) { switch val.validateMsg(v.p.ctx, src, msg) {
log.Debugf("message validation failed; dropping message from %s", src) case ValidationAccept:
case ValidationReject:
result = ValidationReject
break
case ValidationIgnore:
result = ValidationIgnore
}
}
if result == ValidationReject {
log.Warningf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, rejectValidationFailed) v.tracer.RejectMessage(msg, rejectValidationFailed)
return return
} }
}
// apply async validators // apply async validators
if len(async) > 0 { if len(async) > 0 {
select { select {
case v.validateThrottle <- struct{}{}: case v.validateThrottle <- struct{}{}:
go func() { go func() {
v.doValidateTopic(async, src, msg) v.doValidateTopic(async, src, msg, result)
<-v.validateThrottle <-v.validateThrottle
}() }()
default: default:
@ -243,7 +302,12 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
return return
} }
// no async validators, send the message if result == ValidationIgnore {
v.tracer.RejectMessage(msg, rejectValidationIgnored)
return
}
// no async validators, accepted message, send it!
v.p.sendMsg <- msg v.p.sendMsg <- msg
} }
@ -257,17 +321,35 @@ func (v *validation) validateSignature(msg *Message) bool {
return true return true
} }
func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) { func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message, r ValidationResult) {
if !v.validateTopic(vals, src, msg) { result := v.validateTopic(vals, src, msg)
log.Warningf("message validation failed; dropping message from %s", src)
if result == ValidationAccept && r != ValidationAccept {
result = r
}
switch result {
case ValidationAccept:
v.p.sendMsg <- msg
case ValidationReject:
log.Debugf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, rejectValidationFailed) v.tracer.RejectMessage(msg, rejectValidationFailed)
return return
case ValidationIgnore:
log.Debugf("message validation punted; ignoring message from %s", src)
v.tracer.RejectMessage(msg, rejectValidationIgnored)
return
case validationThrottled:
log.Warningf("message validation throttled; ignoring message from %s", src)
v.tracer.RejectMessage(msg, rejectValidationThrottled)
default:
// BUG: this would be an internal programming error, so a panic seems appropiate.
panic(fmt.Errorf("Unexpected validation result: %d", result))
}
} }
v.p.sendMsg <- msg func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) ValidationResult {
}
func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool {
if len(vals) == 1 { if len(vals) == 1 {
return v.validateSingleTopic(vals[0], src, msg) return v.validateSingleTopic(vals[0], src, msg)
} }
@ -275,11 +357,9 @@ func (v *validation) validateTopic(vals []*topicVal, src peer.ID, msg *Message)
ctx, cancel := context.WithCancel(v.p.ctx) ctx, cancel := context.WithCancel(v.p.ctx)
defer cancel() defer cancel()
rch := make(chan bool, len(vals)) rch := make(chan ValidationResult, len(vals))
rcount := 0 rcount := 0
throttle := false
loop:
for _, val := range vals { for _, val := range vals {
rcount++ rcount++
@ -292,55 +372,63 @@ loop:
default: default:
log.Debugf("validation throttled for topic %s", val.topic) log.Debugf("validation throttled for topic %s", val.topic)
throttle = true rch <- validationThrottled
break loop
} }
} }
if throttle { result := ValidationAccept
v.tracer.RejectMessage(msg, rejectValidationThrottled)
return false
}
for i := 0; i < rcount; i++ { for i := 0; i < rcount; i++ {
valid := <-rch switch <-rch {
if !valid { case ValidationAccept:
return false case ValidationReject:
result = ValidationReject
break
case ValidationIgnore:
if result != validationThrottled {
result = ValidationIgnore
}
case validationThrottled:
result = validationThrottled
} }
} }
return true return result
} }
// fast path for single topic validation that avoids the extra goroutine // fast path for single topic validation that avoids the extra goroutine
func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) bool { func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) ValidationResult {
select { select {
case val.validateThrottle <- struct{}{}: case val.validateThrottle <- struct{}{}:
res := val.validateMsg(v.p.ctx, src, msg) res := val.validateMsg(v.p.ctx, src, msg)
<-val.validateThrottle <-val.validateThrottle
return res return res
default: default:
log.Debugf("validation throttled for topic %s", val.topic) log.Debugf("validation throttled for topic %s", val.topic)
v.tracer.RejectMessage(msg, rejectValidationThrottled) return validationThrottled
return false
} }
} }
func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) bool { func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) ValidationResult {
if val.validateTimeout > 0 { if val.validateTimeout > 0 {
var cancel func() var cancel func()
ctx, cancel = context.WithTimeout(ctx, val.validateTimeout) ctx, cancel = context.WithTimeout(ctx, val.validateTimeout)
defer cancel() defer cancel()
} }
valid := val.validate(ctx, src, msg) r := val.validate(ctx, src, msg)
if !valid { switch r {
log.Debugf("validation failed for topic %s", val.topic) case ValidationAccept:
} fallthrough
case ValidationReject:
fallthrough
case ValidationIgnore:
return r
return valid default:
log.Warningf("Unexpected result from validator: %d; ignoring message", r)
return ValidationIgnore
}
} }
/// Options /// Options