implement Join and Leave, refactor sendRPC

This commit is contained in:
vyzo 2018-02-19 20:16:58 +02:00 committed by Steven Allen
parent e1fbe11c97
commit 34509d47b3
1 changed files with 87 additions and 39 deletions

View File

@ -68,27 +68,19 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
}
iwant := gs.handleIHave(ctl)
msgs := gs.handleIWant(ctl)
ihave := gs.handleIWant(ctl)
prune := gs.handleGraft(rpc.from, ctl)
gs.handlePrune(rpc.from, ctl)
if len(iwant) == 0 && len(msgs) == 0 && len(prune) == 0 {
if len(iwant) == 0 && len(ihave) == 0 && len(prune) == 0 {
return
}
// TODO piggyback gossip IHAVE
out := rpcWithControl(msgs, nil, iwant, nil, prune)
mch, ok := gs.p.peers[rpc.from]
if !ok {
return
}
select {
case mch <- out:
default:
// TODO PRUNE messages should be reliable; schedule for piggybacking or retry
log.Infof("dropping message to peer %s: queue full", rpc.from)
out := rpcWithControl(ihave, nil, iwant, nil, prune)
if len(prune) == 0 {
gs.sendMessage(rpc.from, out)
} else {
gs.sendControl(rpc.from, out)
}
}
@ -207,12 +199,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
peers := gs.getPeers(topic, func(peer.ID) bool { return true })
if len(peers) > 0 {
gmap = make(map[peer.ID]struct{})
for _, p := range peers[:GossipSubD] {
gmap[p] = struct{}{}
}
gmap = peerListToMap(peers[:GossipSubD])
gs.fanout[topic] = gmap
}
}
@ -229,17 +216,78 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
continue
}
mch, ok := gs.p.peers[pid]
if !ok {
continue
}
gs.sendMessage(pid, out)
}
}
select {
case mch <- out:
default:
log.Infof("dropping message to peer %s: queue full", pid)
// Drop it. The peer is too slow.
}
func (gs *GossipSubRouter) Join(topic string) {
gmap, ok := gs.mesh[topic]
if ok {
return
}
gmap, ok = gs.fanout[topic]
if ok {
gs.mesh[topic] = gmap
delete(gs.fanout, topic)
} else {
peers := gs.getPeers(topic, func(peer.ID) bool { return true })
gmap = peerListToMap(peers[:GossipSubD])
gs.mesh[topic] = gmap
}
for p := range gmap {
gs.sendGraft(p, topic)
}
}
func (gs *GossipSubRouter) Leave(topic string) {
gmap, ok := gs.mesh[topic]
if !ok {
return
}
for p := range gmap {
gs.sendPrune(p, topic)
}
delete(gs.mesh, topic)
}
func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}}
out := rpcWithControl(nil, nil, nil, graft, nil)
gs.sendControl(p, out)
}
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}}
out := rpcWithControl(nil, nil, nil, nil, prune)
gs.sendControl(p, out)
}
func (gs *GossipSubRouter) sendControl(p peer.ID, out *RPC) {
gs.sendRPC(p, out, true)
}
func (gs *GossipSubRouter) sendMessage(p peer.ID, out *RPC) {
gs.sendRPC(p, out, false)
}
func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) {
// TODO control messages and gossip piggyback
// - control messages (GRAFT/PRUNE) must be reliable and should
// be scheduled for piggyback or retry if the queue is full
// - gossip should be piggybacked on messages oppurtinistcally
mch, ok := gs.p.peers[p]
if !ok {
return
}
select {
case mch <- out:
default:
log.Infof("dropping message to peer %s: queue full", p)
}
}
@ -261,14 +309,6 @@ func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []p
return peers
}
func (gs *GossipSubRouter) Join(topic string) {
// TODO
}
func (gs *GossipSubRouter) Leave(topic string) {
// TODO
}
func (gs *GossipSubRouter) heartbeatTimer() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
@ -291,6 +331,14 @@ func (gs *GossipSubRouter) heartbeat() {
// TODO
}
func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
pmap := make(map[peer.ID]struct{})
for _, p := range peers {
pmap[p] = struct{}{}
}
return pmap
}
func shufflePeers(peers []peer.ID) {
// TODO
}