mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-08 15:53:07 +00:00
invoke user validators once for each message
This commit is contained in:
parent
7a38f7642e
commit
b227afbf9f
36
pubsub.go
36
pubsub.go
@ -6,6 +6,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -106,8 +107,10 @@ type PubSub struct {
|
|||||||
blacklist Blacklist
|
blacklist Blacklist
|
||||||
blacklistPeer chan peer.ID
|
blacklistPeer chan peer.ID
|
||||||
|
|
||||||
peers map[peer.ID]chan *RPC
|
peers map[peer.ID]chan *RPC
|
||||||
seenMessages *timecache.TimeCache
|
|
||||||
|
seenMessagesMx sync.Mutex
|
||||||
|
seenMessages *timecache.TimeCache
|
||||||
|
|
||||||
// key for signing messages; nil when signing is disabled (default for now)
|
// key for signing messages; nil when signing is disabled (default for now)
|
||||||
signKey crypto.PrivKey
|
signKey crypto.PrivKey
|
||||||
@ -552,12 +555,22 @@ func (p *PubSub) notifySubs(msg *pb.Message) {
|
|||||||
|
|
||||||
// seenMessage returns whether we already saw this message before
|
// seenMessage returns whether we already saw this message before
|
||||||
func (p *PubSub) seenMessage(id string) bool {
|
func (p *PubSub) seenMessage(id string) bool {
|
||||||
|
p.seenMessagesMx.Lock()
|
||||||
|
defer p.seenMessagesMx.Unlock()
|
||||||
return p.seenMessages.Has(id)
|
return p.seenMessages.Has(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
|
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
|
||||||
func (p *PubSub) markSeen(id string) {
|
// returns true if the message was freshly marked
|
||||||
|
func (p *PubSub) markSeen(id string) bool {
|
||||||
|
p.seenMessagesMx.Lock()
|
||||||
|
defer p.seenMessagesMx.Unlock()
|
||||||
|
if p.seenMessages.Has(id) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
p.seenMessages.Add(id)
|
p.seenMessages.Add(id)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// subscribedToMessage returns whether we are subscribed to one of the topics
|
// subscribedToMessage returns whether we are subscribed to one of the topics
|
||||||
@ -649,7 +662,9 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p.publishMessage(src, msg.Message)
|
if p.markSeen(id) {
|
||||||
|
p.publishMessage(src, msg.Message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PubSub) validateWorker() {
|
func (p *PubSub) validateWorker() {
|
||||||
@ -674,6 +689,13 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we can mark the message as seen now that we have verified the signature
|
||||||
|
// and avoid invoking user validators more than once
|
||||||
|
id := msgID(msg.Message)
|
||||||
|
if !p.markSeen(id) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if len(vals) > 0 {
|
if len(vals) > 0 {
|
||||||
select {
|
select {
|
||||||
case p.validateThrottle <- struct{}{}:
|
case p.validateThrottle <- struct{}{}:
|
||||||
@ -779,12 +801,6 @@ func (p *PubSub) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) b
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) {
|
func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) {
|
||||||
id := msgID(pmsg)
|
|
||||||
if p.seenMessage(id) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
p.markSeen(id)
|
|
||||||
|
|
||||||
p.notifySubs(pmsg)
|
p.notifySubs(pmsg)
|
||||||
p.rt.Publish(from, pmsg)
|
p.rt.Publish(from, pmsg)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user