mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-05 14:23:10 +00:00
WithMaxConcurrency is WithValidatorConcurrency
and defaultMaxConcurrency is defaultValidateConcurrency.
This commit is contained in:
parent
bf2151ba5f
commit
856a25c8eb
10
floodsub.go
10
floodsub.go
@ -18,9 +18,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
ID = protocol.ID("/floodsub/1.0.0")
|
||||
defaultMaxConcurrency = 10
|
||||
defaultValidateTimeout = 150 * time.Millisecond
|
||||
ID = protocol.ID("/floodsub/1.0.0")
|
||||
defaultValidateConcurrency = 10
|
||||
defaultValidateTimeout = 150 * time.Millisecond
|
||||
)
|
||||
|
||||
var log = logging.Logger("floodsub")
|
||||
@ -521,7 +521,7 @@ func WithValidatorTimeout(timeout time.Duration) SubOpt {
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxConcurrency(n int) SubOpt {
|
||||
func WithValidatorConcurrency(n int) SubOpt {
|
||||
return func(sub *Subscription) error {
|
||||
sub.validateThrottle = make(chan struct{}, n)
|
||||
return nil
|
||||
@ -558,7 +558,7 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO
|
||||
}
|
||||
|
||||
if sub.validate != nil && sub.validateThrottle == nil {
|
||||
sub.validateThrottle = make(chan struct{}, defaultMaxConcurrency)
|
||||
sub.validateThrottle = make(chan struct{}, defaultValidateConcurrency)
|
||||
}
|
||||
|
||||
out := make(chan *Subscription, 1)
|
||||
|
||||
@ -541,7 +541,7 @@ func TestValidateOverload(t *testing.T) {
|
||||
block := make(chan struct{})
|
||||
|
||||
sub, err := psubs[1].Subscribe(topic,
|
||||
WithMaxConcurrency(tc.maxConcurrency),
|
||||
WithValidatorConcurrency(tc.maxConcurrency),
|
||||
WithValidator(func(ctx context.Context, msg *Message) bool {
|
||||
<-block
|
||||
return true
|
||||
@ -554,7 +554,7 @@ func TestValidateOverload(t *testing.T) {
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
|
||||
if len(tc.msgs) != tc.maxConcurrency+1 {
|
||||
t.Fatalf("expected number of messages sent to be defaultMaxConcurrency+1. Got %d, expected %d", len(tc.msgs), tc.maxConcurrency+1)
|
||||
t.Fatalf("expected number of messages sent to be maxConcurrency+1. Got %d, expected %d", len(tc.msgs), tc.maxConcurrency+1)
|
||||
}
|
||||
|
||||
p := psubs[0]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user