chore_: bump go-waku (#5269)

This commit is contained in:
kaichao 2024-06-03 21:19:18 +08:00 committed by GitHub
parent b38a9f5878
commit 1c116589cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 1118 additions and 139 deletions

2
go.mod
View File

@ -93,7 +93,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0 github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2 github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f github.com/waku-org/go-waku v0.8.1-0.20240528125047-269417c5e979
github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2138,8 +2138,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f h1:KiDqcxmCi74BGDZzkGT5T83QhEL/rPrUbEiJWOuiuU4= github.com/waku-org/go-waku v0.8.1-0.20240528125047-269417c5e979 h1:31upWxN0XWBA+bcS0aCJ9jR8HzBNtCg8zKAj+Jvre08=
github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f/go.mod h1:yXnWChXRKTb+NhALbFysluxgSwuxeTF2rhanDJkIx+k= github.com/waku-org/go-waku v0.8.1-0.20240528125047-269417c5e979/go.mod h1:yXnWChXRKTb+NhALbFysluxgSwuxeTF2rhanDJkIx+k=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

2
vendor/github.com/waku-org/go-waku/tests/init.go generated vendored Normal file
View File

@ -0,0 +1,2 @@
// Contains resources or utils for test units
package tests

439
vendor/github.com/waku-org/go-waku/tests/utils.go generated vendored Normal file
View File

@ -0,0 +1,439 @@
package tests
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math"
"math/big"
"net"
"net/url"
"strconv"
"strings"
"sync"
"testing"
"time"
"unicode/utf8"
"github.com/waku-org/go-waku/waku/v2/protocol"
gcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/crypto"
libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/peerstore"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
type StringGenerator func(maxLength int) (string, error)
// GetHostAddress returns the first listen address used by a host
func GetHostAddress(ha host.Host) multiaddr.Multiaddr {
return ha.Addrs()[0]
}
// Returns a full multiaddr of host appended by peerID
func GetAddr(h host.Host) multiaddr.Multiaddr {
id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().String()))
var selectedAddr multiaddr.Multiaddr
//For now skipping circuit relay addresses as libp2p seems to be returning empty p2p-circuit addresses.
for _, addr := range h.Network().ListenAddresses() {
if strings.Contains(addr.String(), "p2p-circuit") {
continue
}
selectedAddr = addr
break
}
return selectedAddr.Encapsulate(id)
}
// FindFreePort returns an available port number
func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) {
t.Helper()
if host == "" {
host = "localhost"
}
for i := 0; i < maxAttempts; i++ {
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(host, "0"))
if err != nil {
t.Logf("unable to resolve tcp addr: %v", err)
continue
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
l.Close()
t.Logf("unable to listen on addr %q: %v", addr, err)
continue
}
port := l.Addr().(*net.TCPAddr).Port
l.Close()
return port, nil
}
return 0, fmt.Errorf("no free port found")
}
// FindFreePort returns an available port number
func FindFreeUDPPort(t *testing.T, host string, maxAttempts int) (int, error) {
t.Helper()
if host == "" {
host = "localhost"
}
for i := 0; i < maxAttempts; i++ {
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, "0"))
if err != nil {
t.Logf("unable to resolve tcp addr: %v", err)
continue
}
l, err := net.ListenUDP("udp", addr)
if err != nil {
l.Close()
t.Logf("unable to listen on addr %q: %v", addr, err)
continue
}
port := l.LocalAddr().(*net.UDPAddr).Port
l.Close()
return port, nil
}
return 0, fmt.Errorf("no free port found")
}
// MakeHost creates a Libp2p host with a random key on a specific port
func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, error) {
// Creates a new RSA key pair for this host.
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, randomness)
if err != nil {
log.Error(err.Error())
return nil, err
}
// 0.0.0.0 will listen on any interface device.
sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))
if err != nil {
return nil, err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}
psWrapper := peerstore.NewWakuPeerstore(ps)
if err != nil {
return nil, err
}
// libp2p.New constructs a new libp2p Host.
// Other options can be added here.
return libp2p.New(
libp2p.Peerstore(psWrapper),
libp2p.ListenAddrs(sourceMultiAddr),
libp2p.Identity(prvKey),
)
}
// CreateWakuMessage creates a WakuMessage protobuffer with default values and a custom contenttopic and timestamp
func CreateWakuMessage(contentTopic string, timestamp *int64, optionalPayload ...string) *pb.WakuMessage {
var payload []byte
if len(optionalPayload) > 0 {
payload = []byte(optionalPayload[0])
} else {
payload = []byte{1, 2, 3}
}
return &pb.WakuMessage{Payload: payload, ContentTopic: contentTopic, Timestamp: timestamp}
}
// RandomHex returns a random hex string of n bytes
func RandomHex(n int) (string, error) {
bytes := make([]byte, n)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
func NewLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags wenr.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) {
db, err := enode.OpenDB("")
if err != nil {
return nil, err
}
localnode := enode.NewLocalNode(db, priv)
localnode.SetFallbackUDP(udpPort)
localnode.Set(enr.WithEntry(wenr.WakuENRField, wakuFlags))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
localnode.SetStaticIP(ipAddr.IP)
if udpPort > 0 && udpPort <= math.MaxUint16 {
localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("setting udpPort", zap.Int("port", udpPort))
}
if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 {
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("setting tcpPort", zap.Int("port", ipAddr.Port))
}
if advertiseAddr != nil {
localnode.SetStaticIP(*advertiseAddr)
}
return localnode, nil
}
func CreateHost(t *testing.T, opts ...config.Option) (host.Host, int, *ecdsa.PrivateKey) {
privKey, err := gcrypto.GenerateKey()
require.NoError(t, err)
sPrivKey := libp2pcrypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(privKey))
port, err := FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err)
sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))
require.NoError(t, err)
opts = append(opts, libp2p.ListenAddrs(sourceMultiAddr),
libp2p.Identity(sPrivKey))
host, err := libp2p.New(opts...)
require.NoError(t, err)
return host, port, privKey
}
func ExtractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) {
ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
return nil, err
}
portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
}, nil
}
func RandomInt(min, max int) (int, error) {
n, err := rand.Int(rand.Reader, big.NewInt(int64(max-min+1)))
if err != nil {
return 0, err
}
return min + int(n.Int64()), nil
}
func RandomBytes(n int) ([]byte, error) {
b := make([]byte, n)
_, err := rand.Read(b)
if err != nil {
return nil, err
}
return b, nil
}
func GenerateRandomASCIIString(maxLength int) (string, error) {
length, err := rand.Int(rand.Reader, big.NewInt(int64(maxLength)))
if err != nil {
return "", err
}
length.SetInt64(length.Int64() + 1)
const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result := make([]byte, length.Int64())
for i := range result {
num, err := rand.Int(rand.Reader, big.NewInt(int64(len(chars))))
if err != nil {
return "", err
}
result[i] = chars[num.Int64()]
}
return string(result), nil
}
func GenerateRandomUTF8String(maxLength int) (string, error) {
length, err := rand.Int(rand.Reader, big.NewInt(int64(maxLength)))
if err != nil {
return "", err
}
length.SetInt64(length.Int64() + 1)
var (
runes []rune
start, end int
)
// Define unicode range
start = 0x0020 // Space character
end = 0x007F // Tilde (~)
for i := 0; int64(i) < length.Int64(); i++ {
randNum, err := rand.Int(rand.Reader, big.NewInt(int64(end-start+1)))
if err != nil {
return "", err
}
char := rune(start + int(randNum.Int64()))
if !utf8.ValidRune(char) {
continue
}
runes = append(runes, char)
}
return string(runes), nil
}
func GenerateRandomJSONString(maxLength int) (string, error) {
// With 5 key-value pairs
m := make(map[string]interface{})
for i := 0; i < 5; i++ {
key, err := GenerateRandomASCIIString(20)
if err != nil {
return "", err
}
value, err := GenerateRandomASCIIString(maxLength)
if err != nil {
return "", err
}
m[key] = value
}
// Marshal the map into a JSON string
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
encoder.SetEscapeHTML(false)
err := encoder.Encode(m)
if err != nil {
return "", err
}
return buf.String(), nil
}
func GenerateRandomBase64String(maxLength int) (string, error) {
bytes, err := RandomBytes(maxLength)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(bytes), nil
}
func GenerateRandomURLEncodedString(maxLength int) (string, error) {
randomString, err := GenerateRandomASCIIString(maxLength)
if err != nil {
return "", err
}
// URL-encode the random string
return url.QueryEscape(randomString), nil
}
func GenerateRandomSQLInsert(maxLength int) (string, error) {
// Random table name
tableName, err := GenerateRandomASCIIString(10)
if err != nil {
return "", err
}
// Random column names
columnCount, err := RandomInt(3, 6)
if err != nil {
return "", err
}
columnNames := make([]string, columnCount)
for i := 0; i < columnCount; i++ {
columnName, err := GenerateRandomASCIIString(maxLength)
if err != nil {
return "", err
}
columnNames[i] = columnName
}
// Random values
values := make([]string, columnCount)
for i := 0; i < columnCount; i++ {
value, err := GenerateRandomASCIIString(maxLength)
if err != nil {
return "", err
}
values[i] = "'" + value + "'"
}
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s);",
tableName,
strings.Join(columnNames, ", "),
strings.Join(values, ", "))
return query, nil
}
func WaitForMsg(t *testing.T, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
wg.Add(1)
log := utils.Logger()
go func() {
defer wg.Done()
select {
case env := <-ch:
msg := env.Message()
log.Info("Received ", zap.String("msg", msg.String()))
case <-time.After(timeout):
require.Fail(t, "Message timeout")
}
}()
wg.Wait()
}
func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
wg.Add(1)
go func() {
defer wg.Done()
select {
case _, ok := <-ch:
require.False(t, ok, "should not retrieve message")
case <-time.After(timeout):
// All good
case <-ctx.Done():
require.Fail(t, "test exceeded allocated time")
}
}()
wg.Wait()
}

