From f20f93004b905c87c7e734d2b9aa4b19d2a6138c Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 22 Nov 2019 21:53:00 +0200 Subject: [PATCH] connect to peers obtained through px --- gossipsub.go | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index f7562d2..7bb2330 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -382,6 +382,76 @@ func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) { backoff[p] = time.Now().Add(GossipSubPruneBackoff) } +func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) { + toconnect := make([]connectInfo, 0, len(peers)) + for _, pi := range peers { + p := peer.ID(pi.PeerID) + + _, connected := gs.peers[p] + if connected { + continue + } + + var srr *routing.SignedRoutingState + var err error + if pi.SignedAddrs != nil { + // the peer sent us a signed record; ensure that it is valid + srr, err = routing.UnmarshalSignedRoutingState(pi.SignedAddrs) + if err != nil { + log.Warningf("error unmarshalling routing record obtained through px: %s", err) + continue + } + if srr.PeerID != p { + log.Warningf("bogus routing record obtained through px: peer ID %s doesn't match expected peer %s", srr.PeerID, p) + continue + } + } + + toconnect = append(toconnect, connectInfo{p, srr}) + } + + if len(toconnect) == 0 { + return + } + + // initiate connections, without blocking the event loop + go func() { + for _, ci := range toconnect { + select { + case gs.connect <- ci: + case <-gs.p.ctx.Done(): + return + } + } + }() +} + +func (gs *GossipSubRouter) connector() { + for { + select { + case ci := <-gs.connect: + if gs.p.host.Network().Connectedness(ci.p) == network.Connected { + continue + } + + log.Debugf("connecting to %s", ci.p) + if ci.srr != nil { + gs.p.host.Peerstore().AddCertifiedAddrs(ci.srr, peerstore.TempAddrTTL) + } + + ctx, cancel := context.WithTimeout(gs.p.ctx, 10*time.Second) + err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p}) + cancel() + if err != nil { + log.Debugf("error connecting to %s: %s", ci.p, err) + } + + case <-gs.p.ctx.Done(): + return + } + } +} + func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { gs.mcache.Put(msg)