control message piggybacking logic
This commit is contained in:
parent
78618fce23
commit
7251c64e65
139
gossipsub.go
139
gossipsub.go
@ -22,20 +22,24 @@ const (
|
||||
|
||||
func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
|
||||
rt := &GossipSubRouter{
|
||||
peers: make(map[peer.ID]protocol.ID),
|
||||
mesh: make(map[string]map[peer.ID]struct{}),
|
||||
fanout: make(map[string]map[peer.ID]struct{}),
|
||||
mcache: NewMessageCache(3, 5),
|
||||
peers: make(map[peer.ID]protocol.ID),
|
||||
mesh: make(map[string]map[peer.ID]struct{}),
|
||||
fanout: make(map[string]map[peer.ID]struct{}),
|
||||
gossip: make(map[peer.ID][]*pb.ControlIHave),
|
||||
control: make(map[peer.ID]*pb.ControlMessage),
|
||||
mcache: NewMessageCache(3, 5),
|
||||
}
|
||||
return NewPubSub(ctx, h, rt, opts...)
|
||||
}
|
||||
|
||||
type GossipSubRouter struct {
|
||||
p *PubSub
|
||||
peers map[peer.ID]protocol.ID // peer protocols
|
||||
mesh map[string]map[peer.ID]struct{} // topic meshes
|
||||
fanout map[string]map[peer.ID]struct{} // topic fanout
|
||||
mcache *MessageCache
|
||||
p *PubSub
|
||||
peers map[peer.ID]protocol.ID // peer protocols
|
||||
mesh map[string]map[peer.ID]struct{} // topic meshes
|
||||
fanout map[string]map[peer.ID]struct{} // topic fanout
|
||||
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
|
||||
control map[peer.ID]*pb.ControlMessage // pending control messages
|
||||
mcache *MessageCache
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) Protocols() []protocol.ID {
|
||||
@ -77,11 +81,7 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
|
||||
}
|
||||
|
||||
out := rpcWithControl(ihave, nil, iwant, nil, prune)
|
||||
if len(prune) == 0 {
|
||||
gs.sendMessage(rpc.from, out)
|
||||
} else {
|
||||
gs.sendControl(rpc.from, out)
|
||||
}
|
||||
gs.sendRPC(rpc.from, out)
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) handleIHave(ctl *pb.ControlMessage) []*pb.ControlIWant {
|
||||
@ -216,7 +216,7 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
|
||||
continue
|
||||
}
|
||||
|
||||
gs.sendMessage(pid, out)
|
||||
gs.sendRPC(pid, out)
|
||||
}
|
||||
}
|
||||
|
||||
@ -256,28 +256,29 @@ func (gs *GossipSubRouter) Leave(topic string) {
|
||||
func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
|
||||
graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}}
|
||||
out := rpcWithControl(nil, nil, nil, graft, nil)
|
||||
gs.sendControl(p, out)
|
||||
gs.sendRPC(p, out)
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
|
||||
prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}}
|
||||
out := rpcWithControl(nil, nil, nil, nil, prune)
|
||||
gs.sendControl(p, out)
|
||||
gs.sendRPC(p, out)
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) sendControl(p peer.ID, out *RPC) {
|
||||
gs.sendRPC(p, out, true)
|
||||
}
|
||||
func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
|
||||
// piggyback cotrol message retries
|
||||
ctl, ok := gs.control[p]
|
||||
if ok {
|
||||
gs.piggybackControl(p, out, ctl)
|
||||
delete(gs.control, p)
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) sendMessage(p peer.ID, out *RPC) {
|
||||
gs.sendRPC(p, out, false)
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) {
|
||||
// TODO control message reliability and gossip piggyback
|
||||
// - control messages (GRAFT/PRUNE) must be reliable and should
|
||||
// be scheduled for piggyback or retry if the queue is full
|
||||
// - gossip (IHAVE) should be piggybacked on messages oppurtinistcally
|
||||
// piggyback gossip
|
||||
ihave, ok := gs.gossip[p]
|
||||
if ok {
|
||||
gs.piggybackGossip(p, out, ihave)
|
||||
delete(gs.gossip, p)
|
||||
}
|
||||
|
||||
mch, ok := gs.p.peers[p]
|
||||
if !ok {
|
||||
@ -288,25 +289,12 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, ctl bool) {
|
||||
case mch <- out:
|
||||
default:
|
||||
log.Infof("dropping message to peer %s: queue full", p)
|
||||
}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []peer.ID {
|
||||
tmap, ok := gs.p.topics[topic]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
peers := make([]peer.ID, 0, len(tmap))
|
||||
for p := range tmap {
|
||||
if gs.peers[p] == GossipSubID && filter(p) {
|
||||
peers = append(peers, p)
|
||||
// push control messages that need to be retried
|
||||
ctl := out.GetControl()
|
||||
if ctl != nil {
|
||||
gs.pushControl(p, ctl)
|
||||
}
|
||||
}
|
||||
|
||||
shufflePeers(peers)
|
||||
|
||||
return peers
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) heartbeatTimer() {
|
||||
@ -330,6 +318,10 @@ func (gs *GossipSubRouter) heartbeatTimer() {
|
||||
func (gs *GossipSubRouter) heartbeat() {
|
||||
gs.mcache.Shift()
|
||||
|
||||
// flush pending control message from retries and gossip
|
||||
// that hasn't been piggybacked since the last heartbeat
|
||||
gs.flush()
|
||||
|
||||
tograft := make(map[peer.ID][]string)
|
||||
toprune := make(map[peer.ID][]string)
|
||||
|
||||
@ -371,7 +363,16 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO gossip
|
||||
// schedule gossip
|
||||
mids := gs.mcache.GetGossipIDs(topic)
|
||||
if len(mids) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
gpeers := gs.getPeers(topic, func(peer.ID) bool { return true })
|
||||
for _, p := range gpeers[:GossipSubD] {
|
||||
gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids})
|
||||
}
|
||||
}
|
||||
|
||||
// send coalesced GRAFT/PRUNE messages
|
||||
@ -392,7 +393,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
}
|
||||
|
||||
out := rpcWithControl(nil, nil, nil, graft, prune)
|
||||
gs.sendControl(p, out)
|
||||
gs.sendRPC(p, out)
|
||||
}
|
||||
|
||||
for p, topics := range toprune {
|
||||
@ -402,7 +403,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
}
|
||||
|
||||
out := rpcWithControl(nil, nil, nil, nil, prune)
|
||||
gs.sendControl(p, out)
|
||||
gs.sendRPC(p, out)
|
||||
}
|
||||
|
||||
// maintain our fanout for topics we are publishing but we have not joined
|
||||
@ -430,6 +431,46 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) flush() {
|
||||
// TODO
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) pushGossip(p peer.ID, ihave *pb.ControlIHave) {
|
||||
gossip := gs.gossip[p]
|
||||
gossip = append(gossip, ihave)
|
||||
gs.gossip[p] = gossip
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) getPeers(topic string, filter func(peer.ID) bool) []peer.ID {
|
||||
tmap, ok := gs.p.topics[topic]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
peers := make([]peer.ID, 0, len(tmap))
|
||||
for p := range tmap {
|
||||
if gs.peers[p] == GossipSubID && filter(p) {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
}
|
||||
|
||||
shufflePeers(peers)
|
||||
|
||||
return peers
|
||||
}
|
||||
|
||||
func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
|
||||
pmap := make(map[peer.ID]struct{})
|
||||
for _, p := range peers {
|
||||
|
Loading…
x
Reference in New Issue
Block a user