diff --git a/gossipsub.go b/gossipsub.go index e7d469a..d34bfff 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -68,27 +68,19 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { } iwant := gs.handleIHave(ctl) - msgs := gs.handleIWant(ctl) + ihave := gs.handleIWant(ctl) prune := gs.handleGraft(rpc.from, ctl) gs.handlePrune(rpc.from, ctl) - if len(iwant) == 0 && len(msgs) == 0 && len(prune) == 0 { + if len(iwant) == 0 && len(ihave) == 0 && len(prune) == 0 { return } - // TODO piggyback gossip IHAVE - out := rpcWithControl(msgs, nil, iwant, nil, prune) - - mch, ok := gs.p.peers[rpc.from] - if !ok { - return - } - - select { - case mch <- out: - default: - // TODO PRUNE messages should be reliable; schedule for piggybacking or retry - log.Infof("dropping message to peer %s: queue full", rpc.from) + out := rpcWithControl(ihave, nil, iwant, nil, prune) + if len(prune) == 0 { + gs.sendMessage(rpc.from, out) + } else { + gs.sendControl(rpc.from, out) } } @@ -207,12 +199,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { peers := gs.getPeers(topic, func(peer.ID) bool { return true }) if len(peers) > 0 { - gmap = make(map[peer.ID]struct{}) - - for _, p := range peers[:GossipSubD] { - gmap[p] = struct{}{} - } - + gmap = peerListToMap(peers[:GossipSubD]) gs.fanout[topic] = gmap } } @@ -229,17 +216,78 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { continue } - mch, ok := gs.p.peers[pid] - if !ok { - continue - } + gs.sendMessage(pid, out) + } +} - select { - case mch <- out: - default: - log.Infof("dropping message to peer %s: queue full", pid) - // Drop it. The peer is too slow. - } +func (gs *GossipSubRouter) Join(topic string) { + gmap, ok := gs.mesh[topic] + if ok { + return + } + + gmap, ok = gs.fanout[topic] + if ok { + gs.mesh[topic] = gmap + delete(gs.fanout, topic) + } else { + peers := gs.getPeers(topic, func(peer.ID) bool { return true }) + gmap = peerListToMap(peers[:GossipSubD]) + gs.mesh[topic] = gmap + } + + for p := range gmap { + gs.sendGraft(p, topic) + } +} + +func (gs *GossipSubRouter) Leave(topic string) { + gmap, ok := gs.mesh[topic] + if !ok { + return + } + + for p := range gmap { + gs.sendPrune(p, topic) + } + delete(gs.mesh, topic) +} + +func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { + graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}} + out := rpcWithControl(nil, nil, nil, graft, nil) + gs.sendControl(p, out) +} + +func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { + prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}} + out := rpcWithControl(nil, nil, nil, nil, prune) + gs.sendControl(p, out) +} + +func (gs *GossipSubRouter) sendControl(p peer.ID, out *RPC) { + gs.sendRPC(p, out, true) +} + +func (gs *GossipSubRouter) sendMessage(p peer.ID, out *RPC) { + gs.sendRPC(p, out, false) +} + +func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) { + // TODO control messages and gossip piggyback + // - control messages (GRAFT/PRUNE) must be reliable and should + // be scheduled for piggyback or retry if the queue is full + // - gossip should be piggybacked on messages oppurtinistcally + + mch, ok := gs.p.peers[p] + if !ok { + return + } + + select { + case mch <- out: + default: + log.Infof("dropping message to peer %s: queue full", p) } } @@ -261,14 +309,6 @@ func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []p return peers } -func (gs *GossipSubRouter) Join(topic string) { - // TODO -} - -func (gs *GossipSubRouter) Leave(topic string) { - // TODO -} - func (gs *GossipSubRouter) heartbeatTimer() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -291,6 +331,14 @@ func (gs *GossipSubRouter) heartbeat() { // TODO } +func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { + pmap := make(map[peer.ID]struct{}) + for _, p := range peers { + pmap[p] = struct{}{} + } + return pmap +} + func shufflePeers(peers []peer.ID) { // TODO }