use a single channel for all validation results
This commit is contained in:
parent
f6081fb061
commit
145a84a33b
12
floodsub.go
12
floodsub.go
|
@ -394,7 +394,8 @@ func (p *PubSub) validate(subs []*Subscription, src peer.ID, msg *Message) {
|
|||
ctx, cancel := context.WithCancel(p.ctx)
|
||||
defer cancel()
|
||||
|
||||
results := make([]chan bool, 0, len(subs))
|
||||
rch := make(chan bool, len(subs))
|
||||
rcount := 0
|
||||
throttle := false
|
||||
|
||||
loop:
|
||||
|
@ -403,15 +404,14 @@ loop:
|
|||
continue
|
||||
}
|
||||
|
||||
rch := make(chan bool, 1)
|
||||
results = append(results, rch)
|
||||
rcount++
|
||||
|
||||
select {
|
||||
case sub.validateThrottle <- struct{}{}:
|
||||
go func(sub *Subscription, rch chan bool) {
|
||||
go func(sub *Subscription) {
|
||||
rch <- sub.validateMsg(ctx, msg)
|
||||
<-sub.validateThrottle
|
||||
}(sub, rch)
|
||||
}(sub)
|
||||
|
||||
default:
|
||||
log.Debugf("validation throttled for topic %s", sub.topic)
|
||||
|
@ -425,7 +425,7 @@ loop:
|
|||
return
|
||||
}
|
||||
|
||||
for _, rch := range results {
|
||||
for i := 0; i < rcount; i++ {
|
||||
valid := <-rch
|
||||
if !valid {
|
||||
log.Warningf("message validation failed; dropping message from %s", src)
|
||||
|
|
Loading…
Reference in New Issue