mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-09 00:03:09 +00:00
Merge pull request #255 from keep-network/configurable-validation-queue
Configurable size of validate queue
This commit is contained in:
commit
25c434f5f7
@ -159,7 +159,7 @@ type PubSubRouter interface {
|
|||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
*pb.Message
|
*pb.Message
|
||||||
ReceivedFrom peer.ID
|
ReceivedFrom peer.ID
|
||||||
ValidatorData interface{}
|
ValidatorData interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -10,6 +10,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
defaultValidateQueueSize = 32
|
||||||
defaultValidateConcurrency = 1024
|
defaultValidateConcurrency = 1024
|
||||||
defaultValidateThrottle = 8192
|
defaultValidateThrottle = 8192
|
||||||
)
|
)
|
||||||
@ -82,7 +83,7 @@ type rmValReq struct {
|
|||||||
func newValidation() *validation {
|
func newValidation() *validation {
|
||||||
return &validation{
|
return &validation{
|
||||||
topicVals: make(map[string]*topicVal),
|
topicVals: make(map[string]*topicVal),
|
||||||
validateQ: make(chan *validateReq, 32),
|
validateQ: make(chan *validateReq, defaultValidateQueueSize),
|
||||||
validateThrottle: make(chan struct{}, defaultValidateThrottle),
|
validateThrottle: make(chan struct{}, defaultValidateThrottle),
|
||||||
validateWorkers: runtime.NumCPU(),
|
validateWorkers: runtime.NumCPU(),
|
||||||
}
|
}
|
||||||
@ -342,6 +343,18 @@ func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message)
|
|||||||
|
|
||||||
/// Options
|
/// Options
|
||||||
|
|
||||||
|
// WithValidateQueueSize sets the buffer of validate queue. Defaults to 32.
|
||||||
|
// When queue is full, validation is throttled and new messages are dropped.
|
||||||
|
func WithValidateQueueSize(n int) Option {
|
||||||
|
return func(ps *PubSub) error {
|
||||||
|
if n > 0 {
|
||||||
|
ps.val.validateQ = make(chan *validateReq, n)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("validate queue size must be > 0")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithValidateThrottle sets the upper bound on the number of active validation
|
// WithValidateThrottle sets the upper bound on the number of active validation
|
||||||
// goroutines across all topics. The default is 8192.
|
// goroutines across all topics. The default is 8192.
|
||||||
func WithValidateThrottle(n int) Option {
|
func WithValidateThrottle(n int) Option {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user