fix timeout
This commit is contained in:
parent
02877cda71
commit
6e8b9f2d5c
29
floodsub.go
29
floodsub.go
|
@ -18,9 +18,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
ID = protocol.ID("/floodsub/1.0.0")
|
||||
maxConcurrency = 10
|
||||
validateTimeoutMillis = 100
|
||||
ID = protocol.ID("/floodsub/1.0.0")
|
||||
maxConcurrency = 10
|
||||
validateTimeout = 150 * time.Millisecond
|
||||
)
|
||||
|
||||
var log = logging.Logger("floodsub")
|
||||
|
@ -192,7 +192,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
|
||||
select {
|
||||
case p.throttleValidate <- struct{}{}:
|
||||
go func() {
|
||||
go func(msg *Message) {
|
||||
defer func() { <-p.throttleValidate }()
|
||||
|
||||
if p.validate(subs, msg) {
|
||||
|
@ -202,7 +202,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
}
|
||||
|
||||
}
|
||||
}()
|
||||
}(msg)
|
||||
default:
|
||||
log.Warning("could not acquire validator; dropping message")
|
||||
}
|
||||
|
@ -373,11 +373,22 @@ func msgID(pmsg *pb.Message) string {
|
|||
// validate is called in a goroutine and calls the validate functions of all subs with msg as parameter.
|
||||
func (p *PubSub) validate(subs []*Subscription, msg *Message) bool {
|
||||
for _, sub := range subs {
|
||||
ctx, cancel := context.WithTimeout(p.ctx, validateTimeoutMillis*time.Millisecond)
|
||||
ctx, cancel := context.WithTimeout(p.ctx, validateTimeout)
|
||||
defer cancel()
|
||||
|
||||
if sub.validate != nil && !sub.validate(ctx, msg) {
|
||||
log.Debugf("validator for topic %s returned false", sub.topic)
|
||||
result := make(chan bool)
|
||||
go func(sub *Subscription) {
|
||||
result <- sub.validate == nil || sub.validate(ctx, msg)
|
||||
}(sub)
|
||||
|
||||
select {
|
||||
case valid := <-result:
|
||||
if !valid {
|
||||
log.Debugf("validator for topic %s returned false", sub.topic)
|
||||
return false
|
||||
}
|
||||
case <-ctx.Done():
|
||||
log.Debugf("validator for topic %s timed out. msg: %s", sub.topic, msg)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -409,7 +420,7 @@ func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error {
|
|||
continue
|
||||
}
|
||||
|
||||
for p, _ := range tmap {
|
||||
for p := range tmap {
|
||||
tosend[p] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -406,7 +406,7 @@ func TestValidateCancel(t *testing.T) {
|
|||
time.Sleep(time.Millisecond * 50)
|
||||
|
||||
testmsg := []byte("this is a legal message")
|
||||
validates := true
|
||||
validates := false // message for which the validator times our are discarded
|
||||
|
||||
p := psubs[0]
|
||||
|
||||
|
@ -441,7 +441,7 @@ func TestValidateOverload(t *testing.T) {
|
|||
block := make(chan struct{})
|
||||
|
||||
sub, err := psubs[1].Subscribe(topic, WithValidator(func(ctx context.Context, msg *Message) bool {
|
||||
_, _ = <-block
|
||||
<-block
|
||||
return true
|
||||
}))
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue