refactor: various

- Limit inbound connections to 10 per IP
- Expose gossipsub parameters on WakuRelay
- New peerstore
This commit is contained in:
Richard Ramos 2023-06-05 10:39:38 -04:00 committed by RichΛrd
parent f6fe353e2e
commit 52ac8e3740
18 changed files with 505 additions and 62 deletions

View File

@ -29,6 +29,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -238,7 +239,7 @@ func AddPeer(address string, protocolID string) string {
return MakeJSONResponse(err)
}
peerID, err := wakuState.node.AddPeer(ma, libp2pProtocol.ID(protocolID))
peerID, err := wakuState.node.AddPeer(ma, peers.Static, libp2pProtocol.ID(protocolID))
return PrepareJSONResponse(peerID, err)
}

View File

@ -15,7 +15,10 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/multiformats/go-multiaddr"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
@ -94,11 +97,25 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e
}
// 0.0.0.0 will listen on any interface device.
sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port))
sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port))
if err != nil {
return nil, err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}
psWrapper := peers.NewWakuPeerstore(ps)
if err != nil {
return nil, err
}
// libp2p.New constructs a new libp2p Host.
// Other options can be added here.
return libp2p.New(
libp2p.Peerstore(psWrapper),
libp2p.ListenAddrs(sourceMultiAddr),
libp2p.Identity(prvKey),
)
@ -121,19 +138,19 @@ func RandomHex(n int) (string, error) {
type TestPeerDiscoverer struct {
sync.RWMutex
peerMap map[peer.ID]struct{}
peerCh chan peer.AddrInfo
peerCh chan v2.PeerData
}
func NewTestPeerDiscoverer() *TestPeerDiscoverer {
result := &TestPeerDiscoverer{
peerMap: make(map[peer.ID]struct{}),
peerCh: make(chan peer.AddrInfo, 10),
peerCh: make(chan v2.PeerData, 10),
}
go func() {
for p := range result.peerCh {
result.Lock()
result.peerMap[p.ID] = struct{}{}
result.peerMap[p.AddrInfo.ID] = struct{}{}
result.Unlock()
}
}()
@ -141,7 +158,7 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer {
return result
}
func (t *TestPeerDiscoverer) PeerChannel() chan<- peer.AddrInfo {
func (t *TestPeerDiscoverer) PeerChannel() chan<- v2.PeerData {
return t.peerCh
}

View File

@ -18,6 +18,7 @@ import (
"github.com/pbnjay/memory"
wmetrics "github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/rendezvous"
"github.com/ethereum/go-ethereum/accounts/keystore"
@ -31,6 +32,7 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
@ -191,7 +193,7 @@ func Execute(options Options) {
peerStore, err := pstoreds.NewPeerstore(ctx, datastore, opts)
failOnErr(err, "Peerstore")
libp2pOpts = append(libp2pOpts, libp2p.Peerstore(peerStore))
nodeOpts = append(nodeOpts, node.WithPeerStore(peerStore))
}
nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...))
@ -296,17 +298,21 @@ func Execute(options Options) {
failOnErr(err, "Wakunode")
if options.Filter.UseV1 {
addPeers(wakuNode, options.Filter.NodesV1, legacy_filter.FilterID_v20beta1)
addStaticPeers(wakuNode, options.Filter.NodesV1, legacy_filter.FilterID_v20beta1)
}
if err = wakuNode.Start(ctx); err != nil {
logger.Fatal("starting waku node", zap.Error(err))
}
addPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4)
addPeers(wakuNode, options.LightPush.Nodes, lightpush.LightPushID_v20beta1)
addPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID)
addPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1)
for _, d := range discoveredNodes {
wakuNode.Host().Peerstore().AddAddrs(d.PeerID, d.PeerInfo.Addrs, peerstore.PermanentAddrTTL)
}
addStaticPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4)
addStaticPeers(wakuNode, options.LightPush.Nodes, lightpush.LightPushID_v20beta1)
addStaticPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID)
addStaticPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1)
if options.DiscV5.Enable {
if err = wakuNode.DiscV5().Start(ctx); err != nil {
@ -318,11 +324,11 @@ func Execute(options Options) {
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
logger.Info("retrieving peer info via peer exchange protocol")
peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, peer_exchange.PeerExchangeID_v20alpha1)
peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, peers.Static, peer_exchange.PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
} else {
desiredOutDegree := 6 // TODO: obtain this from gossipsub D
desiredOutDegree := wakuNode.Relay().Params().D
if err = wakuNode.PeerExchange().Request(ctx, desiredOutDegree, peer_exchange.WithPeer(peerId)); err != nil {
logger.Error("requesting peers via peer exchange", zap.Error(err))
}
@ -420,9 +426,9 @@ func Execute(options Options) {
}
}
func addPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) {
func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) {
for _, addr := range addresses {
_, err := wakuNode.AddPeer(addr, protocols...)
_, err := wakuNode.AddPeer(addr, peers.Static, protocols...)
failOnErr(err, "error adding peer")
}
}

