mirror of https://github.com/status-im/go-waku.git
refactor: store ENR
This commit is contained in:
parent
52ac8e3740
commit
76186e5477
|
@ -28,7 +28,7 @@
|
|||
];
|
||||
doCheck = false;
|
||||
# FIXME: This needs to be manually changed when updating modules.
|
||||
vendorSha256 = "sha256-bDiX2+o0oXx3KvsolcrQriPYnGzWKY3fz3T+fGtVRpI=";
|
||||
vendorSha256 = "sha256-6VBZ0ilGcXhTXCoUmGdrRQqgnRwVJOtAe4JIMUCVw8Y=";
|
||||
# Fix for 'nix run' trying to execute 'go-waku'.
|
||||
meta = { mainProgram = "waku"; };
|
||||
};
|
||||
|
|
3
go.mod
3
go.mod
|
@ -34,10 +34,10 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1
|
||||
github.com/cenkalti/backoff/v4 v4.1.2
|
||||
github.com/go-chi/chi/v5 v5.0.0
|
||||
github.com/lib/pq v1.10.4
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671
|
||||
github.com/waku-org/go-noise v0.0.4
|
||||
github.com/waku-org/go-zerokit-rln v0.1.12
|
||||
github.com/wk8/go-ordered-map v1.0.0
|
||||
|
@ -60,7 +60,6 @@ require (
|
|||
github.com/quic-go/webtransport-go v0.5.2 // indirect
|
||||
github.com/rjeczalik/notify v0.9.3 // indirect
|
||||
github.com/status-im/status-go/extkeys v1.1.2 // indirect
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671 // indirect
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 // indirect
|
||||
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d // indirect
|
||||
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9 // indirect
|
||||
|
|
4
go.sum
4
go.sum
|
@ -211,8 +211,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
|
|||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc=
|
||||
github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
|
||||
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
|
||||
|
@ -1569,8 +1567,6 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
|
|||
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
||||
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM=
|
||||
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601155048-9806ad621c18 h1:BNd3c24LWmYyLFIFx5OpPMWPTGvkzkb6ITag1Ao/w4Q=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601155048-9806ad621c18/go.mod h1:/1YwD6sx3xsbrSkVa4++e8AUDcUjC035bgKwDsZo+0Q=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671 h1:iOCDabjZ11Zk0ejdWBR54OEFA/rRZdQgIrX6Rv4U7AM=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671/go.mod h1:/1YwD6sx3xsbrSkVa4++e8AUDcUjC035bgKwDsZo+0Q=
|
||||
github.com/waku-org/go-noise v0.0.4 h1:ZfQDcCw8pazm89EBl5SXY7GGAnzDQb9AHFXlw3Ktbvk=
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/control"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
|
@ -15,7 +14,6 @@ import (
|
|||
|
||||
type ConnectionGater struct {
|
||||
sync.Mutex
|
||||
host host.Host
|
||||
logger *zap.Logger
|
||||
limiter map[string]int
|
||||
inbound int
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
|
@ -74,6 +75,7 @@ type connCacheData struct {
|
|||
type PeerData struct {
|
||||
Origin peers.Origin
|
||||
AddrInfo peer.AddrInfo
|
||||
ENR *enode.Node
|
||||
}
|
||||
|
||||
// PeerChannel exposes the channel on which discovered peers should be pushed
|
||||
|
@ -181,7 +183,18 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {
|
|||
return
|
||||
case p := <-c.peerCh:
|
||||
c.host.Peerstore().AddAddrs(p.AddrInfo.ID, p.AddrInfo.Addrs, peerstore.AddressTTL)
|
||||
c.host.Peerstore().(peers.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin)
|
||||
err := c.host.Peerstore().(peers.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin)
|
||||
if err != nil {
|
||||
c.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID))
|
||||
}
|
||||
|
||||
if p.ENR != nil {
|
||||
err = c.host.Peerstore().(peers.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
|
||||
if err != nil {
|
||||
c.logger.Error("could not store enr", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
|
||||
}
|
||||
}
|
||||
|
||||
c.publishWork(ctx, p.AddrInfo)
|
||||
case <-time.After(1 * time.Second):
|
||||
// This timeout is to not lock the goroutine
|
||||
|
@ -247,6 +260,7 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
|
|||
defer cancel()
|
||||
err := c.host.Connect(ctx, pi)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
c.host.Peerstore().(peers.WakuPeerstore).AddConnFailure(pi)
|
||||
c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
|
||||
}
|
||||
<-sem
|
||||
|
|
|
@ -299,6 +299,7 @@ func (d *DiscoveryV5) iterate(ctx context.Context) error {
|
|||
peer := v2.PeerData{
|
||||
Origin: peers.Discv5,
|
||||
AddrInfo: peerAddrs[0],
|
||||
ENR: iterator.Node(),
|
||||
}
|
||||
|
||||
select {
|
||||
|
|
|
@ -742,9 +742,13 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error
|
|||
|
||||
func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peers.Origin, protocols ...protocol.ID) error {
|
||||
w.log.Info("adding peer to peerstore", logging.HostID("peer", info.ID))
|
||||
w.host.Peerstore().(peers.WakuPeerstore).SetOrigin(info.ID, origin)
|
||||
err := w.host.Peerstore().(peers.WakuPeerstore).SetOrigin(info.ID, origin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.AddressTTL)
|
||||
err := w.host.Peerstore().AddProtocols(info.ID, protocols...)
|
||||
err = w.host.Peerstore().AddProtocols(info.ID, protocols...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -795,9 +799,11 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
|
|||
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
|
||||
err := w.host.Connect(ctx, info)
|
||||
if err != nil {
|
||||
w.host.Peerstore().(peers.WakuPeerstore).AddConnFailure(info)
|
||||
return err
|
||||
}
|
||||
|
||||
w.host.Peerstore().(peers.WakuPeerstore).ResetConnFailures(info)
|
||||
stats.Record(ctx, metrics.Dials.M(1))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
package peers
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
)
|
||||
|
@ -20,8 +23,14 @@ const (
|
|||
const peerOrigin = "origin"
|
||||
const peerENR = "enr"
|
||||
|
||||
type ConnectionFailures struct {
|
||||
sync.RWMutex
|
||||
failures map[peer.ID]int
|
||||
}
|
||||
|
||||
type WakuPeerstoreImpl struct {
|
||||
peerStore peerstore.Peerstore
|
||||
peerStore peerstore.Peerstore
|
||||
connFailures ConnectionFailures
|
||||
}
|
||||
|
||||
type WakuPeerstore interface {
|
||||
|
@ -29,12 +38,18 @@ type WakuPeerstore interface {
|
|||
Origin(p peer.ID, origin Origin) (Origin, error)
|
||||
PeersByOrigin(origin Origin) peer.IDSlice
|
||||
SetENR(p peer.ID, enr *enode.Node) error
|
||||
ENR(p peer.ID, origin Origin) (*enode.Node, error)
|
||||
ENR(p peer.ID, origin Origin) (*enr.Record, error)
|
||||
AddConnFailure(p peer.AddrInfo)
|
||||
ResetConnFailures(p peer.AddrInfo)
|
||||
ConnFailures(p peer.AddrInfo) int
|
||||
}
|
||||
|
||||
func NewWakuPeerstore(p peerstore.Peerstore) peerstore.Peerstore {
|
||||
return &WakuPeerstoreImpl{
|
||||
peerStore: p,
|
||||
connFailures: ConnectionFailures{
|
||||
failures: make(map[peer.ID]int),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -73,3 +88,21 @@ func (ps *WakuPeerstoreImpl) ENR(p peer.ID, origin Origin) (*enode.Node, error)
|
|||
}
|
||||
return result.(*enode.Node), nil
|
||||
}
|
||||
|
||||
func (ps *WakuPeerstoreImpl) AddConnFailure(p peer.AddrInfo) {
|
||||
ps.connFailures.Lock()
|
||||
defer ps.connFailures.Unlock()
|
||||
ps.connFailures.failures[p.ID]++
|
||||
}
|
||||
|
||||
func (ps *WakuPeerstoreImpl) ResetConnFailures(p peer.AddrInfo) {
|
||||
ps.connFailures.Lock()
|
||||
defer ps.connFailures.Unlock()
|
||||
ps.connFailures.failures[p.ID] = 0
|
||||
}
|
||||
|
||||
func (ps *WakuPeerstoreImpl) ConnFailures(p peer.AddrInfo) int {
|
||||
ps.connFailures.RLock()
|
||||
defer ps.connFailures.RUnlock()
|
||||
return ps.connFailures.failures[p.ID]
|
||||
}
|
||||
|
|
|
@ -63,7 +63,11 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
|||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error {
|
||||
var discoveredPeers []peer.AddrInfo
|
||||
var discoveredPeers []struct {
|
||||
addrInfo peer.AddrInfo
|
||||
enr *enode.Node
|
||||
}
|
||||
|
||||
for _, p := range response.PeerInfos {
|
||||
enrRecord := &enr.Record{}
|
||||
buf := bytes.NewBuffer(p.ENR)
|
||||
|
@ -77,16 +81,21 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
|
|||
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
|
||||
if err != nil {
|
||||
wakuPX.log.Error("creating enode record", zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
peerInfo, err := wenr.EnodeToPeerInfo(enodeRecord)
|
||||
addrInfo, err := wenr.EnodeToPeerInfo(enodeRecord)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
discoveredPeers = append(discoveredPeers, *peerInfo)
|
||||
discoveredPeers = append(discoveredPeers, struct {
|
||||
addrInfo peer.AddrInfo
|
||||
enr *enode.Node
|
||||
}{
|
||||
addrInfo: *addrInfo,
|
||||
enr: enodeRecord,
|
||||
})
|
||||
}
|
||||
|
||||
if len(discoveredPeers) != 0 {
|
||||
|
@ -97,7 +106,8 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
|
|||
for _, p := range discoveredPeers {
|
||||
peer := v2.PeerData{
|
||||
Origin: peers.PeerExchange,
|
||||
AddrInfo: p,
|
||||
AddrInfo: p.addrInfo,
|
||||
ENR: p.enr,
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -139,7 +139,6 @@ func (r *Rendezvous) discover(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: change log level to DEBUG in go-libp2p-rendezvous@v0.4.1/svc.go:234 discover query
|
||||
// TODO: improve this by adding an exponential backoff?
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue