mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-05 22:33:10 +00:00
remove default async validation timeout and increase default topic validation throttle.
and some better documentation.
This commit is contained in:
parent
b227afbf9f
commit
b84a32a4ee
25
pubsub.go
25
pubsub.go
@ -22,8 +22,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultValidateTimeout = 150 * time.Millisecond
|
defaultValidateConcurrency = 1024
|
||||||
defaultValidateConcurrency = 100
|
|
||||||
defaultValidateThrottle = 8192
|
defaultValidateThrottle = 8192
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -227,7 +226,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WithValidateThrottle sets the upper bound on the number of active validation
|
// WithValidateThrottle sets the upper bound on the number of active validation
|
||||||
// goroutines.
|
// goroutines across all topics. The default is 8192.
|
||||||
func WithValidateThrottle(n int) Option {
|
func WithValidateThrottle(n int) Option {
|
||||||
return func(ps *PubSub) error {
|
return func(ps *PubSub) error {
|
||||||
ps.validateThrottle = make(chan struct{}, n)
|
ps.validateThrottle = make(chan struct{}, n)
|
||||||
@ -966,7 +965,8 @@ type Validator func(context.Context, peer.ID, *Message) bool
|
|||||||
// ValidatorOpt is an option for RegisterTopicValidator.
|
// ValidatorOpt is an option for RegisterTopicValidator.
|
||||||
type ValidatorOpt func(addVal *addValReq) error
|
type ValidatorOpt func(addVal *addValReq) error
|
||||||
|
|
||||||
// WithValidatorTimeout is an option that sets the topic validator timeout.
|
// WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator.
|
||||||
|
// By default there is no timeout in asynchronous validators.
|
||||||
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt {
|
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt {
|
||||||
return func(addVal *addValReq) error {
|
return func(addVal *addValReq) error {
|
||||||
addVal.timeout = timeout
|
addVal.timeout = timeout
|
||||||
@ -974,7 +974,8 @@ func WithValidatorTimeout(timeout time.Duration) ValidatorOpt {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithValidatorConcurrency is an option that sets topic validator throttle.
|
// WithValidatorConcurrency is an option that sets the topic validator throttle.
|
||||||
|
// This controls the number of active validation goroutines for the topic; the default is 1024.
|
||||||
func WithValidatorConcurrency(n int) ValidatorOpt {
|
func WithValidatorConcurrency(n int) ValidatorOpt {
|
||||||
return func(addVal *addValReq) error {
|
return func(addVal *addValReq) error {
|
||||||
addVal.throttle = n
|
addVal.throttle = n
|
||||||
@ -983,6 +984,9 @@ func WithValidatorConcurrency(n int) ValidatorOpt {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RegisterTopicValidator registers a validator for topic.
|
// RegisterTopicValidator registers a validator for topic.
|
||||||
|
// By default validators are asynchronous, which means they will run in a separate goroutine.
|
||||||
|
// The number of active goroutines is controlled by global and per topic validator
|
||||||
|
// throttles; if it exceeds the throttle threshold, messages will be dropped.
|
||||||
func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error {
|
func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error {
|
||||||
addVal := &addValReq{
|
addVal := &addValReq{
|
||||||
topic: topic,
|
topic: topic,
|
||||||
@ -1013,7 +1017,7 @@ func (ps *PubSub) addValidator(req *addValReq) {
|
|||||||
val := &topicVal{
|
val := &topicVal{
|
||||||
topic: topic,
|
topic: topic,
|
||||||
validate: req.validate,
|
validate: req.validate,
|
||||||
validateTimeout: defaultValidateTimeout,
|
validateTimeout: 0,
|
||||||
validateThrottle: make(chan struct{}, defaultValidateConcurrency),
|
validateThrottle: make(chan struct{}, defaultValidateConcurrency),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1054,10 +1058,13 @@ func (ps *PubSub) rmValidator(req *rmValReq) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) bool {
|
func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message) bool {
|
||||||
vctx, cancel := context.WithTimeout(ctx, val.validateTimeout)
|
if val.validateTimeout > 0 {
|
||||||
defer cancel()
|
var cancel func()
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, val.validateTimeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
valid := val.validate(vctx, src, msg)
|
valid := val.validate(ctx, src, msg)
|
||||||
if !valid {
|
if !valid {
|
||||||
log.Debugf("validation failed for topic %s", val.topic)
|
log.Debugf("validation failed for topic %s", val.topic)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user