validation pipeline front-end for handling signature validation synchronously

This commit is contained in:
vyzo 2019-04-25 21:28:13 +03:00
parent 9d7a59f4a8
commit 7a38f7642e
1 changed files with 53 additions and 13 deletions

View File

@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"math/rand"
"runtime"
"sync/atomic"
"time"
@ -92,6 +93,9 @@ type PubSub struct {
// topicVals tracks per topic validators
topicVals map[string]*topicVal
// validateQ is the front-end to the validation pipeline
validateQ chan *validateReq
// validateThrottle limits the number of active validation goroutines
validateThrottle chan struct{}
@ -184,6 +188,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
topicVals: make(map[string]*topicVal),
validateQ: make(chan *validateReq, 32),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
@ -210,6 +215,11 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
go ps.processLoop(ctx)
numcpu := runtime.NumCPU()
for i := 0; i < numcpu; i++ {
go ps.validateWorker()
}
return ps, nil
}
@ -631,16 +641,8 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
}
if len(vals) > 0 || msg.Signature != nil {
// validation is asynchronous and globally throttled with the throttleValidate semaphore.
// the purpose of the global throttle is to bound the goncurrency possible from incoming
// network traffic; each validator also has an individual throttle to preclude
// slow (or faulty) validators from starving other topics; see validate below.
select {
case p.validateThrottle <- struct{}{}:
go func() {
p.validate(vals, src, msg)
<-p.validateThrottle
}()
case p.validateQ <- &validateReq{vals, src, msg}:
default:
log.Warningf("message validation throttled; dropping message from %s", src)
}
@ -650,7 +652,20 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
p.publishMessage(src, msg.Message)
}
func (p *PubSub) validateWorker() {
for {
select {
case req := <-p.validateQ:
p.validate(req.vals, req.src, req.msg)
case <-p.ctx.Done():
return
}
}
}
// validate performs validation and only sends the message if all validators succeed
// signature validation is performed synchronously, while user validators are invoked
// asynchronously, throttled by the global validation throttle.
func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) {
if msg.Signature != nil {
if !p.validateSignature(msg) {
@ -660,13 +675,19 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) {
}
if len(vals) > 0 {
if !p.validateTopic(vals, src, msg) {
log.Warningf("message validation failed; dropping message from %s", src)
return
select {
case p.validateThrottle <- struct{}{}:
go func() {
p.doValidateTopic(vals, src, msg)
<-p.validateThrottle
}()
default:
log.Warningf("message validation throttled; dropping message from %s", src)
}
return
}
// all validators were successful, send the message
// no user validators and the signature was valid, send the message
p.sendMsg <- &sendReq{
from: src,
msg: msg,
@ -683,6 +704,18 @@ func (p *PubSub) validateSignature(msg *Message) bool {
return true
}
func (p *PubSub) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) {
if !p.validateTopic(vals, src, msg) {
log.Warningf("message validation failed; dropping message from %s", src)
return
}
p.sendMsg <- &sendReq{
from: src,
msg: msg,
}
}
func (p *PubSub) validateTopic(vals []*topicVal, src peer.ID, msg *Message) bool {
if len(vals) == 1 {
return p.validateSingleTopic(vals[0], src, msg)
@ -883,6 +916,13 @@ func (p *PubSub) BlacklistPeer(pid peer.ID) {
p.blacklistPeer <- pid
}
// validation requests
type validateReq struct {
vals []*topicVal
src peer.ID
msg *Message
}
// per topic validators
type addValReq struct {
topic string