From 6e8b9f2d5c9e7490179a0702582ec008b92b8858 Mon Sep 17 00:00:00 2001 From: keks Date: Thu, 23 Nov 2017 14:39:14 +0100 Subject: [PATCH] fix timeout --- floodsub.go | 29 ++++++++++++++++++++--------- floodsub_test.go | 4 ++-- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/floodsub.go b/floodsub.go index 802393d..99441fd 100644 --- a/floodsub.go +++ b/floodsub.go @@ -18,9 +18,9 @@ import ( ) const ( - ID = protocol.ID("/floodsub/1.0.0") - maxConcurrency = 10 - validateTimeoutMillis = 100 + ID = protocol.ID("/floodsub/1.0.0") + maxConcurrency = 10 + validateTimeout = 150 * time.Millisecond ) var log = logging.Logger("floodsub") @@ -192,7 +192,7 @@ func (p *PubSub) processLoop(ctx context.Context) { select { case p.throttleValidate <- struct{}{}: - go func() { + go func(msg *Message) { defer func() { <-p.throttleValidate }() if p.validate(subs, msg) { @@ -202,7 +202,7 @@ func (p *PubSub) processLoop(ctx context.Context) { } } - }() + }(msg) default: log.Warning("could not acquire validator; dropping message") } @@ -373,11 +373,22 @@ func msgID(pmsg *pb.Message) string { // validate is called in a goroutine and calls the validate functions of all subs with msg as parameter. func (p *PubSub) validate(subs []*Subscription, msg *Message) bool { for _, sub := range subs { - ctx, cancel := context.WithTimeout(p.ctx, validateTimeoutMillis*time.Millisecond) + ctx, cancel := context.WithTimeout(p.ctx, validateTimeout) defer cancel() - if sub.validate != nil && !sub.validate(ctx, msg) { - log.Debugf("validator for topic %s returned false", sub.topic) + result := make(chan bool) + go func(sub *Subscription) { + result <- sub.validate == nil || sub.validate(ctx, msg) + }(sub) + + select { + case valid := <-result: + if !valid { + log.Debugf("validator for topic %s returned false", sub.topic) + return false + } + case <-ctx.Done(): + log.Debugf("validator for topic %s timed out. msg: %s", sub.topic, msg) return false } } @@ -409,7 +420,7 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error { continue } - for p, _ := range tmap { + for p := range tmap { tosend[p] = struct{}{} } } diff --git a/floodsub_test.go b/floodsub_test.go index fb1b2b9..57cf31e 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -406,7 +406,7 @@ func TestValidateCancel(t *testing.T) { time.Sleep(time.Millisecond * 50) testmsg := []byte("this is a legal message") - validates := true + validates := false // message for which the validator times our are discarded p := psubs[0] @@ -441,7 +441,7 @@ func TestValidateOverload(t *testing.T) { block := make(chan struct{}) sub, err := psubs[1].Subscribe(topic, WithValidator(func(ctx context.Context, msg *Message) bool { - _, _ = <-block + <-block return true })) if err != nil {