View File

@ -291,7 +291,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...)
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
@ -844,7 +844,7 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
Protocols: protocols, Protocols: protocols,
Connected: connected, Connected: connected,
Addrs: addrs, Addrs: addrs,
PubsubTopics: topics, PubsubTopics: maps.Keys(topics),
}) })
} }
return peers, nil return peers, nil

View File

@ -28,6 +28,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/waku-org/go-waku/waku/v2/rendezvous"
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
@ -77,6 +78,7 @@ type WakuNodeParameters struct {
enableFilterFullNode bool enableFilterFullNode bool
filterOpts []filter.Option filterOpts []filter.Option
pubsubOpts []pubsub.Option pubsubOpts []pubsub.Option
lightpushOpts []lightpush.Option
minRelayPeersToPublish int minRelayPeersToPublish int
maxMsgSizeBytes int maxMsgSizeBytes int
@ -458,9 +460,10 @@ func WithMessageProvider(s legacy_store.MessageProvider) WakuNodeOption {
} }
// WithLightPush is a WakuNodeOption that enables the lightpush protocol // WithLightPush is a WakuNodeOption that enables the lightpush protocol
func WithLightPush() WakuNodeOption { func WithLightPush(lightpushOpts ...lightpush.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableLightPush = true params.enableLightPush = true
params.lightpushOpts = lightpushOpts
return nil return nil
} }
} }

View File

@ -277,10 +277,10 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee
//Need to filter peers to check if they support relay //Need to filter peers to check if they support relay
if inPeers.Len() != 0 { if inPeers.Len() != 0 {
inRelayPeers, _ = pm.FilterPeersByProto(inPeers, relay.WakuRelayID_v200) inRelayPeers, _ = pm.FilterPeersByProto(inPeers, nil, relay.WakuRelayID_v200)
} }
if outPeers.Len() != 0 { if outPeers.Len() != 0 {
outRelayPeers, _ = pm.FilterPeersByProto(outPeers, relay.WakuRelayID_v200) outRelayPeers, _ = pm.FilterPeersByProto(outPeers, nil, relay.WakuRelayID_v200)
} }
return return
} }

View File