125
waku/v2/connection_gater.go Normal file
View File

@ -0,0 +1,125 @@
package v2
import (
"runtime"
"sync"
"github.com/libp2p/go-libp2p/core/control"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"go.uber.org/zap"
)
type ConnectionGater struct {
sync.Mutex
host host.Host
logger *zap.Logger
limiter map[string]int
inbound int
outbound int
}
const maxConnsPerIP = 10
func NewConnectionGater(logger *zap.Logger) *ConnectionGater {
c := &ConnectionGater{
logger: logger.Named("connection-gater"),
limiter: make(map[string]int),
inbound: 0,
outbound: 0,
}
return c
}
// InterceptPeerDial is called on an imminent outbound peer dial request, prior
// to the addresses of that peer being available/resolved. Blocking connections
// at this stage is typical for blacklisting scenarios.
func (c *ConnectionGater) InterceptPeerDial(_ peer.ID) (allow bool) {
return true
}
// InterceptAddrDial is called on an imminent outbound dial to a peer on a
// particular address. Blocking connections at this stage is typical for
// address filtering.
func (c *ConnectionGater) InterceptAddrDial(pid peer.ID, m multiaddr.Multiaddr) (allow bool) {
return true
}
// InterceptAccept is called as soon as a transport listener receives an
// inbound connection request, before any upgrade takes place. Transports who
// accept already secure and/or multiplexed connections (e.g. possibly QUIC)
// MUST call this method regardless, for correctness/consistency.
func (c *ConnectionGater) InterceptAccept(n network.ConnMultiaddrs) (allow bool) {
if !c.validateInboundConn(n.RemoteMultiaddr()) {
runtime.Gosched() // Allow other go-routines to run in the event
c.logger.Info("exceeds allowed inbound connections from this ip", zap.String("multiaddr", n.RemoteMultiaddr().String()))
return false
}
if false { // inbound > someLimit
c.logger.Info("connection not accepted. Max inbound connections reached", zap.String("multiaddr", n.RemoteMultiaddr().String()))
return false
}
return true
}
// InterceptSecured is called for both inbound and outbound connections,
// after a security handshake has taken place and we've authenticated the peer
func (c *ConnectionGater) InterceptSecured(_ network.Direction, _ peer.ID, _ network.ConnMultiaddrs) (allow bool) {
return true
}
// InterceptUpgraded is called for inbound and outbound connections, after
// libp2p has finished upgrading the connection entirely to a secure,
// multiplexed channel.
func (c *ConnectionGater) InterceptUpgraded(_ network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}
func (c *ConnectionGater) NotifyDisconnect(addr multiaddr.Multiaddr) {
ip, err := manet.ToIP(addr)
if err != nil {
return
}
c.Lock()
defer c.Unlock()
currConnections, ok := c.limiter[ip.String()]
if ok {
currConnections--
if currConnections <= 0 {
delete(c.limiter, ip.String())
} else {
c.limiter[ip.String()] = currConnections
}
}
}
func (c *ConnectionGater) validateInboundConn(addr multiaddr.Multiaddr) bool {
ip, err := manet.ToIP(addr)
if err != nil {
return false
}
c.Lock()
defer c.Unlock()
currConnections, ok := c.limiter[ip.String()]
if !ok {
c.limiter[ip.String()] = 1
return true
} else {
if currConnections+1 > maxConnsPerIP {
return false
}
c.limiter[ip.String()]++
}
return true
}

View File

