diff --git a/pubsub.go b/pubsub.go index a9fe375..a3638ee 100644 --- a/pubsub.go +++ b/pubsub.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand" "runtime" + "sync" "sync/atomic" "time" @@ -106,8 +107,10 @@ type PubSub struct { blacklist Blacklist blacklistPeer chan peer.ID - peers map[peer.ID]chan *RPC - seenMessages *timecache.TimeCache + peers map[peer.ID]chan *RPC + + seenMessagesMx sync.Mutex + seenMessages *timecache.TimeCache // key for signing messages; nil when signing is disabled (default for now) signKey crypto.PrivKey @@ -552,12 +555,22 @@ func (p *PubSub) notifySubs(msg *pb.Message) { // seenMessage returns whether we already saw this message before func (p *PubSub) seenMessage(id string) bool { + p.seenMessagesMx.Lock() + defer p.seenMessagesMx.Unlock() return p.seenMessages.Has(id) } // markSeen marks a message as seen such that seenMessage returns `true' for the given id -func (p *PubSub) markSeen(id string) { +// returns true if the message was freshly marked +func (p *PubSub) markSeen(id string) bool { + p.seenMessagesMx.Lock() + defer p.seenMessagesMx.Unlock() + if p.seenMessages.Has(id) { + return false + } + p.seenMessages.Add(id) + return true } // subscribedToMessage returns whether we are subscribed to one of the topics @@ -649,7 +662,9 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { return } - p.publishMessage(src, msg.Message) + if p.markSeen(id) { + p.publishMessage(src, msg.Message) + } } func (p *PubSub) validateWorker() { @@ -674,6 +689,13 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) { } } + // we can mark the message as seen now that we have verified the signature + // and avoid invoking user validators more than once + id := msgID(msg.Message) + if !p.markSeen(id) { + return + } + if len(vals) > 0 { select { case p.validateThrottle <- struct{}{}: @@ -779,12 +801,6 @@ func (p *PubSub) validateSingleTopic(val *topicVal, src peer.ID, msg *Message) b } func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) { - id := msgID(pmsg) - if p.seenMessage(id) { - return - } - p.markSeen(id) - p.notifySubs(pmsg) p.rt.Publish(from, pmsg) }