From ca20eb4a79263e72f0b29e61321a3431845ae8c3 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 13 Mar 2023 20:37:28 -0400 Subject: [PATCH] feat: connect to discovered peers --- examples/noise/go.mod | 2 +- examples/noise/go.sum | 4 +- waku/v2/node/wakunode2.go | 11 ++- waku/v2/rendezvous/rendezvous.go | 95 ++++++++++++++++++-------- waku/v2/rendezvous/rendezvous_test.go | 96 +++++++++++++++++++++++++++ waku/v2/utils/peer.go | 15 +++++ 6 files changed, 193 insertions(+), 30 deletions(-) create mode 100644 waku/v2/rendezvous/rendezvous_test.go diff --git a/examples/noise/go.mod b/examples/noise/go.mod index aee40ba3..91220e2c 100644 --- a/examples/noise/go.mod +++ b/examples/noise/go.mod @@ -62,7 +62,7 @@ require ( github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-libp2p v0.25.1 // indirect github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect - github.com/libp2p/go-libp2p-pubsub v0.9.1 // indirect + github.com/libp2p/go-libp2p-pubsub v0.9.3 // indirect github.com/libp2p/go-mplex v0.7.0 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-nat v0.1.0 // indirect diff --git a/examples/noise/go.sum b/examples/noise/go.sum index d08eb943..93cd84e5 100644 --- a/examples/noise/go.sum +++ b/examples/noise/go.sum @@ -413,8 +413,8 @@ github.com/libp2p/go-libp2p v0.25.1 h1:YK+YDCHpYyTvitKWVxa5PfElgIpOONU01X5UcLEwJ github.com/libp2p/go-libp2p v0.25.1/go.mod h1:xnK9/1d9+jeQCVvi/f1g12KqtVi/jP/SijtKV1hML3g= github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/1q+8WeNAsw= github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI= -github.com/libp2p/go-libp2p-pubsub v0.9.1 h1:A6LBg9BaoLf3NwRz+E974sAxTVcbUZYg95IhK2BZz9g= -github.com/libp2p/go-libp2p-pubsub v0.9.1/go.mod h1:RYA7aM9jIic5VV47WXu4GkcRxRhrdElWf8xtyli+Dzc= +github.com/libp2p/go-libp2p-pubsub v0.9.3 h1:ihcz9oIBMaCK9kcx+yHWm3mLAFBMAUsM4ux42aikDxo= +github.com/libp2p/go-libp2p-pubsub v0.9.3/go.mod h1:RYA7aM9jIic5VV47WXu4GkcRxRhrdElWf8xtyli+Dzc= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY= github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU= diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index b7c532f5..b5a3dc20 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -214,7 +214,16 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { return nil, err } - w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, w.opts.rendezvousNodes, w.peerConnector, w.log) + var rendezvousPoints []peer.ID + for _, p := range w.opts.rendezvousNodes { + peerID, err := utils.GetPeerID(p) + if err != nil { + return nil, err + } + rendezvousPoints = append(rendezvousPoints, peerID) + } + + w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log) w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...) diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index aab7c3ca..319507c5 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -3,19 +3,26 @@ package rendezvous import ( "context" "math" + "math/rand" "sync" "time" rvs "github.com/berty/go-libp2p-rendezvous" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) const RendezvousID = rvs.RendezvousProto +type rendezvousPoint struct { + sync.RWMutex + + id peer.ID + cookie []byte +} + type Rendezvous struct { host host.Host @@ -24,7 +31,7 @@ type Rendezvous struct { rendezvousSvc *rvs.RendezvousService discoverPeers bool - rendezvousPoints []multiaddr.Multiaddr + rendezvousPoints []*rendezvousPoint peerConnector PeerConnector log *zap.Logger @@ -36,9 +43,16 @@ type PeerConnector interface { PeerChannel() chan<- peer.AddrInfo } -func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool, rendevousPoints []multiaddr.Multiaddr, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { +func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { logger := log.Named("rendezvous") + var rendevousPoints []*rendezvousPoint + for _, rp := range rendezvousPoints { + rendevousPoints = append(rendevousPoints, &rendezvousPoint{ + id: rp, + }) + } + return &Rendezvous{ host: host, enableServer: enableServer, @@ -51,26 +65,27 @@ func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool } func (r *Rendezvous) Start(ctx context.Context) error { - err := r.db.Start(ctx) - if err != nil { - return err - } - ctx, cancel := context.WithCancel(ctx) - r.cancel = cancel if r.enableServer { + err := r.db.Start(ctx) + if err != nil { + cancel() + return err + } + r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) } + r.wg.Add(1) + go r.register(ctx) + if r.discoverPeers { r.wg.Add(1) - go r.register(ctx) + go r.discover(ctx) } - // TODO: Execute discovery and push nodes to peer connector. If asking for peers fail, add timeout and exponential backoff - r.log.Info("rendezvous protocol started") return nil } @@ -78,6 +93,44 @@ func (r *Rendezvous) Start(ctx context.Context) error { const registerBackoff = 200 * time.Millisecond const registerMaxRetries = 7 +func (r *Rendezvous) getRandomServer() *rendezvousPoint { + return r.rendezvousPoints[rand.Intn(len(r.rendezvousPoints))] // nolint: gosec +} + +func (r *Rendezvous) discover(ctx context.Context) { + defer r.wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + server := r.getRandomServer() + + rendezvousClient := rvs.NewRendezvousClient(r.host, server.id) + + addrInfo, cookie, err := rendezvousClient.Discover(ctx, relay.DefaultWakuTopic, 5, server.cookie) + if err != nil { + r.log.Error("could not discover new peers", zap.Error(err)) + cookie = nil + } + + if len(addrInfo) != 0 { + server.Lock() + server.cookie = cookie + server.Unlock() + + for _, addr := range addrInfo { + r.peerConnector.PeerChannel() <- addr + } + } else { + // TODO: change log level to DEBUG in go-libp2p-rendezvous@v0.4.1/svc.go:234 discover query + // TODO: improve this by adding an exponential backoff? + time.Sleep(2 * time.Second) + } + } + } +} + func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) { ttl, err := rendezvousClient.Register(ctx, relay.DefaultWakuTopic, rvs.DefaultTTL) // TODO: determine which topic to use var t <-chan time.Time @@ -94,24 +147,14 @@ func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.Rend } func (r *Rendezvous) register(ctx context.Context) { + defer r.wg.Done() + for _, m := range r.rendezvousPoints { r.wg.Add(1) - go func(m multiaddr.Multiaddr) { + go func(m *rendezvousPoint) { r.wg.Done() - peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P) - if err != nil { - r.log.Error("error obtaining peerID", zap.Error(err)) - return - } - - peerID, err := peer.Decode(peerIDStr) - if err != nil { - r.log.Error("error obtaining peerID", zap.Error(err)) - return - } - - rendezvousClient := rvs.NewRendezvousClient(r.host, peerID) + rendezvousClient := rvs.NewRendezvousClient(r.host, m.id) retries := 0 var t <-chan time.Time diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go new file mode 100644 index 00000000..0f25a781 --- /dev/null +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -0,0 +1,96 @@ +package rendezvous + +import ( + "context" + "crypto/rand" + "database/sql" + "fmt" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/persistence/sqlite" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +type PeerConn struct { + ch chan peer.AddrInfo +} + +func (p PeerConn) PeerChannel() chan<- peer.AddrInfo { + return p.ch +} + +func NewPeerConn() PeerConn { + x := PeerConn{} + x.ch = make(chan peer.AddrInfo, 1000) + return x +} + +func TestRendezvous(t *testing.T) { + port1, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + host1, err := tests.MakeHost(context.Background(), port1, rand.Reader) + require.NoError(t, err) + + var db *sql.DB + db, migration, err := sqlite.NewDB(":memory:") + require.NoError(t, err) + + err = migration(db) + require.NoError(t, err) + + rdb := NewDB(context.Background(), db, utils.Logger()) + rendezvousPoint := NewRendezvous(host1, true, rdb, false, nil, nil, utils.Logger()) + err = rendezvousPoint.Start(context.Background()) + require.NoError(t, err) + defer rendezvousPoint.Stop() + + hostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host1.ID().Pretty())) + host1Addr := host1.Addrs()[0].Encapsulate(hostInfo) + + port2, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + host2, err := tests.MakeHost(context.Background(), port2, rand.Reader) + require.NoError(t, err) + + info, err := peer.AddrInfoFromP2pAddr(host1Addr) + require.NoError(t, err) + + host2.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(info.ID, RendezvousID) + require.NoError(t, err) + + rendezvousClient1 := NewRendezvous(host2, false, nil, false, []peer.ID{host1.ID()}, nil, utils.Logger()) + err = rendezvousClient1.Start(context.Background()) + require.NoError(t, err) + defer rendezvousClient1.Stop() + + port3, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + host3, err := tests.MakeHost(context.Background(), port3, rand.Reader) + require.NoError(t, err) + + host3.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) + err = host3.Peerstore().AddProtocols(info.ID, RendezvousID) + require.NoError(t, err) + + myPeerConnector := NewPeerConn() + + rendezvousClient2 := NewRendezvous(host3, false, nil, true, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger()) + err = rendezvousClient2.Start(context.Background()) + require.NoError(t, err) + defer rendezvousClient2.Stop() + + timer := time.After(5 * time.Second) + select { + case <-timer: + require.Fail(t, "no peer discovered") + case p := <-myPeerConnector.ch: + require.Equal(t, p.ID.Pretty(), host2.ID().Pretty()) + } +} diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 89180522..a475a0d4 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/multiformats/go-multiaddr" "go.uber.org/zap" ) @@ -18,6 +19,20 @@ import ( // some protocol var ErrNoPeersAvailable = errors.New("no suitable peers found") +func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { + peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P) + if err != nil { + return "", err + } + + peerID, err := peer.Decode(peerIDStr) + if err != nil { + return "", err + } + + return peerID, nil +} + // SelectPeer is used to return a random peer that supports a given protocol. // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol, otherwise it will chose a peer from the node peerstore