diff --git a/gossipsub.go b/gossipsub.go index d34bfff..a399f88 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -25,7 +25,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er peers: make(map[peer.ID]protocol.ID), mesh: make(map[string]map[peer.ID]struct{}), fanout: make(map[string]map[peer.ID]struct{}), - mcache: NewMessageCache(5), + mcache: NewMessageCache(3, 5), } return NewPubSub(ctx, h, rt, opts...) } @@ -172,7 +172,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { } func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { - gs.mcache.Add(msg) + gs.mcache.Put(msg) tosend := make(map[peer.ID]struct{}) for _, topic := range msg.GetTopicIDs() { @@ -274,10 +274,10 @@ func (gs *GossipSubRouter) sendMessage(p peer.ID, out *RPC) { } func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) { - // TODO control messages and gossip piggyback + // TODO control message reliability and gossip piggyback // - control messages (GRAFT/PRUNE) must be reliable and should // be scheduled for piggyback or retry if the queue is full - // - gossip should be piggybacked on messages oppurtinistcally + // - gossip (IHAVE) should be piggybacked on messages oppurtinistcally mch, ok := gs.p.peers[p] if !ok { @@ -328,7 +328,70 @@ func (gs *GossipSubRouter) heartbeatTimer() { } func (gs *GossipSubRouter) heartbeat() { - // TODO + gs.mcache.Shift() + + tograft := make(map[peer.ID][]string) + toprune := make(map[peer.ID][]string) + + for topic, peers := range gs.mesh { + + if len(peers) < GossipSubDlo { + ineed := GossipSubD - len(peers) + plst := gs.getPeers(topic, func(p peer.ID) bool { + _, ok := peers[p] + return !ok + }) + + for _, p := range plst[:ineed] { + peers[p] = struct{}{} + topics := tograft[p] + tograft[p] = append(topics, topic) + } + } + + if len(peers) > GossipSubDhi { + idontneed := len(peers) - GossipSubD + plst := peerMapToList(peers) + shufflePeers(plst) + for _, p := range plst[:idontneed] { + delete(peers, p) + topics := toprune[p] + toprune[p] = append(topics, topic) + } + } + + // TODO gossip + } + + for p, topics := range tograft { + graft := make([]*pb.ControlGraft, 0, len(topics)) + for _, topic := range topics { + graft = append(graft, &pb.ControlGraft{TopicID: &topic}) + } + + var prune []*pb.ControlPrune + pruning, ok := toprune[p] + if ok { + delete(toprune, p) + prune = make([]*pb.ControlPrune, 0, len(pruning)) + for _, topic := range pruning { + prune = append(prune, &pb.ControlPrune{TopicID: &topic}) + } + } + + out := rpcWithControl(nil, nil, nil, graft, prune) + gs.sendControl(p, out) + } + + for p, topics := range toprune { + prune := make([]*pb.ControlPrune, 0, len(topics)) + for _, topic := range topics { + prune = append(prune, &pb.ControlPrune{TopicID: &topic}) + } + + out := rpcWithControl(nil, nil, nil, nil, prune) + gs.sendControl(p, out) + } } func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { @@ -339,6 +402,14 @@ func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { return pmap } +func peerMapToList(peers map[peer.ID]struct{}) []peer.ID { + plst := make([]peer.ID, 0, len(peers)) + for p := range peers { + plst = append(plst, p) + } + return plst +} + func shufflePeers(peers []peer.ID) { // TODO } diff --git a/mcache.go b/mcache.go index 01d78f6..0b79892 100644 --- a/mcache.go +++ b/mcache.go @@ -4,14 +4,14 @@ import ( pb "github.com/libp2p/go-floodsub/pb" ) -func NewMessageCache(win int) *MessageCache { +func NewMessageCache(gossip, history int) *MessageCache { return &MessageCache{} } type MessageCache struct { } -func (mc *MessageCache) Add(msg *pb.Message) { +func (mc *MessageCache) Put(msg *pb.Message) { // TODO } @@ -19,3 +19,7 @@ func (mc *MessageCache) Get(mid string) (*pb.Message, bool) { // TODO return nil, false } + +func (mc *MessageCache) Shift() { + // TODO +}