fix rebase artifacts
This commit is contained in:
parent
ce1970d18d
commit
946f35c4d4
92
gossipsub.go
92
gossipsub.go
|
@ -354,7 +354,7 @@ func (gs *GossipSubRouter) connector() {
|
|||
log.Debugf("connecting to %s", ci.p)
|
||||
cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore())
|
||||
if ok && ci.spr != nil {
|
||||
err := cab.ProcessPeerRecord(ci.spr, peerstore.TempAddrTTL)
|
||||
_, err := cab.ConsumePeerRecord(ci.spr, peerstore.TempAddrTTL)
|
||||
if err != nil {
|
||||
log.Debugf("error processing peer record: %s", err)
|
||||
}
|
||||
|
@ -373,89 +373,6 @@ func (gs *GossipSubRouter) connector() {
|
|||
}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
|
||||
backoff, ok := gs.backoff[topic]
|
||||
if !ok {
|
||||
backoff = make(map[peer.ID]time.Time)
|
||||
gs.backoff[topic] = backoff
|
||||
}
|
||||
backoff[p] = time.Now().Add(GossipSubPruneBackoff)
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
|
||||
if len(peers) > GossipSubPrunePeers {
|
||||
shufflePeerInfo(peers)
|
||||
peers = peers[:GossipSubPrunePeers]
|
||||
}
|
||||
|
||||
toconnect := make([]connectInfo, 0, len(peers))
|
||||
|
||||
for _, pi := range peers {
|
||||
p := peer.ID(pi.PeerID)
|
||||
|
||||
_, connected := gs.peers[p]
|
||||
if connected {
|
||||
continue
|
||||
}
|
||||
|
||||
var srr *routing.SignedRoutingState
|
||||
var err error
|
||||
if pi.SignedAddrs != nil {
|
||||
// the peer sent us a signed record; ensure that it is valid
|
||||
srr, err = routing.UnmarshalSignedRoutingState(pi.SignedAddrs)
|
||||
if err != nil {
|
||||
log.Warningf("error unmarshalling routing record obtained through px: %s", err)
|
||||
continue
|
||||
}
|
||||
if srr.PeerID != p {
|
||||
log.Warningf("bogus routing record obtained through px: peer ID %s doesn't match expected peer %s", srr.PeerID, p)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
toconnect = append(toconnect, connectInfo{p, srr})
|
||||
}
|
||||
|
||||
if len(toconnect) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, ci := range toconnect {
|
||||
select {
|
||||
case gs.connect <- ci:
|
||||
default:
|
||||
log.Debugf("ignoring peer connection attempt; too many pending connections")
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) connector() {
|
||||
for {
|
||||
select {
|
||||
case ci := <-gs.connect:
|
||||
if gs.p.host.Network().Connectedness(ci.p) == network.Connected {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("connecting to %s", ci.p)
|
||||
if ci.srr != nil {
|
||||
gs.p.host.Peerstore().AddCertifiedAddrs(ci.srr, peerstore.TempAddrTTL)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(gs.p.ctx, 10*time.Second)
|
||||
err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p})
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Debugf("error connecting to %s: %s", ci.p, err)
|
||||
}
|
||||
|
||||
case <-gs.p.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
|
||||
gs.mcache.Put(msg)
|
||||
|
||||
|
@ -1002,10 +919,3 @@ func shufflePeerInfo(peers []*pb.PeerInfo) {
|
|||
peers[i], peers[j] = peers[j], peers[i]
|
||||
}
|
||||
}
|
||||
|
||||
func shufflePeerInfo(peers []*pb.PeerInfo) {
|
||||
for i := range peers {
|
||||
j := rand.Intn(i + 1)
|
||||
peers[i], peers[j] = peers[j], peers[i]
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue