diff --git a/pubsub.go b/pubsub.go index 220bba1..b21e5ab 100644 --- a/pubsub.go +++ b/pubsub.go @@ -245,7 +245,7 @@ func (p *PubSub) processLoop(ctx context.Context) { p.pushMsg(vals, p.host.ID(), msg) case req := <-p.sendMsg: - p.maybePublishMessage(req.from, req.msg.Message) + p.publishMessage(req.from, req.msg.Message) case req := <-p.addVal: p.addValidator(req) @@ -409,6 +409,12 @@ func msgID(pmsg *pb.Message) string { // pushMsg pushes a message performing validation as necessary func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { + id := msgID(msg.Message) + if p.seenMessage(id) { + return + } + p.markSeen(id) + if len(vals) > 0 { // validation is asynchronous and globally throttled with the throttleValidate semaphore. // the purpose of the global throttle is to bound the goncurrency possible from incoming @@ -426,7 +432,7 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { return } - p.maybePublishMessage(src, msg.Message) + p.publishMessage(src, msg.Message) } // validate performs validation and only sends the message if all validators succeed @@ -476,13 +482,7 @@ loop: } } -func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { - id := msgID(pmsg) - if p.seenMessage(id) { - return - } - - p.markSeen(id) +func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) { p.notifySubs(pmsg) p.rt.Publish(from, pmsg) } @@ -580,7 +580,8 @@ type listPeerReq struct { topic string } -// sendReq is a request to call maybePublishMessage. It is issued after the subscription verification is done. +// sendReq is a request to call publishMessage. +// It is issued after message validation is done. type sendReq struct { from peer.ID msg *Message