@ -11,8 +11,12 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peers"
"go.uber.org/zap"
lru "github.com/hashicorp/golang-lru"
@ -33,7 +37,7 @@ type PeerConnectionStrategy struct {
wg sync.WaitGroup
minPeers int
dialTimeout time.Duration
peerCh chan peer.AddrInfo
peerCh chan PeerData
dialCh chan peer.AddrInfo
backoff backoff.BackoffFactory
@ -67,8 +71,13 @@ type connCacheData struct {
strat backoff.BackoffStrategy
}
type PeerData struct {
Origin peers.Origin
AddrInfo peer.AddrInfo
}
// PeerChannel exposes the channel on which discovered peers should be pushed
func (c *PeerConnectionStrategy) PeerChannel() chan<- peer.AddrInfo {
func (c *PeerConnectionStrategy) PeerChannel() chan<- PeerData {
return c.peerCh
}
@ -85,7 +94,7 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
c.peerCh = make(chan peer.AddrInfo)
c.peerCh = make(chan PeerData)
c.dialCh = make(chan peer.AddrInfo)
c.wg.Add(3)
@ -171,7 +180,9 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {
case <-ctx.Done():
return
case p := <-c.peerCh:
c.publishWork(ctx, p)
c.host.Peerstore().AddAddrs(p.AddrInfo.ID, p.AddrInfo.Addrs, peerstore.AddressTTL)
c.host.Peerstore().(peers.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin)
c.publishWork(ctx, p.AddrInfo)
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
break

View File

@ -15,7 +15,9 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-discover/discover"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
@ -85,7 +87,7 @@ func DefaultOptions() []DiscoveryV5Option {
}
type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo
PeerChannel() chan<- v2.PeerData
}
func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
@ -294,8 +296,13 @@ func (d *DiscoveryV5) iterate(ctx context.Context) error {
}
if len(peerAddrs) != 0 {
peer := v2.PeerData{
Origin: peers.Discv5,
AddrInfo: peerAddrs[0],
}
select {
case d.peerConnector.PeerChannel() <- peerAddrs[0]:
case d.peerConnector.PeerChannel() <- peer:
case <-ctx.Done():
return nil
}

View File

@ -4,7 +4,7 @@ import (
"context"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
@ -22,5 +22,5 @@ type ReceptorService interface {
type PeerConnectorService interface {
Service
PeerChannel() chan<- peer.AddrInfo
PeerChannel() chan<- v2.PeerData
}

View File

@ -25,6 +25,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
ma "github.com/multiformats/go-multiaddr"
@ -35,6 +36,7 @@ import (
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
@ -80,6 +82,8 @@ type WakuNode struct {
log *zap.Logger
timesource timesource.Timesource
peerstore peerstore.Peerstore
relay Service
lightPush Service
peerConnector PeerConnectorService
@ -138,6 +142,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
if params.logger == nil {
params.logger = utils.Logger()
//golog.SetPrimaryCore(params.logger.Core())
golog.SetAllLoggers(params.logLevel)
}
@ -187,6 +192,19 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableLegacyFilter, w.opts.enableStore, w.opts.enableRelay)
w.circuitRelayNodes = make(chan peer.AddrInfo)
// Setup peerstore wrapper
if params.peerstore != nil {
w.peerstore = peers.NewWakuPeerstore(params.peerstore)
params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore))
} else {
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}
w.peerstore = peers.NewWakuPeerstore(ps)
params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore))
}
// Use circuit relay with nodes received on circuitRelayNodes channel
params.libP2POpts = append(params.libP2POpts, libp2p.EnableAutoRelayWithPeerSource(
func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
@ -314,14 +332,23 @@ func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) {
// Start initializes all the protocols that were setup in the WakuNode
func (w *WakuNode) Start(ctx context.Context) error {
connGater := v2.NewConnectionGater(w.log)
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel
w.opts.libP2POpts = append(w.opts.libP2POpts, libp2p.ConnectionGater(connGater))
host, err := libp2p.New(w.opts.libP2POpts...)
if err != nil {
return err
}
host.Network().Notify(&network.NotifyBundle{
DisconnectedF: func(net network.Network, conn network.Conn) {
go connGater.NotifyDisconnect(conn.RemoteMultiaddr())
},
})
w.host = host
if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil {
@ -689,7 +716,7 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error
var peerIDs []peer.ID
for _, n := range w.opts.resumeNodes {
pID, err := w.AddPeer(n, store.StoreID_v20beta4)
pID, err := w.AddPeer(n, peers.Static, store.StoreID_v20beta4)
if err != nil {
w.log.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
@ -713,9 +740,10 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error
return nil
}
func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...protocol.ID) error {
func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peers.Origin, protocols ...protocol.ID) error {
w.log.Info("adding peer to peerstore", logging.HostID("peer", info.ID))
w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
w.host.Peerstore().(peers.WakuPeerstore).SetOrigin(info.ID, origin)
w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.AddressTTL)
err := w.host.Peerstore().AddProtocols(info.ID, protocols...)
if err != nil {
return err
@ -725,13 +753,13 @@ func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...protocol.ID) error
}
// AddPeer is used to add a peer and the protocols it support to the node peerstore
func (w *WakuNode) AddPeer(address ma.Multiaddr, protocols ...protocol.ID) (peer.ID, error) {
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin peers.Origin, protocols ...protocol.ID) (peer.ID, error) {
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
return "", err
}
return info.ID, w.addPeer(info, protocols...)
return info.ID, w.addPeer(info, origin, protocols...)
}
// DialPeerWithMultiAddress is used to connect to a peer using a multiaddress

View File

@ -16,6 +16,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peerstore"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
@ -51,6 +52,7 @@ type WakuNodeParameters struct {
addressFactory basichost.AddrsFactory
privKey *ecdsa.PrivateKey
libP2POpts []libp2p.Option
peerstore peerstore.Peerstore
enableNTP bool
ntpURLs []string
@ -310,6 +312,13 @@ func WithLibP2POptions(opts ...libp2p.Option) WakuNodeOption {
}
}
func WithPeerStore(ps peerstore.Peerstore) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.peerstore = ps
return nil
}
}
// NoDefaultWakuTopic will stop the node from subscribing to the default
// pubsub topic automatically
func NoDefaultWakuTopic() WakuNodeOption {

139
waku/v2/peers/inherited.go Normal file
View File

@ -0,0 +1,139 @@
package peers
import (
"context"
"time"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"
ma "github.com/multiformats/go-multiaddr"
)
// Contains all interface methods from a libp2p peerstore
func (ps *WakuPeerstoreImpl) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
ps.peerStore.AddAddr(p, addr, ttl)
}
func (ps *WakuPeerstoreImpl) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
ps.peerStore.AddAddrs(p, addrs, ttl)
}
func (ps *WakuPeerstoreImpl) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
ps.peerStore.SetAddr(p, addr, ttl)
}
func (ps *WakuPeerstoreImpl) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
ps.peerStore.SetAddrs(p, addrs, ttl)
}
func (ps *WakuPeerstoreImpl) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
ps.peerStore.UpdateAddrs(p, oldTTL, newTTL)
}
func (ps *WakuPeerstoreImpl) Addrs(p peer.ID) []ma.Multiaddr {
return ps.peerStore.Addrs(p)
}
func (ps *WakuPeerstoreImpl) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
return ps.peerStore.AddrStream(ctx, p)
}
func (ps *WakuPeerstoreImpl) ClearAddrs(p peer.ID) {
ps.peerStore.ClearAddrs(p)
}
func (ps *WakuPeerstoreImpl) PeersWithAddrs() peer.IDSlice {
return ps.peerStore.PeersWithAddrs()
}
func (ps *WakuPeerstoreImpl) PeerInfo(peerID peer.ID) peer.AddrInfo {
return ps.peerStore.PeerInfo(peerID)
}
func (ps *WakuPeerstoreImpl) Peers() peer.IDSlice {
return ps.peerStore.Peers()
}
func (ps *WakuPeerstoreImpl) Close() error {
return ps.peerStore.Close()
}
func (ps *WakuPeerstoreImpl) PubKey(p peer.ID) ic.PubKey {
return ps.peerStore.PubKey(p)
}
func (ps *WakuPeerstoreImpl) AddPubKey(p peer.ID, pubk ic.PubKey) error {
return ps.peerStore.AddPubKey(p, pubk)
}
func (ps *WakuPeerstoreImpl) PrivKey(p peer.ID) ic.PrivKey {
return ps.peerStore.PrivKey(p)
}
func (ps *WakuPeerstoreImpl) AddPrivKey(p peer.ID, privk ic.PrivKey) error {
return ps.peerStore.AddPrivKey(p, privk)
}
func (ps *WakuPeerstoreImpl) PeersWithKeys() peer.IDSlice {
return ps.peerStore.PeersWithKeys()
}
func (ps *WakuPeerstoreImpl) RemovePeer(p peer.ID) {
ps.peerStore.RemovePeer(p)
}
func (ps *WakuPeerstoreImpl) Get(p peer.ID, key string) (interface{}, error) {
return ps.peerStore.Get(p, key)
}
func (ps *WakuPeerstoreImpl) Put(p peer.ID, key string, val interface{}) error {
return ps.peerStore.Put(p, key, val)
}
func (ps *WakuPeerstoreImpl) RecordLatency(p peer.ID, t time.Duration) {
ps.peerStore.RecordLatency(p, t)
}
func (ps *WakuPeerstoreImpl) LatencyEWMA(p peer.ID) time.Duration {
return ps.peerStore.LatencyEWMA(p)
}
func (ps *WakuPeerstoreImpl) GetProtocols(p peer.ID) ([]protocol.ID, error) {
return ps.peerStore.GetProtocols(p)
}
func (ps *WakuPeerstoreImpl) AddProtocols(p peer.ID, proto ...protocol.ID) error {
return ps.peerStore.AddProtocols(p, proto...)
}
func (ps *WakuPeerstoreImpl) SetProtocols(p peer.ID, proto ...protocol.ID) error {
return ps.peerStore.SetProtocols(p, proto...)
}
func (ps *WakuPeerstoreImpl) RemoveProtocols(p peer.ID, proto ...protocol.ID) error {
return ps.peerStore.RemoveProtocols(p, proto...)
}
func (ps *WakuPeerstoreImpl) SupportsProtocols(p peer.ID, proto ...protocol.ID) ([]protocol.ID, error) {
return ps.peerStore.SupportsProtocols(p, proto...)
}
func (ps *WakuPeerstoreImpl) FirstSupportedProtocol(p peer.ID, proto ...protocol.ID) (protocol.ID, error) {
return ps.peerStore.FirstSupportedProtocol(p, proto...)
}
func (ps *WakuPeerstoreImpl) ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error) {
return ps.peerStore.(peerstore.CertifiedAddrBook).ConsumePeerRecord(s, ttl)
}
// GetPeerRecord returns a Envelope containing a PeerRecord for the
// given peer id, if one exists.
// Returns nil if no signed PeerRecord exists for the peer.
func (ps *WakuPeerstoreImpl) GetPeerRecord(p peer.ID) *record.Envelope {
return ps.peerStore.(peerstore.CertifiedAddrBook).GetPeerRecord(p)
}

