From 7251c64e65b83723c2d499ce04f16b072b87cf64 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 20 Feb 2018 12:08:18 +0200 Subject: [PATCH] control message piggybacking logic --- gossipsub.go | 139 +++++++++++++++++++++++++++++++++------------------ mcache.go | 4 ++ 2 files changed, 94 insertions(+), 49 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 849ed4c..4f7ac3d 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -22,20 +22,24 @@ const ( func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { rt := &GossipSubRouter{ - peers: make(map[peer.ID]protocol.ID), - mesh: make(map[string]map[peer.ID]struct{}), - fanout: make(map[string]map[peer.ID]struct{}), - mcache: NewMessageCache(3, 5), + peers: make(map[peer.ID]protocol.ID), + mesh: make(map[string]map[peer.ID]struct{}), + fanout: make(map[string]map[peer.ID]struct{}), + gossip: make(map[peer.ID][]*pb.ControlIHave), + control: make(map[peer.ID]*pb.ControlMessage), + mcache: NewMessageCache(3, 5), } return NewPubSub(ctx, h, rt, opts...) } type GossipSubRouter struct { - p *PubSub - peers map[peer.ID]protocol.ID // peer protocols - mesh map[string]map[peer.ID]struct{} // topic meshes - fanout map[string]map[peer.ID]struct{} // topic fanout - mcache *MessageCache + p *PubSub + peers map[peer.ID]protocol.ID // peer protocols + mesh map[string]map[peer.ID]struct{} // topic meshes + fanout map[string]map[peer.ID]struct{} // topic fanout + gossip map[peer.ID][]*pb.ControlIHave // pending gossip + control map[peer.ID]*pb.ControlMessage // pending control messages + mcache *MessageCache } func (gs *GossipSubRouter) Protocols() []protocol.ID { @@ -77,11 +81,7 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { } out := rpcWithControl(ihave, nil, iwant, nil, prune) - if len(prune) == 0 { - gs.sendMessage(rpc.from, out) - } else { - gs.sendControl(rpc.from, out) - } + gs.sendRPC(rpc.from, out) } func (gs *GossipSubRouter) handleIHave(ctl *pb.ControlMessage) []*pb.ControlIWant { @@ -216,7 +216,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { continue } - gs.sendMessage(pid, out) + gs.sendRPC(pid, out) } } @@ -256,28 +256,29 @@ func (gs *GossipSubRouter) Leave(topic string) { func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}} out := rpcWithControl(nil, nil, nil, graft, nil) - gs.sendControl(p, out) + gs.sendRPC(p, out) } func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}} out := rpcWithControl(nil, nil, nil, nil, prune) - gs.sendControl(p, out) + gs.sendRPC(p, out) } -func (gs *GossipSubRouter) sendControl(p peer.ID, out *RPC) { - gs.sendRPC(p, out, true) -} +func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { + // piggyback cotrol message retries + ctl, ok := gs.control[p] + if ok { + gs.piggybackControl(p, out, ctl) + delete(gs.control, p) + } -func (gs *GossipSubRouter) sendMessage(p peer.ID, out *RPC) { - gs.sendRPC(p, out, false) -} - -func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) { - // TODO control message reliability and gossip piggyback - // - control messages (GRAFT/PRUNE) must be reliable and should - // be scheduled for piggyback or retry if the queue is full - // - gossip (IHAVE) should be piggybacked on messages oppurtinistcally + // piggyback gossip + ihave, ok := gs.gossip[p] + if ok { + gs.piggybackGossip(p, out, ihave) + delete(gs.gossip, p) + } mch, ok := gs.p.peers[p] if !ok { @@ -288,25 +289,12 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) { case mch <- out: default: log.Infof("dropping message to peer %s: queue full", p) - } -} - -func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []peer.ID { - tmap, ok := gs.p.topics[topic] - if !ok { - return nil - } - - peers := make([]peer.ID, 0, len(tmap)) - for p := range tmap { - if gs.peers[p] == GossipSubID && filter(p) { - peers = append(peers, p) + // push control messages that need to be retried + ctl := out.GetControl() + if ctl != nil { + gs.pushControl(p, ctl) } } - - shufflePeers(peers) - - return peers } func (gs *GossipSubRouter) heartbeatTimer() { @@ -330,6 +318,10 @@ func (gs *GossipSubRouter) heartbeatTimer() { func (gs *GossipSubRouter) heartbeat() { gs.mcache.Shift() + // flush pending control message from retries and gossip + // that hasn't been piggybacked since the last heartbeat + gs.flush() + tograft := make(map[peer.ID][]string) toprune := make(map[peer.ID][]string) @@ -371,7 +363,16 @@ func (gs *GossipSubRouter) heartbeat() { } } - // TODO gossip + // schedule gossip + mids := gs.mcache.GetGossipIDs(topic) + if len(mids) == 0 { + continue + } + + gpeers := gs.getPeers(topic, func(peer.ID) bool { return true }) + for _, p := range gpeers[:GossipSubD] { + gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids}) + } } // send coalesced GRAFT/PRUNE messages @@ -392,7 +393,7 @@ func (gs *GossipSubRouter) heartbeat() { } out := rpcWithControl(nil, nil, nil, graft, prune) - gs.sendControl(p, out) + gs.sendRPC(p, out) } for p, topics := range toprune { @@ -402,7 +403,7 @@ func (gs *GossipSubRouter) heartbeat() { } out := rpcWithControl(nil, nil, nil, nil, prune) - gs.sendControl(p, out) + gs.sendRPC(p, out) } // maintain our fanout for topics we are publishing but we have not joined @@ -430,6 +431,46 @@ func (gs *GossipSubRouter) heartbeat() { } } +func (gs *GossipSubRouter) flush() { + // TODO +} + +func (gs *GossipSubRouter) pushGossip(p peer.ID, ihave *pb.ControlIHave) { + gossip := gs.gossip[p] + gossip = append(gossip, ihave) + gs.gossip[p] = gossip +} + +func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) { + // TODO +} + +func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) { + // TODO +} + +func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) { + // TODO +} + +func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []peer.ID { + tmap, ok := gs.p.topics[topic] + if !ok { + return nil + } + + peers := make([]peer.ID, 0, len(tmap)) + for p := range tmap { + if gs.peers[p] == GossipSubID && filter(p) { + peers = append(peers, p) + } + } + + shufflePeers(peers) + + return peers +} + func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { pmap := make(map[peer.ID]struct{}) for _, p := range peers { diff --git a/mcache.go b/mcache.go index 0b79892..0ab465d 100644 --- a/mcache.go +++ b/mcache.go @@ -20,6 +20,10 @@ func (mc *MessageCache) Get(mid string) (*pb.Message, bool) { return nil, false } +func (mc *MessageCache) GetGossipIDs(topic string) []string { + return nil +} + func (mc *MessageCache) Shift() { // TODO }