From d118e1d5547044cf502dd91512712a1631add549 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 22 Nov 2019 21:53:00 +0200 Subject: [PATCH] connect to peers obtained through px --- gossipsub.go | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index b0a7e79..0e157d4 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -9,8 +9,11 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p-core/routing" ) const ( @@ -40,6 +43,9 @@ var ( // backoff time for pruned peers GossipSubPruneBackoff = time.Minute + + // number of active connection attempts for peers obtained through px + GossipSubConnectors = 16 ) // NewGossipSub returns a new PubSub object using GossipSubRouter as the router. @@ -52,6 +58,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er gossip: make(map[peer.ID][]*pb.ControlIHave), control: make(map[peer.ID]*pb.ControlMessage), backoff: make(map[string]map[peer.ID]time.Time), + connect: make(chan connectInfo, GossipSubConnectors), mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), } return NewPubSub(ctx, h, rt, opts...) @@ -73,10 +80,16 @@ type GossipSubRouter struct { gossip map[peer.ID][]*pb.ControlIHave // pending gossip control map[peer.ID]*pb.ControlMessage // pending control messages backoff map[string]map[peer.ID]time.Time // prune backoff + connect chan connectInfo // px connection requests mcache *MessageCache tracer *pubsubTracer } +type connectInfo struct { + p peer.ID + srr *routing.SignedRoutingState +} + func (gs *GossipSubRouter) Protocols() []protocol.ID { return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID} } @@ -87,6 +100,9 @@ func (gs *GossipSubRouter) Attach(p *PubSub) { // start using the same msg ID function as PubSub for caching messages. gs.mcache.SetMsgIdFn(p.msgID) go gs.heartbeatTimer() + for i := 0; i < GossipSubConnectors; i++ { + go gs.connector() + } } func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { @@ -251,6 +267,10 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { delete(peers, p) gs.untagPeer(p, topic) gs.addBackoff(p, topic) + px := prune.GetPeers() + if len(px) > 0 { + gs.pxConnect(px) + } } } } @@ -264,6 +284,76 @@ func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) { backoff[p] = time.Now().Add(GossipSubPruneBackoff) } +func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) { + toconnect := make([]connectInfo, 0, len(peers)) + for _, pi := range peers { + p := peer.ID(pi.PeerID) + + _, connected := gs.peers[p] + if connected { + continue + } + + var srr *routing.SignedRoutingState + var err error + if pi.SignedAddrs != nil { + // the peer sent us a signed record; ensure that it is valid + srr, err = routing.UnmarshalSignedRoutingState(pi.SignedAddrs) + if err != nil { + log.Warningf("error unmarshalling routing record obtained through px: %s", err) + continue + } + if srr.PeerID != p { + log.Warningf("bogus routing record obtained through px: peer ID %s doesn't match expected peer %s", srr.PeerID, p) + continue + } + } + + toconnect = append(toconnect, connectInfo{p, srr}) + } + + if len(toconnect) == 0 { + return + } + + // initiate connections, without blocking the event loop + go func() { + for _, ci := range toconnect { + select { + case gs.connect <- ci: + case <-gs.p.ctx.Done(): + return + } + } + }() +} + +func (gs *GossipSubRouter) connector() { + for { + select { + case ci := <-gs.connect: + if gs.p.host.Network().Connectedness(ci.p) == network.Connected { + continue + } + + log.Debugf("connecting to %s", ci.p) + if ci.srr != nil { + gs.p.host.Peerstore().AddCertifiedAddrs(ci.srr, peerstore.TempAddrTTL) + } + + ctx, cancel := context.WithTimeout(gs.p.ctx, 10*time.Second) + err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p}) + cancel() + if err != nil { + log.Debugf("error connecting to %s: %s", ci.p, err) + } + + case <-gs.p.ctx.Done(): + return + } + } +} + func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { gs.mcache.Put(msg)