diff --git a/floodsub.go b/floodsub.go index c3f6aa5..5e2fea5 100644 --- a/floodsub.go +++ b/floodsub.go @@ -394,7 +394,8 @@ func (p *PubSub) validate(subs []*Subscription, src peer.ID, msg *Message) { ctx, cancel := context.WithCancel(p.ctx) defer cancel() - results := make([]chan bool, 0, len(subs)) + rch := make(chan bool, len(subs)) + rcount := 0 throttle := false loop: @@ -403,15 +404,14 @@ loop: continue } - rch := make(chan bool, 1) - results = append(results, rch) + rcount++ select { case sub.validateThrottle <- struct{}{}: - go func(sub *Subscription, rch chan bool) { + go func(sub *Subscription) { rch <- sub.validateMsg(ctx, msg) <-sub.validateThrottle - }(sub, rch) + }(sub) default: log.Debugf("validation throttled for topic %s", sub.topic) @@ -425,7 +425,7 @@ loop: return } - for _, rch := range results { + for i := 0; i < rcount; i++ { valid := <-rch if !valid { log.Warningf("message validation failed; dropping message from %s", src)