@ -2,6 +2,7 @@ package peermanager
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
@ -12,7 +13,16 @@ import (
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
) )
type peerSet map[peer.ID]struct{} type PeerSet map[peer.ID]struct{}
func PeerInSet(peers PeerSet, peer peer.ID) bool {
if len(peers) > 0 {
if _, ok := peers[peer]; ok {
return true
}
}
return false
}
// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic. // SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic.
// If a list of specific peers is passed, the peer will be chosen from that list assuming // If a list of specific peers is passed, the peer will be chosen from that list assuming
@ -54,17 +64,18 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err)) zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
return nil, err return nil, err
} else if len(peerIDs) == 0 { } else if len(peerIDs) == 0 {
peerIDs = make(peerSet) peerIDs = make(PeerSet)
} }
// if not found in serviceSlots or proto == WakuRelayIDv200 // if not found in serviceSlots or proto == WakuRelayIDv200
filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(criteria.PubsubTopics) > 0 { if len(criteria.PubsubTopics) > 0 {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...) filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...)
} }
randomPeers, err := selectRandomPeers(filteredPeers, criteria.MaxPeers-len(peerIDs)) //Not passing excludePeers as filterPeers are already considering excluded ones.
randomPeers, err := selectRandomPeers(filteredPeers, nil, criteria.MaxPeers-len(peerIDs))
if err != nil && len(peerIDs) == 0 { if err != nil && len(peerIDs) == 0 {
return nil, err return nil, err
} }
@ -75,10 +86,13 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
return maps.Keys(peerIDs), nil return maps.Keys(peerIDs), nil
} }
func getRandom(filter peerSet, count int) (peerSet, error) { func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) {
i := 0 i := 0
selectedPeers := make(peerSet) selectedPeers := make(PeerSet)
for pID := range filter { for pID := range filter {
if PeerInSet(excludePeers, pID) {
continue
}
//Map's iterator in golang works using randomness and hence not random function is being used. //Map's iterator in golang works using randomness and hence not random function is being used.
selectedPeers[pID] = struct{}{} selectedPeers[pID] = struct{}{}
i++ i++
@ -93,34 +107,37 @@ func getRandom(filter peerSet, count int) (peerSet, error) {
} }
// selects count random peers from list of peers // selects count random peers from list of peers
func selectRandomPeers(peers peer.IDSlice, count int) (peerSet, error) { func selectRandomPeers(peers peer.IDSlice, excludePeers PeerSet, count int) (PeerSet, error) {
filteredPeerMap := peerSliceToMap(peers) filteredPeerMap := PeerSliceToMap(peers)
return getRandom(filteredPeerMap, count) return getRandom(filteredPeerMap, count, excludePeers)
} }
func peerSliceToMap(peers peer.IDSlice) peerSet { func PeerSliceToMap(peers peer.IDSlice) PeerSet {
peerSet := make(peerSet, peers.Len()) peerSet := make(PeerSet, peers.Len())
for _, peer := range peers { for _, peer := range peers {
peerSet[peer] = struct{}{} peerSet[peer] = struct{}{}
} }
return peerSet return peerSet
} }
func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (peerSet, error) { func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSet, error) {
peers := make(peerSet) peers := make(PeerSet)
var err error var err error
for retryCnt := 0; retryCnt < 1; retryCnt++ { for retryCnt := 0; retryCnt < 1; retryCnt++ {
//Try to fetch from serviceSlot //Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil { if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil {
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
return slot.getRandom(criteria.MaxPeers) return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
} else { //PubsubTopic based selection } else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m)) keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m { for i := range slot.m {
if PeerInSet(criteria.ExcludePeers, i) {
continue
}
keys = append(keys, i) keys = append(keys, i)
} }
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...) selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.MaxPeers) tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
for tmpPeer := range tmpPeers { for tmpPeer := range tmpPeers {
peers[tmpPeer] = struct{}{} peers[tmpPeer] = struct{}{}
} }
@ -145,12 +162,21 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (peerSe
// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. // PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers.
type PeerSelectionCriteria struct { type PeerSelectionCriteria struct {
SelectionType PeerSelection SelectionType PeerSelection `json:"selectionType"`
Proto protocol.ID Proto protocol.ID `json:"protocolId"`
PubsubTopics []string PubsubTopics []string `json:"pubsubTopics"`
SpecificPeers peer.IDSlice SpecificPeers peer.IDSlice `json:"specificPeers"`
MaxPeers int MaxPeers int `json:"maxPeerCount"`
Ctx context.Context Ctx context.Context `json:"-"`
ExcludePeers PeerSet `json:"excludePeers"`
}
func (psc PeerSelectionCriteria) String() string {
pscJson, err := json.Marshal(psc)
if err != nil {
return ""
}
return string(pscJson)
} }
// SelectPeers selects a peer based on selectionType specified. // SelectPeers selects a peer based on selectionType specified.
@ -159,6 +185,12 @@ func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice
if criteria.MaxPeers == 0 { if criteria.MaxPeers == 0 {
criteria.MaxPeers = 1 criteria.MaxPeers = 1
} }
excPeers := maps.Keys(criteria.ExcludePeers)
var excPeer peer.ID
if len(excPeers) > 0 {
excPeer = excPeers[0]
}
pm.logger.Debug("Select Peers", zap.Stringer("selectionCriteria", criteria), zap.Stringer("excludedPeers", excPeer))
switch criteria.SelectionType { switch criteria.SelectionType {
case Automatic: case Automatic:
return pm.SelectRandom(criteria) return pm.SelectRandom(criteria)
@ -191,7 +223,7 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...) peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...)
} }
peers, err = pm.FilterPeersByProto(peers, criteria.Proto) peers, err = pm.FilterPeersByProto(peers, criteria.ExcludePeers, criteria.Proto)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -201,20 +233,22 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
// FilterPeersByProto filters list of peers that support specified protocols. // FilterPeersByProto filters list of peers that support specified protocols.
// If specificPeers is nil, all peers in the host's peerStore are considered for filtering. // If specificPeers is nil, all peers in the host's peerStore are considered for filtering.
func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, excludePeers PeerSet, proto ...protocol.ID) (peer.IDSlice, error) {
peerSet := specificPeers peerSet := specificPeers
if len(peerSet) == 0 { if len(peerSet) == 0 {
peerSet = pm.host.Peerstore().Peers() peerSet = pm.host.Peerstore().Peers()
} }
var peers peer.IDSlice var peers peer.IDSlice
for _, peer := range peerSet { for _, peer := range peerSet {
protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...) protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(protocols) > 0 { if len(protocols) > 0 {
//Maybe we can optimize below set of statements a better way??
if PeerInSet(excludePeers, peer) {
continue
}
peers = append(peers, peer) peers = append(peers, peer)
} }
} }

View File

