support direct peering agreements

This commit is contained in:
vyzo 2020-03-31 13:23:04 +03:00
parent bcff5f20a6
commit 8809484a47
1 changed files with 107 additions and 13 deletions

View File

@ -146,6 +146,30 @@ func WithPeerExchange(doPX bool) Option {
}
}
// WithDirectPeers is a gossipsub router option that specifies peers with direct
// peering agreements. These peers are connected outside of the mesh, with all (valid)
// message unconditionally forwarded to them. The router will maintain open connections
// to these peers. Note that the peering agreement should be reciprocal with direct peers
// symmetrically configured at both ends.
func WithDirectPeers(pis []peer.AddrInfo) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
direct := make(map[peer.ID]struct{})
for _, pi := range pis {
direct[pi.ID] = struct{}{}
ps.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL)
}
gs.direct = direct
return nil
}
}
// GossipSubRouter is a router that implements the gossipsub protocol.
// For each topic we have joined, we maintain an overlay through which
// messages flow; this is the mesh map.
@ -156,6 +180,7 @@ func WithPeerExchange(doPX bool) Option {
type GossipSubRouter struct {
p *PubSub
peers map[peer.ID]protocol.ID // peer protocols
direct map[peer.ID]struct{} // direct peers
mesh map[string]map[peer.ID]struct{} // topic meshes
fanout map[string]map[peer.ID]struct{} // topic fanout
lastpub map[string]int64 // last publish time for fanout topics
@ -221,12 +246,27 @@ func (gs *GossipSubRouter) Attach(p *PubSub) {
for i := 0; i < GossipSubConnectors; i++ {
go gs.connector()
}
// connect to direct peers
if len(gs.direct) > 0 {
go func() {
for p := range gs.direct {
gs.connect <- connectInfo{p: p}
}
}()
}
}
func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
log.Debugf("PEERUP: Add new peer %s using %s", p, proto)
gs.tracer.AddPeer(p, proto)
gs.peers[p] = proto
// tag peer if it is a direct peer
_, direct := gs.direct[p]
if direct {
gs.p.host.ConnManager().TagPeer(p, "pubsub:direct", 1000)
}
}
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
@ -273,7 +313,8 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
}
func (gs *GossipSubRouter) AcceptFrom(p peer.ID) bool {
return gs.score.Score(p) >= gs.graylistThreshold
_, direct := gs.direct[p]
return direct || gs.score.Score(p) >= gs.graylistThreshold
}
func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
@ -392,6 +433,17 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
continue
}
// we don't GRAFT to/from direct peers; complain loudly if this happens
_, direct := gs.direct[p]
if direct {
log.Warningf("GRAFT: ignoring request from direct peer %s", p)
// this is possibly a bug from non-reciprocal configuration; send a PRUNE
prune = append(prune, topic)
// but don't PX
doPX = false
continue
}
// check the score
if score < 0 {
// we don't GRAFT peers with negative score
@ -569,13 +621,22 @@ func (gs *GossipSubRouter) Publish(msg *Message) {
if gs.floodPublish && from == gs.p.host.ID() {
for p := range tmap {
if gs.score.Score(p) >= gs.publishThreshold {
_, direct := gs.direct[p]
if direct || gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
}
}
continue
}
// direct peers
for p := range gs.direct {
_, inTopic := tmap[p]
if inTopic {
tosend[p] = struct{}{}
}
}
// floodsub peers
for p := range tmap {
if gs.peers[p] == FloodSubID && gs.score.Score(p) >= gs.publishThreshold {
@ -639,9 +700,10 @@ func (gs *GossipSubRouter) Join(topic string) {
if len(gmap) < GossipSubD {
// we need more peers; eager, as this would get fixed in the next heartbeat
more := gs.getPeers(topic, GossipSubD-len(gmap), func(p peer.ID) bool {
// filter our current peers and peers with negative scores
_, ok := gmap[p]
return !ok && gs.score.Score(p) >= 0
// filter our current peers, direct peers, and peers with negative scores
_, inMesh := gmap[p]
_, direct := gs.direct[p]
return !inMesh && !direct && gs.score.Score(p) >= 0
})
for _, p := range more {
gmap[p] = struct{}{}
@ -652,8 +714,9 @@ func (gs *GossipSubRouter) Join(topic string) {
delete(gs.lastpub, topic)
} else {
peers := gs.getPeers(topic, GossipSubD, func(p peer.ID) bool {
// filter peers with negative score
return gs.score.Score(p) >= 0
// filter direct peers and peers with negative score
_, direct := gs.direct[p]
return !direct && gs.score.Score(p) >= 0
})
gmap = peerListToMap(peers)
gs.mesh[topic] = gmap
@ -782,6 +845,9 @@ func (gs *GossipSubRouter) heartbeat() {
// clean up expired backoffs
gs.clearBackoff()
// ensure direct peers are connected
gs.directConnect()
// maintain the mesh for topics we have joined
for topic, peers := range gs.mesh {
prunePeer := func(p peer.ID) {
@ -813,10 +879,11 @@ func (gs *GossipSubRouter) heartbeat() {
backoff := gs.backoff[topic]
ineed := GossipSubD - len(peers)
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current peers, peers we are backing off, and peers with negative score
// filter our current and direct peers, peers we are backing off, and peers with negative score
_, inMesh := peers[p]
_, doBackoff := backoff[p]
return !inMesh && !doBackoff && gs.score.Score(p) >= 0
_, direct := gs.direct[p]
return !inMesh && !doBackoff && !direct && gs.score.Score(p) >= 0
})
for _, p := range plst {
@ -875,9 +942,10 @@ func (gs *GossipSubRouter) heartbeat() {
if len(peers) < GossipSubD {
ineed := GossipSubD - len(peers)
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current peers and peers with score above the publish threshold
_, ok := peers[p]
return !ok && gs.score.Score(p) >= gs.publishThreshold
// filter our current and direct peers and peers with score above the publish threshold
_, inFanout := peers[p]
_, direct := gs.direct[p]
return !inFanout && !direct && gs.score.Score(p) >= gs.publishThreshold
})
for _, p := range plst {
@ -916,6 +984,30 @@ func (gs *GossipSubRouter) clearBackoff() {
}
}
func (gs *GossipSubRouter) directConnect() {
// we donly do this every 150 ticks to allow pending connections to complete and account
// for restarts/downtime
if gs.heartbeatTicks%150 != 0 {
return
}
var toconnect []peer.ID
for p := range gs.direct {
_, connected := gs.peers[p]
if !connected {
toconnect = append(toconnect, p)
}
}
if len(toconnect) > 0 {
go func() {
for _, p := range toconnect {
gs.connect <- connectInfo{p: p}
}
}()
}
}
func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, noPX map[peer.ID]bool) {
for p, topics := range tograft {
graft := make([]*pb.ControlGraft, 0, len(topics))
@ -960,10 +1052,12 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
// Send gossip to GossipFactor peers above threshold, with a minimum of D_lazy.
// First we collect the peers above gossipThreshold that are not in the exclude set
// and then randomly select from that set.
// We also exclude direct peers, as there is no reason to emit gossip to them.
peers := make([]peer.ID, 0, len(gs.p.topics[topic]))
for p := range gs.p.topics[topic] {
_, inExclude := exclude[p]
if !inExclude && (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && gs.score.Score(p) >= gs.gossipThreshold {
_, direct := gs.direct[p]
if !inExclude && !direct && (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && gs.score.Score(p) >= gs.gossipThreshold {
peers = append(peers, p)
}
}