diff --git a/gossipsub.go b/gossipsub.go index 088ab95..16c0774 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -146,6 +146,30 @@ func WithPeerExchange(doPX bool) Option { } } +// WithDirectPeers is a gossipsub router option that specifies peers with direct +// peering agreements. These peers are connected outside of the mesh, with all (valid) +// message unconditionally forwarded to them. The router will maintain open connections +// to these peers. Note that the peering agreement should be reciprocal with direct peers +// symmetrically configured at both ends. +func WithDirectPeers(pis []peer.AddrInfo) Option { + return func(ps *PubSub) error { + gs, ok := ps.rt.(*GossipSubRouter) + if !ok { + return fmt.Errorf("pubsub router is not gossipsub") + } + + direct := make(map[peer.ID]struct{}) + for _, pi := range pis { + direct[pi.ID] = struct{}{} + ps.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL) + } + + gs.direct = direct + + return nil + } +} + // GossipSubRouter is a router that implements the gossipsub protocol. // For each topic we have joined, we maintain an overlay through which // messages flow; this is the mesh map. @@ -156,6 +180,7 @@ func WithPeerExchange(doPX bool) Option { type GossipSubRouter struct { p *PubSub peers map[peer.ID]protocol.ID // peer protocols + direct map[peer.ID]struct{} // direct peers mesh map[string]map[peer.ID]struct{} // topic meshes fanout map[string]map[peer.ID]struct{} // topic fanout lastpub map[string]int64 // last publish time for fanout topics @@ -221,12 +246,27 @@ func (gs *GossipSubRouter) Attach(p *PubSub) { for i := 0; i < GossipSubConnectors; i++ { go gs.connector() } + + // connect to direct peers + if len(gs.direct) > 0 { + go func() { + for p := range gs.direct { + gs.connect <- connectInfo{p: p} + } + }() + } } func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { log.Debugf("PEERUP: Add new peer %s using %s", p, proto) gs.tracer.AddPeer(p, proto) gs.peers[p] = proto + + // tag peer if it is a direct peer + _, direct := gs.direct[p] + if direct { + gs.p.host.ConnManager().TagPeer(p, "pubsub:direct", 1000) + } } func (gs *GossipSubRouter) RemovePeer(p peer.ID) { @@ -273,7 +313,8 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool { } func (gs *GossipSubRouter) AcceptFrom(p peer.ID) bool { - return gs.score.Score(p) >= gs.graylistThreshold + _, direct := gs.direct[p] + return direct || gs.score.Score(p) >= gs.graylistThreshold } func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { @@ -392,6 +433,17 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. continue } + // we don't GRAFT to/from direct peers; complain loudly if this happens + _, direct := gs.direct[p] + if direct { + log.Warningf("GRAFT: ignoring request from direct peer %s", p) + // this is possibly a bug from non-reciprocal configuration; send a PRUNE + prune = append(prune, topic) + // but don't PX + doPX = false + continue + } + // check the score if score < 0 { // we don't GRAFT peers with negative score @@ -569,13 +621,22 @@ func (gs *GossipSubRouter) Publish(msg *Message) { if gs.floodPublish && from == gs.p.host.ID() { for p := range tmap { - if gs.score.Score(p) >= gs.publishThreshold { + _, direct := gs.direct[p] + if direct || gs.score.Score(p) >= gs.publishThreshold { tosend[p] = struct{}{} } } continue } + // direct peers + for p := range gs.direct { + _, inTopic := tmap[p] + if inTopic { + tosend[p] = struct{}{} + } + } + // floodsub peers for p := range tmap { if gs.peers[p] == FloodSubID && gs.score.Score(p) >= gs.publishThreshold { @@ -639,9 +700,10 @@ func (gs *GossipSubRouter) Join(topic string) { if len(gmap) < GossipSubD { // we need more peers; eager, as this would get fixed in the next heartbeat more := gs.getPeers(topic, GossipSubD-len(gmap), func(p peer.ID) bool { - // filter our current peers and peers with negative scores - _, ok := gmap[p] - return !ok && gs.score.Score(p) >= 0 + // filter our current peers, direct peers, and peers with negative scores + _, inMesh := gmap[p] + _, direct := gs.direct[p] + return !inMesh && !direct && gs.score.Score(p) >= 0 }) for _, p := range more { gmap[p] = struct{}{} @@ -652,8 +714,9 @@ func (gs *GossipSubRouter) Join(topic string) { delete(gs.lastpub, topic) } else { peers := gs.getPeers(topic, GossipSubD, func(p peer.ID) bool { - // filter peers with negative score - return gs.score.Score(p) >= 0 + // filter direct peers and peers with negative score + _, direct := gs.direct[p] + return !direct && gs.score.Score(p) >= 0 }) gmap = peerListToMap(peers) gs.mesh[topic] = gmap @@ -782,6 +845,9 @@ func (gs *GossipSubRouter) heartbeat() { // clean up expired backoffs gs.clearBackoff() + // ensure direct peers are connected + gs.directConnect() + // maintain the mesh for topics we have joined for topic, peers := range gs.mesh { prunePeer := func(p peer.ID) { @@ -813,10 +879,11 @@ func (gs *GossipSubRouter) heartbeat() { backoff := gs.backoff[topic] ineed := GossipSubD - len(peers) plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { - // filter our current peers, peers we are backing off, and peers with negative score + // filter our current and direct peers, peers we are backing off, and peers with negative score _, inMesh := peers[p] _, doBackoff := backoff[p] - return !inMesh && !doBackoff && gs.score.Score(p) >= 0 + _, direct := gs.direct[p] + return !inMesh && !doBackoff && !direct && gs.score.Score(p) >= 0 }) for _, p := range plst { @@ -875,9 +942,10 @@ func (gs *GossipSubRouter) heartbeat() { if len(peers) < GossipSubD { ineed := GossipSubD - len(peers) plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { - // filter our current peers and peers with score above the publish threshold - _, ok := peers[p] - return !ok && gs.score.Score(p) >= gs.publishThreshold + // filter our current and direct peers and peers with score above the publish threshold + _, inFanout := peers[p] + _, direct := gs.direct[p] + return !inFanout && !direct && gs.score.Score(p) >= gs.publishThreshold }) for _, p := range plst { @@ -916,6 +984,30 @@ func (gs *GossipSubRouter) clearBackoff() { } } +func (gs *GossipSubRouter) directConnect() { + // we donly do this every 150 ticks to allow pending connections to complete and account + // for restarts/downtime + if gs.heartbeatTicks%150 != 0 { + return + } + + var toconnect []peer.ID + for p := range gs.direct { + _, connected := gs.peers[p] + if !connected { + toconnect = append(toconnect, p) + } + } + + if len(toconnect) > 0 { + go func() { + for _, p := range toconnect { + gs.connect <- connectInfo{p: p} + } + }() + } +} + func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, noPX map[peer.ID]bool) { for p, topics := range tograft { graft := make([]*pb.ControlGraft, 0, len(topics)) @@ -960,10 +1052,12 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{} // Send gossip to GossipFactor peers above threshold, with a minimum of D_lazy. // First we collect the peers above gossipThreshold that are not in the exclude set // and then randomly select from that set. + // We also exclude direct peers, as there is no reason to emit gossip to them. peers := make([]peer.ID, 0, len(gs.p.topics[topic])) for p := range gs.p.topics[topic] { _, inExclude := exclude[p] - if !inExclude && (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && gs.score.Score(p) >= gs.gossipThreshold { + _, direct := gs.direct[p] + if !inExclude && !direct && (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && gs.score.Score(p) >= gs.gossipThreshold { peers = append(peers, p) } }