@ -19,10 +19,10 @@ func newPeerMap() *peerMap {
} }
} }
func (pm *peerMap) getRandom(count int) (peerSet, error) { func (pm *peerMap) getRandom(count int, excludePeers PeerSet) (PeerSet, error) {
pm.mu.RLock() pm.mu.RLock()
defer pm.mu.RUnlock() defer pm.mu.RUnlock()
return getRandom(pm.m, count) return getRandom(pm.m, count, excludePeers)
} }
func (pm *peerMap) remove(pID peer.ID) { func (pm *peerMap) remove(pID peer.ID) {

View File

@ -13,6 +13,7 @@ import (
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps"
) )
func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error { func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error {
@ -103,7 +104,7 @@ func (pm *PeerManager) handleNewRelayTopicUnSubscription(pubsubTopic string) {
logging.HostID("peerID", peer)) logging.HostID("peerID", peer))
continue continue
} }
if len(peerTopics) == 1 && peerTopics[0] == pubsubTopic { if len(peerTopics) == 1 && maps.Keys(peerTopics)[0] == pubsubTopic {
err := pm.host.Network().ClosePeer(peer) err := pm.host.Network().ClosePeer(peer)
if err != nil { if err != nil {
pm.logger.Warn("Failed to disconnect connection towards peer", pm.logger.Warn("Failed to disconnect connection towards peer",

View File

@ -8,6 +8,8 @@ import (
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"golang.org/x/exp/maps"
) )
// Origin is used to determine how the peer is identified, // Origin is used to determine how the peer is identified,
@ -58,7 +60,7 @@ type WakuPeerstore interface {
AddPubSubTopic(p peer.ID, topic string) error AddPubSubTopic(p peer.ID, topic string) error
RemovePubSubTopic(p peer.ID, topic string) error RemovePubSubTopic(p peer.ID, topic string) error
PubSubTopics(p peer.ID) ([]string, error) PubSubTopics(p peer.ID) (protocol.TopicSet, error)
SetPubSubTopics(p peer.ID, topics []string) error SetPubSubTopics(p peer.ID, topics []string) error
PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice
PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice
@ -175,13 +177,12 @@ func (ps *WakuPeerstoreImpl) AddPubSubTopic(p peer.ID, topic string) error {
if err != nil { if err != nil {
return err return err
} }
for _, t := range existingTopics {
if t == topic { if _, found := existingTopics[topic]; found {
return nil return nil
} }
} existingTopics[topic] = struct{}{}
existingTopics = append(existingTopics, topic) return ps.peerStore.Put(p, peerPubSubTopics, maps.Keys(existingTopics))
return ps.peerStore.Put(p, peerPubSubTopics, existingTopics)
} }
// RemovePubSubTopic removes a pubSubTopic from the peer // RemovePubSubTopic removes a pubSubTopic from the peer
@ -195,14 +196,9 @@ func (ps *WakuPeerstoreImpl) RemovePubSubTopic(p peer.ID, topic string) error {
return nil return nil
} }
for i := range existingTopics { delete(existingTopics, topic)
if existingTopics[i] == topic {
existingTopics = append(existingTopics[:i], existingTopics[i+1:]...)
break
}
}
err = ps.SetPubSubTopics(p, existingTopics) err = ps.SetPubSubTopics(p, maps.Keys(existingTopics))
if err != nil { if err != nil {
return err return err
} }
@ -215,16 +211,16 @@ func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error {
} }
// PubSubTopics fetches list of pubSubTopics for a peer // PubSubTopics fetches list of pubSubTopics for a peer
func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) { func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) (protocol.TopicSet, error) {
result, err := ps.peerStore.Get(p, peerPubSubTopics) result, err := ps.peerStore.Get(p, peerPubSubTopics)
if err != nil { if err != nil {
if errors.Is(err, peerstore.ErrNotFound) { if errors.Is(err, peerstore.ErrNotFound) {
return nil, nil return protocol.NewTopicSet(), nil
} else { } else {
return nil, err return nil, err
} }
} }
return result.([]string), nil return protocol.NewTopicSet((result.([]string))...), nil
} }
// PeersByPubSubTopic Returns list of peers that support list of pubSubTopics // PeersByPubSubTopic Returns list of peers that support list of pubSubTopics
@ -235,22 +231,16 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific
} }
var result peer.IDSlice var result peer.IDSlice
for _, p := range specificPeers { for _, p := range specificPeers {
topics, err := ps.PubSubTopics(p) peerMatch := true
peerTopics, err := ps.PubSubTopics(p)
if err == nil { if err == nil {
//Convoluted and crazy logic to find subset of topics for _, t := range pubSubTopics {
// Could not find a better way to do it? if _, ok := peerTopics[t]; !ok {
peerTopicMap := make(map[string]struct{}) peerMatch = false
match := true
for _, topic := range topics {
peerTopicMap[topic] = struct{}{}
}
for _, topic := range pubSubTopics {
if _, ok := peerTopicMap[topic]; !ok {
match = false
break break
} }
} }
if match { if peerMatch {
result = append(result, p) result = append(result, p)
} }
} //Note: skipping a peer in case of an error as there would be others available. } //Note: skipping a peer in case of an error as there would be others available.
@ -268,7 +258,7 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeer
for _, p := range specificPeers { for _, p := range specificPeers {
topics, err := ps.PubSubTopics(p) topics, err := ps.PubSubTopics(p)
if err == nil { if err == nil {
for _, topic := range topics { for topic := range topics {
if topic == pubSubTopic { if topic == pubSubTopic {
result = append(result, p) result = append(result, p)
} }

View File

@ -1,11 +1,22 @@
package protocol package protocol
import "golang.org/x/exp/maps" import (
"golang.org/x/exp/maps"
)
type PubsubTopicStr = string type PubsubTopicStr = string
type ContentTopicStr = string type ContentTopicStr = string
type ContentTopicSet map[string]struct{} type ContentTopicSet map[string]struct{}
type TopicSet map[string]struct{}
func NewTopicSet(topics ...string) TopicSet {
s := make(TopicSet, len(topics))
for _, t := range topics {
s[t] = struct{}{}
}
return s
}
func NewContentTopicSet(contentTopics ...string) ContentTopicSet { func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
s := make(ContentTopicSet, len(contentTopics)) s := make(ContentTopicSet, len(contentTopics))
@ -28,6 +39,16 @@ type ContentFilter struct {
ContentTopics ContentTopicSet `json:"contentTopics"` ContentTopics ContentTopicSet `json:"contentTopics"`
} }
func (cf ContentFilter) String() string {
var ret string
ret += "{ pubsubTopic: " + cf.PubsubTopic + ", contentTopics: [ "
for ct := range cf.ContentTopics {
ret += ct
}
ret += " ] }"
return ret
}
func (cf ContentFilter) ContentTopicsList() []string { func (cf ContentFilter) ContentTopicsList() []string {
return cf.ContentTopics.ToList() return cf.ContentTopics.ToList()
} }

View File

@ -9,6 +9,7 @@ import (
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
"time"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
@ -50,6 +51,7 @@ type WakuFilterLightNode struct {
log *zap.Logger log *zap.Logger
subscriptions *subscription.SubscriptionsMap subscriptions *subscription.SubscriptionsMap
pm *peermanager.PeerManager pm *peermanager.PeerManager
peerPingInterval time.Duration
} }
type WakuFilterPushError struct { type WakuFilterPushError struct {
@ -86,7 +88,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM
wf.pm = pm wf.pm = pm
wf.CommonService = service.NewCommonService() wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg) wf.metrics = newMetrics(reg)
wf.peerPingInterval = 5 * time.Second
return wf return wf
} }
@ -97,13 +99,15 @@ func (wf *WakuFilterLightNode) SetHost(h host.Host) {
func (wf *WakuFilterLightNode) Start(ctx context.Context) error { func (wf *WakuFilterLightNode) Start(ctx context.Context) error {
return wf.CommonService.Start(ctx, wf.start) return wf.CommonService.Start(ctx, wf.start)
} }
func (wf *WakuFilterLightNode) start() error { func (wf *WakuFilterLightNode) start() error {
wf.subscriptions = subscription.NewSubscriptionMap(wf.log) wf.subscriptions = subscription.NewSubscriptionMap(wf.log)
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(wf.Context())) wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(wf.Context()))
//Start Filter liveness check
wf.CommonService.WaitGroup().Add(1)
go wf.FilterHealthCheckLoop()
wf.log.Info("filter-push protocol started") wf.log.Info("filter-push protocol started")
return nil return nil
} }
@ -313,24 +317,29 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
wf.pm.Connect(pData) wf.pm.Connect(pData)
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
} }
if params.pm != nil { reqPeerCount := params.maxPeers - len(params.selectedPeers)
peerCount := params.maxPeers - len(params.selectedPeers) if params.pm != nil && reqPeerCount > 0 {
wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
params.selectedPeers, err = wf.pm.SelectPeers( params.selectedPeers, err = wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{ peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType, SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1, Proto: FilterSubscribeID_v20beta1,
PubsubTopics: maps.Keys(pubSubTopicMap), PubsubTopics: maps.Keys(pubSubTopicMap),
SpecificPeers: params.preferredPeers, SpecificPeers: params.preferredPeers,
MaxPeers: peerCount, MaxPeers: reqPeerCount,
Ctx: ctx, Ctx: ctx,
ExcludePeers: params.peersToExclude,
}, },
) )
if err != nil { if err != nil {
wf.log.Error("peer selection returned err", zap.Error(err))
return nil, nil, err return nil, nil, err
} }
} }
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))
return params, pubSubTopicMap, nil return params, pubSubTopicMap, nil
} }
@ -354,7 +363,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
subscriptions := make([]*subscription.SubscriptionDetails, 0) subscriptions := make([]*subscription.SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap { for pubSubTopic, cTopics := range pubSubTopicMap {
var selectedPeers peer.IDSlice var selectedPeers peer.IDSlice
wf.log.Debug("peer selection", zap.Int("params.maxPeers", params.maxPeers))
if params.pm != nil && len(params.selectedPeers) < params.maxPeers { if params.pm != nil && len(params.selectedPeers) < params.maxPeers {
wf.log.Debug("selected peers less than maxPeers", zap.Int("maxpPeers", params.maxPeers))
selectedPeers, err = wf.pm.SelectPeers( selectedPeers, err = wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{ peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType, SelectionType: params.peerSelectionType,
@ -363,6 +375,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
SpecificPeers: params.preferredPeers, SpecificPeers: params.preferredPeers,
MaxPeers: params.maxPeers - params.selectedPeers.Len(), MaxPeers: params.maxPeers - params.selectedPeers.Len(),
Ctx: ctx, Ctx: ctx,
ExcludePeers: params.peersToExclude,
}, },
) )
} else { } else {
@ -375,7 +388,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
failedContentTopics = append(failedContentTopics, cTopics...) failedContentTopics = append(failedContentTopics, cTopics...)
continue continue
} }
var cFilter protocol.ContentFilter var cFilter protocol.ContentFilter
cFilter.PubsubTopic = pubSubTopic cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...) cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...)
@ -395,6 +407,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
failedContentTopics = append(failedContentTopics, cTopics...) failedContentTopics = append(failedContentTopics, cTopics...)
continue continue
} }
wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", peer))
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter)) subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter))
} }
} }
@ -457,16 +470,6 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ..
peerID) peerID)
} }
func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *subscription.SubscriptionDetails) error {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
return err
}
return wf.Ping(ctx, subscription.PeerID)
}
// Unsubscribe is used to stop receiving messages from specified peers for the content filter // Unsubscribe is used to stop receiving messages from specified peers for the content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock() wf.RLock()

