diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index c596e4474..6a8cefb34 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -109,6 +109,10 @@ func (w *GethWakuWrapper) ListenAddresses() ([]string, error) { return nil, errors.New("not available in WakuV1") } +func (w *GethWakuWrapper) RelayPeersByTopic(topic string) (*types.PeerList, error) { + return nil, errors.New("not available in WakuV1") +} + // ENR function only added for compatibility with waku V2 func (w *GethWakuWrapper) ENR() (string, error) { return "", errors.New("not available in WakuV1") diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 1290ea0f9..4bdc8390a 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -283,7 +283,10 @@ func (w *gethWakuV2Wrapper) ListenAddresses() ([]string, error) { return w.waku.ListenAddresses(), nil } -// ENR function only added for compatibility with waku V2 +func (w *gethWakuV2Wrapper) RelayPeersByTopic(topic string) (*types.PeerList, error) { + return w.waku.RelayPeersByTopic(topic) +} + func (w *gethWakuV2Wrapper) ENR() (string, error) { return w.waku.ENR() } diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index 90be801ca..e3615c645 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -25,6 +25,11 @@ type WakuV2Peer struct { Addresses []string `json:"addresses"` } +type PeerList struct { + FullMeshPeers []peer.ID `json:"fullMesh"` + AllPeers []peer.ID `json:"all"` +} + type ConnStatusSubscription struct { sync.RWMutex @@ -92,6 +97,8 @@ type Waku interface { ListenAddresses() ([]string, error) + RelayPeersByTopic(topic string) (*PeerList, error) + ENR() (string, error) Peers() map[string]WakuV2Peer diff --git a/go.mod b/go.mod index d6d393907..2e4fd00eb 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,8 @@ replace github.com/forPelevin/gomoji => github.com/status-im/gomoji v1.1.3-0.202 replace github.com/mutecomm/go-sqlcipher/v4 v4.4.2 => github.com/status-im/go-sqlcipher/v4 v4.5.4-status.2 +replace github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445 + require ( github.com/anacrolix/torrent v1.41.0 github.com/beevik/ntp v0.3.0 diff --git a/go.sum b/go.sum index 43e769cea..881ea8e85 100644 --- a/go.sum +++ b/go.sum @@ -1364,8 +1364,6 @@ github.com/libp2p/go-libp2p v0.35.0 h1:1xS1Bkr9X7GtdvV6ntLnDV9xB1kNjHK1lZ0eaO6gn github.com/libp2p/go-libp2p v0.35.0/go.mod h1:snyJQix4ET6Tj+LeI0VPjjxTtdWpeOhYt5lEY0KirkQ= github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94= github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= -github.com/libp2p/go-libp2p-pubsub v0.11.0 h1:+JvS8Kty0OiyUiN0i8H5JbaCgjnJTRnTHe4rU88dLFc= -github.com/libp2p/go-libp2p-pubsub v0.11.0/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg= github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU= @@ -2141,6 +2139,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97JryAEV8pRXB//qPcg+8bPXl/O+AOLt3FeCKc= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= +github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445 h1:V5f5NGsf/UwlJENmJjHGD9lr+3/Bz4ZZ6mL61tvtxgg= +github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 h1:3Idg7XvXc9iQpUyg8KNKgZnziHJRs3xm7EDJHFzC9to= diff --git a/protocol/messenger_peers.go b/protocol/messenger_peers.go index 91469773f..2a8709e77 100644 --- a/protocol/messenger_peers.go +++ b/protocol/messenger_peers.go @@ -32,6 +32,10 @@ func (m *Messenger) Peers() map[string]types.WakuV2Peer { return m.transport.Peers() } +func (m *Messenger) RelayPeersByTopic(topic string) (*types.PeerList, error) { + return m.transport.RelayPeersByTopic(topic) +} + func (m *Messenger) ListenAddresses() ([]string, error) { return m.transport.ListenAddresses() } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 745eed5f5..3b4ce2faa 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -652,6 +652,10 @@ func (t *Transport) ListenAddresses() ([]string, error) { return t.waku.ListenAddresses() } +func (t *Transport) RelayPeersByTopic(topic string) (*types.PeerList, error) { + return t.waku.RelayPeersByTopic(topic) +} + func (t *Transport) ENR() (string, error) { return t.waku.ENR() } diff --git a/services/ext/api.go b/services/ext/api.go index a39e155f1..97947e686 100644 --- a/services/ext/api.go +++ b/services/ext/api.go @@ -1489,6 +1489,10 @@ func (api *PublicAPI) Peers() map[string]types.WakuV2Peer { return api.service.messenger.Peers() } +func (api *PublicAPI) RelayPeersByTopic(topic string) (*types.PeerList, error) { + return api.service.messenger.RelayPeersByTopic(topic) +} + func (api *PublicAPI) ListenAddresses() ([]string, error) { return api.service.messenger.ListenAddresses() } diff --git a/vendor/github.com/libp2p/go-libp2p-pubsub/gossipsub.go b/vendor/github.com/libp2p/go-libp2p-pubsub/gossipsub.go index aace05533..6e42a7a02 100644 --- a/vendor/github.com/libp2p/go-libp2p-pubsub/gossipsub.go +++ b/vendor/github.com/libp2p/go-libp2p-pubsub/gossipsub.go @@ -688,7 +688,6 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. log.Debugf("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, gs.peerhave[p]) return nil } - if gs.iasked[p] >= gs.params.MaxIHaveLength { log.Debugf("IHAVE: peer %s has already advertised too many messages (%d); ignoring", p, gs.iasked[p]) return nil @@ -706,7 +705,14 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. continue } - for _, mid := range ihave.GetMessageIDs() { + checkIwantMsgsLoop: + for msgIdx, mid := range ihave.GetMessageIDs() { + // prevent remote peer from sending too many msg_ids on a single IHAVE message + if msgIdx >= gs.params.MaxIHaveLength { + log.Debugf("IHAVE: peer %s has sent IHAVE on topic %s with too many messages (%d); ignoring remaining msgs", p, topic, len(ihave.MessageIDs)) + break checkIwantMsgsLoop + } + if gs.p.seenMessage(mid) { continue } @@ -1979,6 +1985,22 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID return peers } +func (gs *GossipSubRouter) MeshPeers(topic string) []peer.ID { + peers, ok := gs.mesh[topic] + if !ok { + return nil + } + + result := make([]peer.ID, len(peers)) + i := 0 + for p := range peers { + result[i] = p + i++ + } + + return result +} + // WithDefaultTagTracer returns the tag tracer of the GossipSubRouter as a PubSub option. // This is useful for cases where the GossipSubRouter is instantiated externally, and is // injected into the GossipSub constructor as a dependency. This allows the tag tracer to be diff --git a/vendor/github.com/libp2p/go-libp2p-pubsub/pubsub.go b/vendor/github.com/libp2p/go-libp2p-pubsub/pubsub.go index c4ecae65f..74a3ea924 100644 --- a/vendor/github.com/libp2p/go-libp2p-pubsub/pubsub.go +++ b/vendor/github.com/libp2p/go-libp2p-pubsub/pubsub.go @@ -1420,3 +1420,7 @@ type addRelayReq struct { topic string resp chan RelayCancelFunc } + +func (p *PubSub) Router() PubSubRouter { + return p.rt +} diff --git a/vendor/modules.txt b/vendor/modules.txt index bb9aefe2a..36244ffc3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -568,7 +568,7 @@ github.com/libp2p/go-libp2p/p2p/transport/webtransport # github.com/libp2p/go-libp2p-asn-util v0.4.1 ## explicit; go 1.19 github.com/libp2p/go-libp2p-asn-util -# github.com/libp2p/go-libp2p-pubsub v0.11.0 +# github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240701005450-b4513d154445 ## explicit; go 1.21 github.com/libp2p/go-libp2p-pubsub github.com/libp2p/go-libp2p-pubsub/pb diff --git a/wakuv2/waku.go b/wakuv2/waku.go index babe316ff..c2d431cfd 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1718,6 +1718,17 @@ func (w *Waku) Peers() map[string]types.WakuV2Peer { return FormatPeerStats(w.node) } +func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { + if w.cfg.LightClient { + return nil, errors.New("only available in relay mode") + } + + return &types.PeerList{ + FullMeshPeers: w.node.Relay().PubSub().Router().(*pubsub.GossipSubRouter).MeshPeers(topic), + AllPeers: w.node.Relay().PubSub().ListPeers(topic), + }, nil +} + func (w *Waku) ListenAddresses() []string { addrs := w.node.ListenAddresses() var result []string diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 3eede21ef..8e47f0309 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -127,6 +127,26 @@ func TestRestartDiscoveryV5(t *testing.T) { require.NoError(t, w.Stop()) } +func TestRelayPeers(t *testing.T) { + config := &Config{} + setDefaultConfig(config, false) + w, err := New(nil, "", config, nil, nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, w.Start()) + _, err = w.RelayPeersByTopic(config.DefaultShardPubsubTopic) + require.NoError(t, err) + + // Ensure function returns an error for lightclient + config = &Config{} + config.ClusterID = 16 + config.LightClient = true + w, err = New(nil, "", config, nil, nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, w.Start()) + _, err = w.RelayPeersByTopic(config.DefaultShardPubsubTopic) + require.Error(t, err) +} + func TestBasicWakuV2(t *testing.T) { t.Skip("flaky test") @@ -347,6 +367,7 @@ func TestWakuV2Filter(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Start()) w.filterManager.filterSubBatchDuration = 1 * time.Second + options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 10 * time.Second }