feat: filter subscription management and multiplexing (#1048)

Filter wrapper API that takes care of managing subscriptions and multiplex them onto a single data channel to be consumed by user. 

Co-authored-by: Prem Chaitanya Prathi <chaitanyaprem@gmail.com>
This commit is contained in:
Vit∀ly Vlasov 2024-05-15 10:33:59 +03:00 committed by GitHub
parent 7028a0b1cb
commit 6e47bd1cf0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1103 additions and 899 deletions

View File

@ -220,6 +220,12 @@ test-postgres:
test-postgres-with-race:
${GOCMD} test -race -p 1 -v -count 1 -tags="${PG_BUILD_TAGS}" github.com/waku-org/go-waku/waku/persistence/...
test-filter:
${GOCMD} test -v github.com/waku-org/go-waku/waku/v2/protocol/filter -run TestFilterSuite -count=1
test-filter-api:
${GOCMD} test -v github.com/waku-org/go-waku/waku/v2/api -run TestFilterApiSuite
TEST_STOREV3_NODE ?=
test-storev3:
TEST_STOREV3_NODE=${TEST_STOREV3_NODE} ${GOCMD} test -p 1 -v -count 1 -tags="${BUILD_TAGS} include_storev3_tests" github.com/waku-org/go-waku/waku/v2/protocol/store/...

View File

@ -30,6 +30,7 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
@ -94,6 +95,7 @@ require (
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
@ -106,6 +108,7 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect

View File

@ -628,6 +628,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48=

View File

@ -38,6 +38,7 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
@ -105,6 +106,7 @@ require (
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
@ -118,6 +120,7 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect

View File

@ -660,6 +660,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48=

View File

@ -26,6 +26,7 @@ require (
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
@ -90,6 +91,7 @@ require (
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
@ -101,6 +103,7 @@ require (
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
@ -126,5 +129,6 @@ require (
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
golang.org/x/tools v0.14.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)

View File

@ -625,6 +625,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48=

View File

@ -28,6 +28,7 @@ require (
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
@ -92,6 +93,7 @@ require (
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
@ -103,6 +105,7 @@ require (
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
@ -127,5 +130,6 @@ require (
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
golang.org/x/tools v0.14.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)

View File

@ -625,6 +625,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48=

View File

@ -26,6 +26,7 @@ require (
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
@ -91,6 +92,7 @@ require (
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
@ -102,6 +104,7 @@ require (
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
@ -126,5 +129,6 @@ require (
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
golang.org/x/tools v0.14.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)

View File

@ -625,6 +625,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI=
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48=

2
go.sum
View File

@ -1531,8 +1531,6 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/waku-org/go-discover v0.0.0-20240129014929-85f2c00b96a3 h1:Kk0KYXZE/uNnARF2TbCQyvyZ/w4SgF8VhquNdOVVsNU=
github.com/waku-org/go-discover v0.0.0-20240129014929-85f2c00b96a3/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97JryAEV8pRXB//qPcg+8bPXl/O+AOLt3FeCKc=
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=

View File

@ -1,78 +0,0 @@
package tests
import (
"context"
"net"
"testing"
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func TestBasicSendingReceiving(t *testing.T) {
hostAddr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
key, err := RandomHex(32)
require.NoError(t, err)
prvKey, err := crypto.HexToECDSA(key)
require.NoError(t, err)
ctx := context.Background()
wakuNode, err := node.New(
node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr),
node.WithWakuRelay(),
)
require.NoError(t, err)
require.NotNil(t, wakuNode)
err = wakuNode.Start(ctx)
require.NoError(t, err)
require.NoError(t, write(ctx, wakuNode, "test"))
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
value := <-sub[0].Ch
payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None})
require.NoError(t, err)
require.Contains(t, string(payload.Data), "test")
}
func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) error {
var contentTopic string = "test"
version := uint32(0)
timestamp := utils.GetUnixEpoch()
p := new(payload.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent)
p.Key = &payload.KeyInfo{Kind: payload.None}
payload, err := p.Encode(version)
if err != nil {
return err
}
msg := &pb.WakuMessage{
Payload: payload,
Version: proto.Uint32(version),
ContentTopic: contentTopic,
Timestamp: timestamp,
}
_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic())
return err
}

233
waku/v2/api/filter.go Normal file
View File

@ -0,0 +1,233 @@
package api
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
const FilterPingTimeout = 5 * time.Second
const MultiplexChannelBuffer = 100
type FilterConfig struct {
MaxPeers int `json:"maxPeers"`
Peers []peer.ID `json:"peers"`
}
func (fc FilterConfig) String() string {
jsonStr, err := json.Marshal(fc)
if err != nil {
return ""
}
return string(jsonStr)
}
type Sub struct {
ContentFilter protocol.ContentFilter
DataCh chan *protocol.Envelope
Config FilterConfig
subs subscription.SubscriptionSet
wf *filter.WakuFilterLightNode
ctx context.Context
cancel context.CancelFunc
log *zap.Logger
}
// Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) {
sub := new(Sub)
sub.wf = wf
sub.ctx, sub.cancel = context.WithCancel(ctx)
sub.subs = make(subscription.SubscriptionSet)
sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer)
sub.ContentFilter = contentFilter
sub.Config = config
sub.log = log.Named("filter-api")
sub.log.Debug("filter subscribe params", zap.Int("maxPeers", config.MaxPeers), zap.Stringer("contentFilter", contentFilter))
subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers)
if err != nil {
return nil, err
}
sub.multiplex(subs)
go sub.healthCheckLoop()
return sub, nil
}
func (apiSub *Sub) Unsubscribe() {
apiSub.cancel()
}
func (apiSub *Sub) healthCheckLoop() {
// Health checks
ticker := time.NewTicker(FilterPingTimeout)
defer ticker.Stop()
for {
select {
case <-apiSub.ctx.Done():
apiSub.log.Debug("healthCheckLoop: Done()")
apiSub.cleanup()
return
case <-ticker.C:
apiSub.log.Debug("healthCheckLoop: checkAliveness()")
topicCounts := apiSub.getTopicCounts()
apiSub.resubscribe(topicCounts)
}
}
}
func (apiSub *Sub) cleanup() {
apiSub.log.Debug("ENTER cleanup()")
defer func() {
apiSub.log.Debug("EXIT cleanup()")
}()
for _, s := range apiSub.subs {
_, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s)
if err != nil {
//Logging with info as this is part of cleanup
apiSub.log.Info("failed to unsubscribe filter", zap.Error(err))
}
}
close(apiSub.DataCh)
}
// Returns active sub counts for each pubsub topic
func (apiSub *Sub) getTopicCounts() map[string]int {
// Buffered chan for sub aliveness results
type CheckResult struct {
sub *subscription.SubscriptionDetails
alive bool
}
checkResults := make(chan CheckResult, len(apiSub.subs))
var wg sync.WaitGroup
// Run pings asynchronously
for _, s := range apiSub.subs {
wg.Add(1)
go func(sub *subscription.SubscriptionDetails) {
defer wg.Done()
ctx, cancelFunc := context.WithTimeout(apiSub.ctx, FilterPingTimeout)
defer cancelFunc()
err := apiSub.wf.IsSubscriptionAlive(ctx, sub)
apiSub.log.Debug("Check result:", zap.Any("subID", sub.ID), zap.Bool("result", err == nil))
checkResults <- CheckResult{sub, err == nil}
}(s)
}
// Collect healthy topic counts
topicCounts := make(map[string]int)
topicMap, _ := protocol.ContentFilterToPubSubTopicMap(apiSub.ContentFilter)
for _, t := range maps.Keys(topicMap) {
topicCounts[t] = 0
}
wg.Wait()
close(checkResults)
for s := range checkResults {
if !s.alive {
// Close inactive subs
s.sub.Close()
delete(apiSub.subs, s.sub.ID)
} else {
topicCounts[s.sub.ContentFilter.PubsubTopic]++
}
}
return topicCounts
}
// Attempts to resubscribe on topics that lack subscriptions
func (apiSub *Sub) resubscribe(topicCounts map[string]int) {
// Delete healthy topics
for t, cnt := range topicCounts {
if cnt == apiSub.Config.MaxPeers {
delete(topicCounts, t)
}
}
if len(topicCounts) == 0 {
// All topics healthy, return
return
}
var wg sync.WaitGroup
// Re-subscribe asynchronously
newSubs := make(chan []*subscription.SubscriptionDetails)
for t, cnt := range topicCounts {
cFilter := protocol.ContentFilter{PubsubTopic: t, ContentTopics: apiSub.ContentFilter.ContentTopics}
wg.Add(1)
go func(count int) {
defer wg.Done()
subs, err := apiSub.subscribe(cFilter, apiSub.Config.MaxPeers-count)
if err != nil {
return
} //Not handling scenario where all requested subs are not received as that will get handled in next cycle.
newSubs <- subs
}(cnt)
}
wg.Wait()
close(newSubs)
apiSub.log.Debug("resubscribe(): before range newSubs")
for subs := range newSubs {
if subs != nil {
apiSub.multiplex(subs)
}
}
apiSub.log.Debug("checkAliveness(): close(newSubs)")
//close(newSubs)
}
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int) ([]*subscription.SubscriptionDetails, error) {
// Low-level subscribe, returns a set of SubscriptionDetails
options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p))
}
subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...)
if err != nil {
if len(subs) > 0 {
// Partial Failure, for now proceed as we don't expect this to happen wrt specific topics.
// Rather it can happen in case subscription with one of the peer fails.
// This can further get automatically handled at resubscribe,
apiSub.log.Error("partial failure in Filter subscribe", zap.Error(err))
return subs, nil
}
// In case of complete subscription failure, application or user needs to handle and probably retry based on error
// TODO: Once filter error handling indicates specific error, this can be addressed based on the error at this layer.
return nil, err
}
return subs, nil
}
func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
// Multiplex onto single channel
// Goroutines will exit once sub channels are closed
for _, subDetails := range subs {
apiSub.subs[subDetails.ID] = subDetails
go func(subDetails *subscription.SubscriptionDetails) {
apiSub.log.Debug("New multiplex", zap.String("subID", subDetails.ID))
for env := range subDetails.C {
apiSub.DataCh <- env
}
}(subDetails)
}
}