View File

@ -0,0 +1,48 @@
package filter
import (
"context"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"
)
func (wf *WakuFilterLightNode) PingPeers() {
//Send a ping to all the peers and report their status to corresponding subscriptions
// Alive or not or set state of subcription??
for _, peer := range wf.subscriptions.GetSubscribedPeers() {
go wf.PingPeer(peer)
}
}
func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), wf.peerPingInterval)
defer cancel()
err := wf.Ping(ctxWithTimeout, peer)
if err != nil {
wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err))
subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer)
for _, subscription := range subscriptions {
wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID))
//Indicating that subscription is closing,
close(subscription.Closing)
}
}
}
func (wf *WakuFilterLightNode) FilterHealthCheckLoop() {
defer wf.WaitGroup().Done()
ticker := time.NewTicker(wf.peerPingInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
wf.PingPeers()
case <-wf.CommonService.Context().Done():
return
}
}
}

View File

@ -39,6 +39,7 @@ type (
peerAddr multiaddr.Multiaddr peerAddr multiaddr.Multiaddr
peerSelectionType peermanager.PeerSelection peerSelectionType peermanager.PeerSelection
preferredPeers peer.IDSlice preferredPeers peer.IDSlice
peersToExclude peermanager.PeerSet
maxPeers int maxPeers int
requestID []byte requestID []byte
log *zap.Logger log *zap.Logger
@ -101,6 +102,14 @@ func WithMaxPeersPerContentFilter(numPeers int) FilterSubscribeOption {
} }
} }
// WithPeersToExclude option excludes the peers that are specified from selection
func WithPeersToExclude(peers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
params.peersToExclude = peermanager.PeerSliceToMap(peers)
return nil
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store. // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store.
// If a list of specific peers is passed, the peer will be chosen from that list assuming it // If a list of specific peers is passed, the peer will be chosen from that list assuming it
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore // supports the chosen protocol, otherwise it will chose a peer from the node peerstore
@ -145,6 +154,7 @@ func DefaultSubscriptionOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{ return []FilterSubscribeOption{
WithAutomaticPeerSelection(), WithAutomaticPeerSelection(),
WithAutomaticRequestID(), WithAutomaticRequestID(),
WithMaxPeersPerContentFilter(1),
} }
} }

