make validator timeout configurable

This commit is contained in:
keks 2017-11-23 19:12:59 +01:00 committed by vyzo
parent 6e8b9f2d5c
commit fe09d1eea3
3 changed files with 72 additions and 6 deletions

View File

@ -18,9 +18,9 @@ import (
)
const (
ID = protocol.ID("/floodsub/1.0.0")
maxConcurrency = 10
validateTimeout = 150 * time.Millisecond
ID = protocol.ID("/floodsub/1.0.0")
maxConcurrency = 10
defaultValidateTimeout = 150 * time.Millisecond
)
var log = logging.Logger("floodsub")
@ -373,7 +373,7 @@ func msgID(pmsg *pb.Message) string {
// validate is called in a goroutine and calls the validate functions of all subs with msg as parameter.
func (p *PubSub) validate(subs []*Subscription, msg *Message) bool {
for _, sub := range subs {
ctx, cancel := context.WithTimeout(p.ctx, validateTimeout)
ctx, cancel := context.WithTimeout(p.ctx, sub.validateTimeout)
defer cancel()
result := make(chan bool)
@ -479,7 +479,14 @@ func WithValidator(validate Validator) func(*Subscription) error {
sub.validate = validate
return nil
}
}
// WithValidatorTimeout is an option that can be supplied to Subscribe. The argument is a duration after which long-running validators are canceled.
func WithValidatorTimeout(timeout time.Duration) func(*Subscription) error {
return func(sub *Subscription) error {
sub.validateTimeout = timeout
return nil
}
}
// Subscribe returns a new Subscription for the given topic
@ -500,7 +507,8 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO
}
sub := &Subscription{
topic: td.GetName(),
topic: td.GetName(),
validateTimeout: defaultValidateTimeout,
}
for _, opt := range opts {

View File

@ -385,6 +385,61 @@ func TestValidate(t *testing.T) {
}
}
func TestValidateTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
topic := "foobar"
cases := []struct {
timeout time.Duration
msg []byte
validates bool
}{
{75 * time.Millisecond, []byte("this better time out"), false},
{150 * time.Millisecond, []byte("this should work"), true},
}
for _, tc := range cases {
sub, err := psubs[1].Subscribe(topic, WithValidator(func(ctx context.Context, msg *Message) bool {
time.Sleep(100 * time.Millisecond)
return true
}), WithValidatorTimeout(tc.timeout))
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 50)
p := psubs[0]
err = p.Publish(topic, tc.msg)
if err != nil {
t.Fatal(err)
}
select {
case msg := <-sub.ch:
if !tc.validates {
t.Log(msg)
t.Error("expected message validation to filter out the message")
}
case <-time.After(333 * time.Millisecond):
if tc.validates {
t.Error("expected message validation to accept the message")
}
}
// important: cancel!
// otherwise the message will still be filtered by the other subscription
sub.Cancel()
}
}
func TestValidateCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -2,6 +2,7 @@ package floodsub
import (
"context"
"time"
)
type Subscription struct {
@ -9,7 +10,9 @@ type Subscription struct {
ch chan *Message
cancelCh chan<- *Subscription
err error
validate Validator
validate Validator
validateTimeout time.Duration
}
func (sub *Subscription) Topic() string {