go-libp2p-pubsub/mcache.go

87 lines
2.0 KiB
Go
Raw Normal View History

package pubsub
2018-02-19 14:13:18 +00:00
import (
"fmt"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
2018-02-19 14:13:18 +00:00
)
// NewMessageCache creates a sliding window cache that remembers messages for as
// long as `history` slots.
//
// When queried for messages to advertise, the cache only returns messages in
// the last `gossip` slots.
//
// The `gossip` parameter must be smaller or equal to `history`, or this
// function will panic.
//
// The slack between `gossip` and `history` accounts for the reaction time
// between when a message is advertised via IHAVE gossip, and the peer pulls it
// via an IWANT command.
func NewMessageCache(gossip, history int) *MessageCache {
if gossip > history {
err := fmt.Errorf("invalid parameters for message cache; gossip slots (%d) cannot be larger than history slots (%d)",
gossip, history)
panic(err)
}
2018-02-20 11:56:23 +00:00
return &MessageCache{
msgs: make(map[string]*pb.Message),
history: make([][]CacheEntry, history),
gossip: gossip,
msgID: DefaultMsgIdFn,
2018-02-20 11:56:23 +00:00
}
2018-02-19 14:13:18 +00:00
}
type MessageCache struct {
2018-02-20 11:56:23 +00:00
msgs map[string]*pb.Message
history [][]CacheEntry
gossip int
msgID MsgIdFunction
}
func (mc *MessageCache) ChangeMsgIdFn(msgID MsgIdFunction) {
mc.msgID = msgID
2018-02-20 11:56:23 +00:00
}
type CacheEntry struct {
mid string
topics []string
2018-02-19 14:13:18 +00:00
}
func (mc *MessageCache) Put(msg *pb.Message) {
mid := mc.msgID(msg)
2018-02-20 11:56:23 +00:00
mc.msgs[mid] = msg
mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topics: msg.GetTopicIDs()})
2018-02-19 14:13:18 +00:00
}
2018-02-19 16:45:10 +00:00
func (mc *MessageCache) Get(mid string) (*pb.Message, bool) {
2018-02-20 11:56:23 +00:00
m, ok := mc.msgs[mid]
return m, ok
2018-02-19 16:45:10 +00:00
}
2018-02-20 10:08:18 +00:00
func (mc *MessageCache) GetGossipIDs(topic string) []string {
2018-02-20 11:56:23 +00:00
var mids []string
for _, entries := range mc.history[:mc.gossip] {
for _, entry := range entries {
for _, t := range entry.topics {
if t == topic {
mids = append(mids, entry.mid)
break
}
}
}
}
return mids
2018-02-20 10:08:18 +00:00
}
func (mc *MessageCache) Shift() {
2018-02-20 11:56:23 +00:00
last := mc.history[len(mc.history)-1]
for _, entry := range last {
delete(mc.msgs, entry.mid)
}
for i := len(mc.history) - 2; i >= 0; i-- {
mc.history[i+1] = mc.history[i]
}
mc.history[0] = nil
}