View File

@ -0,0 +1,75 @@
package peers
import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)
type Origin int64
const (
Unknown Origin = iota
Discv5
Static
PeerExchange
DnsDiscovery
Rendezvous
)
const peerOrigin = "origin"
const peerENR = "enr"
type WakuPeerstoreImpl struct {
peerStore peerstore.Peerstore
}
type WakuPeerstore interface {
SetOrigin(p peer.ID, origin Origin) error
Origin(p peer.ID, origin Origin) (Origin, error)
PeersByOrigin(origin Origin) peer.IDSlice
SetENR(p peer.ID, enr *enode.Node) error
ENR(p peer.ID, origin Origin) (*enode.Node, error)
}
func NewWakuPeerstore(p peerstore.Peerstore) peerstore.Peerstore {
return &WakuPeerstoreImpl{
peerStore: p,
}
}
func (ps *WakuPeerstoreImpl) SetOrigin(p peer.ID, origin Origin) error {
return ps.peerStore.Put(p, peerOrigin, origin)
}
func (ps *WakuPeerstoreImpl) Origin(p peer.ID, origin Origin) (Origin, error) {
result, err := ps.peerStore.Get(p, peerOrigin)
if err != nil {
return Unknown, err
}
return result.(Origin), nil
}
func (ps *WakuPeerstoreImpl) PeersByOrigin(origin Origin) peer.IDSlice {
var result peer.IDSlice
for _, p := range ps.Peers() {
_, err := ps.Origin(p, origin)
if err == nil {
result = append(result, p)
}
}
return result
}
func (ps *WakuPeerstoreImpl) SetENR(p peer.ID, enr *enode.Node) error {
return ps.peerStore.Put(p, peerENR, enr)
}
func (ps *WakuPeerstoreImpl) ENR(p peer.ID, origin Origin) (*enode.Node, error) {
result, err := ps.peerStore.Get(p, peerENR)
if err != nil {
return nil, err
}
return result.(*enode.Node), nil
}

