From fe09d1eea3dfa2f469acf4c21de2e57a0688e79c Mon Sep 17 00:00:00 2001 From: keks Date: Thu, 23 Nov 2017 19:12:59 +0100 Subject: [PATCH] make validator timeout configurable --- floodsub.go | 18 +++++++++++----- floodsub_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++ subscription.go | 5 ++++- 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/floodsub.go b/floodsub.go index 99441fd..f7c2eb0 100644 --- a/floodsub.go +++ b/floodsub.go @@ -18,9 +18,9 @@ import ( ) const ( - ID = protocol.ID("/floodsub/1.0.0") - maxConcurrency = 10 - validateTimeout = 150 * time.Millisecond + ID = protocol.ID("/floodsub/1.0.0") + maxConcurrency = 10 + defaultValidateTimeout = 150 * time.Millisecond ) var log = logging.Logger("floodsub") @@ -373,7 +373,7 @@ func msgID(pmsg *pb.Message) string { // validate is called in a goroutine and calls the validate functions of all subs with msg as parameter. func (p *PubSub) validate(subs []*Subscription, msg *Message) bool { for _, sub := range subs { - ctx, cancel := context.WithTimeout(p.ctx, validateTimeout) + ctx, cancel := context.WithTimeout(p.ctx, sub.validateTimeout) defer cancel() result := make(chan bool) @@ -479,7 +479,14 @@ func WithValidator(validate Validator) func(*Subscription) error { sub.validate = validate return nil } +} +// WithValidatorTimeout is an option that can be supplied to Subscribe. The argument is a duration after which long-running validators are canceled. +func WithValidatorTimeout(timeout time.Duration) func(*Subscription) error { + return func(sub *Subscription) error { + sub.validateTimeout = timeout + return nil + } } // Subscribe returns a new Subscription for the given topic @@ -500,7 +507,8 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO } sub := &Subscription{ - topic: td.GetName(), + topic: td.GetName(), + validateTimeout: defaultValidateTimeout, } for _, opt := range opts { diff --git a/floodsub_test.go b/floodsub_test.go index 57cf31e..35a2400 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -385,6 +385,61 @@ func TestValidate(t *testing.T) { } } +func TestValidateTimeout(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 2) + psubs := getPubsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + topic := "foobar" + + cases := []struct { + timeout time.Duration + msg []byte + validates bool + }{ + {75 * time.Millisecond, []byte("this better time out"), false}, + {150 * time.Millisecond, []byte("this should work"), true}, + } + + for _, tc := range cases { + sub, err := psubs[1].Subscribe(topic, WithValidator(func(ctx context.Context, msg *Message) bool { + time.Sleep(100 * time.Millisecond) + return true + }), WithValidatorTimeout(tc.timeout)) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 50) + + p := psubs[0] + err = p.Publish(topic, tc.msg) + if err != nil { + t.Fatal(err) + } + + select { + case msg := <-sub.ch: + if !tc.validates { + t.Log(msg) + t.Error("expected message validation to filter out the message") + } + case <-time.After(333 * time.Millisecond): + if tc.validates { + t.Error("expected message validation to accept the message") + } + } + + // important: cancel! + // otherwise the message will still be filtered by the other subscription + sub.Cancel() + } + +} + func TestValidateCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/subscription.go b/subscription.go index a3f97dd..cc76413 100644 --- a/subscription.go +++ b/subscription.go @@ -2,6 +2,7 @@ package floodsub import ( "context" + "time" ) type Subscription struct { @@ -9,7 +10,9 @@ type Subscription struct { ch chan *Message cancelCh chan<- *Subscription err error - validate Validator + + validate Validator + validateTimeout time.Duration } func (sub *Subscription) Topic() string {