View File

@ -128,7 +128,7 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream
wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start)) wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start))
logger.Info("received request", zap.String("requestType", subscribeRequest.FilterSubscribeType.String())) logger.Info("received request", zap.Stringer("serverID", wf.h.ID()), zap.Stringer("requestType", subscribeRequest.FilterSubscribeType))
} }
} }

View File

@ -0,0 +1,394 @@
package filter
import (
"context"
"crypto/rand"
"fmt"
"strconv"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/peermanager"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
type LightNodeData struct {
LightNode *WakuFilterLightNode
LightNodeHost host.Host
}
type FullNodeData struct {
relayNode *relay.WakuRelay
RelaySub *relay.Subscription
FullNodeHost host.Host
Broadcaster relay.Broadcaster
FullNode *WakuFilterFullNode
}
type FilterTestSuite struct {
suite.Suite
LightNodeData
FullNodeData
TestTopic string
TestContentTopic string
ctx context.Context
ctxCancel context.CancelFunc
wg *sync.WaitGroup
contentFilter protocol.ContentFilter
subDetails []*subscription.SubscriptionDetails
Log *zap.Logger
}
const DefaultTestPubSubTopic = "/waku/2/go/filter/test"
const DefaultTestContentTopic = "/test/10/my-app"
type WakuMsg struct {
PubSubTopic string
ContentTopic string
Payload string
}
func (s *FilterTestSuite) SetupTest() {
log := utils.Logger() //.Named("filterv2-test")
s.Log = log
s.Log.Info("SetupTest()")
// Use a pointer to WaitGroup so that to avoid copying
// https://pkg.go.dev/sync#WaitGroup
s.wg = &sync.WaitGroup{}
// Create test context
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
s.ctx = ctx
s.ctxCancel = cancel
s.TestTopic = DefaultTestPubSubTopic
s.TestContentTopic = DefaultTestContentTopic
s.MakeWakuFilterLightNode()
s.LightNode.peerPingInterval = 1 * time.Second
s.StartLightNode()
//TODO: Add tests to verify broadcaster.
s.MakeWakuFilterFullNode(s.TestTopic, false)
s.ConnectToFullNode(s.LightNode, s.FullNode)
}
func (s *FilterTestSuite) TearDownTest() {
s.FullNode.Stop()
s.LightNode.Stop()
s.RelaySub.Unsubscribe()
s.LightNode.Stop()
s.ctxCancel()
}
func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) {
mAddr := tests.GetAddr(h2.h)
_, err := h1.pm.AddPeer(mAddr, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1)
s.Log.Info("add peer", zap.Stringer("mAddr", mAddr))
s.Require().NoError(err)
}
func (s *FilterTestSuite) GetWakuRelay(topic string) FullNodeData {
broadcaster := relay.NewBroadcaster(10)
s.Require().NoError(broadcaster.Start(context.Background()))
port, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, relay.WithMaxMsgSize(1024*1024))
relay.SetHost(host)
err = relay.Start(context.Background())
s.Require().NoError(err)
sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic))
s.Require().NoError(err)
return FullNodeData{relay, sub[0], host, broadcaster, nil}
}
func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bool) FullNodeData {
nodeData := s.GetWakuRelay(topic)
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
node2Filter.SetHost(nodeData.FullNodeHost)
var sub *relay.Subscription
if withRegisterAll {
sub = nodeData.Broadcaster.RegisterForAll()
} else {
sub = nodeData.Broadcaster.Register(protocol.NewContentFilter(topic))
}
err := node2Filter.Start(s.ctx, sub)
s.Require().NoError(err)
nodeData.FullNode = node2Filter
return nodeData
}
func (s *FilterTestSuite) MakeWakuFilterFullNode(topic string, withRegisterAll bool) {
nodeData := s.GetWakuFilterFullNode(topic, withRegisterAll)
s.FullNodeData = nodeData
}
func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
port, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
b := relay.NewBroadcaster(10)
s.Require().NoError(b.Start(context.Background()))
pm := peermanager.NewPeerManager(5, 5, nil, s.Log)
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
filterPush.SetHost(host)
pm.SetHost(host)
return LightNodeData{filterPush, host}
}
func (s *FilterTestSuite) MakeWakuFilterLightNode() {
s.LightNodeData = s.GetWakuFilterLightNode()
}
func (s *FilterTestSuite) StartLightNode() {
err := s.LightNode.Start(context.Background())
s.Require().NoError(err)
}
func (s *FilterTestSuite) waitForMsg(msg *WakuMsg) {
s.waitForMsgFromChan(msg, s.subDetails[0].C)
}
func (s *FilterTestSuite) waitForMsgFromChan(msg *WakuMsg, ch chan *protocol.Envelope) {
s.wg.Add(1)
var msgFound = false
go func() {
defer s.wg.Done()
select {
case env := <-ch:
for _, topic := range s.contentFilter.ContentTopicsList() {
if topic == env.Message().GetContentTopic() {
msgFound = true
}
}
s.Require().True(msgFound)
case <-time.After(1 * time.Second):
s.Require().Fail("Message timeout")
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
}
}()
if msg != nil {
s.PublishMsg(msg)
}
s.wg.Wait()
}
func matchOneOfManyMsg(one WakuMsg, many []WakuMsg) bool {
for _, m := range many {
if m.PubSubTopic == one.PubSubTopic &&
m.ContentTopic == one.ContentTopic &&
m.Payload == one.Payload {
return true
}
}
return false
}
func (s *FilterTestSuite) waitForMessages(msgs []WakuMsg) {
s.wg.Add(1)
msgCount := len(msgs)
found := 0
subs := s.subDetails
s.Log.Info("Expected messages ", zap.String("count", strconv.Itoa(msgCount)))
s.Log.Info("Existing subscriptions ", zap.String("count", strconv.Itoa(len(subs))))
go func() {
defer s.wg.Done()
for _, sub := range subs {
s.Log.Info("Looking at ", zap.String("pubSubTopic", sub.ContentFilter.PubsubTopic))
for i := 0; i < msgCount; i++ {
select {
case env, ok := <-sub.C:
if !ok {
continue
}
received := WakuMsg{
PubSubTopic: env.PubsubTopic(),
ContentTopic: env.Message().GetContentTopic(),
Payload: string(env.Message().GetPayload()),
}
s.Log.Debug("received message ", zap.String("pubSubTopic", received.PubSubTopic), zap.String("contentTopic", received.ContentTopic), zap.String("payload", received.Payload))
if matchOneOfManyMsg(received, msgs) {
found++
}
case <-time.After(3 * time.Second):
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
}
}
}
}()
if msgs != nil {
s.publishMessages(msgs)
}
s.wg.Wait()
s.Require().Equal(msgCount, found)
}
func (s *FilterTestSuite) waitForTimeout(msg *WakuMsg) {
s.waitForTimeoutFromChan(msg, s.subDetails[0].C)
}
func (s *FilterTestSuite) waitForTimeoutFromChan(msg *WakuMsg, ch chan *protocol.Envelope) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
select {
case env, ok := <-ch:
if ok {
s.Require().Fail("should not receive another message", zap.String("payload", string(env.Message().Payload)))
}
case <-time.After(1 * time.Second):
// Timeout elapsed, all good
case <-s.ctx.Done():
s.Require().Fail("waitForTimeout test exceeded allocated time")
}
}()
s.PublishMsg(msg)
s.wg.Wait()
}
func (s *FilterTestSuite) getSub(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
contentFilter := protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)}
subDetails, err := s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(peer))
s.Require().NoError(err)
time.Sleep(1 * time.Second)
return subDetails
}
func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) {
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
sub.Add(contentTopic)
s.contentFilter = sub.ContentFilter
subDetails, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.subDetails = subDetails
s.Require().NoError(err)
return
}
}
s.subDetails = s.getSub(pubsubTopic, contentTopic, peer)
s.contentFilter = s.subDetails[0].ContentFilter
}
func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
topicsCount := len(sub.ContentFilter.ContentTopicsList())
if topicsCount == 1 {
_, err := s.LightNode.Unsubscribe(s.ctx, sub.ContentFilter, WithPeer(peer))
s.Require().NoError(err)
} else {
sub.Remove(contentTopic)
}
s.contentFilter = sub.ContentFilter
}
}
return s.LightNode.Subscriptions()
}
func (s *FilterTestSuite) PublishMsg(msg *WakuMsg) {
if len(msg.Payload) == 0 {
msg.Payload = "123"
}
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(msg.ContentTopic, utils.GetUnixEpoch(), msg.Payload), relay.WithPubSubTopic(msg.PubSubTopic))
s.Require().NoError(err)
}
func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) {
for _, m := range msgs {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(m.ContentTopic, utils.GetUnixEpoch(), m.Payload), relay.WithPubSubTopic(m.PubSubTopic))
s.Require().NoError(err)
}
}
func (s *FilterTestSuite) prepareData(quantity int, topics, contentTopics, payloads bool, sg tests.StringGenerator) []WakuMsg {
var (
pubsubTopic = s.TestTopic // Has to be the same with initial s.testTopic
contentTopic = s.TestContentTopic // Has to be the same with initial s.testContentTopic
payload = "test_msg"
messages []WakuMsg
strMaxLenght = 4097
generatedString = ""
)
for i := 0; i < quantity; i++ {
msg := WakuMsg{
PubSubTopic: pubsubTopic,
ContentTopic: contentTopic,
Payload: payload,
}
if sg != nil {
generatedString, _ = sg(strMaxLenght)
}
if topics {
msg.PubSubTopic = fmt.Sprintf("%s%02d%s", pubsubTopic, i, generatedString)
}
if contentTopics {
msg.ContentTopic = fmt.Sprintf("%s%02d%s", contentTopic, i, generatedString)
}
if payloads {
msg.Payload = fmt.Sprintf("%s%02d%s", payload, i, generatedString)
}
messages = append(messages, msg)
}
return messages
}

