diff --git a/flake.nix b/flake.nix index e6149406..1109421f 100644 --- a/flake.nix +++ b/flake.nix @@ -28,7 +28,7 @@ ]; doCheck = false; # FIXME: This needs to be manually changed when updating modules. - vendorSha256 = "sha256-bDiX2+o0oXx3KvsolcrQriPYnGzWKY3fz3T+fGtVRpI="; + vendorSha256 = "sha256-6VBZ0ilGcXhTXCoUmGdrRQqgnRwVJOtAe4JIMUCVw8Y="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; }; diff --git a/go.mod b/go.mod index 2a6c7536..6cc9e0a6 100644 --- a/go.mod +++ b/go.mod @@ -34,10 +34,10 @@ require ( ) require ( - github.com/berty/go-libp2p-rendezvous v0.4.1 github.com/cenkalti/backoff/v4 v4.1.2 github.com/go-chi/chi/v5 v5.0.0 github.com/lib/pq v1.10.4 + github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671 github.com/waku-org/go-noise v0.0.4 github.com/waku-org/go-zerokit-rln v0.1.12 github.com/wk8/go-ordered-map v1.0.0 @@ -60,7 +60,6 @@ require ( github.com/quic-go/webtransport-go v0.5.2 // indirect github.com/rjeczalik/notify v0.9.3 // indirect github.com/status-im/status-go/extkeys v1.1.2 // indirect - github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671 // indirect github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 // indirect github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d // indirect github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9 // indirect diff --git a/go.sum b/go.sum index bd9bae24..e67d268e 100644 --- a/go.sum +++ b/go.sum @@ -211,8 +211,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc= -github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= @@ -1569,8 +1567,6 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM= github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= -github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601155048-9806ad621c18 h1:BNd3c24LWmYyLFIFx5OpPMWPTGvkzkb6ITag1Ao/w4Q= -github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601155048-9806ad621c18/go.mod h1:/1YwD6sx3xsbrSkVa4++e8AUDcUjC035bgKwDsZo+0Q= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671 h1:iOCDabjZ11Zk0ejdWBR54OEFA/rRZdQgIrX6Rv4U7AM= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671/go.mod h1:/1YwD6sx3xsbrSkVa4++e8AUDcUjC035bgKwDsZo+0Q= github.com/waku-org/go-noise v0.0.4 h1:ZfQDcCw8pazm89EBl5SXY7GGAnzDQb9AHFXlw3Ktbvk= diff --git a/waku/v2/connection_gater.go b/waku/v2/connection_gater.go index 249e9dce..66a0391f 100644 --- a/waku/v2/connection_gater.go +++ b/waku/v2/connection_gater.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/libp2p/go-libp2p/core/control" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" @@ -15,7 +14,6 @@ import ( type ConnectionGater struct { sync.Mutex - host host.Host logger *zap.Logger limiter map[string]int inbound int diff --git a/waku/v2/discovery_connector.go b/waku/v2/discovery_connector.go index f31ffa84..f7facd0d 100644 --- a/waku/v2/discovery_connector.go +++ b/waku/v2/discovery_connector.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -74,6 +75,7 @@ type connCacheData struct { type PeerData struct { Origin peers.Origin AddrInfo peer.AddrInfo + ENR *enode.Node } // PeerChannel exposes the channel on which discovered peers should be pushed @@ -181,7 +183,18 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) { return case p := <-c.peerCh: c.host.Peerstore().AddAddrs(p.AddrInfo.ID, p.AddrInfo.Addrs, peerstore.AddressTTL) - c.host.Peerstore().(peers.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin) + err := c.host.Peerstore().(peers.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin) + if err != nil { + c.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID)) + } + + if p.ENR != nil { + err = c.host.Peerstore().(peers.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) + if err != nil { + c.logger.Error("could not store enr", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) + } + } + c.publishWork(ctx, p.AddrInfo) case <-time.After(1 * time.Second): // This timeout is to not lock the goroutine @@ -247,6 +260,7 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { defer cancel() err := c.host.Connect(ctx, pi) if err != nil && !errors.Is(err, context.Canceled) { + c.host.Peerstore().(peers.WakuPeerstore).AddConnFailure(pi) c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) } <-sem diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 974bc33c..4188814e 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -299,6 +299,7 @@ func (d *DiscoveryV5) iterate(ctx context.Context) error { peer := v2.PeerData{ Origin: peers.Discv5, AddrInfo: peerAddrs[0], + ENR: iterator.Node(), } select { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 0bbc328d..98de1e42 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -742,9 +742,13 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peers.Origin, protocols ...protocol.ID) error { w.log.Info("adding peer to peerstore", logging.HostID("peer", info.ID)) - w.host.Peerstore().(peers.WakuPeerstore).SetOrigin(info.ID, origin) + err := w.host.Peerstore().(peers.WakuPeerstore).SetOrigin(info.ID, origin) + if err != nil { + return err + } + w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.AddressTTL) - err := w.host.Peerstore().AddProtocols(info.ID, protocols...) + err = w.host.Peerstore().AddProtocols(info.ID, protocols...) if err != nil { return err } @@ -795,9 +799,11 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) if err != nil { + w.host.Peerstore().(peers.WakuPeerstore).AddConnFailure(info) return err } + w.host.Peerstore().(peers.WakuPeerstore).ResetConnFailures(info) stats.Record(ctx, metrics.Dials.M(1)) return nil } diff --git a/waku/v2/peers/peerstore.go b/waku/v2/peers/peerstore.go index 58faf3e8..eb754c39 100644 --- a/waku/v2/peers/peerstore.go +++ b/waku/v2/peers/peerstore.go @@ -1,7 +1,10 @@ package peers import ( + "sync" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" ) @@ -20,8 +23,14 @@ const ( const peerOrigin = "origin" const peerENR = "enr" +type ConnectionFailures struct { + sync.RWMutex + failures map[peer.ID]int +} + type WakuPeerstoreImpl struct { - peerStore peerstore.Peerstore + peerStore peerstore.Peerstore + connFailures ConnectionFailures } type WakuPeerstore interface { @@ -29,12 +38,18 @@ type WakuPeerstore interface { Origin(p peer.ID, origin Origin) (Origin, error) PeersByOrigin(origin Origin) peer.IDSlice SetENR(p peer.ID, enr *enode.Node) error - ENR(p peer.ID, origin Origin) (*enode.Node, error) + ENR(p peer.ID, origin Origin) (*enr.Record, error) + AddConnFailure(p peer.AddrInfo) + ResetConnFailures(p peer.AddrInfo) + ConnFailures(p peer.AddrInfo) int } func NewWakuPeerstore(p peerstore.Peerstore) peerstore.Peerstore { return &WakuPeerstoreImpl{ peerStore: p, + connFailures: ConnectionFailures{ + failures: make(map[peer.ID]int), + }, } } @@ -73,3 +88,21 @@ func (ps *WakuPeerstoreImpl) ENR(p peer.ID, origin Origin) (*enode.Node, error) } return result.(*enode.Node), nil } + +func (ps *WakuPeerstoreImpl) AddConnFailure(p peer.AddrInfo) { + ps.connFailures.Lock() + defer ps.connFailures.Unlock() + ps.connFailures.failures[p.ID]++ +} + +func (ps *WakuPeerstoreImpl) ResetConnFailures(p peer.AddrInfo) { + ps.connFailures.Lock() + defer ps.connFailures.Unlock() + ps.connFailures.failures[p.ID] = 0 +} + +func (ps *WakuPeerstoreImpl) ConnFailures(p peer.AddrInfo) int { + ps.connFailures.RLock() + defer ps.connFailures.RUnlock() + return ps.connFailures.failures[p.ID] +} diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 32c2eec5..e7c9a5da 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -63,7 +63,11 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts } func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error { - var discoveredPeers []peer.AddrInfo + var discoveredPeers []struct { + addrInfo peer.AddrInfo + enr *enode.Node + } + for _, p := range response.PeerInfos { enrRecord := &enr.Record{} buf := bytes.NewBuffer(p.ENR) @@ -77,16 +81,21 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord) if err != nil { wakuPX.log.Error("creating enode record", zap.Error(err)) - return err } - peerInfo, err := wenr.EnodeToPeerInfo(enodeRecord) + addrInfo, err := wenr.EnodeToPeerInfo(enodeRecord) if err != nil { return err } - discoveredPeers = append(discoveredPeers, *peerInfo) + discoveredPeers = append(discoveredPeers, struct { + addrInfo peer.AddrInfo + enr *enode.Node + }{ + addrInfo: *addrInfo, + enr: enodeRecord, + }) } if len(discoveredPeers) != 0 { @@ -97,7 +106,8 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb for _, p := range discoveredPeers { peer := v2.PeerData{ Origin: peers.PeerExchange, - AddrInfo: p, + AddrInfo: p.addrInfo, + ENR: p.enr, } select { case <-ctx.Done(): diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 50f85a51..5b6b0d9a 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -139,7 +139,6 @@ func (r *Rendezvous) discover(ctx context.Context) { } } } else { - // TODO: change log level to DEBUG in go-libp2p-rendezvous@v0.4.1/svc.go:234 discover query // TODO: improve this by adding an exponential backoff? time.Sleep(5 * time.Second) }