View File

@ -0,0 +1,77 @@
package api
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
func TestFilterApiSuite(t *testing.T) {
suite.Run(t, new(FilterApiTestSuite))
}
type FilterApiTestSuite struct {
filter.FilterTestSuite
}
func (s *FilterApiTestSuite) SetupTest() {
s.FilterTestSuite.SetupTest()
s.Log.Info("SetupTest()")
}
func (s *FilterApiTestSuite) TearDownTest() {
s.FilterTestSuite.TearDownTest()
}
func (s *FilterApiTestSuite) TestSubscribe() {
contentFilter := protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
// We have one full node already created in SetupTest(),
// create another one
fullNodeData2 := s.GetWakuFilterFullNode(s.TestTopic, true)
s.ConnectHosts(s.LightNodeHost, fullNodeData2.FullNodeHost)
peers := []peer.ID{s.FullNodeHost.ID(), fullNodeData2.FullNodeHost.ID()}
s.Log.Info("FullNodeHost IDs:", zap.Any("peers", peers))
// Make sure IDs are different
s.Require().True(peers[0] != peers[1])
apiConfig := FilterConfig{MaxPeers: 2, Peers: peers}
s.Require().Equal(apiConfig.MaxPeers, 2)
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
s.Log.Info("About to perform API Subscribe()")
apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log)
s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter)
s.Log.Info("Subscribed")
s.Require().Len(apiSub.subs, 2)
for sub := range apiSub.subs {
s.Log.Info("SubDetails:", zap.String("id", sub))
}
subsArray := maps.Keys(apiSub.subs)
s.Require().True(subsArray[0] != subsArray[1])
// Publish msg and confirm it's received twice because of multiplexing
s.PublishMsg(&filter.WakuMsg{PubSubTopic: s.TestTopic, ContentTopic: s.TestContentTopic, Payload: "Test msg"})
cnt := 0
for msg := range apiSub.DataCh {
s.Log.Info("Received msg:", zap.Int("cnt", cnt), zap.String("payload", string(msg.Message().Payload)))
cnt++
break
}
s.Require().Equal(cnt, 1)
time.Sleep(2 * time.Second)
apiSub.Unsubscribe()
for range apiSub.DataCh {
}
s.Log.Info("DataCh is closed")
}

View File

@ -843,7 +843,7 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
Protocols: protocols,
Connected: connected,
Addrs: addrs,
PubsubTopics: topics,
PubsubTopics: maps.Keys(topics),
})
}
return peers, nil

View File

@ -13,6 +13,7 @@ import (
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error {
@ -103,7 +104,7 @@ func (pm *PeerManager) handleNewRelayTopicUnSubscription(pubsubTopic string) {
logging.HostID("peerID", peer))
continue
}
if len(peerTopics) == 1 && peerTopics[0] == pubsubTopic {
if len(peerTopics) == 1 && maps.Keys(peerTopics)[0] == pubsubTopic {
err := pm.host.Network().ClosePeer(peer)
if err != nil {
pm.logger.Warn("Failed to disconnect connection towards peer",

View File

@ -8,6 +8,8 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"golang.org/x/exp/maps"
)
// Origin is used to determine how the peer is identified,
@ -58,7 +60,7 @@ type WakuPeerstore interface {
AddPubSubTopic(p peer.ID, topic string) error
RemovePubSubTopic(p peer.ID, topic string) error
PubSubTopics(p peer.ID) ([]string, error)
PubSubTopics(p peer.ID) (protocol.TopicSet, error)
SetPubSubTopics(p peer.ID, topics []string) error
PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice
PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice
@ -175,13 +177,12 @@ func (ps *WakuPeerstoreImpl) AddPubSubTopic(p peer.ID, topic string) error {
if err != nil {
return err
}
for _, t := range existingTopics {
if t == topic {
return nil
}
if _, found := existingTopics[topic]; found {
return nil
}
existingTopics = append(existingTopics, topic)
return ps.peerStore.Put(p, peerPubSubTopics, existingTopics)
existingTopics[topic] = struct{}{}
return ps.peerStore.Put(p, peerPubSubTopics, maps.Keys(existingTopics))
}
// RemovePubSubTopic removes a pubSubTopic from the peer
@ -195,14 +196,9 @@ func (ps *WakuPeerstoreImpl) RemovePubSubTopic(p peer.ID, topic string) error {
return nil
}
for i := range existingTopics {
if existingTopics[i] == topic {
existingTopics = append(existingTopics[:i], existingTopics[i+1:]...)
break
}
}
delete(existingTopics, topic)
err = ps.SetPubSubTopics(p, existingTopics)
err = ps.SetPubSubTopics(p, maps.Keys(existingTopics))
if err != nil {
return err
}
@ -215,16 +211,16 @@ func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error {
}
// PubSubTopics fetches list of pubSubTopics for a peer
func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) {
func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) (protocol.TopicSet, error) {
result, err := ps.peerStore.Get(p, peerPubSubTopics)
if err != nil {
if errors.Is(err, peerstore.ErrNotFound) {
return nil, nil
return protocol.NewTopicSet(), nil
} else {
return nil, err
}
}
return result.([]string), nil
return protocol.NewTopicSet((result.([]string))...), nil
}
// PeersByPubSubTopic Returns list of peers that support list of pubSubTopics
@ -235,22 +231,16 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific
}
var result peer.IDSlice
for _, p := range specificPeers {
topics, err := ps.PubSubTopics(p)
peerMatch := true
peerTopics, err := ps.PubSubTopics(p)
if err == nil {
//Convoluted and crazy logic to find subset of topics
// Could not find a better way to do it?
peerTopicMap := make(map[string]struct{})
match := true
for _, topic := range topics {
peerTopicMap[topic] = struct{}{}
}
for _, topic := range pubSubTopics {
if _, ok := peerTopicMap[topic]; !ok {
match = false
for _, t := range pubSubTopics {
if _, ok := peerTopics[t]; !ok {
peerMatch = false
break
}
}
if match {
if peerMatch {
result = append(result, p)
}
} //Note: skipping a peer in case of an error as there would be others available.
@ -268,7 +258,7 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeer
for _, p := range specificPeers {
topics, err := ps.PubSubTopics(p)
if err == nil {
for _, topic := range topics {
for topic := range topics {
if topic == pubSubTopic {
result = append(result, p)
}

View File

@ -1,11 +1,22 @@
package protocol
import "golang.org/x/exp/maps"
import (
"golang.org/x/exp/maps"
)
type PubsubTopicStr = string
type ContentTopicStr = string
type ContentTopicSet map[string]struct{}
type TopicSet map[string]struct{}
func NewTopicSet(topics ...string) TopicSet {
s := make(TopicSet, len(topics))
for _, t := range topics {
s[t] = struct{}{}
}
return s
}
func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
s := make(ContentTopicSet, len(contentTopics))
@ -28,6 +39,16 @@ type ContentFilter struct {
ContentTopics ContentTopicSet `json:"contentTopics"`
}
func (cf ContentFilter) String() string {
var ret string
ret += "{ pubsubTopic: " + cf.PubsubTopic + ", contentTopics: [ "
for ct := range cf.ContentTopics {
ret += ct
}
ret += " ] }"
return ret
}
func (cf ContentFilter) ContentTopicsList() []string {
return cf.ContentTopics.ToList()
}

View File

@ -6,29 +6,29 @@ import (
)
func (s *FilterTestSuite) TestSubscriptionPing() {
err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
err := s.LightNode.Ping(context.Background(), s.FullNodeHost.ID())
s.Require().Error(err)
filterErr, ok := err.(*FilterError)
s.Require().True(ok)
s.Require().Equal(filterErr.Code, http.StatusNotFound)
contentTopic := "abc"
s.subDetails = s.subscribe(s.testTopic, contentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, contentTopic, s.FullNodeHost.ID())
err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
err = s.LightNode.Ping(context.Background(), s.FullNodeHost.ID())
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestUnSubscriptionPing() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
err := s.LightNode.Ping(context.Background(), s.FullNodeHost.ID())
s.Require().NoError(err)
_, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
_, err = s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
err = s.LightNode.Ping(context.Background(), s.FullNodeHost.ID())
s.Require().Error(err)
}

View File

@ -30,52 +30,32 @@ import (
func (s *FilterTestSuite) TestCreateSubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""})
}
func (s *FilterTestSuite) TestModifySubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""})
// Subscribe to another content_topic
newContentTopic := "Topic_modified"
s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, newContentTopic, s.FullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, newContentTopic, ""})
}
func (s *FilterTestSuite) TestMultipleMessages() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "first"})
}, s.subDetails[0].C)
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "second"})
}
func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters,
@ -222,28 +202,29 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi
func (s *FilterTestSuite) TestIncorrectSubscribeIdentifier() {
log := utils.Logger()
s.log = log
s.Log = log
s.wg = &sync.WaitGroup{}
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
s.testTopic = defaultTestPubSubTopic
s.testContentTopic = defaultTestContentTopic
s.TestTopic = DefaultTestPubSubTopic
s.TestContentTopic = DefaultTestContentTopic
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.MakeWakuFilterLightNode()
s.StartLightNode()
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
s.MakeWakuFilterFullNode(s.TestTopic, false)
//Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
// Subscribe with incorrect SubscribeID
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
_, err := s.lightNode.IncorrectSubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
_, err := s.LightNode.IncorrectSubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().Error(err)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -259,7 +240,7 @@ func (wf *WakuFilterLightNode) startWithIncorrectPushProto() error {
func (s *FilterTestSuite) TestIncorrectPushIdentifier() {
log := utils.Logger()
s.log = log
s.Log = log
s.wg = &sync.WaitGroup{}
// Create test context
@ -267,43 +248,43 @@ func (s *FilterTestSuite) TestIncorrectPushIdentifier() {
s.ctx = ctx
s.ctxCancel = cancel
s.testTopic = defaultTestPubSubTopic
s.testContentTopic = defaultTestContentTopic
s.TestTopic = DefaultTestPubSubTopic
s.TestContentTopic = DefaultTestContentTopic
s.lightNode = s.makeWakuFilterLightNode(false, true)
s.MakeWakuFilterLightNode()
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
s.MakeWakuFilterFullNode(s.TestTopic, false)
// Re-start light node with unsupported prefix for match func
s.lightNode.Stop()
err := s.lightNode.CommonService.Start(s.ctx, s.lightNode.startWithIncorrectPushProto)
s.LightNode.Stop()
err := s.LightNode.CommonService.Start(s.ctx, s.LightNode.startWithIncorrectPushProto)
s.Require().NoError(err)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
err = s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
// Subscribe
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
s.subDetails, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
s.subDetails, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
time.Sleep(1 * time.Second)
// Send message
_, err = s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic))
_, err = s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.TestContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.TestTopic))
s.Require().NoError(err)
// Message should never arrive -> exit after timeout
select {
case msg := <-s.subDetails[0].C:
s.log.Info("Light node received a msg")
s.Log.Info("Light node received a msg")
s.Require().Nil(msg)
case <-time.After(1 * time.Second):
s.Require().True(true)
}
_, err = s.lightNode.UnsubscribeAll(s.ctx)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}