View File

@ -10,7 +10,9 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/pbio"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"go.uber.org/zap"
@ -61,7 +63,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
}
func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error {
var peers []peer.AddrInfo
var discoveredPeers []peer.AddrInfo
for _, p := range response.PeerInfos {
enrRecord := &enr.Record{}
buf := bytes.NewBuffer(p.ENR)
@ -84,19 +86,23 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
return err
}
peers = append(peers, *peerInfo)
discoveredPeers = append(discoveredPeers, *peerInfo)
}
if len(peers) != 0 {
wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(peers)))
if len(discoveredPeers) != 0 {
wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(discoveredPeers)))
wakuPX.wg.Add(1)
go func() {
defer wakuPX.wg.Done()
for _, p := range peers {
for _, p := range discoveredPeers {
peer := v2.PeerData{
Origin: peers.PeerExchange,
AddrInfo: p,
}
select {
case <-ctx.Done():
return
case wakuPX.peerConnector.PeerChannel() <- p:
case wakuPX.peerConnector.PeerChannel() <- peer:
}
}
}()

View File

@ -10,10 +10,10 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
@ -45,7 +45,7 @@ type WakuPeerExchange struct {
}
type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo
PeerChannel() chan<- v2.PeerData
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct

View File

@ -31,6 +31,7 @@ type WakuRelay struct {
host host.Host
opts []pubsub.Option
pubsub *pubsub.PubSub
params pubsub.GossipSubParams
timesource timesource.Timesource
log *zap.Logger
@ -65,24 +66,28 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
w.log = log.Named("relay")
// default options required by WakuRelay
opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
opts = append(opts, pubsub.WithNoAuthor())
opts = append(opts, pubsub.WithMessageIdFn(msgIdFn))
opts = append(opts, pubsub.WithGossipSubProtocols(
[]protocol.ID{pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID, WakuRelayID_v200},
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
switch feat {
case pubsub.GossipSubFeatureMesh:
return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10
case pubsub.GossipSubFeaturePX:
return proto == pubsub.GossipSubID_v11
default:
return false
}
},
))
w.opts = append([]pubsub.Option{
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithNoAuthor(),
pubsub.WithMessageIdFn(msgIdFn),
pubsub.WithGossipSubProtocols(
[]protocol.ID{pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID, WakuRelayID_v200},
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
switch feat {
case pubsub.GossipSubFeatureMesh:
return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10
case pubsub.GossipSubFeaturePX:
return proto == pubsub.GossipSubID_v11
default:
return false
}
},
),
}, opts...)
w.opts = opts
// We disable overriding gossipsub parameters by adding them as the last value in the options
cfg := pubsub.DefaultGossipSubParams()
w.opts = append(w.opts, pubsub.WithGossipSubParams(cfg))
return w
}
@ -360,4 +365,9 @@ func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscriptio
}
}
}
}
func (w *WakuRelay) Params() pubsub.GossipSubParams {
return w.params
}

