From b09c9d1a48e34ced756199b918a03deb5e1ef255 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 19 Feb 2018 14:04:47 +0200 Subject: [PATCH] check and mark seen messages prior to validation this allows us to avoid revalidating messages, either because they were concurrently received or were previously rejected by the validator. also allows us to filter invalid messages from gossip. --- pubsub.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pubsub.go b/pubsub.go index 220bba1..b21e5ab 100644 --- a/pubsub.go +++ b/pubsub.go @@ -245,7 +245,7 @@ func (p *PubSub) processLoop(ctx context.Context) { p.pushMsg(vals, p.host.ID(), msg) case req := <-p.sendMsg: - p.maybePublishMessage(req.from, req.msg.Message) + p.publishMessage(req.from, req.msg.Message) case req := <-p.addVal: p.addValidator(req) @@ -409,6 +409,12 @@ func msgID(pmsg *pb.Message) string { // pushMsg pushes a message performing validation as necessary func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { + id := msgID(msg.Message) + if p.seenMessage(id) { + return + } + p.markSeen(id) + if len(vals) > 0 { // validation is asynchronous and globally throttled with the throttleValidate semaphore. // the purpose of the global throttle is to bound the goncurrency possible from incoming @@ -426,7 +432,7 @@ func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { return } - p.maybePublishMessage(src, msg.Message) + p.publishMessage(src, msg.Message) } // validate performs validation and only sends the message if all validators succeed @@ -476,13 +482,7 @@ loop: } } -func (p *PubSub) maybePublishMessage(from peer.ID, pmsg *pb.Message) { - id := msgID(pmsg) - if p.seenMessage(id) { - return - } - - p.markSeen(id) +func (p *PubSub) publishMessage(from peer.ID, pmsg *pb.Message) { p.notifySubs(pmsg) p.rt.Publish(from, pmsg) } @@ -580,7 +580,8 @@ type listPeerReq struct { topic string } -// sendReq is a request to call maybePublishMessage. It is issued after the subscription verification is done. +// sendReq is a request to call publishMessage. +// It is issued after message validation is done. type sendReq struct { from peer.ID msg *Message