View File

@ -2,28 +2,27 @@ package filter
import (
"context"
"strconv"
"time"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"strconv"
"time"
)
func (s *FilterTestSuite) TestValidPayloadsASCII() {
// Subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomASCIIString)
messages := s.prepareData(100, false, false, true, tests.GenerateRandomASCIIString)
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -31,17 +30,15 @@ func (s *FilterTestSuite) TestValidPayloadsASCII() {
func (s *FilterTestSuite) TestValidPayloadsUTF8() {
// Subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomUTF8String)
messages := s.prepareData(100, false, false, true, tests.GenerateRandomUTF8String)
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -49,17 +46,15 @@ func (s *FilterTestSuite) TestValidPayloadsUTF8() {
func (s *FilterTestSuite) TestValidPayloadsBase64() {
// Subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomBase64String)
messages := s.prepareData(100, false, false, true, tests.GenerateRandomBase64String)
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -67,17 +62,15 @@ func (s *FilterTestSuite) TestValidPayloadsBase64() {
func (s *FilterTestSuite) TestValidPayloadsJSON() {
// Subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomJSONString)
messages := s.prepareData(100, false, false, true, tests.GenerateRandomJSONString)
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -85,17 +78,15 @@ func (s *FilterTestSuite) TestValidPayloadsJSON() {
func (s *FilterTestSuite) TestValidPayloadsURLEncoded() {
// Subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomURLEncodedString)
messages := s.prepareData(100, false, false, true, tests.GenerateRandomURLEncodedString)
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -103,17 +94,15 @@ func (s *FilterTestSuite) TestValidPayloadsURLEncoded() {
func (s *FilterTestSuite) TestValidPayloadsSQL() {
// Subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomSQLInsert)
messages := s.prepareData(100, false, false, true, tests.GenerateRandomSQLInsert)
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -123,23 +112,21 @@ func (s *FilterTestSuite) TestLargePayloadsUTF8() {
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 40*time.Second)
// Subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Prepare basic data
messages := prepareData(10, false, false, false, nil)
messages := s.prepareData(10, false, false, false, nil)
// Generate large string
for i := range messages {
messages[i].payload, _ = tests.GenerateRandomUTF8String(153600)
s.log.Info("Generated payload with ", zap.String("length", strconv.Itoa(len(messages[i].payload))))
messages[i].Payload, _ = tests.GenerateRandomUTF8String(153600)
s.Log.Info("Generated payload with ", zap.String("length", strconv.Itoa(len(messages[i].Payload))))
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -147,19 +134,19 @@ func (s *FilterTestSuite) TestLargePayloadsUTF8() {
func (s *FilterTestSuite) TestFuturePayloadEncryptionVersion() {
// Subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
message := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "test_payload")
message := tests.CreateWakuMessage(s.TestContentTopic, utils.GetUnixEpoch(), "test_payload")
futureVersion := uint32(100)
message.Version = &futureVersion
// Should get accepted
_, err := s.relayNode.Publish(s.ctx, message, relay.WithPubSubTopic(s.testTopic))
_, err := s.relayNode.Publish(s.ctx, message, relay.WithPubSubTopic(s.TestTopic))
s.Require().NoError(err)
// Should be received
s.waitForMsg(nil, s.subDetails[0].C)
s.waitForMsg(nil)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}

View File

@ -18,76 +18,61 @@ import (
func (s *FilterTestSuite) TestWakuFilter() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Should be received
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "first")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "first"})
// Wrong content topic
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, "TopicB", "second")
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{s.TestTopic, "TopicB", "second"})
_, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
_, err := s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "third")
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{s.TestTopic, s.TestContentTopic, "third"})
// Two new subscriptions with same [peer, contentFilter]
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
secondSub := s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
secondSub := s.getSub(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Assert that we have 2 subscriptions now
s.Require().Equal(len(s.lightNode.Subscriptions()), 2)
s.Require().Equal(len(s.LightNode.Subscriptions()), 2)
// Should be received on both subscriptions
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "fourth")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "fourth"})
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "fifth")
}, secondSub[0].C)
s.waitForMsgFromChan(&WakuMsg{s.TestTopic, s.TestContentTopic, "fifth"}, secondSub[0].C)
s.waitForMsg(nil, s.subDetails[0].C)
s.waitForMsg(nil, secondSub[0].C)
s.waitForMsg(nil)
s.waitForMsgFromChan(nil, secondSub[0].C)
// Unsubscribe from second sub only
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0])
_, err = s.LightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0])
s.Require().NoError(err)
// Should still receive
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "sixth")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "sixth"})
// Unsubscribe from first sub only
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0])
_, err = s.LightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0])
s.Require().NoError(err)
s.Require().Equal(len(s.lightNode.Subscriptions()), 0)
s.Require().Equal(len(s.LightNode.Subscriptions()), 0)
// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "seventh")
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{s.TestTopic, s.TestContentTopic, "seventh"})
}
func (s *FilterTestSuite) TestPubSubSingleContentTopic() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Message should be received
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "test_msg")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "test_msg"})
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -97,19 +82,17 @@ func (s *FilterTestSuite) TestPubSubMultiContentTopic() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds
messages := prepareData(3, false, true, false, nil)
messages := s.prepareData(3, false, true, false, nil)
// Subscribe
for _, m := range messages {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -119,36 +102,35 @@ func (s *FilterTestSuite) TestMultiPubSubMultiContentTopic() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 20 seconds
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.MakeWakuFilterLightNode()
s.StartLightNode()
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true)
s.MakeWakuFilterFullNode(s.TestTopic, true)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
messages := prepareData(2, true, true, false, nil)
messages := s.prepareData(2, true, true, false, nil)
// Subscribe
for _, m := range messages {
s.subDetails = append(s.subDetails, s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())...)
s.log.Info("Subscribing ", zap.String("PubSubTopic", m.pubSubTopic))
_, err := s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic))
s.subDetails = append(s.subDetails, s.getSub(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())...)
s.Log.Info("Subscribing ", zap.String("PubSubTopic", m.PubSubTopic))
_, err := s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.PubSubTopic))
s.Require().NoError(err)
}
// Debug to see subscriptions in light node
for _, sub := range s.subDetails {
s.log.Info("Light Node subscription ", zap.String("PubSubTopic", sub.ContentFilter.PubsubTopic), zap.String("ContentTopic", sub.ContentFilter.ContentTopicsList()[0]))
s.Log.Info("Light Node subscription ", zap.String("PubSubTopic", sub.ContentFilter.PubsubTopic), zap.String("ContentTopic", sub.ContentFilter.ContentTopicsList()[0]))
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -158,39 +140,35 @@ func (s *FilterTestSuite) TestPubSubMultiOverlapContentTopic() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 20 seconds
messages := prepareData(10, false, true, true, nil)
messages := s.prepareData(10, false, true, true, nil)
// Subscribe
for _, m := range messages {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestSubscriptionRefresh() {
messages := prepareData(2, false, false, true, nil)
messages := s.prepareData(2, false, false, true, nil)
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Repeat the same subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Both messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -209,33 +187,31 @@ func (s *FilterTestSuite) TestContentTopicsLimit() {
}
}
messages := prepareData(maxContentTopics+1, false, true, true, nil)
messages := s.prepareData(maxContentTopics+1, false, true, true, nil)
// Subscribe
for _, m := range messages[:len(messages)-1] {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())
}
// All messages within limit should get received
s.waitForMessages(func() {
s.publishMessages(messages[:len(messages)-1])
}, s.subDetails, messages[:len(messages)-1])
s.waitForMessages(messages[:len(messages)-1])
// Adding over the limit contentTopic should fail
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == messages[len(messages)-1].pubSubTopic {
sub.Add(messages[len(messages)-1].contentTopic)
_, err := s.lightNode.Subscribe(s.ctx, sub.ContentFilter, WithPeer(s.fullNodeHost.ID()))
if sub.ContentFilter.PubsubTopic == messages[len(messages)-1].PubSubTopic {
sub.Add(messages[len(messages)-1].ContentTopic)
_, err := s.LightNode.Subscribe(s.ctx, sub.ContentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().Error(err)
}
}
// Unsubscribe for cleanup
for _, m := range messages {
_ = s.unsubscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
_ = s.unsubscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())
}
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -245,61 +221,61 @@ func (s *FilterTestSuite) TestSubscribeErrorHandling() {
// Prepare data
messages = append(messages, WakuMsg{
pubSubTopic: "",
contentTopic: s.testContentTopic,
payload: "N/A",
PubSubTopic: "",
ContentTopic: s.TestContentTopic,
Payload: "N/A",
})
messages = append(messages, WakuMsg{
pubSubTopic: s.testTopic,
contentTopic: "",
payload: "N/A",
PubSubTopic: s.TestTopic,
ContentTopic: "",
Payload: "N/A",
})
// Subscribe with empty pubsub
s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[0].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].contentTopic)}
_, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[0].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].ContentTopic)}
_, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().Error(err)
// Subscribe with empty content topic
s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[1].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].contentTopic)}
_, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[1].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].ContentTopic)}
_, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().Error(err)
}
func (s *FilterTestSuite) TestMultipleFullNodeSubscriptions() {
log := utils.Logger()
s.log = log
s.Log = log
s.wg = &sync.WaitGroup{}
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
fullNodeIDHex := make([]byte, hex.EncodedLen(len([]byte(s.fullNodeHost.ID()))))
_ = hex.Encode(fullNodeIDHex, []byte(s.fullNodeHost.ID()))
fullNodeIDHex := make([]byte, hex.EncodedLen(len([]byte(s.FullNodeHost.ID()))))
_ = hex.Encode(fullNodeIDHex, []byte(s.FullNodeHost.ID()))
s.log.Info("Already subscribed to", zap.String("fullNode", string(fullNodeIDHex)))
s.Log.Info("Already subscribed to", zap.String("fullNode", string(fullNodeIDHex)))
// This will overwrite values with the second node info
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
s.MakeWakuFilterFullNode(s.TestTopic, false)
// Connect to second full and relay node
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
fullNodeIDHex = make([]byte, hex.EncodedLen(len([]byte(s.fullNodeHost.ID()))))
_ = hex.Encode(fullNodeIDHex, []byte(s.fullNodeHost.ID()))
fullNodeIDHex = make([]byte, hex.EncodedLen(len([]byte(s.FullNodeHost.ID()))))
_ = hex.Encode(fullNodeIDHex, []byte(s.FullNodeHost.ID()))
s.log.Info("Subscribing to second", zap.String("fullNode", string(fullNodeIDHex)))
s.Log.Info("Subscribing to second", zap.String("fullNode", string(fullNodeIDHex)))
// Subscribe to the second full node
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
_, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
_, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -308,25 +284,28 @@ func (s *FilterTestSuite) TestSubscribeMultipleLightNodes() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
lightNode2 := s.makeWakuFilterLightNode(true, true)
lightNodeData := s.GetWakuFilterLightNode()
lightNode2 := lightNodeData.LightNode
err := lightNode2.Start(context.Background())
s.Require().NoError(err)
// Connect node2
lightNode2.h.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
lightNode2.h.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
messages := prepareData(2, true, true, true, nil)
messages := s.prepareData(2, true, true, true, nil)
// Subscribe separately: light node 1 -> full node
contentFilter := protocol.ContentFilter{PubsubTopic: messages[0].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].contentTopic)}
_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
contentFilter := protocol.ContentFilter{PubsubTopic: messages[0].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].ContentTopic)}
_, err = s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
// Subscribe separately: light node 2 -> full node
contentFilter2 := protocol.ContentFilter{PubsubTopic: messages[1].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].contentTopic)}
_, err = lightNode2.Subscribe(s.ctx, contentFilter2, WithPeer(s.fullNodeHost.ID()))
contentFilter2 := protocol.ContentFilter{PubsubTopic: messages[1].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].ContentTopic)}
_, err = lightNode2.Subscribe(s.ctx, contentFilter2, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
// Unsubscribe
_, err = s.lightNode.UnsubscribeAll(s.ctx)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
_, err = lightNode2.UnsubscribeAll(s.ctx)
@ -344,13 +323,14 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() {
// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second)
_, fullNode2 := s.makeWakuFilterFullNode(testTopic, false, false)
nodeData := s.GetWakuFilterFullNode(testTopic, false)
fullNode2 := nodeData.fullNode
// Connect nodes
fullNode2.h.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
fullNode2.h.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
// Get stream
stream, err := fullNode2.h.NewStream(s.ctx, s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
stream, err := fullNode2.h.NewStream(s.ctx, s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
// Prepare subscribe request
@ -364,7 +344,7 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() {
fullNode2.subscribe(s.ctx, stream, subscribeRequest)
// Check the pubsub topic related to the first node is stored within the second node
pubsubTopics, hasTopics := fullNode2.subscriptions.Get(s.fullNodeHost.ID())
pubsubTopics, hasTopics := fullNode2.subscriptions.Get(s.FullNodeHost.ID())
s.Require().True(hasTopics)
// Check the pubsub topic is what we have set
@ -378,13 +358,13 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() {
}
func (s *FilterTestSuite) TestIsSubscriptionAlive() {
messages := prepareData(2, false, true, false, nil)
messages := s.prepareData(2, false, true, false, nil)
// Subscribe with the first message only
s.subDetails = s.subscribe(messages[0].pubSubTopic, messages[0].contentTopic, s.fullNodeHost.ID())
s.subscribe(messages[0].PubSubTopic, messages[0].ContentTopic, s.FullNodeHost.ID())
// IsSubscriptionAlive returns no error for the first message
err := s.lightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0])
err := s.LightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0])
s.Require().NoError(err)
// Create new host/peer - not related to any node
@ -395,25 +375,25 @@ func (s *FilterTestSuite) TestIsSubscriptionAlive() {
s.subDetails[0].PeerID = host.ID()
// IsSubscriptionAlive returns error for the second message, peer ID doesn't match
err = s.lightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0])
err = s.LightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0])
s.Require().Error(err)
}
func (s *FilterTestSuite) TestFilterSubscription() {
contentFilter := protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
contentFilter := protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
// Subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Returns no error and SubscriptionDetails for existing subscription
_, err := s.lightNode.FilterSubscription(s.fullNodeHost.ID(), contentFilter)
_, err := s.LightNode.FilterSubscription(s.FullNodeHost.ID(), contentFilter)
s.Require().NoError(err)
otherFilter := protocol.ContentFilter{PubsubTopic: "34583495", ContentTopics: protocol.NewContentTopicSet("sjfa402")}
// Returns error and nil SubscriptionDetails for non existent subscription
nonSubscription, err := s.lightNode.FilterSubscription(s.fullNodeHost.ID(), otherFilter)
nonSubscription, err := s.LightNode.FilterSubscription(s.FullNodeHost.ID(), otherFilter)
s.Require().Error(err)
s.Require().Nil(nonSubscription)
@ -422,32 +402,32 @@ func (s *FilterTestSuite) TestFilterSubscription() {
s.Require().NoError(err)
// Returns error and nil SubscriptionDetails for unrelated host/peer
nonSubscription, err = s.lightNode.FilterSubscription(host.ID(), contentFilter)
nonSubscription, err = s.LightNode.FilterSubscription(host.ID(), contentFilter)
s.Require().Error(err)
s.Require().Nil(nonSubscription)
}
func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() {
contentFilter := protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
contentFilter := protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)}
// Subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// With valid peer
opts := []FilterSubscribeOption{WithPeer(s.fullNodeHost.ID())}
opts := []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID())}
// Positive case
_, _, err := s.lightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts)
_, _, err := s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts)
s.Require().NoError(err)
addr := s.fullNodeHost.Addrs()[0]
addr := s.FullNodeHost.Addrs()[0]
// Combine mutually exclusive options
opts = []FilterSubscribeOption{WithPeer(s.fullNodeHost.ID()), WithPeerAddr(addr)}
opts = []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)}
// Should fail on wrong option combination
_, _, err = s.lightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts)
_, _, err = s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts)
s.Require().Error(err)
}

