feat: connect to discovered peers

This commit is contained in:
Richard Ramos 2023-03-13 20:37:28 -04:00 committed by RichΛrd
parent 2b30726c14
commit ca20eb4a79
6 changed files with 193 additions and 30 deletions

View File

@ -62,7 +62,7 @@ require (
github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.25.1 // indirect github.com/libp2p/go-libp2p v0.25.1 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.9.1 // indirect github.com/libp2p/go-libp2p-pubsub v0.9.3 // indirect
github.com/libp2p/go-mplex v0.7.0 // indirect github.com/libp2p/go-mplex v0.7.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect github.com/libp2p/go-nat v0.1.0 // indirect

View File

@ -413,8 +413,8 @@ github.com/libp2p/go-libp2p v0.25.1 h1:YK+YDCHpYyTvitKWVxa5PfElgIpOONU01X5UcLEwJ
github.com/libp2p/go-libp2p v0.25.1/go.mod h1:xnK9/1d9+jeQCVvi/f1g12KqtVi/jP/SijtKV1hML3g= github.com/libp2p/go-libp2p v0.25.1/go.mod h1:xnK9/1d9+jeQCVvi/f1g12KqtVi/jP/SijtKV1hML3g=
github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/1q+8WeNAsw= github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/1q+8WeNAsw=
github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI= github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI=
github.com/libp2p/go-libp2p-pubsub v0.9.1 h1:A6LBg9BaoLf3NwRz+E974sAxTVcbUZYg95IhK2BZz9g= github.com/libp2p/go-libp2p-pubsub v0.9.3 h1:ihcz9oIBMaCK9kcx+yHWm3mLAFBMAUsM4ux42aikDxo=
github.com/libp2p/go-libp2p-pubsub v0.9.1/go.mod h1:RYA7aM9jIic5VV47WXu4GkcRxRhrdElWf8xtyli+Dzc= github.com/libp2p/go-libp2p-pubsub v0.9.3/go.mod h1:RYA7aM9jIic5VV47WXu4GkcRxRhrdElWf8xtyli+Dzc=
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY= github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY=
github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU= github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU=

View File

@ -214,7 +214,16 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
return nil, err return nil, err
} }
w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, w.opts.rendezvousNodes, w.peerConnector, w.log) var rendezvousPoints []peer.ID
for _, p := range w.opts.rendezvousNodes {
peerID, err := utils.GetPeerID(p)
if err != nil {
return nil, err
}
rendezvousPoints = append(rendezvousPoints, peerID)
}
w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...) w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...)

View File