View File

@ -10,6 +10,8 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
rvs "github.com/waku-org/go-libp2p-rendezvous"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
@ -40,7 +42,7 @@ type Rendezvous struct {
}
type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo
PeerChannel() chan<- v2.PeerData
}
func NewRendezvous(enableServer bool, db *DB, discoverPeers bool, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
@ -126,8 +128,12 @@ func (r *Rendezvous) discover(ctx context.Context) {
server.Unlock()
for _, addr := range addrInfo {
peer := v2.PeerData{
Origin: peers.Rendezvous,
AddrInfo: addr,
}
select {
case r.peerConnector.PeerChannel() <- addr:
case r.peerConnector.PeerChannel() <- peer:
case <-ctx.Done():
return
}

View File

@ -14,20 +14,21 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/persistence/sqlite"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type PeerConn struct {
ch chan peer.AddrInfo
ch chan v2.PeerData
}
func (p PeerConn) PeerChannel() chan<- peer.AddrInfo {
func (p PeerConn) PeerChannel() chan<- v2.PeerData {
return p.ch
}
func NewPeerConn() PeerConn {
x := PeerConn{}
x.ch = make(chan peer.AddrInfo, 1000)
x.ch = make(chan v2.PeerData, 1000)
return x
}
@ -94,6 +95,6 @@ func TestRendezvous(t *testing.T) {
case <-timer:
require.Fail(t, "no peer discovered")
case p := <-myPeerConnector.ch:
require.Equal(t, p.ID.Pretty(), host2.ID().Pretty())
require.Equal(t, p.AddrInfo.ID.Pretty(), host2.ID().Pretty())
}
}

View File

@ -12,6 +12,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -196,7 +197,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
_, err = d.node.AddPeer(peerAddr)
_, err = d.node.AddPeer(peerAddr, peers.Static)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -82,7 +83,7 @@ func TestFilterSubscription(t *testing.T) {
break
}
_, err = d.node.AddPeer(addr, legacy_filter.FilterID_v20beta1)
_, err = d.node.AddPeer(addr, peers.Static, legacy_filter.FilterID_v20beta1)
require.NoError(t, err)
args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}}