View File

@ -2,25 +2,16 @@ package filter
import (
"context"
"crypto/rand"
"errors"
"fmt"
"strconv"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -29,359 +20,20 @@ func TestFilterSuite(t *testing.T) {
suite.Run(t, new(FilterTestSuite))
}
const defaultTestPubSubTopic = "/waku/2/go/filter/test"
const defaultTestContentTopic = "/test/10/my-app"
type FilterTestSuite struct {
suite.Suite
testTopic string
testContentTopic string
ctx context.Context
ctxCancel context.CancelFunc
lightNode *WakuFilterLightNode
lightNodeHost host.Host
relayNode *relay.WakuRelay
relaySub *relay.Subscription
fullNode *WakuFilterFullNode
fullNodeHost host.Host
wg *sync.WaitGroup
contentFilter protocol.ContentFilter
subDetails []*subscription.SubscriptionDetails
log *zap.Logger
}
type WakuMsg struct {
pubSubTopic string
contentTopic string
payload string
}
func (s *FilterTestSuite) makeWakuRelay(topic string, shared bool) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) {
broadcaster := relay.NewBroadcaster(10)
s.Require().NoError(broadcaster.Start(context.Background()))
port, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log)
relay.SetHost(host)
if shared {
s.fullNodeHost = host
}
err = relay.Start(context.Background())
s.Require().NoError(err)
sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic))
s.Require().NoError(err)
return relay, sub[0], host, broadcaster
}
func (s *FilterTestSuite) makeWakuFilterLightNode(start bool, withBroadcaster bool) *WakuFilterLightNode {
port, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
var b relay.Broadcaster
if withBroadcaster {
b = relay.NewBroadcaster(10)
s.Require().NoError(b.Start(context.Background()))
}
filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log)
filterPush.SetHost(host)
s.lightNodeHost = host
if start {
err = filterPush.Start(context.Background())
s.Require().NoError(err)
}
return filterPush
}
func (s *FilterTestSuite) makeWakuFilterFullNode(topic string, withRegisterAll bool, shared bool) (*relay.WakuRelay, *WakuFilterFullNode) {
var sub *relay.Subscription
node, relaySub, host, broadcaster := s.makeWakuRelay(topic, shared)
if shared {
s.relaySub = relaySub
}
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log)
node2Filter.SetHost(host)
if withRegisterAll {
sub = broadcaster.RegisterForAll()
} else {
sub = broadcaster.Register(protocol.NewContentFilter(topic))
}
err := node2Filter.Start(s.ctx, sub)
s.Require().NoError(err)
return node, node2Filter
}
func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) {
s.wg.Add(1)
var msgFound = false
go func() {
defer s.wg.Done()
select {
case env := <-ch:
for _, topic := range s.contentFilter.ContentTopicsList() {
if topic == env.Message().GetContentTopic() {
msgFound = true
}
}
s.Require().True(msgFound)
case <-time.After(1 * time.Second):
s.Require().Fail("Message timeout")
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
}
}()
if fn != nil {
fn()
}
s.wg.Wait()
}
func matchOneOfManyMsg(one WakuMsg, many []WakuMsg) bool {
for _, m := range many {
if m.pubSubTopic == one.pubSubTopic &&
m.contentTopic == one.contentTopic &&
m.payload == one.payload {
return true
}
}
return false
}
func (s *FilterTestSuite) waitForMessages(fn func(), subs []*subscription.SubscriptionDetails, expected []WakuMsg) {
s.wg.Add(1)
msgCount := len(expected)
found := 0
s.log.Info("Expected messages ", zap.String("count", strconv.Itoa(msgCount)))
s.log.Info("Existing subscriptions ", zap.String("count", strconv.Itoa(len(subs))))
go func() {
defer s.wg.Done()
for _, sub := range subs {
s.log.Info("Looking at ", zap.String("pubSubTopic", sub.ContentFilter.PubsubTopic))
for i := 0; i < msgCount; i++ {
select {
case env, ok := <-sub.C:
if !ok {
continue
}
received := WakuMsg{
pubSubTopic: env.PubsubTopic(),
contentTopic: env.Message().GetContentTopic(),
payload: string(env.Message().GetPayload()),
}
s.log.Debug("received message ", zap.String("pubSubTopic", received.pubSubTopic), zap.String("contentTopic", received.contentTopic), zap.String("payload", received.payload))
if matchOneOfManyMsg(received, expected) {
found++
}
case <-time.After(3 * time.Second):
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
}
}
}
}()
if fn != nil {
fn()
}
s.wg.Wait()
s.Require().True(msgCount == found)
}
func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
select {
case env, ok := <-ch:
if ok {
s.Require().Fail("should not receive another message", zap.String("payload", string(env.Message().Payload)))
}
case <-time.After(1 * time.Second):
// Timeout elapsed, all good
case <-s.ctx.Done():
s.Require().Fail("waitForTimeout test exceeded allocated time")
}
}()
fn()
s.wg.Wait()
}
func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
sub.Add(contentTopic)
s.contentFilter = sub.ContentFilter
subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.Require().NoError(err)
return subDetails
}
}
s.contentFilter = protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)}
subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.Require().NoError(err)
// Sleep to make sure the filter is subscribed
time.Sleep(1 * time.Second)
return subDetails
}
func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
topicsCount := len(sub.ContentFilter.ContentTopicsList())
if topicsCount == 1 {
_, err := s.lightNode.Unsubscribe(s.ctx, sub.ContentFilter, WithPeer(peer))
s.Require().NoError(err)
} else {
sub.Remove(contentTopic)
}
s.contentFilter = sub.ContentFilter
}
}
return s.lightNode.Subscriptions()
}
func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload ...string) {
var payload string
if len(optionalPayload) > 0 {
payload = optionalPayload[0]
} else {
payload = "123"
}
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), relay.WithPubSubTopic(topic))
s.Require().NoError(err)
}
func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) {
for _, m := range msgs {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(m.contentTopic, utils.GetUnixEpoch(), m.payload), relay.WithPubSubTopic(m.pubSubTopic))
s.Require().NoError(err)
}
}
func prepareData(quantity int, topics, contentTopics, payloads bool, sg tests.StringGenerator) []WakuMsg {
var (
pubsubTopic = defaultTestPubSubTopic // Has to be the same with initial s.testTopic
contentTopic = defaultTestContentTopic // Has to be the same with initial s.testContentTopic
payload = "test_msg"
messages []WakuMsg
strMaxLenght = 4097
generatedString = ""
)
for i := 0; i < quantity; i++ {
msg := WakuMsg{
pubSubTopic: pubsubTopic,
contentTopic: contentTopic,
payload: payload,
}
if sg != nil {
generatedString, _ = sg(strMaxLenght)
}
if topics {
msg.pubSubTopic = fmt.Sprintf("%s%02d%s", pubsubTopic, i, generatedString)
}
if contentTopics {
msg.contentTopic = fmt.Sprintf("%s%02d%s", contentTopic, i, generatedString)
}
if payloads {
msg.payload = fmt.Sprintf("%s%02d%s", payload, i, generatedString)
}
messages = append(messages, msg)
}
return messages
}
func (s *FilterTestSuite) SetupTest() {
log := utils.Logger() //.Named("filterv2-test")
s.log = log
// Use a pointer to WaitGroup so that to avoid copying
// https://pkg.go.dev/sync#WaitGroup
s.wg = &sync.WaitGroup{}
// Create test context
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
s.ctx = ctx
s.ctxCancel = cancel
s.testTopic = defaultTestPubSubTopic
s.testContentTopic = defaultTestContentTopic
s.lightNode = s.makeWakuFilterLightNode(true, true)
//TODO: Add tests to verify broadcaster.
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TearDownTest() {
s.fullNode.Stop()
s.relayNode.Stop()
s.relaySub.Unsubscribe()
s.lightNode.Stop()
s.ctxCancel()
}
func (s *FilterTestSuite) TestRunningGuard() {
s.lightNode.Stop()
s.LightNode.Stop()
contentFilter := protocol.ContentFilter{PubsubTopic: "test", ContentTopics: protocol.NewContentTopicSet("test")}
_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
_, err := s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().ErrorIs(err, service.ErrNotStarted)
err = s.lightNode.Start(s.ctx)
err = s.LightNode.Start(s.ctx)
s.Require().NoError(err)
_, err = s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
_, err = s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
}
@ -390,18 +42,18 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() {
contentFilter := protocol.ContentFilter{PubsubTopic: "test", ContentTopics: protocol.NewContentTopicSet("test")}
_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
_, err := s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
result, err := s.lightNode.Unsubscribe(s.ctx, contentFilter, DontWait())
result, err := s.LightNode.Unsubscribe(s.ctx, contentFilter, DontWait())
s.Require().NoError(err)
s.Require().Equal(0, len(result.Errors()))
_, err = s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
_, err = s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
wg := sync.WaitGroup{}
_, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithWaitGroup(&wg))
_, err = s.LightNode.Unsubscribe(s.ctx, contentFilter, WithWaitGroup(&wg))
wg.Wait()
s.Require().NoError(err)
}
@ -410,18 +62,18 @@ func (s *FilterTestSuite) TestStartStop() {
var wg sync.WaitGroup
wg.Add(2)
s.lightNode = s.makeWakuFilterLightNode(false, false)
s.MakeWakuFilterLightNode()
stopNode := func() {
for i := 0; i < 100000; i++ {
s.lightNode.Stop()
s.LightNode.Stop()
}
wg.Done()
}
startNode := func() {
for i := 0; i < 100; i++ {
err := s.lightNode.Start(context.Background())
err := s.LightNode.Start(context.Background())
if errors.Is(err, service.ErrAlreadyStarted) {
continue
}
@ -441,7 +93,7 @@ func (s *FilterTestSuite) TestAutoShard() {
//Workaround as could not find a way to reuse setup test with params
// Stop what is run in setup
s.fullNode.Stop()
s.lightNode.Stop()
s.LightNode.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds
s.ctx = ctx
s.ctxCancel = cancel
@ -451,102 +103,91 @@ func (s *FilterTestSuite) TestAutoShard() {
s.Require().NoError(err)
//Computing pubSubTopic only for filterFullNode.
pubSubTopic := protocol.GetShardFromContentTopic(cTopic1, protocol.GenerationZeroShardsCount)
s.testContentTopic = cTopic1Str
s.testTopic = pubSubTopic.String()
s.TestContentTopic = cTopic1Str
s.TestTopic = pubSubTopic.String()
s.lightNode = s.makeWakuFilterLightNode(true, false)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(pubSubTopic.String(), false, true)
s.MakeWakuFilterLightNode()
s.StartLightNode()
s.MakeWakuFilterFullNode(pubSubTopic.String(), false)
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
err = s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
s.log.Info("Testing Autoshard:CreateSubscription")
s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
s.Log.Info("Testing Autoshard:CreateSubscription")
s.subscribe("", s.TestContentTopic, s.FullNodeHost.ID())
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""})
// Wrong content topic
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, "TopicB", "second")
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{s.TestTopic, "TopicB", "second"})
_, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
_, err = s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
time.Sleep(1 * time.Second)
// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "third")
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{s.TestTopic, s.TestContentTopic, "third"})
s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID())
s.subscribe("", s.TestContentTopic, s.FullNodeHost.ID())
s.log.Info("Testing Autoshard:SubscriptionPing")
err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Log.Info("Testing Autoshard:SubscriptionPing")
err = s.LightNode.Ping(context.Background(), s.FullNodeHost.ID())
s.Require().NoError(err)
// Test ModifySubscription Subscribe to another content_topic
s.log.Info("Testing Autoshard:ModifySubscription")
s.Log.Info("Testing Autoshard:ModifySubscription")
newContentTopic := "0/test/1/testTopic1/proto"
s.subDetails = s.subscribe("", newContentTopic, s.fullNodeHost.ID())
s.subscribe("", newContentTopic, s.FullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
s.waitForMsg(&WakuMsg{s.TestTopic, newContentTopic, ""})
}, s.subDetails[0].C)
_, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.testTopic,
_, err = s.LightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.TestTopic,
ContentTopics: protocol.NewContentTopicSet(newContentTopic),
})
s.Require().NoError(err)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestLightNodeIsListening() {
messages := prepareData(2, true, true, false, nil)
messages := s.prepareData(2, true, true, false, nil)
// Subscribe with the first message only
s.subDetails = s.subscribe(messages[0].pubSubTopic, messages[0].contentTopic, s.fullNodeHost.ID())
s.subscribe(messages[0].PubSubTopic, messages[0].ContentTopic, s.FullNodeHost.ID())
// IsListening returns true for the first message
listenStatus := s.lightNode.IsListening(messages[0].pubSubTopic, messages[0].contentTopic)
listenStatus := s.LightNode.IsListening(messages[0].PubSubTopic, messages[0].ContentTopic)
s.Require().True(listenStatus)
// IsListening returns false for the second message
listenStatus = s.lightNode.IsListening(messages[1].pubSubTopic, messages[1].contentTopic)
listenStatus = s.LightNode.IsListening(messages[1].PubSubTopic, messages[1].ContentTopic)
s.Require().False(listenStatus)
// IsListening returns false for combination as well
listenStatus = s.lightNode.IsListening(messages[0].pubSubTopic, messages[1].contentTopic)
listenStatus = s.LightNode.IsListening(messages[0].PubSubTopic, messages[1].ContentTopic)
s.Require().False(listenStatus)
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) BeforeTest(suiteName, testName string) {
s.log.Info("Executing ", zap.String("testName", testName))
s.Log.Info("Executing ", zap.String("testName", testName))
}
func (s *FilterTestSuite) AfterTest(suiteName, testName string) {
s.log.Info("Finished executing ", zap.String("testName", testName))
s.Log.Info("Finished executing ", zap.String("testName", testName))
}
func (s *FilterTestSuite) TestStaticSharding() {
log := utils.Logger()
s.log = log
s.Log = log
s.wg = &sync.WaitGroup{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
@ -554,56 +195,40 @@ func (s *FilterTestSuite) TestStaticSharding() {
s.ctxCancel = cancel
// Gen pubsub topic "/waku/2/rs/100/100"
s.testTopic = protocol.NewStaticShardingPubsubTopic(uint16(100), uint16(100)).String()
s.TestTopic = protocol.NewStaticShardingPubsubTopic(uint16(100), uint16(100)).String()
// Pubsub topics for neg. test cases
testTopics := []string{
"/waku/2/rs/100/1024",
"/waku/2/rs/100/101",
}
s.testContentTopic = "/test/10/my-filter-app/proto"
s.TestContentTopic = "/test/10/my-filter-app/proto"
// Prepare new nodes
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)
s.MakeWakuFilterLightNode()
s.StartLightNode()
s.MakeWakuFilterFullNode(s.TestTopic, false)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
s.ConnectHosts(s.LightNodeHost, s.FullNodeHost)
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
msg := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// Test positive case for static shard pubsub topic - message gets received
s.waitForMsg(func() {
_, err := s.relayNode.Publish(s.ctx, msg, relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""})
// Test two negative cases for static shard pubsub topic - message times out
s.waitForTimeout(func() {
_, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[0]))
s.Require().NoError(err)
s.waitForTimeout(&WakuMsg{testTopics[0], s.TestContentTopic, ""})
}, s.subDetails[0].C)
s.waitForTimeout(func() {
_, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[1]))
s.Require().NoError(err)
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{testTopics[1], s.TestContentTopic, ""})
// Cleanup
_, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.testTopic,
ContentTopics: protocol.NewContentTopicSet(s.testContentTopic),
_, err := s.LightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.TestTopic,
ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic),
})
s.Require().NoError(err)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}

