From 1c116589cd0a60f9bb643208d69902367d20c9d3 Mon Sep 17 00:00:00 2001 From: kaichao Date: Mon, 3 Jun 2024 21:19:18 +0800 Subject: [PATCH] chore_: bump go-waku (#5269) --- go.mod | 2 +- go.sum | 4 +- .../github.com/waku-org/go-waku/tests/init.go | 2 + .../waku-org/go-waku/tests/utils.go | 439 ++++++++++++++++++ .../go-waku/waku/v2/node/wakunode2.go | 4 +- .../go-waku/waku/v2/node/wakuoptions.go | 5 +- .../waku/v2/peermanager/peer_manager.go | 4 +- .../waku/v2/peermanager/peer_selection.go | 84 +++- .../waku/v2/peermanager/service_slot.go | 4 +- .../v2/peermanager/topic_event_handler.go | 3 +- .../waku/v2/peerstore/waku_peer_store.go | 50 +- .../waku/v2/protocol/content_filter.go | 23 +- .../go-waku/waku/v2/protocol/filter/client.go | 49 +- .../v2/protocol/filter/filter_health_check.go | 48 ++ .../waku/v2/protocol/filter/options.go | 10 + .../go-waku/waku/v2/protocol/filter/server.go | 2 +- .../waku/v2/protocol/filter/test_utils.go | 394 ++++++++++++++++ .../v2/protocol/lightpush/pb/validation.go | 7 + .../v2/protocol/lightpush/waku_lightpush.go | 1 + .../go-waku/waku/v2/protocol/store/client.go | 35 +- .../go-waku/waku/v2/protocol/store/result.go | 43 +- .../subscription/subscription_details.go | 10 +- .../subscription/subscriptions_map.go | 25 + vendor/modules.txt | 3 +- wakuv2/filter_manager.go | 6 +- 25 files changed, 1118 insertions(+), 139 deletions(-) create mode 100644 vendor/github.com/waku-org/go-waku/tests/init.go create mode 100644 vendor/github.com/waku-org/go-waku/tests/utils.go create mode 100644 vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go create mode 100644 vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go diff --git a/go.mod b/go.mod index 2becb31ba..d34af5046 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f + github.com/waku-org/go-waku v0.8.1-0.20240528125047-269417c5e979 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 548ea5e9e..7e59b36a5 100644 --- a/go.sum +++ b/go.sum @@ -2138,8 +2138,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f h1:KiDqcxmCi74BGDZzkGT5T83QhEL/rPrUbEiJWOuiuU4= -github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f/go.mod h1:yXnWChXRKTb+NhALbFysluxgSwuxeTF2rhanDJkIx+k= +github.com/waku-org/go-waku v0.8.1-0.20240528125047-269417c5e979 h1:31upWxN0XWBA+bcS0aCJ9jR8HzBNtCg8zKAj+Jvre08= +github.com/waku-org/go-waku v0.8.1-0.20240528125047-269417c5e979/go.mod h1:yXnWChXRKTb+NhALbFysluxgSwuxeTF2rhanDJkIx+k= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/tests/init.go b/vendor/github.com/waku-org/go-waku/tests/init.go new file mode 100644 index 000000000..cdbbb0ab9 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/tests/init.go @@ -0,0 +1,2 @@ +// Contains resources or utils for test units +package tests diff --git a/vendor/github.com/waku-org/go-waku/tests/utils.go b/vendor/github.com/waku-org/go-waku/tests/utils.go new file mode 100644 index 000000000..d5a579127 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/tests/utils.go @@ -0,0 +1,439 @@ +package tests + +import ( + "bytes" + "context" + "crypto/ecdsa" + "crypto/rand" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "math" + "math/big" + "net" + "net/url" + "strconv" + "strings" + "sync" + "testing" + "time" + "unicode/utf8" + + "github.com/waku-org/go-waku/waku/v2/protocol" + + gcrypto "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/config" + "github.com/libp2p/go-libp2p/core/crypto" + libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/peerstore" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +type StringGenerator func(maxLength int) (string, error) + +// GetHostAddress returns the first listen address used by a host +func GetHostAddress(ha host.Host) multiaddr.Multiaddr { + return ha.Addrs()[0] +} + +// Returns a full multiaddr of host appended by peerID +func GetAddr(h host.Host) multiaddr.Multiaddr { + id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().String())) + var selectedAddr multiaddr.Multiaddr + //For now skipping circuit relay addresses as libp2p seems to be returning empty p2p-circuit addresses. + for _, addr := range h.Network().ListenAddresses() { + if strings.Contains(addr.String(), "p2p-circuit") { + continue + } + selectedAddr = addr + break + } + return selectedAddr.Encapsulate(id) +} + +// FindFreePort returns an available port number +func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) { + t.Helper() + + if host == "" { + host = "localhost" + } + + for i := 0; i < maxAttempts; i++ { + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(host, "0")) + if err != nil { + t.Logf("unable to resolve tcp addr: %v", err) + continue + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + l.Close() + t.Logf("unable to listen on addr %q: %v", addr, err) + continue + } + + port := l.Addr().(*net.TCPAddr).Port + l.Close() + return port, nil + + } + + return 0, fmt.Errorf("no free port found") +} + +// FindFreePort returns an available port number +func FindFreeUDPPort(t *testing.T, host string, maxAttempts int) (int, error) { + t.Helper() + + if host == "" { + host = "localhost" + } + + for i := 0; i < maxAttempts; i++ { + addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, "0")) + if err != nil { + t.Logf("unable to resolve tcp addr: %v", err) + continue + } + l, err := net.ListenUDP("udp", addr) + if err != nil { + l.Close() + t.Logf("unable to listen on addr %q: %v", addr, err) + continue + } + + port := l.LocalAddr().(*net.UDPAddr).Port + l.Close() + return port, nil + + } + + return 0, fmt.Errorf("no free port found") +} + +// MakeHost creates a Libp2p host with a random key on a specific port +func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, error) { + // Creates a new RSA key pair for this host. + prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, randomness) + if err != nil { + log.Error(err.Error()) + return nil, err + } + + // 0.0.0.0 will listen on any interface device. + sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + if err != nil { + return nil, err + } + + ps, err := pstoremem.NewPeerstore() + if err != nil { + return nil, err + } + + psWrapper := peerstore.NewWakuPeerstore(ps) + if err != nil { + return nil, err + } + + // libp2p.New constructs a new libp2p Host. + // Other options can be added here. + return libp2p.New( + libp2p.Peerstore(psWrapper), + libp2p.ListenAddrs(sourceMultiAddr), + libp2p.Identity(prvKey), + ) +} + +// CreateWakuMessage creates a WakuMessage protobuffer with default values and a custom contenttopic and timestamp +func CreateWakuMessage(contentTopic string, timestamp *int64, optionalPayload ...string) *pb.WakuMessage { + var payload []byte + if len(optionalPayload) > 0 { + payload = []byte(optionalPayload[0]) + } else { + payload = []byte{1, 2, 3} + } + return &pb.WakuMessage{Payload: payload, ContentTopic: contentTopic, Timestamp: timestamp} +} + +// RandomHex returns a random hex string of n bytes +func RandomHex(n int) (string, error) { + bytes := make([]byte, n) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return hex.EncodeToString(bytes), nil +} + +func NewLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags wenr.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + localnode := enode.NewLocalNode(db, priv) + localnode.SetFallbackUDP(udpPort) + localnode.Set(enr.WithEntry(wenr.WakuENRField, wakuFlags)) + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + localnode.SetStaticIP(ipAddr.IP) + + if udpPort > 0 && udpPort <= math.MaxUint16 { + localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting udpPort", zap.Int("port", udpPort)) + } + + if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { + localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) + } + + if advertiseAddr != nil { + localnode.SetStaticIP(*advertiseAddr) + } + + return localnode, nil +} + +func CreateHost(t *testing.T, opts ...config.Option) (host.Host, int, *ecdsa.PrivateKey) { + privKey, err := gcrypto.GenerateKey() + require.NoError(t, err) + + sPrivKey := libp2pcrypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(privKey)) + + port, err := FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + + sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + require.NoError(t, err) + + opts = append(opts, libp2p.ListenAddrs(sourceMultiAddr), + libp2p.Identity(sPrivKey)) + + host, err := libp2p.New(opts...) + require.NoError(t, err) + + return host, port, privKey +} + +func ExtractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { + ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) + if err != nil { + return nil, err + } + + portStr, err := addr.ValueForProtocol(multiaddr.P_TCP) + if err != nil { + return nil, err + } + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, err + } + return &net.TCPAddr{ + IP: net.ParseIP(ipStr), + Port: port, + }, nil +} + +func RandomInt(min, max int) (int, error) { + n, err := rand.Int(rand.Reader, big.NewInt(int64(max-min+1))) + if err != nil { + return 0, err + } + return min + int(n.Int64()), nil +} + +func RandomBytes(n int) ([]byte, error) { + b := make([]byte, n) + _, err := rand.Read(b) + + if err != nil { + return nil, err + } + + return b, nil +} + +func GenerateRandomASCIIString(maxLength int) (string, error) { + length, err := rand.Int(rand.Reader, big.NewInt(int64(maxLength))) + if err != nil { + return "", err + } + length.SetInt64(length.Int64() + 1) + + const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + result := make([]byte, length.Int64()) + for i := range result { + num, err := rand.Int(rand.Reader, big.NewInt(int64(len(chars)))) + if err != nil { + return "", err + } + result[i] = chars[num.Int64()] + } + + return string(result), nil +} + +func GenerateRandomUTF8String(maxLength int) (string, error) { + length, err := rand.Int(rand.Reader, big.NewInt(int64(maxLength))) + if err != nil { + return "", err + } + length.SetInt64(length.Int64() + 1) + + var ( + runes []rune + start, end int + ) + + // Define unicode range + start = 0x0020 // Space character + end = 0x007F // Tilde (~) + + for i := 0; int64(i) < length.Int64(); i++ { + randNum, err := rand.Int(rand.Reader, big.NewInt(int64(end-start+1))) + if err != nil { + return "", err + } + char := rune(start + int(randNum.Int64())) + if !utf8.ValidRune(char) { + continue + } + runes = append(runes, char) + } + + return string(runes), nil +} + +func GenerateRandomJSONString(maxLength int) (string, error) { + // With 5 key-value pairs + m := make(map[string]interface{}) + for i := 0; i < 5; i++ { + key, err := GenerateRandomASCIIString(20) + if err != nil { + return "", err + } + value, err := GenerateRandomASCIIString(maxLength) + if err != nil { + return "", err + } + + m[key] = value + } + + // Marshal the map into a JSON string + var buf bytes.Buffer + encoder := json.NewEncoder(&buf) + encoder.SetEscapeHTML(false) + err := encoder.Encode(m) + if err != nil { + return "", err + } + + return buf.String(), nil +} + +func GenerateRandomBase64String(maxLength int) (string, error) { + bytes, err := RandomBytes(maxLength) + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(bytes), nil +} + +func GenerateRandomURLEncodedString(maxLength int) (string, error) { + randomString, err := GenerateRandomASCIIString(maxLength) + if err != nil { + return "", err + } + + // URL-encode the random string + return url.QueryEscape(randomString), nil +} + +func GenerateRandomSQLInsert(maxLength int) (string, error) { + // Random table name + tableName, err := GenerateRandomASCIIString(10) + if err != nil { + return "", err + } + + // Random column names + columnCount, err := RandomInt(3, 6) + if err != nil { + return "", err + } + columnNames := make([]string, columnCount) + for i := 0; i < columnCount; i++ { + columnName, err := GenerateRandomASCIIString(maxLength) + if err != nil { + return "", err + } + columnNames[i] = columnName + } + + // Random values + values := make([]string, columnCount) + for i := 0; i < columnCount; i++ { + value, err := GenerateRandomASCIIString(maxLength) + if err != nil { + return "", err + } + values[i] = "'" + value + "'" + } + + query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s);", + tableName, + strings.Join(columnNames, ", "), + strings.Join(values, ", ")) + + return query, nil +} + +func WaitForMsg(t *testing.T, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) { + wg.Add(1) + log := utils.Logger() + go func() { + defer wg.Done() + select { + case env := <-ch: + msg := env.Message() + log.Info("Received ", zap.String("msg", msg.String())) + case <-time.After(timeout): + require.Fail(t, "Message timeout") + } + }() + wg.Wait() +} + +func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) { + wg.Add(1) + go func() { + defer wg.Done() + select { + case _, ok := <-ch: + require.False(t, ok, "should not retrieve message") + case <-time.After(timeout): + // All good + case <-ctx.Done(): + require.Fail(t, "test exceeded allocated time") + } + }() + + wg.Wait() +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index ca6a16bb4..5d68374b0 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -291,7 +291,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) - w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log) + w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...) w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) @@ -844,7 +844,7 @@ func (w *WakuNode) Peers() ([]*Peer, error) { Protocols: protocols, Connected: connected, Addrs: addrs, - PubsubTopics: topics, + PubsubTopics: maps.Keys(topics), }) } return peers, nil diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index 90f67f066..0c1759929 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -28,6 +28,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -77,6 +78,7 @@ type WakuNodeParameters struct { enableFilterFullNode bool filterOpts []filter.Option pubsubOpts []pubsub.Option + lightpushOpts []lightpush.Option minRelayPeersToPublish int maxMsgSizeBytes int @@ -458,9 +460,10 @@ func WithMessageProvider(s legacy_store.MessageProvider) WakuNodeOption { } // WithLightPush is a WakuNodeOption that enables the lightpush protocol -func WithLightPush() WakuNodeOption { +func WithLightPush(lightpushOpts ...lightpush.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableLightPush = true + params.lightpushOpts = lightpushOpts return nil } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index e811632e6..9da979e56 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -277,10 +277,10 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee //Need to filter peers to check if they support relay if inPeers.Len() != 0 { - inRelayPeers, _ = pm.FilterPeersByProto(inPeers, relay.WakuRelayID_v200) + inRelayPeers, _ = pm.FilterPeersByProto(inPeers, nil, relay.WakuRelayID_v200) } if outPeers.Len() != 0 { - outRelayPeers, _ = pm.FilterPeersByProto(outPeers, relay.WakuRelayID_v200) + outRelayPeers, _ = pm.FilterPeersByProto(outPeers, nil, relay.WakuRelayID_v200) } return } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_selection.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_selection.go index 4c1268bb6..8011b7e90 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_selection.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_selection.go @@ -2,6 +2,7 @@ package peermanager import ( "context" + "encoding/json" "errors" "github.com/libp2p/go-libp2p/core/peer" @@ -12,7 +13,16 @@ import ( "golang.org/x/exp/maps" ) -type peerSet map[peer.ID]struct{} +type PeerSet map[peer.ID]struct{} + +func PeerInSet(peers PeerSet, peer peer.ID) bool { + if len(peers) > 0 { + if _, ok := peers[peer]; ok { + return true + } + } + return false +} // SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic. // If a list of specific peers is passed, the peer will be chosen from that list assuming @@ -54,17 +64,18 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err)) return nil, err } else if len(peerIDs) == 0 { - peerIDs = make(peerSet) + peerIDs = make(PeerSet) } // if not found in serviceSlots or proto == WakuRelayIDv200 - filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) + filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto) if err != nil { return nil, err } if len(criteria.PubsubTopics) > 0 { filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...) } - randomPeers, err := selectRandomPeers(filteredPeers, criteria.MaxPeers-len(peerIDs)) + //Not passing excludePeers as filterPeers are already considering excluded ones. + randomPeers, err := selectRandomPeers(filteredPeers, nil, criteria.MaxPeers-len(peerIDs)) if err != nil && len(peerIDs) == 0 { return nil, err } @@ -75,10 +86,13 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic return maps.Keys(peerIDs), nil } -func getRandom(filter peerSet, count int) (peerSet, error) { +func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) { i := 0 - selectedPeers := make(peerSet) + selectedPeers := make(PeerSet) for pID := range filter { + if PeerInSet(excludePeers, pID) { + continue + } //Map's iterator in golang works using randomness and hence not random function is being used. selectedPeers[pID] = struct{}{} i++ @@ -93,34 +107,37 @@ func getRandom(filter peerSet, count int) (peerSet, error) { } // selects count random peers from list of peers -func selectRandomPeers(peers peer.IDSlice, count int) (peerSet, error) { - filteredPeerMap := peerSliceToMap(peers) - return getRandom(filteredPeerMap, count) +func selectRandomPeers(peers peer.IDSlice, excludePeers PeerSet, count int) (PeerSet, error) { + filteredPeerMap := PeerSliceToMap(peers) + return getRandom(filteredPeerMap, count, excludePeers) } -func peerSliceToMap(peers peer.IDSlice) peerSet { - peerSet := make(peerSet, peers.Len()) +func PeerSliceToMap(peers peer.IDSlice) PeerSet { + peerSet := make(PeerSet, peers.Len()) for _, peer := range peers { peerSet[peer] = struct{}{} } return peerSet } -func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (peerSet, error) { - peers := make(peerSet) +func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSet, error) { + peers := make(PeerSet) var err error for retryCnt := 0; retryCnt < 1; retryCnt++ { //Try to fetch from serviceSlot if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil { if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { - return slot.getRandom(criteria.MaxPeers) + return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers) } else { //PubsubTopic based selection keys := make([]peer.ID, 0, len(slot.m)) for i := range slot.m { + if PeerInSet(criteria.ExcludePeers, i) { + continue + } keys = append(keys, i) } selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...) - tmpPeers, err := selectRandomPeers(selectedPeers, criteria.MaxPeers) + tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers) for tmpPeer := range tmpPeers { peers[tmpPeer] = struct{}{} } @@ -145,12 +162,21 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (peerSe // PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. type PeerSelectionCriteria struct { - SelectionType PeerSelection - Proto protocol.ID - PubsubTopics []string - SpecificPeers peer.IDSlice - MaxPeers int - Ctx context.Context + SelectionType PeerSelection `json:"selectionType"` + Proto protocol.ID `json:"protocolId"` + PubsubTopics []string `json:"pubsubTopics"` + SpecificPeers peer.IDSlice `json:"specificPeers"` + MaxPeers int `json:"maxPeerCount"` + Ctx context.Context `json:"-"` + ExcludePeers PeerSet `json:"excludePeers"` +} + +func (psc PeerSelectionCriteria) String() string { + pscJson, err := json.Marshal(psc) + if err != nil { + return "" + } + return string(pscJson) } // SelectPeers selects a peer based on selectionType specified. @@ -159,6 +185,12 @@ func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice if criteria.MaxPeers == 0 { criteria.MaxPeers = 1 } + excPeers := maps.Keys(criteria.ExcludePeers) + var excPeer peer.ID + if len(excPeers) > 0 { + excPeer = excPeers[0] + } + pm.logger.Debug("Select Peers", zap.Stringer("selectionCriteria", criteria), zap.Stringer("excludedPeers", excPeer)) switch criteria.SelectionType { case Automatic: return pm.SelectRandom(criteria) @@ -191,7 +223,7 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...) } - peers, err = pm.FilterPeersByProto(peers, criteria.Proto) + peers, err = pm.FilterPeersByProto(peers, criteria.ExcludePeers, criteria.Proto) if err != nil { return "", err } @@ -201,20 +233,22 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( // FilterPeersByProto filters list of peers that support specified protocols. // If specificPeers is nil, all peers in the host's peerStore are considered for filtering. -func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { +func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, excludePeers PeerSet, proto ...protocol.ID) (peer.IDSlice, error) { peerSet := specificPeers if len(peerSet) == 0 { peerSet = pm.host.Peerstore().Peers() } - var peers peer.IDSlice for _, peer := range peerSet { protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...) if err != nil { return nil, err } - if len(protocols) > 0 { + //Maybe we can optimize below set of statements a better way?? + if PeerInSet(excludePeers, peer) { + continue + } peers = append(peers, peer) } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/service_slot.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/service_slot.go index a5673df3f..9fadca6d0 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/service_slot.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/service_slot.go @@ -19,10 +19,10 @@ func newPeerMap() *peerMap { } } -func (pm *peerMap) getRandom(count int) (peerSet, error) { +func (pm *peerMap) getRandom(count int, excludePeers PeerSet) (PeerSet, error) { pm.mu.RLock() defer pm.mu.RUnlock() - return getRandom(pm.m, count) + return getRandom(pm.m, count, excludePeers) } func (pm *peerMap) remove(pID peer.ID) { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/topic_event_handler.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/topic_event_handler.go index 60e1f759b..3060bf7b9 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/topic_event_handler.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/topic_event_handler.go @@ -13,6 +13,7 @@ import ( waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" + "golang.org/x/exp/maps" ) func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error { @@ -103,7 +104,7 @@ func (pm *PeerManager) handleNewRelayTopicUnSubscription(pubsubTopic string) { logging.HostID("peerID", peer)) continue } - if len(peerTopics) == 1 && peerTopics[0] == pubsubTopic { + if len(peerTopics) == 1 && maps.Keys(peerTopics)[0] == pubsubTopic { err := pm.host.Network().ClosePeer(peer) if err != nil { pm.logger.Warn("Failed to disconnect connection towards peer", diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peerstore/waku_peer_store.go b/vendor/github.com/waku-org/go-waku/waku/v2/peerstore/waku_peer_store.go index 402d265ad..203b23103 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peerstore/waku_peer_store.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peerstore/waku_peer_store.go @@ -8,6 +8,8 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" + "golang.org/x/exp/maps" ) // Origin is used to determine how the peer is identified, @@ -58,7 +60,7 @@ type WakuPeerstore interface { AddPubSubTopic(p peer.ID, topic string) error RemovePubSubTopic(p peer.ID, topic string) error - PubSubTopics(p peer.ID) ([]string, error) + PubSubTopics(p peer.ID) (protocol.TopicSet, error) SetPubSubTopics(p peer.ID, topics []string) error PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice @@ -175,13 +177,12 @@ func (ps *WakuPeerstoreImpl) AddPubSubTopic(p peer.ID, topic string) error { if err != nil { return err } - for _, t := range existingTopics { - if t == topic { - return nil - } + + if _, found := existingTopics[topic]; found { + return nil } - existingTopics = append(existingTopics, topic) - return ps.peerStore.Put(p, peerPubSubTopics, existingTopics) + existingTopics[topic] = struct{}{} + return ps.peerStore.Put(p, peerPubSubTopics, maps.Keys(existingTopics)) } // RemovePubSubTopic removes a pubSubTopic from the peer @@ -195,14 +196,9 @@ func (ps *WakuPeerstoreImpl) RemovePubSubTopic(p peer.ID, topic string) error { return nil } - for i := range existingTopics { - if existingTopics[i] == topic { - existingTopics = append(existingTopics[:i], existingTopics[i+1:]...) - break - } - } + delete(existingTopics, topic) - err = ps.SetPubSubTopics(p, existingTopics) + err = ps.SetPubSubTopics(p, maps.Keys(existingTopics)) if err != nil { return err } @@ -215,16 +211,16 @@ func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error { } // PubSubTopics fetches list of pubSubTopics for a peer -func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) { +func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) (protocol.TopicSet, error) { result, err := ps.peerStore.Get(p, peerPubSubTopics) if err != nil { if errors.Is(err, peerstore.ErrNotFound) { - return nil, nil + return protocol.NewTopicSet(), nil } else { return nil, err } } - return result.([]string), nil + return protocol.NewTopicSet((result.([]string))...), nil } // PeersByPubSubTopic Returns list of peers that support list of pubSubTopics @@ -235,22 +231,16 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific } var result peer.IDSlice for _, p := range specificPeers { - topics, err := ps.PubSubTopics(p) + peerMatch := true + peerTopics, err := ps.PubSubTopics(p) if err == nil { - //Convoluted and crazy logic to find subset of topics - // Could not find a better way to do it? - peerTopicMap := make(map[string]struct{}) - match := true - for _, topic := range topics { - peerTopicMap[topic] = struct{}{} - } - for _, topic := range pubSubTopics { - if _, ok := peerTopicMap[topic]; !ok { - match = false + for _, t := range pubSubTopics { + if _, ok := peerTopics[t]; !ok { + peerMatch = false break } } - if match { + if peerMatch { result = append(result, p) } } //Note: skipping a peer in case of an error as there would be others available. @@ -268,7 +258,7 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeer for _, p := range specificPeers { topics, err := ps.PubSubTopics(p) if err == nil { - for _, topic := range topics { + for topic := range topics { if topic == pubSubTopic { result = append(result, p) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/content_filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/content_filter.go index f09cf52b3..0a859e0d2 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/content_filter.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/content_filter.go @@ -1,11 +1,22 @@ package protocol -import "golang.org/x/exp/maps" +import ( + "golang.org/x/exp/maps" +) type PubsubTopicStr = string type ContentTopicStr = string type ContentTopicSet map[string]struct{} +type TopicSet map[string]struct{} + +func NewTopicSet(topics ...string) TopicSet { + s := make(TopicSet, len(topics)) + for _, t := range topics { + s[t] = struct{}{} + } + return s +} func NewContentTopicSet(contentTopics ...string) ContentTopicSet { s := make(ContentTopicSet, len(contentTopics)) @@ -28,6 +39,16 @@ type ContentFilter struct { ContentTopics ContentTopicSet `json:"contentTopics"` } +func (cf ContentFilter) String() string { + var ret string + ret += "{ pubsubTopic: " + cf.PubsubTopic + ", contentTopics: [ " + for ct := range cf.ContentTopics { + ret += ct + } + ret += " ] }" + return ret +} + func (cf ContentFilter) ContentTopicsList() []string { return cf.ContentTopics.ToList() } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index cad12dc6f..edbba8d3f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -9,6 +9,7 @@ import ( "net/http" "strings" "sync" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -43,13 +44,14 @@ var ( type WakuFilterLightNode struct { *service.CommonService - h host.Host - broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s - timesource timesource.Timesource - metrics Metrics - log *zap.Logger - subscriptions *subscription.SubscriptionsMap - pm *peermanager.PeerManager + h host.Host + broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s + timesource timesource.Timesource + metrics Metrics + log *zap.Logger + subscriptions *subscription.SubscriptionsMap + pm *peermanager.PeerManager + peerPingInterval time.Duration } type WakuFilterPushError struct { @@ -86,7 +88,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM wf.pm = pm wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) - + wf.peerPingInterval = 5 * time.Second return wf } @@ -97,13 +99,15 @@ func (wf *WakuFilterLightNode) SetHost(h host.Host) { func (wf *WakuFilterLightNode) Start(ctx context.Context) error { return wf.CommonService.Start(ctx, wf.start) - } func (wf *WakuFilterLightNode) start() error { wf.subscriptions = subscription.NewSubscriptionMap(wf.log) wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(wf.Context())) + //Start Filter liveness check + wf.CommonService.WaitGroup().Add(1) + go wf.FilterHealthCheckLoop() wf.log.Info("filter-push protocol started") return nil } @@ -313,24 +317,29 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, wf.pm.Connect(pData) params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) } - if params.pm != nil { + reqPeerCount := params.maxPeers - len(params.selectedPeers) - peerCount := params.maxPeers - len(params.selectedPeers) + if params.pm != nil && reqPeerCount > 0 { + wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude))) params.selectedPeers, err = wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, PubsubTopics: maps.Keys(pubSubTopicMap), SpecificPeers: params.preferredPeers, - MaxPeers: peerCount, + MaxPeers: reqPeerCount, Ctx: ctx, + ExcludePeers: params.peersToExclude, }, ) if err != nil { + wf.log.Error("peer selection returned err", zap.Error(err)) return nil, nil, err } } + wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers))) + return params, pubSubTopicMap, nil } @@ -354,7 +363,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { var selectedPeers peer.IDSlice + wf.log.Debug("peer selection", zap.Int("params.maxPeers", params.maxPeers)) + if params.pm != nil && len(params.selectedPeers) < params.maxPeers { + wf.log.Debug("selected peers less than maxPeers", zap.Int("maxpPeers", params.maxPeers)) selectedPeers, err = wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, @@ -363,6 +375,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot SpecificPeers: params.preferredPeers, MaxPeers: params.maxPeers - params.selectedPeers.Len(), Ctx: ctx, + ExcludePeers: params.peersToExclude, }, ) } else { @@ -375,7 +388,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot failedContentTopics = append(failedContentTopics, cTopics...) continue } - var cFilter protocol.ContentFilter cFilter.PubsubTopic = pubSubTopic cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...) @@ -395,6 +407,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot failedContentTopics = append(failedContentTopics, cTopics...) continue } + wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", peer)) subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter)) } } @@ -457,16 +470,6 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts .. peerID) } -func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *subscription.SubscriptionDetails) error { - wf.RLock() - defer wf.RUnlock() - if err := wf.ErrOnNotRunning(); err != nil { - return err - } - - return wf.Ping(ctx, subscription.PeerID) -} - // Unsubscribe is used to stop receiving messages from specified peers for the content filter func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go new file mode 100644 index 000000000..11b9a7200 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go @@ -0,0 +1,48 @@ +package filter + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/zap" +) + +func (wf *WakuFilterLightNode) PingPeers() { + //Send a ping to all the peers and report their status to corresponding subscriptions + // Alive or not or set state of subcription?? + for _, peer := range wf.subscriptions.GetSubscribedPeers() { + go wf.PingPeer(peer) + } +} + +func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { + ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), wf.peerPingInterval) + defer cancel() + err := wf.Ping(ctxWithTimeout, peer) + if err != nil { + wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err)) + + subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) + for _, subscription := range subscriptions { + wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) + + //Indicating that subscription is closing, + close(subscription.Closing) + } + } +} + +func (wf *WakuFilterLightNode) FilterHealthCheckLoop() { + defer wf.WaitGroup().Done() + ticker := time.NewTicker(wf.peerPingInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + wf.PingPeers() + case <-wf.CommonService.Context().Done(): + return + } + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go index afb9f2f1e..8743218b4 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go @@ -39,6 +39,7 @@ type ( peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice + peersToExclude peermanager.PeerSet maxPeers int requestID []byte log *zap.Logger @@ -101,6 +102,14 @@ func WithMaxPeersPerContentFilter(numPeers int) FilterSubscribeOption { } } +// WithPeersToExclude option excludes the peers that are specified from selection +func WithPeersToExclude(peers ...peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { + params.peersToExclude = peermanager.PeerSliceToMap(peers) + return nil + } +} + // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store. // If a list of specific peers is passed, the peer will be chosen from that list assuming it // supports the chosen protocol, otherwise it will chose a peer from the node peerstore @@ -145,6 +154,7 @@ func DefaultSubscriptionOptions() []FilterSubscribeOption { return []FilterSubscribeOption{ WithAutomaticPeerSelection(), WithAutomaticRequestID(), + WithMaxPeersPerContentFilter(1), } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go index aa18839b4..2bf63bb50 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go @@ -128,7 +128,7 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start)) - logger.Info("received request", zap.String("requestType", subscribeRequest.FilterSubscribeType.String())) + logger.Info("received request", zap.Stringer("serverID", wf.h.ID()), zap.Stringer("requestType", subscribeRequest.FilterSubscribeType)) } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go new file mode 100644 index 000000000..2beabc2d2 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go @@ -0,0 +1,394 @@ +package filter + +import ( + "context" + "crypto/rand" + "fmt" + "strconv" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/suite" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/peermanager" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +type LightNodeData struct { + LightNode *WakuFilterLightNode + LightNodeHost host.Host +} + +type FullNodeData struct { + relayNode *relay.WakuRelay + RelaySub *relay.Subscription + FullNodeHost host.Host + Broadcaster relay.Broadcaster + FullNode *WakuFilterFullNode +} + +type FilterTestSuite struct { + suite.Suite + LightNodeData + FullNodeData + + TestTopic string + TestContentTopic string + ctx context.Context + ctxCancel context.CancelFunc + wg *sync.WaitGroup + contentFilter protocol.ContentFilter + subDetails []*subscription.SubscriptionDetails + + Log *zap.Logger +} + +const DefaultTestPubSubTopic = "/waku/2/go/filter/test" +const DefaultTestContentTopic = "/test/10/my-app" + +type WakuMsg struct { + PubSubTopic string + ContentTopic string + Payload string +} + +func (s *FilterTestSuite) SetupTest() { + log := utils.Logger() //.Named("filterv2-test") + s.Log = log + + s.Log.Info("SetupTest()") + // Use a pointer to WaitGroup so that to avoid copying + // https://pkg.go.dev/sync#WaitGroup + s.wg = &sync.WaitGroup{} + + // Create test context + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + s.ctx = ctx + s.ctxCancel = cancel + + s.TestTopic = DefaultTestPubSubTopic + s.TestContentTopic = DefaultTestContentTopic + + s.MakeWakuFilterLightNode() + s.LightNode.peerPingInterval = 1 * time.Second + s.StartLightNode() + + //TODO: Add tests to verify broadcaster. + + s.MakeWakuFilterFullNode(s.TestTopic, false) + + s.ConnectToFullNode(s.LightNode, s.FullNode) + +} + +func (s *FilterTestSuite) TearDownTest() { + s.FullNode.Stop() + s.LightNode.Stop() + s.RelaySub.Unsubscribe() + s.LightNode.Stop() + s.ctxCancel() +} + +func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) { + mAddr := tests.GetAddr(h2.h) + _, err := h1.pm.AddPeer(mAddr, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1) + s.Log.Info("add peer", zap.Stringer("mAddr", mAddr)) + s.Require().NoError(err) +} + +func (s *FilterTestSuite) GetWakuRelay(topic string) FullNodeData { + + broadcaster := relay.NewBroadcaster(10) + s.Require().NoError(broadcaster.Start(context.Background())) + + port, err := tests.FindFreePort(s.T(), "", 5) + s.Require().NoError(err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + s.Require().NoError(err) + + relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, relay.WithMaxMsgSize(1024*1024)) + relay.SetHost(host) + + err = relay.Start(context.Background()) + s.Require().NoError(err) + + sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic)) + s.Require().NoError(err) + + return FullNodeData{relay, sub[0], host, broadcaster, nil} +} + +func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bool) FullNodeData { + + nodeData := s.GetWakuRelay(topic) + + node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + node2Filter.SetHost(nodeData.FullNodeHost) + + var sub *relay.Subscription + if withRegisterAll { + sub = nodeData.Broadcaster.RegisterForAll() + } else { + sub = nodeData.Broadcaster.Register(protocol.NewContentFilter(topic)) + } + + err := node2Filter.Start(s.ctx, sub) + s.Require().NoError(err) + + nodeData.FullNode = node2Filter + + return nodeData +} + +func (s *FilterTestSuite) MakeWakuFilterFullNode(topic string, withRegisterAll bool) { + nodeData := s.GetWakuFilterFullNode(topic, withRegisterAll) + + s.FullNodeData = nodeData +} + +func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { + port, err := tests.FindFreePort(s.T(), "", 5) + s.Require().NoError(err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + s.Require().NoError(err) + b := relay.NewBroadcaster(10) + s.Require().NoError(b.Start(context.Background())) + pm := peermanager.NewPeerManager(5, 5, nil, s.Log) + filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + filterPush.SetHost(host) + pm.SetHost(host) + return LightNodeData{filterPush, host} +} + +func (s *FilterTestSuite) MakeWakuFilterLightNode() { + s.LightNodeData = s.GetWakuFilterLightNode() +} + +func (s *FilterTestSuite) StartLightNode() { + err := s.LightNode.Start(context.Background()) + s.Require().NoError(err) +} + +func (s *FilterTestSuite) waitForMsg(msg *WakuMsg) { + s.waitForMsgFromChan(msg, s.subDetails[0].C) +} + +func (s *FilterTestSuite) waitForMsgFromChan(msg *WakuMsg, ch chan *protocol.Envelope) { + s.wg.Add(1) + var msgFound = false + go func() { + defer s.wg.Done() + select { + case env := <-ch: + for _, topic := range s.contentFilter.ContentTopicsList() { + if topic == env.Message().GetContentTopic() { + msgFound = true + } + } + s.Require().True(msgFound) + case <-time.After(1 * time.Second): + s.Require().Fail("Message timeout") + case <-s.ctx.Done(): + s.Require().Fail("test exceeded allocated time") + } + }() + + if msg != nil { + s.PublishMsg(msg) + } + + s.wg.Wait() +} + +func matchOneOfManyMsg(one WakuMsg, many []WakuMsg) bool { + for _, m := range many { + if m.PubSubTopic == one.PubSubTopic && + m.ContentTopic == one.ContentTopic && + m.Payload == one.Payload { + return true + } + } + + return false +} + +func (s *FilterTestSuite) waitForMessages(msgs []WakuMsg) { + s.wg.Add(1) + msgCount := len(msgs) + found := 0 + subs := s.subDetails + s.Log.Info("Expected messages ", zap.String("count", strconv.Itoa(msgCount))) + s.Log.Info("Existing subscriptions ", zap.String("count", strconv.Itoa(len(subs)))) + + go func() { + defer s.wg.Done() + for _, sub := range subs { + s.Log.Info("Looking at ", zap.String("pubSubTopic", sub.ContentFilter.PubsubTopic)) + for i := 0; i < msgCount; i++ { + select { + case env, ok := <-sub.C: + if !ok { + continue + } + received := WakuMsg{ + PubSubTopic: env.PubsubTopic(), + ContentTopic: env.Message().GetContentTopic(), + Payload: string(env.Message().GetPayload()), + } + s.Log.Debug("received message ", zap.String("pubSubTopic", received.PubSubTopic), zap.String("contentTopic", received.ContentTopic), zap.String("payload", received.Payload)) + if matchOneOfManyMsg(received, msgs) { + found++ + } + case <-time.After(3 * time.Second): + + case <-s.ctx.Done(): + s.Require().Fail("test exceeded allocated time") + } + } + } + }() + + if msgs != nil { + s.publishMessages(msgs) + } + + s.wg.Wait() + s.Require().Equal(msgCount, found) +} + +func (s *FilterTestSuite) waitForTimeout(msg *WakuMsg) { + s.waitForTimeoutFromChan(msg, s.subDetails[0].C) +} + +func (s *FilterTestSuite) waitForTimeoutFromChan(msg *WakuMsg, ch chan *protocol.Envelope) { + s.wg.Add(1) + go func() { + defer s.wg.Done() + select { + case env, ok := <-ch: + if ok { + s.Require().Fail("should not receive another message", zap.String("payload", string(env.Message().Payload))) + } + case <-time.After(1 * time.Second): + // Timeout elapsed, all good + case <-s.ctx.Done(): + s.Require().Fail("waitForTimeout test exceeded allocated time") + } + }() + + s.PublishMsg(msg) + + s.wg.Wait() +} + +func (s *FilterTestSuite) getSub(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails { + contentFilter := protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)} + + subDetails, err := s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(peer)) + s.Require().NoError(err) + + time.Sleep(1 * time.Second) + + return subDetails +} +func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) { + + for _, sub := range s.subDetails { + if sub.ContentFilter.PubsubTopic == pubsubTopic { + sub.Add(contentTopic) + s.contentFilter = sub.ContentFilter + subDetails, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) + s.subDetails = subDetails + s.Require().NoError(err) + return + } + } + + s.subDetails = s.getSub(pubsubTopic, contentTopic, peer) + s.contentFilter = s.subDetails[0].ContentFilter +} + +func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails { + + for _, sub := range s.subDetails { + if sub.ContentFilter.PubsubTopic == pubsubTopic { + topicsCount := len(sub.ContentFilter.ContentTopicsList()) + if topicsCount == 1 { + _, err := s.LightNode.Unsubscribe(s.ctx, sub.ContentFilter, WithPeer(peer)) + s.Require().NoError(err) + } else { + sub.Remove(contentTopic) + } + s.contentFilter = sub.ContentFilter + } + } + + return s.LightNode.Subscriptions() +} + +func (s *FilterTestSuite) PublishMsg(msg *WakuMsg) { + if len(msg.Payload) == 0 { + msg.Payload = "123" + } + + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(msg.ContentTopic, utils.GetUnixEpoch(), msg.Payload), relay.WithPubSubTopic(msg.PubSubTopic)) + s.Require().NoError(err) +} + +func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) { + for _, m := range msgs { + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(m.ContentTopic, utils.GetUnixEpoch(), m.Payload), relay.WithPubSubTopic(m.PubSubTopic)) + s.Require().NoError(err) + } +} + +func (s *FilterTestSuite) prepareData(quantity int, topics, contentTopics, payloads bool, sg tests.StringGenerator) []WakuMsg { + var ( + pubsubTopic = s.TestTopic // Has to be the same with initial s.testTopic + contentTopic = s.TestContentTopic // Has to be the same with initial s.testContentTopic + payload = "test_msg" + messages []WakuMsg + strMaxLenght = 4097 + generatedString = "" + ) + + for i := 0; i < quantity; i++ { + msg := WakuMsg{ + PubSubTopic: pubsubTopic, + ContentTopic: contentTopic, + Payload: payload, + } + + if sg != nil { + generatedString, _ = sg(strMaxLenght) + + } + + if topics { + msg.PubSubTopic = fmt.Sprintf("%s%02d%s", pubsubTopic, i, generatedString) + } + + if contentTopics { + msg.ContentTopic = fmt.Sprintf("%s%02d%s", contentTopic, i, generatedString) + } + + if payloads { + msg.Payload = fmt.Sprintf("%s%02d%s", payload, i, generatedString) + } + + messages = append(messages, msg) + } + + return messages +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb/validation.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb/validation.go index 6477c355d..f915c5b72 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb/validation.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb/validation.go @@ -2,6 +2,10 @@ package pb import "errors" +// This special value for requestId indicates that the message was rate limited +// and we did not retreive the requestId to avoid a potential attack vector. +const REQUESTID_RATE_LIMITED = "N/A" + var ( errMissingRequestID = errors.New("missing RequestId field") errMissingQuery = errors.New("missing Query field") @@ -32,6 +36,9 @@ func (x *PushRpc) ValidateRequest() error { } func (x *PushRpc) ValidateResponse(requestID string) error { + if x.RequestId == REQUESTID_RATE_LIMITED { + return nil + } if x.RequestId == "" { return errMissingRequestID } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index f2b98bb37..19708d116 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -108,6 +108,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream) wakuLP.metrics.RecordError(rateLimitFailure) responseMsg := "exceeds the rate limit" responsePushRPC.Response.Info = &responseMsg + responsePushRPC.RequestId = pb.REQUESTID_RATE_LIMITED wakuLP.reply(stream, responsePushRPC, logger) return } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go index 9eb3da40a..4faec5cf9 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go @@ -177,11 +177,12 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ } result := &Result{ - store: s, - messages: response.Messages, - storeRequest: storeRequest, - peerID: params.selectedPeer, - cursor: response.PaginationCursor, + store: s, + messages: response.Messages, + storeRequest: storeRequest, + storeResponse: response, + peerID: params.selectedPeer, + cursor: response.PaginationCursor, } return result, nil @@ -213,12 +214,12 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { if r.IsComplete() { return &Result{ - store: s, - started: true, - messages: []*pb.WakuMessageKeyValue{}, - cursor: nil, - storeRequest: r.storeRequest, - peerID: r.PeerID(), + store: s, + messages: nil, + cursor: nil, + storeRequest: r.storeRequest, + storeResponse: r.storeResponse, + peerID: r.PeerID(), }, nil } @@ -232,12 +233,12 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { } result := &Result{ - started: true, - store: s, - messages: response.Messages, - storeRequest: storeRequest, - peerID: r.PeerID(), - cursor: response.PaginationCursor, + store: s, + messages: response.Messages, + storeRequest: storeRequest, + storeResponse: response, + peerID: r.PeerID(), + cursor: response.PaginationCursor, } return result, nil diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go index 180d27de1..0b3cb36fd 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go @@ -9,12 +9,15 @@ import ( // Result represents a valid response from a store node type Result struct { - started bool - messages []*pb.WakuMessageKeyValue - store *WakuStore - storeRequest *pb.StoreQueryRequest - cursor []byte - peerID peer.ID + noCursor bool + done bool + + messages []*pb.WakuMessageKeyValue + store *WakuStore + storeRequest *pb.StoreQueryRequest + storeResponse *pb.StoreQueryResponse + cursor []byte + peerID peer.ID } func (r *Result) Cursor() []byte { @@ -22,7 +25,7 @@ func (r *Result) Cursor() []byte { } func (r *Result) IsComplete() bool { - return r.cursor == nil + return r.noCursor && r.done } func (r *Result) PeerID() peer.ID { @@ -33,32 +36,32 @@ func (r *Result) Query() *pb.StoreQueryRequest { return r.storeRequest } -func (r *Result) Next(ctx context.Context) (bool, error) { - if !r.started { - r.started = true - return len(r.messages) != 0, nil - } +func (r *Result) Response() *pb.StoreQueryResponse { + return r.storeResponse +} - if r.IsComplete() { - r.cursor = nil +func (r *Result) Next(ctx context.Context) error { + if r.noCursor { + r.done = true r.messages = nil - return false, nil + return nil } newResult, err := r.store.next(ctx, r) if err != nil { - return false, err + return err } r.cursor = newResult.cursor r.messages = newResult.messages - return !r.IsComplete(), nil + if r.cursor == nil { + r.noCursor = true + } + + return nil } func (r *Result) Messages() []*pb.WakuMessageKeyValue { - if !r.started { - return nil - } return r.messages } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go index 936aeecd5..f2ec88706 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscription_details.go @@ -25,10 +25,11 @@ type PeerContentFilter struct { type SubscriptionDetails struct { sync.RWMutex - ID string `json:"subscriptionID"` - mapRef *SubscriptionsMap - Closed bool `json:"-"` - once sync.Once + ID string `json:"subscriptionID"` + mapRef *SubscriptionsMap + Closed bool `json:"-"` + once sync.Once + Closing chan struct{} PeerID peer.ID `json:"peerID"` ContentFilter protocol.ContentFilter `json:"contentFilters"` @@ -96,7 +97,6 @@ func (s *SubscriptionDetails) CloseC() { s.once.Do(func() { s.Lock() defer s.Unlock() - s.Closed = true close(s.C) }) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go index a538912e7..c308d9bba 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/subscription/subscriptions_map.go @@ -75,6 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content PeerID: peerID, C: make(chan *protocol.Envelope, 1024), ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)}, + Closing: make(chan struct{}), } // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair @@ -218,6 +219,30 @@ func (m *SubscriptionsMap) GetSubscriptionsForPeer(peerID peer.ID, contentFilter return output } +func (m *SubscriptionsMap) GetAllSubscriptionsForPeer(peerID peer.ID) []*SubscriptionDetails { + m.RLock() + defer m.RUnlock() + + var output []*SubscriptionDetails + for _, peerSubs := range m.items { + if peerSubs.PeerID == peerID { + for _, subs := range peerSubs.SubsPerPubsubTopic { + for _, subscriptionDetail := range subs { + output = append(output, subscriptionDetail) + } + } + break + } + } + return output +} + +func (m *SubscriptionsMap) GetSubscribedPeers() peer.IDSlice { + m.RLock() + defer m.RUnlock() + return maps.Keys(m.items) +} + func (m *SubscriptionsMap) GetAllSubscriptions() []*SubscriptionDetails { return m.GetSubscriptionsForPeer("", protocol.ContentFilter{}) } diff --git a/vendor/modules.txt b/vendor/modules.txt index be7703f01..5612ecb85 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1023,9 +1023,10 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f +# github.com/waku-org/go-waku v0.8.1-0.20240528125047-269417c5e979 ## explicit; go 1.20 github.com/waku-org/go-waku/logging +github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/waku/persistence github.com/waku-org/go-waku/waku/v2/discv5 github.com/waku-org/go-waku/waku/v2/dnsdisc diff --git a/wakuv2/filter_manager.go b/wakuv2/filter_manager.go index eb661d841..1c58d1f74 100644 --- a/wakuv2/filter_manager.go +++ b/wakuv2/filter_manager.go @@ -28,8 +28,6 @@ const ( FilterEventGetStats ) -const pingTimeout = 10 * time.Second - type FilterSubs map[string]subscription.SubscriptionSet type FilterEvent struct { @@ -76,9 +74,7 @@ func newFilterManager(ctx context.Context, logger *zap.Logger, getFilterFn func( mgr.config = config mgr.node = node mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error { - ctx, cancel := context.WithTimeout(ctx, pingTimeout) - defer cancel() - return mgr.node.FilterLightnode().IsSubscriptionAlive(ctx, sub) + return nil } return mgr