connect to peers obtained through px
This commit is contained in:
parent
806fd24a58
commit
d118e1d554
90
gossipsub.go
90
gossipsub.go
|
@ -9,8 +9,11 @@ import (
|
|||
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"github.com/libp2p/go-libp2p-core/routing"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -40,6 +43,9 @@ var (
|
|||
|
||||
// backoff time for pruned peers
|
||||
GossipSubPruneBackoff = time.Minute
|
||||
|
||||
// number of active connection attempts for peers obtained through px
|
||||
GossipSubConnectors = 16
|
||||
)
|
||||
|
||||
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
|
||||
|
@ -52,6 +58,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
|
|||
gossip: make(map[peer.ID][]*pb.ControlIHave),
|
||||
control: make(map[peer.ID]*pb.ControlMessage),
|
||||
backoff: make(map[string]map[peer.ID]time.Time),
|
||||
connect: make(chan connectInfo, GossipSubConnectors),
|
||||
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
|
||||
}
|
||||
return NewPubSub(ctx, h, rt, opts...)
|
||||
|
@ -73,10 +80,16 @@ type GossipSubRouter struct {
|
|||
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
|
||||
control map[peer.ID]*pb.ControlMessage // pending control messages
|
||||
backoff map[string]map[peer.ID]time.Time // prune backoff
|
||||
connect chan connectInfo // px connection requests
|
||||
mcache *MessageCache
|
||||
tracer *pubsubTracer
|
||||
}
|
||||
|
||||
type connectInfo struct {
|
||||
p peer.ID
|
||||
srr *routing.SignedRoutingState
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) Protocols() []protocol.ID {
|
||||
return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
|
||||
}
|
||||
|
@ -87,6 +100,9 @@ func (gs *GossipSubRouter) Attach(p *PubSub) {
|
|||
// start using the same msg ID function as PubSub for caching messages.
|
||||
gs.mcache.SetMsgIdFn(p.msgID)
|
||||
go gs.heartbeatTimer()
|
||||
for i := 0; i < GossipSubConnectors; i++ {
|
||||
go gs.connector()
|
||||
}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
|
||||
|
@ -251,6 +267,10 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
|
|||
delete(peers, p)
|
||||
gs.untagPeer(p, topic)
|
||||
gs.addBackoff(p, topic)
|
||||
px := prune.GetPeers()
|
||||
if len(px) > 0 {
|
||||
gs.pxConnect(px)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -264,6 +284,76 @@ func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
|
|||
backoff[p] = time.Now().Add(GossipSubPruneBackoff)
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
|
||||
toconnect := make([]connectInfo, 0, len(peers))
|
||||
for _, pi := range peers {
|
||||
p := peer.ID(pi.PeerID)
|
||||
|
||||
_, connected := gs.peers[p]
|
||||
if connected {
|
||||
continue
|
||||
}
|
||||
|
||||
var srr *routing.SignedRoutingState
|
||||
var err error
|
||||
if pi.SignedAddrs != nil {
|
||||
// the peer sent us a signed record; ensure that it is valid
|
||||
srr, err = routing.UnmarshalSignedRoutingState(pi.SignedAddrs)
|
||||
if err != nil {
|
||||
log.Warningf("error unmarshalling routing record obtained through px: %s", err)
|
||||
continue
|
||||
}
|
||||
if srr.PeerID != p {
|
||||
log.Warningf("bogus routing record obtained through px: peer ID %s doesn't match expected peer %s", srr.PeerID, p)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
toconnect = append(toconnect, connectInfo{p, srr})
|
||||
}
|
||||
|
||||
if len(toconnect) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// initiate connections, without blocking the event loop
|
||||
go func() {
|
||||
for _, ci := range toconnect {
|
||||
select {
|
||||
case gs.connect <- ci:
|
||||
case <-gs.p.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) connector() {
|
||||
for {
|
||||
select {
|
||||
case ci := <-gs.connect:
|
||||
if gs.p.host.Network().Connectedness(ci.p) == network.Connected {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("connecting to %s", ci.p)
|
||||
if ci.srr != nil {
|
||||
gs.p.host.Peerstore().AddCertifiedAddrs(ci.srr, peerstore.TempAddrTTL)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(gs.p.ctx, 10*time.Second)
|
||||
err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p})
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Debugf("error connecting to %s: %s", ci.p, err)
|
||||
}
|
||||
|
||||
case <-gs.p.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
|
||||
gs.mcache.Put(msg)
|
||||
|
||||
|
|
Loading…
Reference in New Issue