View File

@ -2,6 +2,10 @@ package pb
import "errors" import "errors"
// This special value for requestId indicates that the message was rate limited
// and we did not retreive the requestId to avoid a potential attack vector.
const REQUESTID_RATE_LIMITED = "N/A"
var ( var (
errMissingRequestID = errors.New("missing RequestId field") errMissingRequestID = errors.New("missing RequestId field")
errMissingQuery = errors.New("missing Query field") errMissingQuery = errors.New("missing Query field")
@ -32,6 +36,9 @@ func (x *PushRpc) ValidateRequest() error {
} }
func (x *PushRpc) ValidateResponse(requestID string) error { func (x *PushRpc) ValidateResponse(requestID string) error {
if x.RequestId == REQUESTID_RATE_LIMITED {
return nil
}
if x.RequestId == "" { if x.RequestId == "" {
return errMissingRequestID return errMissingRequestID
} }

View File

@ -108,6 +108,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream)
wakuLP.metrics.RecordError(rateLimitFailure) wakuLP.metrics.RecordError(rateLimitFailure)
responseMsg := "exceeds the rate limit" responseMsg := "exceeds the rate limit"
responsePushRPC.Response.Info = &responseMsg responsePushRPC.Response.Info = &responseMsg
responsePushRPC.RequestId = pb.REQUESTID_RATE_LIMITED
wakuLP.reply(stream, responsePushRPC, logger) wakuLP.reply(stream, responsePushRPC, logger)
return return
} }

View File

