mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-05-20 23:49:31 +00:00
track peer (re)transmissions in message cache
This commit is contained in:
parent
f9d29c47b6
commit
5b55f0f78d
21
mcache.go
21
mcache.go
@ -4,6 +4,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewMessageCache creates a sliding window cache that remembers messages for as
|
// NewMessageCache creates a sliding window cache that remembers messages for as
|
||||||
@ -26,6 +28,7 @@ func NewMessageCache(gossip, history int) *MessageCache {
|
|||||||
}
|
}
|
||||||
return &MessageCache{
|
return &MessageCache{
|
||||||
msgs: make(map[string]*pb.Message),
|
msgs: make(map[string]*pb.Message),
|
||||||
|
peertx: make(map[string]map[peer.ID]int),
|
||||||
history: make([][]CacheEntry, history),
|
history: make([][]CacheEntry, history),
|
||||||
gossip: gossip,
|
gossip: gossip,
|
||||||
msgID: DefaultMsgIdFn,
|
msgID: DefaultMsgIdFn,
|
||||||
@ -34,6 +37,7 @@ func NewMessageCache(gossip, history int) *MessageCache {
|
|||||||
|
|
||||||
type MessageCache struct {
|
type MessageCache struct {
|
||||||
msgs map[string]*pb.Message
|
msgs map[string]*pb.Message
|
||||||
|
peertx map[string]map[peer.ID]int
|
||||||
history [][]CacheEntry
|
history [][]CacheEntry
|
||||||
gossip int
|
gossip int
|
||||||
msgID MsgIdFunction
|
msgID MsgIdFunction
|
||||||
@ -59,6 +63,22 @@ func (mc *MessageCache) Get(mid string) (*pb.Message, bool) {
|
|||||||
return m, ok
|
return m, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mc *MessageCache) GetForPeer(mid string, p peer.ID) (*pb.Message, int, bool) {
|
||||||
|
m, ok := mc.msgs[mid]
|
||||||
|
if !ok {
|
||||||
|
return nil, 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
tx, ok := mc.peertx[mid]
|
||||||
|
if !ok {
|
||||||
|
tx = make(map[peer.ID]int)
|
||||||
|
mc.peertx[mid] = tx
|
||||||
|
}
|
||||||
|
tx[p]++
|
||||||
|
|
||||||
|
return m, tx[p], true
|
||||||
|
}
|
||||||
|
|
||||||
func (mc *MessageCache) GetGossipIDs(topic string) []string {
|
func (mc *MessageCache) GetGossipIDs(topic string) []string {
|
||||||
var mids []string
|
var mids []string
|
||||||
for _, entries := range mc.history[:mc.gossip] {
|
for _, entries := range mc.history[:mc.gossip] {
|
||||||
@ -78,6 +98,7 @@ func (mc *MessageCache) Shift() {
|
|||||||
last := mc.history[len(mc.history)-1]
|
last := mc.history[len(mc.history)-1]
|
||||||
for _, entry := range last {
|
for _, entry := range last {
|
||||||
delete(mc.msgs, entry.mid)
|
delete(mc.msgs, entry.mid)
|
||||||
|
delete(mc.peertx, entry.mid)
|
||||||
}
|
}
|
||||||
for i := len(mc.history) - 2; i >= 0; i-- {
|
for i := len(mc.history) - 2; i >= 0; i-- {
|
||||||
mc.history[i+1] = mc.history[i]
|
mc.history[i+1] = mc.history[i]
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user