View File

@ -3,13 +3,14 @@ package filter
import (
"context"
"crypto/rand"
"strconv"
"time"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
"strconv"
"time"
)
func (s *FilterTestSuite) TestUnsubscribeSingleContentTopic() {
@ -17,68 +18,54 @@ func (s *FilterTestSuite) TestUnsubscribeSingleContentTopic() {
var newContentTopic = "TopicB"
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
s.subscribe(s.TestTopic, newContentTopic, s.FullNodeHost.ID())
// Message is possible to receive for original contentTopic
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "test_msg")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "test_msg"})
// Message is possible to receive for new contentTopic
s.waitForMsg(func() {
s.publishMsg(s.testTopic, newContentTopic, "test_msg")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, newContentTopic, "test_msg"})
_ = s.unsubscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())
_ = s.unsubscribe(s.TestTopic, newContentTopic, s.FullNodeHost.ID())
// Message should not be received for new contentTopic as it was unsubscribed
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, newContentTopic, "test_msg")
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{s.TestTopic, newContentTopic, "test_msg"})
// Message is still possible to receive for original contentTopic
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "test_msg2")
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "test_msg2"})
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestUnsubscribeMultiContentTopic() {
var messages = prepareData(3, false, true, true, nil)
var messages = s.prepareData(3, false, true, true, nil)
// Subscribe with 3 content topics
for _, m := range messages {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
// Unsubscribe with the last 2 content topics
for _, m := range messages[1:] {
_ = s.unsubscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
_ = s.unsubscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())
}
// Messages should not be received for the last two contentTopics as it was unsubscribed
for _, m := range messages[1:] {
s.waitForTimeout(func() {
s.publishMsg(m.pubSubTopic, m.contentTopic, m.payload)
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{m.PubSubTopic, m.ContentTopic, m.Payload})
}
// Message is still possible to receive for the first contentTopic
s.waitForMsg(func() {
s.publishMsg(messages[0].pubSubTopic, messages[0].contentTopic, messages[0].payload)
}, s.subDetails[0].C)
s.waitForMsg(&WakuMsg{messages[0].PubSubTopic, messages[0].ContentTopic, messages[0].Payload})
_, err := s.lightNode.UnsubscribeAll(s.ctx)
_, err := s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
@ -87,39 +74,36 @@ func (s *FilterTestSuite) TestUnsubscribeMultiPubSubMultiContentTopic() {
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second)
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.MakeWakuFilterLightNode()
s.StartLightNode()
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true)
s.MakeWakuFilterFullNode(s.TestTopic, true)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL)
err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
messages := prepareData(2, true, true, true, nil)
messages := s.prepareData(2, true, true, true, nil)
// Subscribe
for _, m := range messages {
s.subDetails = append(s.subDetails, s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())...)
_, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic))
s.subDetails = append(s.subDetails, s.getSub(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())...)
_, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.PubSubTopic))
s.Require().NoError(err)
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
// Unsubscribe
for _, m := range messages {
_ = s.unsubscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
_ = s.unsubscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())
}
// No messages can be received with previous subscriptions
for i, m := range messages {
s.waitForTimeout(func() {
s.publishMsg(m.pubSubTopic, m.contentTopic, m.payload)
}, s.subDetails[i].C)
s.waitForTimeoutFromChan(&WakuMsg{m.PubSubTopic, m.ContentTopic, m.Payload}, s.subDetails[i].C)
}
}
@ -127,100 +111,93 @@ func (s *FilterTestSuite) TestUnsubscribeErrorHandling() {
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second)
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.MakeWakuFilterLightNode()
s.StartLightNode()
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true)
s.MakeWakuFilterFullNode(s.TestTopic, true)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL)
err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
var messages, invalidMessages []WakuMsg
messages = prepareData(2, false, true, true, nil)
messages = s.prepareData(2, false, true, true, nil)
// Prepare "invalid" data for unsubscribe
invalidMessages = append(invalidMessages,
WakuMsg{
pubSubTopic: "",
contentTopic: messages[0].contentTopic,
payload: "N/A",
PubSubTopic: "",
ContentTopic: messages[0].ContentTopic,
Payload: "N/A",
},
WakuMsg{
pubSubTopic: messages[0].pubSubTopic,
contentTopic: "",
payload: "N/A",
PubSubTopic: messages[0].PubSubTopic,
ContentTopic: "",
Payload: "N/A",
},
WakuMsg{
pubSubTopic: "/waku/2/go/filter/not_subscribed",
contentTopic: "not_subscribed_topic",
payload: "N/A",
PubSubTopic: "/waku/2/go/filter/not_subscribed",
ContentTopic: "not_subscribed_topic",
Payload: "N/A",
})
// Subscribe with valid topics
for _, m := range messages {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
_, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic))
s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())
_, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.PubSubTopic))
s.Require().NoError(err)
}
// All messages should be possible to receive for subscribed topics
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
// Unsubscribe with empty pubsub
contentFilter := protocol.ContentFilter{PubsubTopic: invalidMessages[0].pubSubTopic,
ContentTopics: protocol.NewContentTopicSet(invalidMessages[0].contentTopic)}
_, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
contentFilter := protocol.ContentFilter{PubsubTopic: invalidMessages[0].PubSubTopic,
ContentTopics: protocol.NewContentTopicSet(invalidMessages[0].ContentTopic)}
_, err = s.LightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().Error(err)
// Unsubscribe with empty content topic
contentFilter = protocol.ContentFilter{PubsubTopic: invalidMessages[1].pubSubTopic,
ContentTopics: protocol.NewContentTopicSet(invalidMessages[1].contentTopic)}
_, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
contentFilter = protocol.ContentFilter{PubsubTopic: invalidMessages[1].PubSubTopic,
ContentTopics: protocol.NewContentTopicSet(invalidMessages[1].ContentTopic)}
_, err = s.LightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().Error(err)
// Unsubscribe with non-existent topics, expect no error to prevent attacker from topic guessing
contentFilter = protocol.ContentFilter{PubsubTopic: invalidMessages[2].pubSubTopic,
ContentTopics: protocol.NewContentTopicSet(invalidMessages[2].contentTopic)}
_, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
contentFilter = protocol.ContentFilter{PubsubTopic: invalidMessages[2].PubSubTopic,
ContentTopics: protocol.NewContentTopicSet(invalidMessages[2].ContentTopic)}
_, err = s.LightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
// All messages should be still possible to receive for subscribed topics
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
_, err = s.lightNode.UnsubscribeAll(s.ctx)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TestUnsubscribeAllWithoutContentTopics() {
var messages = prepareData(2, false, true, true, nil)
var messages = s.prepareData(2, false, true, true, nil)
// Subscribe with 2 content topics
for _, m := range messages {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
// Unsubscribe all with peer specification
_, err := s.lightNode.UnsubscribeAll(s.ctx, WithPeer(s.fullNodeHost.ID()))
_, err := s.LightNode.UnsubscribeAll(s.ctx, WithPeer(s.FullNodeHost.ID()))
s.Require().NoError(err)
// Messages should not be received for any contentTopics
for _, m := range messages {
s.waitForTimeout(func() {
s.publishMsg(m.pubSubTopic, m.contentTopic, m.payload)
}, s.subDetails[0].C)
s.waitForTimeout(&WakuMsg{m.PubSubTopic, m.ContentTopic, m.Payload})
}
}
@ -228,75 +205,68 @@ func (s *FilterTestSuite) TestUnsubscribeAllDiffPubSubContentTopics() {
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second)
s.lightNode = s.makeWakuFilterLightNode(true, true)
s.MakeWakuFilterLightNode()
s.StartLightNode()
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true)
s.MakeWakuFilterFullNode(s.TestTopic, true)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL)
err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
messages := prepareData(2, true, true, true, nil)
messages := s.prepareData(2, true, true, true, nil)
// Subscribe
for _, m := range messages {
s.subDetails = append(s.subDetails, s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())...)
_, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic))
s.subDetails = append(s.subDetails, s.getSub(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())...)
_, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.PubSubTopic))
s.Require().NoError(err)
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
// Unsubscribe all without any specification
_, err = s.lightNode.UnsubscribeAll(s.ctx)
_, err = s.LightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
// No messages can be received with previous subscriptions
for i, m := range messages {
s.waitForTimeout(func() {
s.publishMsg(m.pubSubTopic, m.contentTopic, m.payload)
}, s.subDetails[i].C)
s.waitForTimeoutFromChan(&WakuMsg{m.PubSubTopic, m.ContentTopic, m.Payload}, s.subDetails[i].C)
}
}
func (s *FilterTestSuite) TestUnsubscribeAllUnrelatedPeer() {
var messages = prepareData(2, false, true, false, nil)
var messages = s.prepareData(2, false, true, false, nil)
// Subscribe with 2 content topics
for _, m := range messages {
s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())
s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())
}
// All messages should be received
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
// Create new host - not related to any node
host, err := tests.MakeHost(context.Background(), 12345, rand.Reader)
s.Require().NoError(err)
s.log.Info("Host ID", logging.HostID("FullNode", s.fullNodeHost.ID()))
s.log.Info("Host ID", logging.HostID("LightNode", s.lightNodeHost.ID()))
s.log.Info("Host ID", logging.HostID("Unrelated", host.ID()))
s.Log.Info("Host ID", logging.HostID("FullNode", s.FullNodeHost.ID()))
s.Log.Info("Host ID", logging.HostID("LightNode", s.LightNodeHost.ID()))
s.Log.Info("Host ID", logging.HostID("Unrelated", host.ID()))
// Unsubscribe all with unrelated peer specification
pushResult, err := s.lightNode.UnsubscribeAll(s.ctx, WithPeer(host.ID()))
pushResult, err := s.LightNode.UnsubscribeAll(s.ctx, WithPeer(host.ID()))
for e := range pushResult.errs {
s.log.Info("Push Result ", zap.String("error", strconv.Itoa(e)))
s.Log.Info("Push Result ", zap.String("error", strconv.Itoa(e)))
}
// All messages should be received because peer ID used was not related to any subscription
s.waitForMessages(func() {
s.publishMessages(messages)
}, s.subDetails, messages)
s.waitForMessages(messages)
// Expect error for unsubscribe from non existing peer
s.Require().Error(err)