@ -180,6 +180,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
store: s, store: s,
messages: response.Messages, messages: response.Messages,
storeRequest: storeRequest, storeRequest: storeRequest,
storeResponse: response,
peerID: params.selectedPeer, peerID: params.selectedPeer,
cursor: response.PaginationCursor, cursor: response.PaginationCursor,
} }
@ -214,10 +215,10 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
if r.IsComplete() { if r.IsComplete() {
return &Result{ return &Result{
store: s, store: s,
started: true, messages: nil,
messages: []*pb.WakuMessageKeyValue{},
cursor: nil, cursor: nil,
storeRequest: r.storeRequest, storeRequest: r.storeRequest,
storeResponse: r.storeResponse,
peerID: r.PeerID(), peerID: r.PeerID(),
}, nil }, nil
} }
@ -232,10 +233,10 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
} }
result := &Result{ result := &Result{
started: true,
store: s, store: s,
messages: response.Messages, messages: response.Messages,
storeRequest: storeRequest, storeRequest: storeRequest,
storeResponse: response,
peerID: r.PeerID(), peerID: r.PeerID(),
cursor: response.PaginationCursor, cursor: response.PaginationCursor,
} }

View File

@ -9,10 +9,13 @@ import (
// Result represents a valid response from a store node // Result represents a valid response from a store node
type Result struct { type Result struct {
started bool noCursor bool
done bool
messages []*pb.WakuMessageKeyValue messages []*pb.WakuMessageKeyValue
store *WakuStore store *WakuStore
storeRequest *pb.StoreQueryRequest storeRequest *pb.StoreQueryRequest
storeResponse *pb.StoreQueryResponse
cursor []byte cursor []byte
peerID peer.ID peerID peer.ID
} }
@ -22,7 +25,7 @@ func (r *Result) Cursor() []byte {
} }
func (r *Result) IsComplete() bool { func (r *Result) IsComplete() bool {
return r.cursor == nil return r.noCursor && r.done
} }
func (r *Result) PeerID() peer.ID { func (r *Result) PeerID() peer.ID {
@ -33,32 +36,32 @@ func (r *Result) Query() *pb.StoreQueryRequest {
return r.storeRequest return r.storeRequest
} }
func (r *Result) Next(ctx context.Context) (bool, error) { func (r *Result) Response() *pb.StoreQueryResponse {
if !r.started { return r.storeResponse
r.started = true
return len(r.messages) != 0, nil
} }
if r.IsComplete() { func (r *Result) Next(ctx context.Context) error {
r.cursor = nil if r.noCursor {
r.done = true
r.messages = nil r.messages = nil
return false, nil return nil
} }
newResult, err := r.store.next(ctx, r) newResult, err := r.store.next(ctx, r)
if err != nil { if err != nil {
return false, err return err
} }
r.cursor = newResult.cursor r.cursor = newResult.cursor
r.messages = newResult.messages r.messages = newResult.messages
return !r.IsComplete(), nil if r.cursor == nil {
r.noCursor = true
}
return nil
} }
func (r *Result) Messages() []*pb.WakuMessageKeyValue { func (r *Result) Messages() []*pb.WakuMessageKeyValue {
if !r.started {
return nil
}
return r.messages return r.messages
} }

View File

@ -29,6 +29,7 @@ type SubscriptionDetails struct {
mapRef *SubscriptionsMap mapRef *SubscriptionsMap
Closed bool `json:"-"` Closed bool `json:"-"`
once sync.Once once sync.Once
Closing chan struct{}
PeerID peer.ID `json:"peerID"` PeerID peer.ID `json:"peerID"`
ContentFilter protocol.ContentFilter `json:"contentFilters"` ContentFilter protocol.ContentFilter `json:"contentFilters"`
@ -96,7 +97,6 @@ func (s *SubscriptionDetails) CloseC() {
s.once.Do(func() { s.once.Do(func() {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
s.Closed = true s.Closed = true
close(s.C) close(s.C)
}) })

View File

@ -75,6 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content
PeerID: peerID, PeerID: peerID,
C: make(chan *protocol.Envelope, 1024), C: make(chan *protocol.Envelope, 1024),
ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)}, ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)},
Closing: make(chan struct{}),
} }
// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
@ -218,6 +219,30 @@ func (m *SubscriptionsMap) GetSubscriptionsForPeer(peerID peer.ID, contentFilter
return output return output
} }
func (m *SubscriptionsMap) GetAllSubscriptionsForPeer(peerID peer.ID) []*SubscriptionDetails {
m.RLock()
defer m.RUnlock()
var output []*SubscriptionDetails
for _, peerSubs := range m.items {
if peerSubs.PeerID == peerID {
for _, subs := range peerSubs.SubsPerPubsubTopic {
for _, subscriptionDetail := range subs {
output = append(output, subscriptionDetail)
}
}
break
}
}
return output
}
func (m *SubscriptionsMap) GetSubscribedPeers() peer.IDSlice {
m.RLock()
defer m.RUnlock()
return maps.Keys(m.items)
}
func (m *SubscriptionsMap) GetAllSubscriptions() []*SubscriptionDetails { func (m *SubscriptionsMap) GetAllSubscriptions() []*SubscriptionDetails {
return m.GetSubscriptionsForPeer("", protocol.ContentFilter{}) return m.GetSubscriptionsForPeer("", protocol.ContentFilter{})
} }

3
vendor/modules.txt vendored
View File

@ -1023,9 +1023,10 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f # github.com/waku-org/go-waku v0.8.1-0.20240528125047-269417c5e979
## explicit; go 1.20 ## explicit; go 1.20
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests
github.com/waku-org/go-waku/waku/persistence github.com/waku-org/go-waku/waku/persistence
github.com/waku-org/go-waku/waku/v2/discv5 github.com/waku-org/go-waku/waku/v2/discv5
github.com/waku-org/go-waku/waku/v2/dnsdisc github.com/waku-org/go-waku/waku/v2/dnsdisc

View File

@ -28,8 +28,6 @@ const (
FilterEventGetStats FilterEventGetStats
) )
const pingTimeout = 10 * time.Second
type FilterSubs map[string]subscription.SubscriptionSet type FilterSubs map[string]subscription.SubscriptionSet
type FilterEvent struct { type FilterEvent struct {
@ -76,9 +74,7 @@ func newFilterManager(ctx context.Context, logger *zap.Logger, getFilterFn func(
mgr.config = config mgr.config = config
mgr.node = node mgr.node = node
mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error { mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error {
ctx, cancel := context.WithTimeout(ctx, pingTimeout) return nil
defer cancel()
return mgr.node.FilterLightnode().IsSubscriptionAlive(ctx, sub)
} }
return mgr return mgr