From 52ac8e3740311a8da46dbe30a8f57514d417e9c9 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 5 Jun 2023 10:39:38 -0400 Subject: [PATCH] refactor: various - Limit inbound connections to 10 per IP - Expose gossipsub parameters on WakuRelay - New peerstore --- mobile/api.go | 3 +- tests/utils.go | 27 +++- waku/node.go | 26 ++-- waku/v2/connection_gater.go | 125 ++++++++++++++++++ waku/v2/discovery_connector.go | 19 ++- waku/v2/discv5/discover.go | 11 +- waku/v2/node/service.go | 4 +- waku/v2/node/wakunode2.go | 38 +++++- waku/v2/node/wakuoptions.go | 9 ++ waku/v2/peers/inherited.go | 139 +++++++++++++++++++++ waku/v2/peers/peerstore.go | 75 +++++++++++ waku/v2/protocol/peer_exchange/client.go | 18 ++- waku/v2/protocol/peer_exchange/protocol.go | 4 +- waku/v2/protocol/relay/waku_relay.go | 44 ++++--- waku/v2/rendezvous/rendezvous.go | 10 +- waku/v2/rendezvous/rendezvous_test.go | 9 +- waku/v2/rest/store.go | 3 +- waku/v2/rpc/filter_test.go | 3 +- 18 files changed, 505 insertions(+), 62 deletions(-) create mode 100644 waku/v2/connection_gater.go create mode 100644 waku/v2/peers/inherited.go create mode 100644 waku/v2/peers/peerstore.go diff --git a/mobile/api.go b/mobile/api.go index e527cc06..ed1e803c 100644 --- a/mobile/api.go +++ b/mobile/api.go @@ -29,6 +29,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/payload" + "github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/utils" @@ -238,7 +239,7 @@ func AddPeer(address string, protocolID string) string { return MakeJSONResponse(err) } - peerID, err := wakuState.node.AddPeer(ma, libp2pProtocol.ID(protocolID)) + peerID, err := wakuState.node.AddPeer(ma, peers.Static, libp2pProtocol.ID(protocolID)) return PrepareJSONResponse(peerID, err) } diff --git a/tests/utils.go b/tests/utils.go index 841f8d35..7d660f8e 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -15,7 +15,10 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/multiformats/go-multiaddr" + v2 "github.com/waku-org/go-waku/waku/v2" + "github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/protocol/pb" ) @@ -94,11 +97,25 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e } // 0.0.0.0 will listen on any interface device. - sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)) + sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)) + if err != nil { + return nil, err + } + + ps, err := pstoremem.NewPeerstore() + if err != nil { + return nil, err + } + + psWrapper := peers.NewWakuPeerstore(ps) + if err != nil { + return nil, err + } // libp2p.New constructs a new libp2p Host. // Other options can be added here. return libp2p.New( + libp2p.Peerstore(psWrapper), libp2p.ListenAddrs(sourceMultiAddr), libp2p.Identity(prvKey), ) @@ -121,19 +138,19 @@ func RandomHex(n int) (string, error) { type TestPeerDiscoverer struct { sync.RWMutex peerMap map[peer.ID]struct{} - peerCh chan peer.AddrInfo + peerCh chan v2.PeerData } func NewTestPeerDiscoverer() *TestPeerDiscoverer { result := &TestPeerDiscoverer{ peerMap: make(map[peer.ID]struct{}), - peerCh: make(chan peer.AddrInfo, 10), + peerCh: make(chan v2.PeerData, 10), } go func() { for p := range result.peerCh { result.Lock() - result.peerMap[p.ID] = struct{}{} + result.peerMap[p.AddrInfo.ID] = struct{}{} result.Unlock() } }() @@ -141,7 +158,7 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer { return result } -func (t *TestPeerDiscoverer) PeerChannel() chan<- peer.AddrInfo { +func (t *TestPeerDiscoverer) PeerChannel() chan<- v2.PeerData { return t.peerCh } diff --git a/waku/node.go b/waku/node.go index 7c7db599..657ccaf8 100644 --- a/waku/node.go +++ b/waku/node.go @@ -18,6 +18,7 @@ import ( "github.com/pbnjay/memory" wmetrics "github.com/waku-org/go-waku/waku/v2/metrics" + "github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/ethereum/go-ethereum/accounts/keystore" @@ -31,6 +32,7 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -191,7 +193,7 @@ func Execute(options Options) { peerStore, err := pstoreds.NewPeerstore(ctx, datastore, opts) failOnErr(err, "Peerstore") - libp2pOpts = append(libp2pOpts, libp2p.Peerstore(peerStore)) + nodeOpts = append(nodeOpts, node.WithPeerStore(peerStore)) } nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...)) @@ -296,17 +298,21 @@ func Execute(options Options) { failOnErr(err, "Wakunode") if options.Filter.UseV1 { - addPeers(wakuNode, options.Filter.NodesV1, legacy_filter.FilterID_v20beta1) + addStaticPeers(wakuNode, options.Filter.NodesV1, legacy_filter.FilterID_v20beta1) } if err = wakuNode.Start(ctx); err != nil { logger.Fatal("starting waku node", zap.Error(err)) } - addPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4) - addPeers(wakuNode, options.LightPush.Nodes, lightpush.LightPushID_v20beta1) - addPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID) - addPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1) + for _, d := range discoveredNodes { + wakuNode.Host().Peerstore().AddAddrs(d.PeerID, d.PeerInfo.Addrs, peerstore.PermanentAddrTTL) + } + + addStaticPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4) + addStaticPeers(wakuNode, options.LightPush.Nodes, lightpush.LightPushID_v20beta1) + addStaticPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID) + addStaticPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1) if options.DiscV5.Enable { if err = wakuNode.DiscV5().Start(ctx); err != nil { @@ -318,11 +324,11 @@ func Execute(options Options) { if options.PeerExchange.Enable && options.PeerExchange.Node != nil { logger.Info("retrieving peer info via peer exchange protocol") - peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, peer_exchange.PeerExchangeID_v20alpha1) + peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, peers.Static, peer_exchange.PeerExchangeID_v20alpha1) if err != nil { logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err)) } else { - desiredOutDegree := 6 // TODO: obtain this from gossipsub D + desiredOutDegree := wakuNode.Relay().Params().D if err = wakuNode.PeerExchange().Request(ctx, desiredOutDegree, peer_exchange.WithPeer(peerId)); err != nil { logger.Error("requesting peers via peer exchange", zap.Error(err)) } @@ -420,9 +426,9 @@ func Execute(options Options) { } } -func addPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) { +func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) { for _, addr := range addresses { - _, err := wakuNode.AddPeer(addr, protocols...) + _, err := wakuNode.AddPeer(addr, peers.Static, protocols...) failOnErr(err, "error adding peer") } } diff --git a/waku/v2/connection_gater.go b/waku/v2/connection_gater.go new file mode 100644 index 00000000..249e9dce --- /dev/null +++ b/waku/v2/connection_gater.go @@ -0,0 +1,125 @@ +package v2 + +import ( + "runtime" + "sync" + + "github.com/libp2p/go-libp2p/core/control" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" + "go.uber.org/zap" +) + +type ConnectionGater struct { + sync.Mutex + host host.Host + logger *zap.Logger + limiter map[string]int + inbound int + outbound int +} + +const maxConnsPerIP = 10 + +func NewConnectionGater(logger *zap.Logger) *ConnectionGater { + c := &ConnectionGater{ + logger: logger.Named("connection-gater"), + limiter: make(map[string]int), + inbound: 0, + outbound: 0, + } + + return c +} + +// InterceptPeerDial is called on an imminent outbound peer dial request, prior +// to the addresses of that peer being available/resolved. Blocking connections +// at this stage is typical for blacklisting scenarios. +func (c *ConnectionGater) InterceptPeerDial(_ peer.ID) (allow bool) { + return true +} + +// InterceptAddrDial is called on an imminent outbound dial to a peer on a +// particular address. Blocking connections at this stage is typical for +// address filtering. +func (c *ConnectionGater) InterceptAddrDial(pid peer.ID, m multiaddr.Multiaddr) (allow bool) { + return true +} + +// InterceptAccept is called as soon as a transport listener receives an +// inbound connection request, before any upgrade takes place. Transports who +// accept already secure and/or multiplexed connections (e.g. possibly QUIC) +// MUST call this method regardless, for correctness/consistency. +func (c *ConnectionGater) InterceptAccept(n network.ConnMultiaddrs) (allow bool) { + if !c.validateInboundConn(n.RemoteMultiaddr()) { + runtime.Gosched() // Allow other go-routines to run in the event + c.logger.Info("exceeds allowed inbound connections from this ip", zap.String("multiaddr", n.RemoteMultiaddr().String())) + return false + } + + if false { // inbound > someLimit + c.logger.Info("connection not accepted. Max inbound connections reached", zap.String("multiaddr", n.RemoteMultiaddr().String())) + return false + } + + return true +} + +// InterceptSecured is called for both inbound and outbound connections, +// after a security handshake has taken place and we've authenticated the peer +func (c *ConnectionGater) InterceptSecured(_ network.Direction, _ peer.ID, _ network.ConnMultiaddrs) (allow bool) { + return true +} + +// InterceptUpgraded is called for inbound and outbound connections, after +// libp2p has finished upgrading the connection entirely to a secure, +// multiplexed channel. +func (c *ConnectionGater) InterceptUpgraded(_ network.Conn) (allow bool, reason control.DisconnectReason) { + return true, 0 +} + +func (c *ConnectionGater) NotifyDisconnect(addr multiaddr.Multiaddr) { + ip, err := manet.ToIP(addr) + if err != nil { + return + } + + c.Lock() + defer c.Unlock() + + currConnections, ok := c.limiter[ip.String()] + if ok { + currConnections-- + if currConnections <= 0 { + delete(c.limiter, ip.String()) + } else { + c.limiter[ip.String()] = currConnections + } + } +} + +func (c *ConnectionGater) validateInboundConn(addr multiaddr.Multiaddr) bool { + ip, err := manet.ToIP(addr) + if err != nil { + return false + } + + c.Lock() + defer c.Unlock() + + currConnections, ok := c.limiter[ip.String()] + if !ok { + c.limiter[ip.String()] = 1 + return true + } else { + if currConnections+1 > maxConnsPerIP { + return false + } + + c.limiter[ip.String()]++ + } + return true +} diff --git a/waku/v2/discovery_connector.go b/waku/v2/discovery_connector.go index e22232e2..f31ffa84 100644 --- a/waku/v2/discovery_connector.go +++ b/waku/v2/discovery_connector.go @@ -11,8 +11,12 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peers" + "go.uber.org/zap" lru "github.com/hashicorp/golang-lru" @@ -33,7 +37,7 @@ type PeerConnectionStrategy struct { wg sync.WaitGroup minPeers int dialTimeout time.Duration - peerCh chan peer.AddrInfo + peerCh chan PeerData dialCh chan peer.AddrInfo backoff backoff.BackoffFactory @@ -67,8 +71,13 @@ type connCacheData struct { strat backoff.BackoffStrategy } +type PeerData struct { + Origin peers.Origin + AddrInfo peer.AddrInfo +} + // PeerChannel exposes the channel on which discovered peers should be pushed -func (c *PeerConnectionStrategy) PeerChannel() chan<- peer.AddrInfo { +func (c *PeerConnectionStrategy) PeerChannel() chan<- PeerData { return c.peerCh } @@ -85,7 +94,7 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) c.cancel = cancel - c.peerCh = make(chan peer.AddrInfo) + c.peerCh = make(chan PeerData) c.dialCh = make(chan peer.AddrInfo) c.wg.Add(3) @@ -171,7 +180,9 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) { case <-ctx.Done(): return case p := <-c.peerCh: - c.publishWork(ctx, p) + c.host.Peerstore().AddAddrs(p.AddrInfo.ID, p.AddrInfo.Addrs, peerstore.AddressTTL) + c.host.Peerstore().(peers.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin) + c.publishWork(ctx, p.AddrInfo) case <-time.After(1 * time.Second): // This timeout is to not lock the goroutine break diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 857fcfd7..974bc33c 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -15,7 +15,9 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-discover/discover" "github.com/waku-org/go-waku/logging" + v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" + "github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -85,7 +87,7 @@ func DefaultOptions() []DiscoveryV5Option { } type PeerConnector interface { - PeerChannel() chan<- peer.AddrInfo + PeerChannel() chan<- v2.PeerData } func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { @@ -294,8 +296,13 @@ func (d *DiscoveryV5) iterate(ctx context.Context) error { } if len(peerAddrs) != 0 { + peer := v2.PeerData{ + Origin: peers.Discv5, + AddrInfo: peerAddrs[0], + } + select { - case d.peerConnector.PeerChannel() <- peerAddrs[0]: + case d.peerConnector.PeerChannel() <- peer: case <-ctx.Done(): return nil } diff --git a/waku/v2/node/service.go b/waku/v2/node/service.go index 12376820..bc0ff8d2 100644 --- a/waku/v2/node/service.go +++ b/waku/v2/node/service.go @@ -4,7 +4,7 @@ import ( "context" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" + v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol/relay" ) @@ -22,5 +22,5 @@ type ReceptorService interface { type PeerConnectorService interface { Service - PeerChannel() chan<- peer.AddrInfo + PeerChannel() chan<- v2.PeerData } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 139013ec..0bbc328d 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -25,6 +25,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/libp2p/go-libp2p/p2p/host/autorelay" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" ma "github.com/multiformats/go-multiaddr" @@ -35,6 +36,7 @@ import ( v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/discv5" "github.com/waku-org/go-waku/waku/v2/metrics" + "github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" @@ -80,6 +82,8 @@ type WakuNode struct { log *zap.Logger timesource timesource.Timesource + peerstore peerstore.Peerstore + relay Service lightPush Service peerConnector PeerConnectorService @@ -138,6 +142,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { if params.logger == nil { params.logger = utils.Logger() + //golog.SetPrimaryCore(params.logger.Core()) golog.SetAllLoggers(params.logLevel) } @@ -187,6 +192,19 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableLegacyFilter, w.opts.enableStore, w.opts.enableRelay) w.circuitRelayNodes = make(chan peer.AddrInfo) + // Setup peerstore wrapper + if params.peerstore != nil { + w.peerstore = peers.NewWakuPeerstore(params.peerstore) + params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore)) + } else { + ps, err := pstoremem.NewPeerstore() + if err != nil { + return nil, err + } + w.peerstore = peers.NewWakuPeerstore(ps) + params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore)) + } + // Use circuit relay with nodes received on circuitRelayNodes channel params.libP2POpts = append(params.libP2POpts, libp2p.EnableAutoRelayWithPeerSource( func(ctx context.Context, numPeers int) <-chan peer.AddrInfo { @@ -314,14 +332,23 @@ func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) { // Start initializes all the protocols that were setup in the WakuNode func (w *WakuNode) Start(ctx context.Context) error { + connGater := v2.NewConnectionGater(w.log) + ctx, cancel := context.WithCancel(ctx) w.cancel = cancel + w.opts.libP2POpts = append(w.opts.libP2POpts, libp2p.ConnectionGater(connGater)) host, err := libp2p.New(w.opts.libP2POpts...) if err != nil { return err } + host.Network().Notify(&network.NotifyBundle{ + DisconnectedF: func(net network.Network, conn network.Conn) { + go connGater.NotifyDisconnect(conn.RemoteMultiaddr()) + }, + }) + w.host = host if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil { @@ -689,7 +716,7 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error var peerIDs []peer.ID for _, n := range w.opts.resumeNodes { - pID, err := w.AddPeer(n, store.StoreID_v20beta4) + pID, err := w.AddPeer(n, peers.Static, store.StoreID_v20beta4) if err != nil { w.log.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err)) } @@ -713,9 +740,10 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error return nil } -func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...protocol.ID) error { +func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peers.Origin, protocols ...protocol.ID) error { w.log.Info("adding peer to peerstore", logging.HostID("peer", info.ID)) - w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) + w.host.Peerstore().(peers.WakuPeerstore).SetOrigin(info.ID, origin) + w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.AddressTTL) err := w.host.Peerstore().AddProtocols(info.ID, protocols...) if err != nil { return err @@ -725,13 +753,13 @@ func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...protocol.ID) error } // AddPeer is used to add a peer and the protocols it support to the node peerstore -func (w *WakuNode) AddPeer(address ma.Multiaddr, protocols ...protocol.ID) (peer.ID, error) { +func (w *WakuNode) AddPeer(address ma.Multiaddr, origin peers.Origin, protocols ...protocol.ID) (peer.ID, error) { info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { return "", err } - return info.ID, w.addPeer(info, protocols...) + return info.ID, w.addPeer(info, origin, protocols...) } // DialPeerWithMultiAddress is used to connect to a peer using a multiaddress diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 333e73a2..7271a68e 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -16,6 +16,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peerstore" basichost "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/muxer/mplex" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" @@ -51,6 +52,7 @@ type WakuNodeParameters struct { addressFactory basichost.AddrsFactory privKey *ecdsa.PrivateKey libP2POpts []libp2p.Option + peerstore peerstore.Peerstore enableNTP bool ntpURLs []string @@ -310,6 +312,13 @@ func WithLibP2POptions(opts ...libp2p.Option) WakuNodeOption { } } +func WithPeerStore(ps peerstore.Peerstore) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.peerstore = ps + return nil + } +} + // NoDefaultWakuTopic will stop the node from subscribing to the default // pubsub topic automatically func NoDefaultWakuTopic() WakuNodeOption { diff --git a/waku/v2/peers/inherited.go b/waku/v2/peers/inherited.go new file mode 100644 index 00000000..316307e8 --- /dev/null +++ b/waku/v2/peers/inherited.go @@ -0,0 +1,139 @@ +package peers + +import ( + "context" + "time" + + ic "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/core/record" + ma "github.com/multiformats/go-multiaddr" +) + +// Contains all interface methods from a libp2p peerstore + +func (ps *WakuPeerstoreImpl) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { + ps.peerStore.AddAddr(p, addr, ttl) +} + +func (ps *WakuPeerstoreImpl) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { + ps.peerStore.AddAddrs(p, addrs, ttl) +} + +func (ps *WakuPeerstoreImpl) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { + ps.peerStore.SetAddr(p, addr, ttl) +} + +func (ps *WakuPeerstoreImpl) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { + ps.peerStore.SetAddrs(p, addrs, ttl) +} + +func (ps *WakuPeerstoreImpl) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { + ps.peerStore.UpdateAddrs(p, oldTTL, newTTL) +} + +func (ps *WakuPeerstoreImpl) Addrs(p peer.ID) []ma.Multiaddr { + return ps.peerStore.Addrs(p) +} + +func (ps *WakuPeerstoreImpl) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { + return ps.peerStore.AddrStream(ctx, p) +} + +func (ps *WakuPeerstoreImpl) ClearAddrs(p peer.ID) { + ps.peerStore.ClearAddrs(p) +} + +func (ps *WakuPeerstoreImpl) PeersWithAddrs() peer.IDSlice { + return ps.peerStore.PeersWithAddrs() +} + +func (ps *WakuPeerstoreImpl) PeerInfo(peerID peer.ID) peer.AddrInfo { + return ps.peerStore.PeerInfo(peerID) +} + +func (ps *WakuPeerstoreImpl) Peers() peer.IDSlice { + return ps.peerStore.Peers() +} + +func (ps *WakuPeerstoreImpl) Close() error { + return ps.peerStore.Close() +} + +func (ps *WakuPeerstoreImpl) PubKey(p peer.ID) ic.PubKey { + return ps.peerStore.PubKey(p) +} + +func (ps *WakuPeerstoreImpl) AddPubKey(p peer.ID, pubk ic.PubKey) error { + return ps.peerStore.AddPubKey(p, pubk) +} + +func (ps *WakuPeerstoreImpl) PrivKey(p peer.ID) ic.PrivKey { + return ps.peerStore.PrivKey(p) +} + +func (ps *WakuPeerstoreImpl) AddPrivKey(p peer.ID, privk ic.PrivKey) error { + return ps.peerStore.AddPrivKey(p, privk) +} + +func (ps *WakuPeerstoreImpl) PeersWithKeys() peer.IDSlice { + return ps.peerStore.PeersWithKeys() +} + +func (ps *WakuPeerstoreImpl) RemovePeer(p peer.ID) { + ps.peerStore.RemovePeer(p) +} + +func (ps *WakuPeerstoreImpl) Get(p peer.ID, key string) (interface{}, error) { + return ps.peerStore.Get(p, key) +} + +func (ps *WakuPeerstoreImpl) Put(p peer.ID, key string, val interface{}) error { + return ps.peerStore.Put(p, key, val) + +} + +func (ps *WakuPeerstoreImpl) RecordLatency(p peer.ID, t time.Duration) { + ps.peerStore.RecordLatency(p, t) +} + +func (ps *WakuPeerstoreImpl) LatencyEWMA(p peer.ID) time.Duration { + return ps.peerStore.LatencyEWMA(p) +} + +func (ps *WakuPeerstoreImpl) GetProtocols(p peer.ID) ([]protocol.ID, error) { + return ps.peerStore.GetProtocols(p) +} + +func (ps *WakuPeerstoreImpl) AddProtocols(p peer.ID, proto ...protocol.ID) error { + return ps.peerStore.AddProtocols(p, proto...) +} + +func (ps *WakuPeerstoreImpl) SetProtocols(p peer.ID, proto ...protocol.ID) error { + return ps.peerStore.SetProtocols(p, proto...) +} + +func (ps *WakuPeerstoreImpl) RemoveProtocols(p peer.ID, proto ...protocol.ID) error { + return ps.peerStore.RemoveProtocols(p, proto...) +} + +func (ps *WakuPeerstoreImpl) SupportsProtocols(p peer.ID, proto ...protocol.ID) ([]protocol.ID, error) { + return ps.peerStore.SupportsProtocols(p, proto...) +} + +func (ps *WakuPeerstoreImpl) FirstSupportedProtocol(p peer.ID, proto ...protocol.ID) (protocol.ID, error) { + return ps.peerStore.FirstSupportedProtocol(p, proto...) +} + +func (ps *WakuPeerstoreImpl) ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error) { + return ps.peerStore.(peerstore.CertifiedAddrBook).ConsumePeerRecord(s, ttl) +} + +// GetPeerRecord returns a Envelope containing a PeerRecord for the +// given peer id, if one exists. +// Returns nil if no signed PeerRecord exists for the peer. +func (ps *WakuPeerstoreImpl) GetPeerRecord(p peer.ID) *record.Envelope { + return ps.peerStore.(peerstore.CertifiedAddrBook).GetPeerRecord(p) +} diff --git a/waku/v2/peers/peerstore.go b/waku/v2/peers/peerstore.go new file mode 100644 index 00000000..58faf3e8 --- /dev/null +++ b/waku/v2/peers/peerstore.go @@ -0,0 +1,75 @@ +package peers + +import ( + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" +) + +type Origin int64 + +const ( + Unknown Origin = iota + Discv5 + Static + PeerExchange + DnsDiscovery + Rendezvous +) + +const peerOrigin = "origin" +const peerENR = "enr" + +type WakuPeerstoreImpl struct { + peerStore peerstore.Peerstore +} + +type WakuPeerstore interface { + SetOrigin(p peer.ID, origin Origin) error + Origin(p peer.ID, origin Origin) (Origin, error) + PeersByOrigin(origin Origin) peer.IDSlice + SetENR(p peer.ID, enr *enode.Node) error + ENR(p peer.ID, origin Origin) (*enode.Node, error) +} + +func NewWakuPeerstore(p peerstore.Peerstore) peerstore.Peerstore { + return &WakuPeerstoreImpl{ + peerStore: p, + } +} + +func (ps *WakuPeerstoreImpl) SetOrigin(p peer.ID, origin Origin) error { + return ps.peerStore.Put(p, peerOrigin, origin) +} + +func (ps *WakuPeerstoreImpl) Origin(p peer.ID, origin Origin) (Origin, error) { + result, err := ps.peerStore.Get(p, peerOrigin) + if err != nil { + return Unknown, err + } + + return result.(Origin), nil +} + +func (ps *WakuPeerstoreImpl) PeersByOrigin(origin Origin) peer.IDSlice { + var result peer.IDSlice + for _, p := range ps.Peers() { + _, err := ps.Origin(p, origin) + if err == nil { + result = append(result, p) + } + } + return result +} + +func (ps *WakuPeerstoreImpl) SetENR(p peer.ID, enr *enode.Node) error { + return ps.peerStore.Put(p, peerENR, enr) +} + +func (ps *WakuPeerstoreImpl) ENR(p peer.ID, origin Origin) (*enode.Node, error) { + result, err := ps.peerStore.Get(p, peerENR) + if err != nil { + return nil, err + } + return result.(*enode.Node), nil +} diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index d5bf9d06..32c2eec5 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -10,7 +10,9 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio/pbio" + v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" + "github.com/waku-org/go-waku/waku/v2/peers" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" "go.uber.org/zap" @@ -61,7 +63,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts } func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error { - var peers []peer.AddrInfo + var discoveredPeers []peer.AddrInfo for _, p := range response.PeerInfos { enrRecord := &enr.Record{} buf := bytes.NewBuffer(p.ENR) @@ -84,19 +86,23 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb return err } - peers = append(peers, *peerInfo) + discoveredPeers = append(discoveredPeers, *peerInfo) } - if len(peers) != 0 { - wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(peers))) + if len(discoveredPeers) != 0 { + wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(discoveredPeers))) wakuPX.wg.Add(1) go func() { defer wakuPX.wg.Done() - for _, p := range peers { + for _, p := range discoveredPeers { + peer := v2.PeerData{ + Origin: peers.PeerExchange, + AddrInfo: p, + } select { case <-ctx.Done(): return - case wakuPX.peerConnector.PeerChannel() <- p: + case wakuPX.peerConnector.PeerChannel() <- peer: } } }() diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index fcbb5a60..64d718cc 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -10,10 +10,10 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-msgio/pbio" "github.com/waku-org/go-waku/logging" + v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/discv5" "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -45,7 +45,7 @@ type WakuPeerExchange struct { } type PeerConnector interface { - PeerChannel() chan<- peer.AddrInfo + PeerChannel() chan<- v2.PeerData } // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 3eb9bc5c..81c136bb 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -31,6 +31,7 @@ type WakuRelay struct { host host.Host opts []pubsub.Option pubsub *pubsub.PubSub + params pubsub.GossipSubParams timesource timesource.Timesource log *zap.Logger @@ -65,24 +66,28 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.log = log.Named("relay") // default options required by WakuRelay - opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) - opts = append(opts, pubsub.WithNoAuthor()) - opts = append(opts, pubsub.WithMessageIdFn(msgIdFn)) - opts = append(opts, pubsub.WithGossipSubProtocols( - []protocol.ID{pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID, WakuRelayID_v200}, - func(feat pubsub.GossipSubFeature, proto protocol.ID) bool { - switch feat { - case pubsub.GossipSubFeatureMesh: - return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10 - case pubsub.GossipSubFeaturePX: - return proto == pubsub.GossipSubID_v11 - default: - return false - } - }, - )) + w.opts = append([]pubsub.Option{ + pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), + pubsub.WithNoAuthor(), + pubsub.WithMessageIdFn(msgIdFn), + pubsub.WithGossipSubProtocols( + []protocol.ID{pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID, WakuRelayID_v200}, + func(feat pubsub.GossipSubFeature, proto protocol.ID) bool { + switch feat { + case pubsub.GossipSubFeatureMesh: + return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10 + case pubsub.GossipSubFeaturePX: + return proto == pubsub.GossipSubID_v11 + default: + return false + } + }, + ), + }, opts...) - w.opts = opts + // We disable overriding gossipsub parameters by adding them as the last value in the options + cfg := pubsub.DefaultGossipSubParams() + w.opts = append(w.opts, pubsub.WithGossipSubParams(cfg)) return w } @@ -360,4 +365,9 @@ func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscriptio } } } + +} + +func (w *WakuRelay) Params() pubsub.GossipSubParams { + return w.params } diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index d45da75d..50f85a51 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -10,6 +10,8 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" rvs "github.com/waku-org/go-libp2p-rendezvous" + v2 "github.com/waku-org/go-waku/waku/v2" + "github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -40,7 +42,7 @@ type Rendezvous struct { } type PeerConnector interface { - PeerChannel() chan<- peer.AddrInfo + PeerChannel() chan<- v2.PeerData } func NewRendezvous(enableServer bool, db *DB, discoverPeers bool, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { @@ -126,8 +128,12 @@ func (r *Rendezvous) discover(ctx context.Context) { server.Unlock() for _, addr := range addrInfo { + peer := v2.PeerData{ + Origin: peers.Rendezvous, + AddrInfo: addr, + } select { - case r.peerConnector.PeerChannel() <- addr: + case r.peerConnector.PeerChannel() <- peer: case <-ctx.Done(): return } diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index 0e7b9416..87b04c64 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -14,20 +14,21 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/persistence/sqlite" + v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/utils" ) type PeerConn struct { - ch chan peer.AddrInfo + ch chan v2.PeerData } -func (p PeerConn) PeerChannel() chan<- peer.AddrInfo { +func (p PeerConn) PeerChannel() chan<- v2.PeerData { return p.ch } func NewPeerConn() PeerConn { x := PeerConn{} - x.ch = make(chan peer.AddrInfo, 1000) + x.ch = make(chan v2.PeerData, 1000) return x } @@ -94,6 +95,6 @@ func TestRendezvous(t *testing.T) { case <-timer: require.Fail(t, "no peer discovered") case p := <-myPeerConnector.ch: - require.Equal(t, p.ID.Pretty(), host2.ID().Pretty()) + require.Equal(t, p.AddrInfo.ID.Pretty(), host2.ID().Pretty()) } } diff --git a/waku/v2/rest/store.go b/waku/v2/rest/store.go index c300592f..00656995 100644 --- a/waku/v2/rest/store.go +++ b/waku/v2/rest/store.go @@ -12,6 +12,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" @@ -196,7 +197,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - _, err = d.node.AddPeer(peerAddr) + _, err = d.node.AddPeer(peerAddr, peers.Static) if err != nil { writeStoreError(w, http.StatusInternalServerError, err) return diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 3260ea7a..f5ff0ea4 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -82,7 +83,7 @@ func TestFilterSubscription(t *testing.T) { break } - _, err = d.node.AddPeer(addr, legacy_filter.FilterID_v20beta1) + _, err = d.node.AddPeer(addr, peers.Static, legacy_filter.FilterID_v20beta1) require.NoError(t, err) args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}}