@ -3,19 +3,26 @@ package rendezvous
import ( import (
"context" "context"
"math" "math"
"math/rand"
"sync" "sync"
"time" "time"
rvs "github.com/berty/go-libp2p-rendezvous" rvs "github.com/berty/go-libp2p-rendezvous"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap" "go.uber.org/zap"
) )
const RendezvousID = rvs.RendezvousProto const RendezvousID = rvs.RendezvousProto
type rendezvousPoint struct {
sync.RWMutex
id peer.ID
cookie []byte
}
type Rendezvous struct { type Rendezvous struct {
host host.Host host host.Host
@ -24,7 +31,7 @@ type Rendezvous struct {
rendezvousSvc *rvs.RendezvousService rendezvousSvc *rvs.RendezvousService
discoverPeers bool discoverPeers bool
rendezvousPoints []multiaddr.Multiaddr rendezvousPoints []*rendezvousPoint
peerConnector PeerConnector peerConnector PeerConnector
log *zap.Logger log *zap.Logger
@ -36,9 +43,16 @@ type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo PeerChannel() chan<- peer.AddrInfo
} }
func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool, rendevousPoints []multiaddr.Multiaddr, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous") logger := log.Named("rendezvous")
var rendevousPoints []*rendezvousPoint
for _, rp := range rendezvousPoints {
rendevousPoints = append(rendevousPoints, &rendezvousPoint{
id: rp,
})
}
return &Rendezvous{ return &Rendezvous{
host: host, host: host,
enableServer: enableServer, enableServer: enableServer,
@ -51,26 +65,27 @@ func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool
} }
func (r *Rendezvous) Start(ctx context.Context) error { func (r *Rendezvous) Start(ctx context.Context) error {
err := r.db.Start(ctx)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel r.cancel = cancel
if r.enableServer { if r.enableServer {
err := r.db.Start(ctx)
if err != nil {
cancel()
return err
}
r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db)
} }
r.wg.Add(1)
go r.register(ctx)
if r.discoverPeers { if r.discoverPeers {
r.wg.Add(1) r.wg.Add(1)
go r.register(ctx) go r.discover(ctx)
} }
// TODO: Execute discovery and push nodes to peer connector. If asking for peers fail, add timeout and exponential backoff
r.log.Info("rendezvous protocol started") r.log.Info("rendezvous protocol started")
return nil return nil
} }
@ -78,6 +93,44 @@ func (r *Rendezvous) Start(ctx context.Context) error {
const registerBackoff = 200 * time.Millisecond const registerBackoff = 200 * time.Millisecond
const registerMaxRetries = 7 const registerMaxRetries = 7
func (r *Rendezvous) getRandomServer() *rendezvousPoint {
return r.rendezvousPoints[rand.Intn(len(r.rendezvousPoints))] // nolint: gosec
}
func (r *Rendezvous) discover(ctx context.Context) {
defer r.wg.Done()
for {
select {
case <-ctx.Done():
return
default:
server := r.getRandomServer()
rendezvousClient := rvs.NewRendezvousClient(r.host, server.id)
addrInfo, cookie, err := rendezvousClient.Discover(ctx, relay.DefaultWakuTopic, 5, server.cookie)
if err != nil {
r.log.Error("could not discover new peers", zap.Error(err))
cookie = nil
}
if len(addrInfo) != 0 {
server.Lock()
server.cookie = cookie
server.Unlock()
for _, addr := range addrInfo {
r.peerConnector.PeerChannel() <- addr
}
} else {
// TODO: change log level to DEBUG in go-libp2p-rendezvous@v0.4.1/svc.go:234 discover query
// TODO: improve this by adding an exponential backoff?
time.Sleep(2 * time.Second)
}
}
}
}
func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) { func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) {
ttl, err := rendezvousClient.Register(ctx, relay.DefaultWakuTopic, rvs.DefaultTTL) // TODO: determine which topic to use ttl, err := rendezvousClient.Register(ctx, relay.DefaultWakuTopic, rvs.DefaultTTL) // TODO: determine which topic to use
var t <-chan time.Time var t <-chan time.Time
@ -94,24 +147,14 @@ func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.Rend
} }
func (r *Rendezvous) register(ctx context.Context) { func (r *Rendezvous) register(ctx context.Context) {
defer r.wg.Done()
for _, m := range r.rendezvousPoints { for _, m := range r.rendezvousPoints {
r.wg.Add(1) r.wg.Add(1)
go func(m multiaddr.Multiaddr) { go func(m *rendezvousPoint) {
r.wg.Done() r.wg.Done()
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P) rendezvousClient := rvs.NewRendezvousClient(r.host, m.id)
if err != nil {
r.log.Error("error obtaining peerID", zap.Error(err))
return
}
peerID, err := peer.Decode(peerIDStr)
if err != nil {
r.log.Error("error obtaining peerID", zap.Error(err))
return
}
rendezvousClient := rvs.NewRendezvousClient(r.host, peerID)
retries := 0 retries := 0
var t <-chan time.Time var t <-chan time.Time

View File

@ -0,0 +1,96 @@
package rendezvous
import (
"context"
"crypto/rand"
"database/sql"
"fmt"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/persistence/sqlite"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type PeerConn struct {
ch chan peer.AddrInfo
}
func (p PeerConn) PeerChannel() chan<- peer.AddrInfo {
return p.ch
}
func NewPeerConn() PeerConn {
x := PeerConn{}
x.ch = make(chan peer.AddrInfo, 1000)
return x
}
func TestRendezvous(t *testing.T) {
port1, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host1, err := tests.MakeHost(context.Background(), port1, rand.Reader)
require.NoError(t, err)
var db *sql.DB
db, migration, err := sqlite.NewDB(":memory:")
require.NoError(t, err)
err = migration(db)
require.NoError(t, err)
rdb := NewDB(context.Background(), db, utils.Logger())
rendezvousPoint := NewRendezvous(host1, true, rdb, false, nil, nil, utils.Logger())
err = rendezvousPoint.Start(context.Background())
require.NoError(t, err)
defer rendezvousPoint.Stop()
hostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host1.ID().Pretty()))
host1Addr := host1.Addrs()[0].Encapsulate(hostInfo)
port2, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host2, err := tests.MakeHost(context.Background(), port2, rand.Reader)
require.NoError(t, err)
info, err := peer.AddrInfoFromP2pAddr(host1Addr)
require.NoError(t, err)
host2.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(info.ID, RendezvousID)
require.NoError(t, err)
rendezvousClient1 := NewRendezvous(host2, false, nil, false, []peer.ID{host1.ID()}, nil, utils.Logger())
err = rendezvousClient1.Start(context.Background())
require.NoError(t, err)
defer rendezvousClient1.Stop()
port3, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host3, err := tests.MakeHost(context.Background(), port3, rand.Reader)
require.NoError(t, err)
host3.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
err = host3.Peerstore().AddProtocols(info.ID, RendezvousID)
require.NoError(t, err)
myPeerConnector := NewPeerConn()
rendezvousClient2 := NewRendezvous(host3, false, nil, true, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger())
err = rendezvousClient2.Start(context.Background())
require.NoError(t, err)
defer rendezvousClient2.Stop()
timer := time.After(5 * time.Second)
select {
case <-timer:
require.Fail(t, "no peer discovered")
case p := <-myPeerConnector.ch:
require.Equal(t, p.ID.Pretty(), host2.ID().Pretty())
}
}

View File

@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -18,6 +19,20 @@ import (
// some protocol // some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found") var ErrNoPeersAvailable = errors.New("no suitable peers found")
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
if err != nil {
return "", err
}
peerID, err := peer.Decode(peerIDStr)
if err != nil {
return "", err
}
return peerID, nil
}
// SelectPeer is used to return a random peer that supports a given protocol. // SelectPeer is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming // If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore // it supports the chosen protocol, otherwise it will chose a peer from the node peerstore