From 64e1b8b0d4060d2059f8863359af7d92773f0a35 Mon Sep 17 00:00:00 2001 From: Yusef Napora Date: Wed, 13 May 2020 18:41:23 -0400 Subject: [PATCH] use map to track near-first deliveries in tagTracer --- tag_tracer.go | 101 +++++++++++++++++--------------------------------- 1 file changed, 34 insertions(+), 67 deletions(-) diff --git a/tag_tracer.go b/tag_tracer.go index d063b87..480f000 100644 --- a/tag_tracer.go +++ b/tag_tracer.go @@ -1,7 +1,6 @@ package pubsub import ( - "context" "fmt" "sync" "time" @@ -61,9 +60,9 @@ type tagTracer struct { decaying map[string]connmgr.DecayingTag direct map[peer.ID]struct{} - // track message deliveries to reward "near first" deliveries - // (a delivery that occurs while we're still validating the message) - deliveries *messageDeliveries + // a map of message ids to the set of peers who delivered the message after the first delivery, + // but before the message was finished validating + nearFirst map[string]map[peer.ID]struct{} } func newTagTracer(cmgr connmgr.ConnManager) *tagTracer { @@ -72,11 +71,11 @@ func newTagTracer(cmgr connmgr.ConnManager) *tagTracer { log.Warnf("connection manager does not support decaying tags, delivery tags will not be applied") } return &tagTracer{ - cmgr: cmgr, - msgID: DefaultMsgIdFn, - decayer: decayer, - decaying: make(map[string]connmgr.DecayingTag), - deliveries: &messageDeliveries{records: make(map[string]*deliveryRecord)}, + cmgr: cmgr, + msgID: DefaultMsgIdFn, + decayer: decayer, + decaying: make(map[string]connmgr.DecayingTag), + nearFirst: make(map[string]map[peer.ID]struct{}), } } @@ -87,27 +86,6 @@ func (t *tagTracer) Start(gs *GossipSubRouter) { t.msgID = gs.p.msgID t.direct = gs.direct - go t.background(gs.p.ctx) -} - -func (t *tagTracer) background(ctx context.Context) { - gcDeliveryRecords := time.NewTicker(time.Minute) - defer gcDeliveryRecords.Stop() - - for { - select { - case <-gcDeliveryRecords.C: - t.gcDeliveryRecords() - case <-ctx.Done(): - return - } - } -} - -func (t *tagTracer) gcDeliveryRecords() { - t.Lock() - defer t.Unlock() - t.deliveries.gc() } func (t *tagTracer) tagPeerIfDirect(p peer.ID) { @@ -205,27 +183,15 @@ func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) { func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID { t.Lock() defer t.Unlock() - drec := t.deliveries.getRecord(t.msgID(msg.Message)) - nearFirstPeers := make([]peer.ID, 0, len(drec.peers)) - // defensive check that this is the first delivery trace -- delivery status should be unknown - if drec.status != deliveryUnknown { - log.Warnf("unexpected delivery trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Now().Sub(drec.firstSeen), drec.status) - return nearFirstPeers + peersMap, ok := t.nearFirst[t.msgID(msg.Message)] + if !ok { + return []peer.ID{} } - - drec.status = deliveryValid - drec.validated = time.Now() - - for p := range drec.peers { - // this check is to make sure a peer can't send us a message twice and get a double count - // if it is a first delivery. - if p != msg.ReceivedFrom { - nearFirstPeers = append(nearFirstPeers, p) - } + peers := make([]peer.ID, 0, len(peersMap)) + for p := range peersMap { + peers = append(peers, p) } - // we're done with the peers map and can reclaim the memory - drec.peers = nil - return nearFirstPeers + return peers } // -- internalTracer interface methods @@ -246,6 +212,11 @@ func (t *tagTracer) DeliverMessage(msg *Message) { for _, p := range nearFirst { t.bumpTagsForMessage(p, msg) } + + // delete the delivery state for this message + t.Lock() + delete(t.nearFirst, t.msgID(msg.Message)) + t.Unlock() } func (t *tagTracer) Leave(topic string) { @@ -264,37 +235,33 @@ func (t *tagTracer) ValidateMessage(msg *Message) { t.Lock() defer t.Unlock() - // create a delivery record for the message - _ = t.deliveries.getRecord(t.msgID(msg.Message)) + // create map to start tracking the peers who deliver while we're validating + id := t.msgID(msg.Message) + if _, exists := t.nearFirst[id]; exists { + return + } + t.nearFirst[id] = make(map[peer.ID]struct{}) } func (t *tagTracer) DuplicateMessage(msg *Message) { t.Lock() defer t.Unlock() - drec := t.deliveries.getRecord(t.msgID(msg.Message)) - if drec.status == deliveryUnknown { - // the message is being validated; track the peer delivery and wait for - // the Deliver/Reject notification. - drec.peers[msg.ReceivedFrom] = struct{}{} + id := t.msgID(msg.Message) + peers, ok := t.nearFirst[id] + if !ok { + // the peers map should have been created in ValidateMessage + return } + peers[msg.ReceivedFrom] = struct{}{} } func (t *tagTracer) RejectMessage(msg *Message, reason string) { t.Lock() defer t.Unlock() - // mark message as invalid and release tracking info - drec := t.deliveries.getRecord(t.msgID(msg.Message)) - - // defensive check that this is the first rejection trace -- delivery status should be unknown - if drec.status != deliveryUnknown { - log.Warnf("unexpected rejection trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Now().Sub(drec.firstSeen), drec.status) - return - } - - drec.status = deliveryInvalid - drec.peers = nil + // stop tracking near-first deliveries for rejected message + delete(t.nearFirst, t.msgID(msg.Message)) } func (t *tagTracer) RemovePeer(peer.ID) {}