mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-02-02 19:53:06 +00:00
simplify duplicate mesh delivery tracking
This commit is contained in:
parent
9c0b96f4e7
commit
ca6778bc83
32
score.go
32
score.go
@ -144,7 +144,7 @@ type messageDeliveries struct {
|
||||
type deliveryRecord struct {
|
||||
firstSeen time.Time
|
||||
status int
|
||||
peers map[peer.ID]time.Time
|
||||
peers map[peer.ID]struct{}
|
||||
}
|
||||
|
||||
// delivery record status
|
||||
@ -375,10 +375,10 @@ func (ps *peerScore) DeliverMessage(msg *Message) {
|
||||
|
||||
drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
|
||||
|
||||
// mark the message as valid and reward mesh peers that have forwarded it in time
|
||||
// mark the message as valid and reward mesh peers that have already forwarded it to us
|
||||
drec.status = delivery_valid
|
||||
for p, t := range drec.peers {
|
||||
ps.markDuplicateMessageDelivery(p, msg, drec.firstSeen, t)
|
||||
for p := range drec.peers {
|
||||
ps.markDuplicateMessageDelivery(p, msg, time.Time{})
|
||||
}
|
||||
}
|
||||
|
||||
@ -446,16 +446,14 @@ func (ps *peerScore) DuplicateMessage(msg *Message) {
|
||||
|
||||
switch drec.status {
|
||||
case delivery_unknown:
|
||||
// the message is being validated; track the peer delivery time and wait for
|
||||
// the message is being validated; track the peer delivery and wait for
|
||||
// the Deliver/Reject notification.
|
||||
now := time.Now()
|
||||
drec.peers[msg.ReceivedFrom] = now
|
||||
drec.peers[msg.ReceivedFrom] = struct{}{}
|
||||
|
||||
case delivery_valid:
|
||||
// mark the peer delivery time to only count a duplicate delivery once.
|
||||
now := time.Now()
|
||||
drec.peers[msg.ReceivedFrom] = now
|
||||
ps.markDuplicateMessageDelivery(msg.ReceivedFrom, msg, drec.firstSeen, now)
|
||||
drec.peers[msg.ReceivedFrom] = struct{}{}
|
||||
ps.markDuplicateMessageDelivery(msg.ReceivedFrom, msg, drec.firstSeen)
|
||||
|
||||
case delivery_invalid:
|
||||
// we no longer track delivery time
|
||||
@ -536,12 +534,18 @@ func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) {
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, firstSeen, deliveryTime time.Time) {
|
||||
func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, firstSeen time.Time) {
|
||||
var now time.Time
|
||||
|
||||
pstats, ok := ps.peerStats[p]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if !firstSeen.IsZero() {
|
||||
now = time.Now()
|
||||
}
|
||||
|
||||
for _, topic := range msg.GetTopicIDs() {
|
||||
tstats, ok := pstats.getTopicStats(topic, ps.params)
|
||||
if !ok {
|
||||
@ -552,8 +556,10 @@ func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, first
|
||||
continue
|
||||
}
|
||||
|
||||
// check against the mesh delivery window
|
||||
if deliveryTime.After(firstSeen.Add(ps.params.Topics[topic].MeshMessageDeliveriesWindow)) {
|
||||
// check against the mesh delivery window -- if the firstSeen time is passed as 0, then now
|
||||
// will also be zero and the message was received before we finished validation and thus falls
|
||||
// within the mesh delivery window.
|
||||
if !now.IsZero() && now.After(firstSeen.Add(ps.params.Topics[topic].MeshMessageDeliveriesWindow)) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user