mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-07 15:23:08 +00:00
add validation context for cancelation on aborts
This commit is contained in:
parent
fba445bc6d
commit
c95ed28496
@ -377,6 +377,9 @@ func (p *PubSub) pushMsg(subs []*Subscription, src peer.ID, msg *Message) {
|
|||||||
|
|
||||||
// validate performs validation and only sends the message if all validators succeed
|
// validate performs validation and only sends the message if all validators succeed
|
||||||
func (p *PubSub) validate(subs []*Subscription, src peer.ID, msg *Message) {
|
func (p *PubSub) validate(subs []*Subscription, src peer.ID, msg *Message) {
|
||||||
|
ctx, cancel := context.WithCancel(p.ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
results := make([]chan bool, 0, len(subs))
|
results := make([]chan bool, 0, len(subs))
|
||||||
throttle := false
|
throttle := false
|
||||||
|
|
||||||
@ -391,10 +394,10 @@ loop:
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case sub.validateThrottle <- struct{}{}:
|
case sub.validateThrottle <- struct{}{}:
|
||||||
go func(sub *Subscription, msg *Message, rch chan bool) {
|
go func(sub *Subscription, rch chan bool) {
|
||||||
rch <- sub.validateMsg(p.ctx, msg)
|
rch <- sub.validateMsg(ctx, msg)
|
||||||
<-sub.validateThrottle
|
<-sub.validateThrottle
|
||||||
}(sub, msg, rch)
|
}(sub, rch)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.Debugf("validation throttled for topic %s", sub.topic)
|
log.Debugf("validation throttled for topic %s", sub.topic)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user