hearbeat preliminaries: overlay management

This commit is contained in:
vyzo 2018-02-19 21:24:17 +02:00 committed by Steven Allen
parent 34509d47b3
commit 73da341386
2 changed files with 82 additions and 7 deletions

View File

@ -25,7 +25,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
peers: make(map[peer.ID]protocol.ID), peers: make(map[peer.ID]protocol.ID),
mesh: make(map[string]map[peer.ID]struct{}), mesh: make(map[string]map[peer.ID]struct{}),
fanout: make(map[string]map[peer.ID]struct{}), fanout: make(map[string]map[peer.ID]struct{}),
mcache: NewMessageCache(5), mcache: NewMessageCache(3, 5),
} }
return NewPubSub(ctx, h, rt, opts...) return NewPubSub(ctx, h, rt, opts...)
} }
@ -172,7 +172,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
} }
func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
gs.mcache.Add(msg) gs.mcache.Put(msg)
tosend := make(map[peer.ID]struct{}) tosend := make(map[peer.ID]struct{})
for _, topic := range msg.GetTopicIDs() { for _, topic := range msg.GetTopicIDs() {
@ -274,10 +274,10 @@ func (gs *GossipSubRouter) sendMessage(p peer.ID, out *RPC) {
} }
func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) { func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) {
// TODO control messages and gossip piggyback // TODO control message reliability and gossip piggyback
// - control messages (GRAFT/PRUNE) must be reliable and should // - control messages (GRAFT/PRUNE) must be reliable and should
// be scheduled for piggyback or retry if the queue is full // be scheduled for piggyback or retry if the queue is full
// - gossip should be piggybacked on messages oppurtinistcally // - gossip (IHAVE) should be piggybacked on messages oppurtinistcally
mch, ok := gs.p.peers[p] mch, ok := gs.p.peers[p]
if !ok { if !ok {
@ -328,7 +328,70 @@ func (gs *GossipSubRouter) heartbeatTimer() {
} }
func (gs *GossipSubRouter) heartbeat() { func (gs *GossipSubRouter) heartbeat() {
// TODO gs.mcache.Shift()
tograft := make(map[peer.ID][]string)
toprune := make(map[peer.ID][]string)
for topic, peers := range gs.mesh {
if len(peers) < GossipSubDlo {
ineed := GossipSubD - len(peers)
plst := gs.getPeers(topic, func(p peer.ID) bool {
_, ok := peers[p]
return !ok
})
for _, p := range plst[:ineed] {
peers[p] = struct{}{}
topics := tograft[p]
tograft[p] = append(topics, topic)
}
}
if len(peers) > GossipSubDhi {
idontneed := len(peers) - GossipSubD
plst := peerMapToList(peers)
shufflePeers(plst)
for _, p := range plst[:idontneed] {
delete(peers, p)
topics := toprune[p]
toprune[p] = append(topics, topic)
}
}
// TODO gossip
}
for p, topics := range tograft {
graft := make([]*pb.ControlGraft, 0, len(topics))
for _, topic := range topics {
graft = append(graft, &pb.ControlGraft{TopicID: &topic})
}
var prune []*pb.ControlPrune
pruning, ok := toprune[p]
if ok {
delete(toprune, p)
prune = make([]*pb.ControlPrune, 0, len(pruning))
for _, topic := range pruning {
prune = append(prune, &pb.ControlPrune{TopicID: &topic})
}
}
out := rpcWithControl(nil, nil, nil, graft, prune)
gs.sendControl(p, out)
}
for p, topics := range toprune {
prune := make([]*pb.ControlPrune, 0, len(topics))
for _, topic := range topics {
prune = append(prune, &pb.ControlPrune{TopicID: &topic})
}
out := rpcWithControl(nil, nil, nil, nil, prune)
gs.sendControl(p, out)
}
} }
func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
@ -339,6 +402,14 @@ func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
return pmap return pmap
} }
func peerMapToList(peers map[peer.ID]struct{}) []peer.ID {
plst := make([]peer.ID, 0, len(peers))
for p := range peers {
plst = append(plst, p)
}
return plst
}
func shufflePeers(peers []peer.ID) { func shufflePeers(peers []peer.ID) {
// TODO // TODO
} }

View File

@ -4,14 +4,14 @@ import (
pb "github.com/libp2p/go-floodsub/pb" pb "github.com/libp2p/go-floodsub/pb"
) )
func NewMessageCache(win int) *MessageCache { func NewMessageCache(gossip, history int) *MessageCache {
return &MessageCache{} return &MessageCache{}
} }
type MessageCache struct { type MessageCache struct {
} }
func (mc *MessageCache) Add(msg *pb.Message) { func (mc *MessageCache) Put(msg *pb.Message) {
// TODO // TODO
} }
@ -19,3 +19,7 @@ func (mc *MessageCache) Get(mid string) (*pb.Message, bool) {
// TODO // TODO
return nil, false return nil, false
} }
func (mc *MessageCache) Shift() {
// TODO
}