use map to track near-first deliveries in tagTracer
This commit is contained in:
parent
cc0ba20627
commit
64e1b8b0d4
101
tag_tracer.go
101
tag_tracer.go
|
@ -1,7 +1,6 @@
|
|||
package pubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -61,9 +60,9 @@ type tagTracer struct {
|
|||
decaying map[string]connmgr.DecayingTag
|
||||
direct map[peer.ID]struct{}
|
||||
|
||||
// track message deliveries to reward "near first" deliveries
|
||||
// (a delivery that occurs while we're still validating the message)
|
||||
deliveries *messageDeliveries
|
||||
// a map of message ids to the set of peers who delivered the message after the first delivery,
|
||||
// but before the message was finished validating
|
||||
nearFirst map[string]map[peer.ID]struct{}
|
||||
}
|
||||
|
||||
func newTagTracer(cmgr connmgr.ConnManager) *tagTracer {
|
||||
|
@ -72,11 +71,11 @@ func newTagTracer(cmgr connmgr.ConnManager) *tagTracer {
|
|||
log.Warnf("connection manager does not support decaying tags, delivery tags will not be applied")
|
||||
}
|
||||
return &tagTracer{
|
||||
cmgr: cmgr,
|
||||
msgID: DefaultMsgIdFn,
|
||||
decayer: decayer,
|
||||
decaying: make(map[string]connmgr.DecayingTag),
|
||||
deliveries: &messageDeliveries{records: make(map[string]*deliveryRecord)},
|
||||
cmgr: cmgr,
|
||||
msgID: DefaultMsgIdFn,
|
||||
decayer: decayer,
|
||||
decaying: make(map[string]connmgr.DecayingTag),
|
||||
nearFirst: make(map[string]map[peer.ID]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,27 +86,6 @@ func (t *tagTracer) Start(gs *GossipSubRouter) {
|
|||
|
||||
t.msgID = gs.p.msgID
|
||||
t.direct = gs.direct
|
||||
go t.background(gs.p.ctx)
|
||||
}
|
||||
|
||||
func (t *tagTracer) background(ctx context.Context) {
|
||||
gcDeliveryRecords := time.NewTicker(time.Minute)
|
||||
defer gcDeliveryRecords.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-gcDeliveryRecords.C:
|
||||
t.gcDeliveryRecords()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tagTracer) gcDeliveryRecords() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.deliveries.gc()
|
||||
}
|
||||
|
||||
func (t *tagTracer) tagPeerIfDirect(p peer.ID) {
|
||||
|
@ -205,27 +183,15 @@ func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) {
|
|||
func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
drec := t.deliveries.getRecord(t.msgID(msg.Message))
|
||||
nearFirstPeers := make([]peer.ID, 0, len(drec.peers))
|
||||
// defensive check that this is the first delivery trace -- delivery status should be unknown
|
||||
if drec.status != deliveryUnknown {
|
||||
log.Warnf("unexpected delivery trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Now().Sub(drec.firstSeen), drec.status)
|
||||
return nearFirstPeers
|
||||
peersMap, ok := t.nearFirst[t.msgID(msg.Message)]
|
||||
if !ok {
|
||||
return []peer.ID{}
|
||||
}
|
||||
|
||||
drec.status = deliveryValid
|
||||
drec.validated = time.Now()
|
||||
|
||||
for p := range drec.peers {
|
||||
// this check is to make sure a peer can't send us a message twice and get a double count
|
||||
// if it is a first delivery.
|
||||
if p != msg.ReceivedFrom {
|
||||
nearFirstPeers = append(nearFirstPeers, p)
|
||||
}
|
||||
peers := make([]peer.ID, 0, len(peersMap))
|
||||
for p := range peersMap {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
// we're done with the peers map and can reclaim the memory
|
||||
drec.peers = nil
|
||||
return nearFirstPeers
|
||||
return peers
|
||||
}
|
||||
|
||||
// -- internalTracer interface methods
|
||||
|
@ -246,6 +212,11 @@ func (t *tagTracer) DeliverMessage(msg *Message) {
|
|||
for _, p := range nearFirst {
|
||||
t.bumpTagsForMessage(p, msg)
|
||||
}
|
||||
|
||||
// delete the delivery state for this message
|
||||
t.Lock()
|
||||
delete(t.nearFirst, t.msgID(msg.Message))
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *tagTracer) Leave(topic string) {
|
||||
|
@ -264,37 +235,33 @@ func (t *tagTracer) ValidateMessage(msg *Message) {
|
|||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
// create a delivery record for the message
|
||||
_ = t.deliveries.getRecord(t.msgID(msg.Message))
|
||||
// create map to start tracking the peers who deliver while we're validating
|
||||
id := t.msgID(msg.Message)
|
||||
if _, exists := t.nearFirst[id]; exists {
|
||||
return
|
||||
}
|
||||
t.nearFirst[id] = make(map[peer.ID]struct{})
|
||||
}
|
||||
|
||||
func (t *tagTracer) DuplicateMessage(msg *Message) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
drec := t.deliveries.getRecord(t.msgID(msg.Message))
|
||||
if drec.status == deliveryUnknown {
|
||||
// the message is being validated; track the peer delivery and wait for
|
||||
// the Deliver/Reject notification.
|
||||
drec.peers[msg.ReceivedFrom] = struct{}{}
|
||||
id := t.msgID(msg.Message)
|
||||
peers, ok := t.nearFirst[id]
|
||||
if !ok {
|
||||
// the peers map should have been created in ValidateMessage
|
||||
return
|
||||
}
|
||||
peers[msg.ReceivedFrom] = struct{}{}
|
||||
}
|
||||
|
||||
func (t *tagTracer) RejectMessage(msg *Message, reason string) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
// mark message as invalid and release tracking info
|
||||
drec := t.deliveries.getRecord(t.msgID(msg.Message))
|
||||
|
||||
// defensive check that this is the first rejection trace -- delivery status should be unknown
|
||||
if drec.status != deliveryUnknown {
|
||||
log.Warnf("unexpected rejection trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Now().Sub(drec.firstSeen), drec.status)
|
||||
return
|
||||
}
|
||||
|
||||
drec.status = deliveryInvalid
|
||||
drec.peers = nil
|
||||
// stop tracking near-first deliveries for rejected message
|
||||
delete(t.nearFirst, t.msgID(msg.Message))
|
||||
}
|
||||
|
||||
func (t *tagTracer) RemovePeer(peer.ID) {}
|
||||
|
|
Loading…
Reference in New Issue