diff --git a/comm.go b/comm.go index 430e6a6..182e992 100644 --- a/comm.go +++ b/comm.go @@ -104,3 +104,21 @@ func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC { func rpcWithMessages(msgs ...*pb.Message) *RPC { return &RPC{RPC: pb.RPC{Publish: msgs}} } + +func rpcWithControl(msgs []*pb.Message, + ihave []*pb.ControlIHave, + iwant []*pb.ControlIWant, + graft []*pb.ControlGraft, + prune []*pb.ControlPrune) *RPC { + return &RPC{ + RPC: pb.RPC{ + Publish: msgs, + Control: &pb.ControlMessage{ + Ihave: ihave, + Iwant: iwant, + Graft: graft, + Prune: prune, + }, + }, + } +} diff --git a/gossipsub.go b/gossipsub.go index 0d63f92..2ecbb0a 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -62,7 +62,121 @@ func (gs *GossipSubRouter) RemovePeer(p peer.ID) { } func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { - // TODO + ctl := rpc.GetControl() + if ctl == nil { + return + } + + iwant := gs.handleIHave(ctl) + msgs := gs.handleIWant(ctl) + prune := gs.handleGraft(rpc.from, ctl) + gs.handlePrune(rpc.from, ctl) + + if len(iwant) == 0 && len(msgs) == 0 && len(prune) == 0 { + return + } + + // TODO piggyback gossip IHAVE + out := rpcWithControl(msgs, nil, iwant, nil, prune) + + mch, ok := gs.p.peers[rpc.from] + if !ok { + return + } + + select { + case mch <- out: + default: + // TODO PRUNE messages should be reliable; schedule for piggybacking or retry + log.Infof("dropping message to peer %s: queue full", rpc.from) + } +} + +func (gs *GossipSubRouter) handleIHave(ctl *pb.ControlMessage) []*pb.ControlIWant { + iwant := make(map[string]struct{}) + + for _, ihave := range ctl.GetIhave() { + topic := ihave.GetTopicID() + _, ok := gs.mesh[topic] + if !ok { + continue + } + + for _, mid := range ihave.GetMessageIDs() { + if gs.p.seenMessage(mid) { + continue + } + iwant[mid] = struct{}{} + } + } + + if len(iwant) == 0 { + return nil + } + + iwantlst := make([]string, 0, len(iwant)) + for mid := range iwant { + iwantlst = append(iwantlst, mid) + } + + return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}} +} + +func (gs *GossipSubRouter) handleIWant(ctl *pb.ControlMessage) []*pb.Message { + ihave := make(map[string]*pb.Message) + for _, iwant := range ctl.GetIwant() { + for _, mid := range iwant.GetMessageIDs() { + msg, ok := gs.mcache.Get(mid) + if ok { + ihave[mid] = msg + } + } + } + + if len(ihave) == 0 { + return nil + } + + msgs := make([]*pb.Message, 0, len(ihave)) + for _, msg := range ihave { + msgs = append(msgs, msg) + } + + return msgs +} + +func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune { + var prune []string + for _, graft := range ctl.GetGraft() { + topic := graft.GetTopicID() + peers, ok := gs.mesh[topic] + if !ok { + prune = append(prune, topic) + } else { + peers[p] = struct{}{} + } + } + + if len(prune) == 0 { + return nil + } + + cprune := make([]*pb.ControlPrune, 0, len(prune)) + for _, topic := range prune { + cprune = append(cprune, &pb.ControlPrune{TopicID: &topic}) + } + + return cprune +} + +func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { + for _, prune := range ctl.GetPrune() { + topic := prune.GetTopicID() + peers, ok := gs.mesh[topic] + if ok { + delete(peers, p) + } + } } func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { diff --git a/mcache.go b/mcache.go index 47e869e..01d78f6 100644 --- a/mcache.go +++ b/mcache.go @@ -14,3 +14,8 @@ type MessageCache struct { func (mc *MessageCache) Add(msg *pb.Message) { // TODO } + +func (mc *MessageCache) Get(mid string) (*pb.Message, bool) { + // TODO + return nil, false +}