View File

@ -128,7 +128,7 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream
wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start))
logger.Info("received request", zap.String("requestType", subscribeRequest.FilterSubscribeType.String()))
logger.Info("received request", zap.Stringer("serverID", wf.h.ID()), zap.Stringer("requestType", subscribeRequest.FilterSubscribeType))
}
}

View File

@ -0,0 +1,390 @@
package filter
import (
"context"
"crypto/rand"
"fmt"
"strconv"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
type LightNodeData struct {
LightNode *WakuFilterLightNode
LightNodeHost host.Host
}
type FullNodeData struct {
relayNode *relay.WakuRelay
RelaySub *relay.Subscription
FullNodeHost host.Host
Broadcaster relay.Broadcaster
fullNode *WakuFilterFullNode
}
type FilterTestSuite struct {
suite.Suite
LightNodeData
FullNodeData
TestTopic string
TestContentTopic string
ctx context.Context
ctxCancel context.CancelFunc
wg *sync.WaitGroup
contentFilter protocol.ContentFilter
subDetails []*subscription.SubscriptionDetails
Log *zap.Logger
}
const DefaultTestPubSubTopic = "/waku/2/go/filter/test"
const DefaultTestContentTopic = "/test/10/my-app"
type WakuMsg struct {
PubSubTopic string
ContentTopic string
Payload string
}
func (s *FilterTestSuite) SetupTest() {
log := utils.Logger() //.Named("filterv2-test")
s.Log = log
s.Log.Info("SetupTest()")
// Use a pointer to WaitGroup so that to avoid copying
// https://pkg.go.dev/sync#WaitGroup
s.wg = &sync.WaitGroup{}
// Create test context
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
s.ctx = ctx
s.ctxCancel = cancel
s.TestTopic = DefaultTestPubSubTopic
s.TestContentTopic = DefaultTestContentTopic
s.MakeWakuFilterLightNode()
s.StartLightNode()
//TODO: Add tests to verify broadcaster.
s.MakeWakuFilterFullNode(s.TestTopic, false)
s.ConnectHosts(s.LightNodeHost, s.FullNodeHost)
}
func (s *FilterTestSuite) TearDownTest() {
s.fullNode.Stop()
s.LightNode.Stop()
s.RelaySub.Unsubscribe()
s.LightNode.Stop()
s.ctxCancel()
}
func (s *FilterTestSuite) ConnectHosts(h1, h2 host.Host) {
h1.Peerstore().AddAddr(h2.ID(), tests.GetHostAddress(h2), peerstore.PermanentAddrTTL)
err := h1.Peerstore().AddProtocols(h2.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
}
func (s *FilterTestSuite) GetWakuRelay(topic string) FullNodeData {
broadcaster := relay.NewBroadcaster(10)
s.Require().NoError(broadcaster.Start(context.Background()))
port, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, relay.WithMaxMsgSize(1024*1024))
relay.SetHost(host)
err = relay.Start(context.Background())
s.Require().NoError(err)
sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic))
s.Require().NoError(err)
return FullNodeData{relay, sub[0], host, broadcaster, nil}
}
func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bool) FullNodeData {
nodeData := s.GetWakuRelay(topic)
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
node2Filter.SetHost(nodeData.FullNodeHost)
var sub *relay.Subscription
if withRegisterAll {
sub = nodeData.Broadcaster.RegisterForAll()
} else {
sub = nodeData.Broadcaster.Register(protocol.NewContentFilter(topic))
}
err := node2Filter.Start(s.ctx, sub)
s.Require().NoError(err)
nodeData.fullNode = node2Filter
return nodeData
}
func (s *FilterTestSuite) MakeWakuFilterFullNode(topic string, withRegisterAll bool) {
nodeData := s.GetWakuFilterFullNode(topic, withRegisterAll)
s.FullNodeData = nodeData
}
func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
port, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
b := relay.NewBroadcaster(10)
s.Require().NoError(b.Start(context.Background()))
filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
filterPush.SetHost(host)
return LightNodeData{filterPush, host}
}
func (s *FilterTestSuite) MakeWakuFilterLightNode() {
s.LightNodeData = s.GetWakuFilterLightNode()
}
func (s *FilterTestSuite) StartLightNode() {
err := s.LightNode.Start(context.Background())
s.Require().NoError(err)
}
func (s *FilterTestSuite) waitForMsg(msg *WakuMsg) {
s.waitForMsgFromChan(msg, s.subDetails[0].C)
}
func (s *FilterTestSuite) waitForMsgFromChan(msg *WakuMsg, ch chan *protocol.Envelope) {
s.wg.Add(1)
var msgFound = false
go func() {
defer s.wg.Done()
select {
case env := <-ch:
for _, topic := range s.contentFilter.ContentTopicsList() {
if topic == env.Message().GetContentTopic() {
msgFound = true
}
}
s.Require().True(msgFound)
case <-time.After(1 * time.Second):
s.Require().Fail("Message timeout")
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
}
}()
if msg != nil {
s.PublishMsg(msg)
}
s.wg.Wait()
}
func matchOneOfManyMsg(one WakuMsg, many []WakuMsg) bool {
for _, m := range many {
if m.PubSubTopic == one.PubSubTopic &&
m.ContentTopic == one.ContentTopic &&
m.Payload == one.Payload {
return true
}
}
return false
}
func (s *FilterTestSuite) waitForMessages(msgs []WakuMsg) {
s.wg.Add(1)
msgCount := len(msgs)
found := 0
subs := s.subDetails
s.Log.Info("Expected messages ", zap.String("count", strconv.Itoa(msgCount)))
s.Log.Info("Existing subscriptions ", zap.String("count", strconv.Itoa(len(subs))))
go func() {
defer s.wg.Done()
for _, sub := range subs {
s.Log.Info("Looking at ", zap.String("pubSubTopic", sub.ContentFilter.PubsubTopic))
for i := 0; i < msgCount; i++ {
select {
case env, ok := <-sub.C:
if !ok {
continue
}
received := WakuMsg{
PubSubTopic: env.PubsubTopic(),
ContentTopic: env.Message().GetContentTopic(),
Payload: string(env.Message().GetPayload()),
}
s.Log.Debug("received message ", zap.String("pubSubTopic", received.PubSubTopic), zap.String("contentTopic", received.ContentTopic), zap.String("payload", received.Payload))
if matchOneOfManyMsg(received, msgs) {
found++
}
case <-time.After(3 * time.Second):
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
}
}
}
}()
if msgs != nil {
s.publishMessages(msgs)
}
s.wg.Wait()
s.Require().Equal(msgCount, found)
}
func (s *FilterTestSuite) waitForTimeout(msg *WakuMsg) {
s.waitForTimeoutFromChan(msg, s.subDetails[0].C)
}
func (s *FilterTestSuite) waitForTimeoutFromChan(msg *WakuMsg, ch chan *protocol.Envelope) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
select {
case env, ok := <-ch:
if ok {
s.Require().Fail("should not receive another message", zap.String("payload", string(env.Message().Payload)))
}
case <-time.After(1 * time.Second):
// Timeout elapsed, all good
case <-s.ctx.Done():
s.Require().Fail("waitForTimeout test exceeded allocated time")
}
}()
s.PublishMsg(msg)
s.wg.Wait()
}
func (s *FilterTestSuite) getSub(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
contentFilter := protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)}
subDetails, err := s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(peer))
s.Require().NoError(err)
time.Sleep(1 * time.Second)
return subDetails
}
func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) {
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
sub.Add(contentTopic)
s.contentFilter = sub.ContentFilter
subDetails, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.subDetails = subDetails
s.Require().NoError(err)
return
}
}
s.subDetails = s.getSub(pubsubTopic, contentTopic, peer)
s.contentFilter = s.subDetails[0].ContentFilter
}
func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
for _, sub := range s.subDetails {
if sub.ContentFilter.PubsubTopic == pubsubTopic {
topicsCount := len(sub.ContentFilter.ContentTopicsList())
if topicsCount == 1 {
_, err := s.LightNode.Unsubscribe(s.ctx, sub.ContentFilter, WithPeer(peer))
s.Require().NoError(err)
} else {
sub.Remove(contentTopic)
}
s.contentFilter = sub.ContentFilter
}
}
return s.LightNode.Subscriptions()
}
func (s *FilterTestSuite) PublishMsg(msg *WakuMsg) {
if len(msg.Payload) == 0 {
msg.Payload = "123"
}
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(msg.ContentTopic, utils.GetUnixEpoch(), msg.Payload), relay.WithPubSubTopic(msg.PubSubTopic))
s.Require().NoError(err)
}
func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) {
for _, m := range msgs {
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(m.ContentTopic, utils.GetUnixEpoch(), m.Payload), relay.WithPubSubTopic(m.PubSubTopic))
s.Require().NoError(err)
}
}
func (s *FilterTestSuite) prepareData(quantity int, topics, contentTopics, payloads bool, sg tests.StringGenerator) []WakuMsg {
var (
pubsubTopic = s.TestTopic // Has to be the same with initial s.testTopic
contentTopic = s.TestContentTopic // Has to be the same with initial s.testContentTopic
payload = "test_msg"
messages []WakuMsg
strMaxLenght = 4097
generatedString = ""
)
for i := 0; i < quantity; i++ {
msg := WakuMsg{
PubSubTopic: pubsubTopic,
ContentTopic: contentTopic,
Payload: payload,
}
if sg != nil {
generatedString, _ = sg(strMaxLenght)
}
if topics {
msg.PubSubTopic = fmt.Sprintf("%s%02d%s", pubsubTopic, i, generatedString)
}
if contentTopics {
msg.ContentTopic = fmt.Sprintf("%s%02d%s", contentTopic, i, generatedString)
}
if payloads {
msg.Payload = fmt.Sprintf("%s%02d%s", payload, i, generatedString)
}
messages = append(messages, msg)
}
return messages
}