connect to peers obtained through px
This commit is contained in:
parent
85b455fa50
commit
f20f93004b
70
gossipsub.go
70
gossipsub.go
|
@ -382,6 +382,76 @@ func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
|
|||
backoff[p] = time.Now().Add(GossipSubPruneBackoff)
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
|
||||
toconnect := make([]connectInfo, 0, len(peers))
|
||||
for _, pi := range peers {
|
||||
p := peer.ID(pi.PeerID)
|
||||
|
||||
_, connected := gs.peers[p]
|
||||
if connected {
|
||||
continue
|
||||
}
|
||||
|
||||
var srr *routing.SignedRoutingState
|
||||
var err error
|
||||
if pi.SignedAddrs != nil {
|
||||
// the peer sent us a signed record; ensure that it is valid
|
||||
srr, err = routing.UnmarshalSignedRoutingState(pi.SignedAddrs)
|
||||
if err != nil {
|
||||
log.Warningf("error unmarshalling routing record obtained through px: %s", err)
|
||||
continue
|
||||
}
|
||||
if srr.PeerID != p {
|
||||
log.Warningf("bogus routing record obtained through px: peer ID %s doesn't match expected peer %s", srr.PeerID, p)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
toconnect = append(toconnect, connectInfo{p, srr})
|
||||
}
|
||||
|
||||
if len(toconnect) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// initiate connections, without blocking the event loop
|
||||
go func() {
|
||||
for _, ci := range toconnect {
|
||||
select {
|
||||
case gs.connect <- ci:
|
||||
case <-gs.p.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) connector() {
|
||||
for {
|
||||
select {
|
||||
case ci := <-gs.connect:
|
||||
if gs.p.host.Network().Connectedness(ci.p) == network.Connected {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("connecting to %s", ci.p)
|
||||
if ci.srr != nil {
|
||||
gs.p.host.Peerstore().AddCertifiedAddrs(ci.srr, peerstore.TempAddrTTL)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(gs.p.ctx, 10*time.Second)
|
||||
err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p})
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Debugf("error connecting to %s: %s", ci.p, err)
|
||||
}
|
||||
|
||||
case <-gs.p.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
|
||||
gs.mcache.Put(msg)
|
||||
|
||||
|
|
Loading…
Reference in New Issue