From 6e47bd1cf06081f657ca74993b92f288055f8542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vit=E2=88=80ly=20Vlasov?= Date: Wed, 15 May 2024 10:33:59 +0300 Subject: [PATCH] feat: filter subscription management and multiplexing (#1048) Filter wrapper API that takes care of managing subscriptions and multiplex them onto a single data channel to be consumed by user. Co-authored-by: Prem Chaitanya Prathi --- Makefile | 8 +- examples/basic-relay/go.mod | 3 + examples/basic-relay/go.sum | 1 + examples/chat2/go.mod | 3 + examples/chat2/go.sum | 1 + examples/filter2/go.mod | 4 + examples/filter2/go.sum | 1 + examples/noise/go.mod | 4 + examples/noise/go.sum | 1 + examples/rln/go.mod | 4 + examples/rln/go.sum | 1 + go.sum | 2 - tests/connection_test.go | 78 --- waku/v2/api/filter.go | 233 +++++++++ waku/v2/api/filter_test.go | 77 +++ waku/v2/node/wakunode2.go | 2 +- waku/v2/peermanager/topic_event_handler.go | 3 +- waku/v2/peerstore/waku_peer_store.go | 50 +- waku/v2/protocol/content_filter.go | 23 +- waku/v2/protocol/filter/filter_ping_test.go | 14 +- .../filter/filter_proto_ident_test.go | 85 ++- waku/v2/protocol/filter/filter_push_test.go | 89 ++-- .../protocol/filter/filter_subscribe_test.go | 238 ++++----- waku/v2/protocol/filter/filter_test.go | 489 ++---------------- .../filter/filter_unsubscribe_test.go | 196 +++---- waku/v2/protocol/filter/server.go | 2 +- waku/v2/protocol/filter/test_utils.go | 390 ++++++++++++++ 27 files changed, 1103 insertions(+), 899 deletions(-) delete mode 100644 tests/connection_test.go create mode 100644 waku/v2/api/filter.go create mode 100644 waku/v2/api/filter_test.go create mode 100644 waku/v2/protocol/filter/test_utils.go diff --git a/Makefile b/Makefile index df2f096e..6cc4dde0 100644 --- a/Makefile +++ b/Makefile @@ -220,6 +220,12 @@ test-postgres: test-postgres-with-race: ${GOCMD} test -race -p 1 -v -count 1 -tags="${PG_BUILD_TAGS}" github.com/waku-org/go-waku/waku/persistence/... +test-filter: + ${GOCMD} test -v github.com/waku-org/go-waku/waku/v2/protocol/filter -run TestFilterSuite -count=1 + +test-filter-api: + ${GOCMD} test -v github.com/waku-org/go-waku/waku/v2/api -run TestFilterApiSuite + TEST_STOREV3_NODE ?= test-storev3: - TEST_STOREV3_NODE=${TEST_STOREV3_NODE} ${GOCMD} test -p 1 -v -count 1 -tags="${BUILD_TAGS} include_storev3_tests" github.com/waku-org/go-waku/waku/v2/protocol/store/... \ No newline at end of file + TEST_STOREV3_NODE=${TEST_STOREV3_NODE} ${GOCMD} test -p 1 -v -count 1 -tags="${BUILD_TAGS} include_storev3_tests" github.com/waku-org/go-waku/waku/v2/protocol/store/... diff --git a/examples/basic-relay/go.mod b/examples/basic-relay/go.mod index 1504a8b5..7930f9c1 100644 --- a/examples/basic-relay/go.mod +++ b/examples/basic-relay/go.mod @@ -30,6 +30,7 @@ require ( github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect @@ -94,6 +95,7 @@ require ( github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect @@ -106,6 +108,7 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect diff --git a/examples/basic-relay/go.sum b/examples/basic-relay/go.sum index 9d2621f8..ce224109 100644 --- a/examples/basic-relay/go.sum +++ b/examples/basic-relay/go.sum @@ -628,6 +628,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= diff --git a/examples/chat2/go.mod b/examples/chat2/go.mod index 2b8a2834..06b20908 100644 --- a/examples/chat2/go.mod +++ b/examples/chat2/go.mod @@ -38,6 +38,7 @@ require ( github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect @@ -105,6 +106,7 @@ require ( github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect @@ -118,6 +120,7 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect diff --git a/examples/chat2/go.sum b/examples/chat2/go.sum index d3022959..cf0be719 100644 --- a/examples/chat2/go.sum +++ b/examples/chat2/go.sum @@ -660,6 +660,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= diff --git a/examples/filter2/go.mod b/examples/filter2/go.mod index ad17d25b..a299123c 100644 --- a/examples/filter2/go.mod +++ b/examples/filter2/go.mod @@ -26,6 +26,7 @@ require ( github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect @@ -90,6 +91,7 @@ require ( github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect @@ -101,6 +103,7 @@ require ( github.com/raulk/go-watchdog v1.3.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect @@ -126,5 +129,6 @@ require ( golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect golang.org/x/tools v0.14.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/examples/filter2/go.sum b/examples/filter2/go.sum index 6bc4d876..b11572a7 100644 --- a/examples/filter2/go.sum +++ b/examples/filter2/go.sum @@ -625,6 +625,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= diff --git a/examples/noise/go.mod b/examples/noise/go.mod index 623b09c3..f838867b 100644 --- a/examples/noise/go.mod +++ b/examples/noise/go.mod @@ -28,6 +28,7 @@ require ( github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect @@ -92,6 +93,7 @@ require ( github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect @@ -103,6 +105,7 @@ require ( github.com/raulk/go-watchdog v1.3.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect @@ -127,5 +130,6 @@ require ( golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect golang.org/x/tools v0.14.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/examples/noise/go.sum b/examples/noise/go.sum index 62cc4a23..82d7eed7 100644 --- a/examples/noise/go.sum +++ b/examples/noise/go.sum @@ -625,6 +625,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= diff --git a/examples/rln/go.mod b/examples/rln/go.mod index 25e7bd68..60b5b235 100644 --- a/examples/rln/go.mod +++ b/examples/rln/go.mod @@ -26,6 +26,7 @@ require ( github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect @@ -91,6 +92,7 @@ require ( github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect @@ -102,6 +104,7 @@ require ( github.com/raulk/go-watchdog v1.3.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect @@ -126,5 +129,6 @@ require ( golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect golang.org/x/tools v0.14.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/examples/rln/go.sum b/examples/rln/go.sum index 6bc4d876..b11572a7 100644 --- a/examples/rln/go.sum +++ b/examples/rln/go.sum @@ -625,6 +625,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= diff --git a/go.sum b/go.sum index 946fc732..23b46338 100644 --- a/go.sum +++ b/go.sum @@ -1531,8 +1531,6 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= -github.com/waku-org/go-discover v0.0.0-20240129014929-85f2c00b96a3 h1:Kk0KYXZE/uNnARF2TbCQyvyZ/w4SgF8VhquNdOVVsNU= -github.com/waku-org/go-discover v0.0.0-20240129014929-85f2c00b96a3/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97JryAEV8pRXB//qPcg+8bPXl/O+AOLt3FeCKc= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= diff --git a/tests/connection_test.go b/tests/connection_test.go deleted file mode 100644 index 2efa1e50..00000000 --- a/tests/connection_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package tests - -import ( - "context" - "net" - "testing" - - "github.com/ethereum/go-ethereum/crypto" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" - - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/payload" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func TestBasicSendingReceiving(t *testing.T) { - hostAddr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") - require.NoError(t, err) - - key, err := RandomHex(32) - require.NoError(t, err) - - prvKey, err := crypto.HexToECDSA(key) - require.NoError(t, err) - - ctx := context.Background() - - wakuNode, err := node.New( - node.WithPrivateKey(prvKey), - node.WithHostAddress(hostAddr), - node.WithWakuRelay(), - ) - require.NoError(t, err) - require.NotNil(t, wakuNode) - - err = wakuNode.Start(ctx) - require.NoError(t, err) - - require.NoError(t, write(ctx, wakuNode, "test")) - - sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) - require.NoError(t, err) - - value := <-sub[0].Ch - payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None}) - require.NoError(t, err) - - require.Contains(t, string(payload.Data), "test") -} - -func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) error { - var contentTopic string = "test" - version := uint32(0) - timestamp := utils.GetUnixEpoch() - - p := new(payload.Payload) - p.Data = []byte(wakuNode.ID() + ": " + msgContent) - p.Key = &payload.KeyInfo{Kind: payload.None} - - payload, err := p.Encode(version) - if err != nil { - return err - } - - msg := &pb.WakuMessage{ - Payload: payload, - Version: proto.Uint32(version), - ContentTopic: contentTopic, - Timestamp: timestamp, - } - - _, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()) - return err -} diff --git a/waku/v2/api/filter.go b/waku/v2/api/filter.go new file mode 100644 index 00000000..f6f83f14 --- /dev/null +++ b/waku/v2/api/filter.go @@ -0,0 +1,233 @@ +package api + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "go.uber.org/zap" + "golang.org/x/exp/maps" +) + +const FilterPingTimeout = 5 * time.Second +const MultiplexChannelBuffer = 100 + +type FilterConfig struct { + MaxPeers int `json:"maxPeers"` + Peers []peer.ID `json:"peers"` +} + +func (fc FilterConfig) String() string { + jsonStr, err := json.Marshal(fc) + if err != nil { + return "" + } + return string(jsonStr) +} + +type Sub struct { + ContentFilter protocol.ContentFilter + DataCh chan *protocol.Envelope + Config FilterConfig + subs subscription.SubscriptionSet + wf *filter.WakuFilterLightNode + ctx context.Context + cancel context.CancelFunc + log *zap.Logger +} + +// Subscribe +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) { + sub := new(Sub) + sub.wf = wf + sub.ctx, sub.cancel = context.WithCancel(ctx) + sub.subs = make(subscription.SubscriptionSet) + sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer) + sub.ContentFilter = contentFilter + sub.Config = config + sub.log = log.Named("filter-api") + sub.log.Debug("filter subscribe params", zap.Int("maxPeers", config.MaxPeers), zap.Stringer("contentFilter", contentFilter)) + subs, err := sub.subscribe(contentFilter, sub.Config.MaxPeers) + + if err != nil { + return nil, err + } + sub.multiplex(subs) + go sub.healthCheckLoop() + return sub, nil +} + +func (apiSub *Sub) Unsubscribe() { + apiSub.cancel() + +} + +func (apiSub *Sub) healthCheckLoop() { + // Health checks + ticker := time.NewTicker(FilterPingTimeout) + defer ticker.Stop() + for { + select { + case <-apiSub.ctx.Done(): + apiSub.log.Debug("healthCheckLoop: Done()") + apiSub.cleanup() + return + case <-ticker.C: + apiSub.log.Debug("healthCheckLoop: checkAliveness()") + topicCounts := apiSub.getTopicCounts() + apiSub.resubscribe(topicCounts) + } + } + +} + +func (apiSub *Sub) cleanup() { + apiSub.log.Debug("ENTER cleanup()") + defer func() { + apiSub.log.Debug("EXIT cleanup()") + }() + + for _, s := range apiSub.subs { + _, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s) + if err != nil { + //Logging with info as this is part of cleanup + apiSub.log.Info("failed to unsubscribe filter", zap.Error(err)) + } + } + close(apiSub.DataCh) + +} + +// Returns active sub counts for each pubsub topic +func (apiSub *Sub) getTopicCounts() map[string]int { + // Buffered chan for sub aliveness results + type CheckResult struct { + sub *subscription.SubscriptionDetails + alive bool + } + checkResults := make(chan CheckResult, len(apiSub.subs)) + var wg sync.WaitGroup + + // Run pings asynchronously + for _, s := range apiSub.subs { + wg.Add(1) + go func(sub *subscription.SubscriptionDetails) { + defer wg.Done() + ctx, cancelFunc := context.WithTimeout(apiSub.ctx, FilterPingTimeout) + defer cancelFunc() + err := apiSub.wf.IsSubscriptionAlive(ctx, sub) + + apiSub.log.Debug("Check result:", zap.Any("subID", sub.ID), zap.Bool("result", err == nil)) + checkResults <- CheckResult{sub, err == nil} + }(s) + } + + // Collect healthy topic counts + topicCounts := make(map[string]int) + + topicMap, _ := protocol.ContentFilterToPubSubTopicMap(apiSub.ContentFilter) + for _, t := range maps.Keys(topicMap) { + topicCounts[t] = 0 + } + wg.Wait() + close(checkResults) + for s := range checkResults { + if !s.alive { + // Close inactive subs + s.sub.Close() + delete(apiSub.subs, s.sub.ID) + } else { + topicCounts[s.sub.ContentFilter.PubsubTopic]++ + } + } + + return topicCounts +} + +// Attempts to resubscribe on topics that lack subscriptions +func (apiSub *Sub) resubscribe(topicCounts map[string]int) { + + // Delete healthy topics + for t, cnt := range topicCounts { + if cnt == apiSub.Config.MaxPeers { + delete(topicCounts, t) + } + } + + if len(topicCounts) == 0 { + // All topics healthy, return + return + } + var wg sync.WaitGroup + + // Re-subscribe asynchronously + newSubs := make(chan []*subscription.SubscriptionDetails) + + for t, cnt := range topicCounts { + cFilter := protocol.ContentFilter{PubsubTopic: t, ContentTopics: apiSub.ContentFilter.ContentTopics} + wg.Add(1) + go func(count int) { + defer wg.Done() + subs, err := apiSub.subscribe(cFilter, apiSub.Config.MaxPeers-count) + if err != nil { + return + } //Not handling scenario where all requested subs are not received as that will get handled in next cycle. + newSubs <- subs + }(cnt) + } + wg.Wait() + close(newSubs) + apiSub.log.Debug("resubscribe(): before range newSubs") + for subs := range newSubs { + if subs != nil { + apiSub.multiplex(subs) + } + } + apiSub.log.Debug("checkAliveness(): close(newSubs)") + //close(newSubs) +} + +func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int) ([]*subscription.SubscriptionDetails, error) { + // Low-level subscribe, returns a set of SubscriptionDetails + options := make([]filter.FilterSubscribeOption, 0) + options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount))) + for _, p := range apiSub.Config.Peers { + options = append(options, filter.WithPeer(p)) + } + subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...) + + if err != nil { + if len(subs) > 0 { + // Partial Failure, for now proceed as we don't expect this to happen wrt specific topics. + // Rather it can happen in case subscription with one of the peer fails. + // This can further get automatically handled at resubscribe, + apiSub.log.Error("partial failure in Filter subscribe", zap.Error(err)) + return subs, nil + } + // In case of complete subscription failure, application or user needs to handle and probably retry based on error + // TODO: Once filter error handling indicates specific error, this can be addressed based on the error at this layer. + return nil, err + } + + return subs, nil +} + +func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { + + // Multiplex onto single channel + // Goroutines will exit once sub channels are closed + for _, subDetails := range subs { + apiSub.subs[subDetails.ID] = subDetails + go func(subDetails *subscription.SubscriptionDetails) { + apiSub.log.Debug("New multiplex", zap.String("subID", subDetails.ID)) + for env := range subDetails.C { + apiSub.DataCh <- env + } + }(subDetails) + } +} diff --git a/waku/v2/api/filter_test.go b/waku/v2/api/filter_test.go new file mode 100644 index 00000000..22432e7e --- /dev/null +++ b/waku/v2/api/filter_test.go @@ -0,0 +1,77 @@ +package api + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/suite" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "go.uber.org/zap" + "golang.org/x/exp/maps" +) + +func TestFilterApiSuite(t *testing.T) { + suite.Run(t, new(FilterApiTestSuite)) +} + +type FilterApiTestSuite struct { + filter.FilterTestSuite +} + +func (s *FilterApiTestSuite) SetupTest() { + s.FilterTestSuite.SetupTest() + s.Log.Info("SetupTest()") +} + +func (s *FilterApiTestSuite) TearDownTest() { + s.FilterTestSuite.TearDownTest() +} + +func (s *FilterApiTestSuite) TestSubscribe() { + contentFilter := protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} + + // We have one full node already created in SetupTest(), + // create another one + fullNodeData2 := s.GetWakuFilterFullNode(s.TestTopic, true) + s.ConnectHosts(s.LightNodeHost, fullNodeData2.FullNodeHost) + peers := []peer.ID{s.FullNodeHost.ID(), fullNodeData2.FullNodeHost.ID()} + s.Log.Info("FullNodeHost IDs:", zap.Any("peers", peers)) + // Make sure IDs are different + s.Require().True(peers[0] != peers[1]) + apiConfig := FilterConfig{MaxPeers: 2, Peers: peers} + + s.Require().Equal(apiConfig.MaxPeers, 2) + s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) + + s.Log.Info("About to perform API Subscribe()") + apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log) + s.Require().NoError(err) + s.Require().Equal(apiSub.ContentFilter, contentFilter) + s.Log.Info("Subscribed") + + s.Require().Len(apiSub.subs, 2) + for sub := range apiSub.subs { + s.Log.Info("SubDetails:", zap.String("id", sub)) + } + subsArray := maps.Keys(apiSub.subs) + s.Require().True(subsArray[0] != subsArray[1]) + // Publish msg and confirm it's received twice because of multiplexing + s.PublishMsg(&filter.WakuMsg{PubSubTopic: s.TestTopic, ContentTopic: s.TestContentTopic, Payload: "Test msg"}) + cnt := 0 + for msg := range apiSub.DataCh { + s.Log.Info("Received msg:", zap.Int("cnt", cnt), zap.String("payload", string(msg.Message().Payload))) + cnt++ + break + } + s.Require().Equal(cnt, 1) + + time.Sleep(2 * time.Second) + apiSub.Unsubscribe() + for range apiSub.DataCh { + } + s.Log.Info("DataCh is closed") + +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 323958e5..e2f95b7c 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -843,7 +843,7 @@ func (w *WakuNode) Peers() ([]*Peer, error) { Protocols: protocols, Connected: connected, Addrs: addrs, - PubsubTopics: topics, + PubsubTopics: maps.Keys(topics), }) } return peers, nil diff --git a/waku/v2/peermanager/topic_event_handler.go b/waku/v2/peermanager/topic_event_handler.go index 60e1f759..3060bf7b 100644 --- a/waku/v2/peermanager/topic_event_handler.go +++ b/waku/v2/peermanager/topic_event_handler.go @@ -13,6 +13,7 @@ import ( waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" + "golang.org/x/exp/maps" ) func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error { @@ -103,7 +104,7 @@ func (pm *PeerManager) handleNewRelayTopicUnSubscription(pubsubTopic string) { logging.HostID("peerID", peer)) continue } - if len(peerTopics) == 1 && peerTopics[0] == pubsubTopic { + if len(peerTopics) == 1 && maps.Keys(peerTopics)[0] == pubsubTopic { err := pm.host.Network().ClosePeer(peer) if err != nil { pm.logger.Warn("Failed to disconnect connection towards peer", diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index 402d265a..203b2310 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -8,6 +8,8 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" + "golang.org/x/exp/maps" ) // Origin is used to determine how the peer is identified, @@ -58,7 +60,7 @@ type WakuPeerstore interface { AddPubSubTopic(p peer.ID, topic string) error RemovePubSubTopic(p peer.ID, topic string) error - PubSubTopics(p peer.ID) ([]string, error) + PubSubTopics(p peer.ID) (protocol.TopicSet, error) SetPubSubTopics(p peer.ID, topics []string) error PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice @@ -175,13 +177,12 @@ func (ps *WakuPeerstoreImpl) AddPubSubTopic(p peer.ID, topic string) error { if err != nil { return err } - for _, t := range existingTopics { - if t == topic { - return nil - } + + if _, found := existingTopics[topic]; found { + return nil } - existingTopics = append(existingTopics, topic) - return ps.peerStore.Put(p, peerPubSubTopics, existingTopics) + existingTopics[topic] = struct{}{} + return ps.peerStore.Put(p, peerPubSubTopics, maps.Keys(existingTopics)) } // RemovePubSubTopic removes a pubSubTopic from the peer @@ -195,14 +196,9 @@ func (ps *WakuPeerstoreImpl) RemovePubSubTopic(p peer.ID, topic string) error { return nil } - for i := range existingTopics { - if existingTopics[i] == topic { - existingTopics = append(existingTopics[:i], existingTopics[i+1:]...) - break - } - } + delete(existingTopics, topic) - err = ps.SetPubSubTopics(p, existingTopics) + err = ps.SetPubSubTopics(p, maps.Keys(existingTopics)) if err != nil { return err } @@ -215,16 +211,16 @@ func (ps *WakuPeerstoreImpl) SetPubSubTopics(p peer.ID, topics []string) error { } // PubSubTopics fetches list of pubSubTopics for a peer -func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) { +func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) (protocol.TopicSet, error) { result, err := ps.peerStore.Get(p, peerPubSubTopics) if err != nil { if errors.Is(err, peerstore.ErrNotFound) { - return nil, nil + return protocol.NewTopicSet(), nil } else { return nil, err } } - return result.([]string), nil + return protocol.NewTopicSet((result.([]string))...), nil } // PeersByPubSubTopic Returns list of peers that support list of pubSubTopics @@ -235,22 +231,16 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific } var result peer.IDSlice for _, p := range specificPeers { - topics, err := ps.PubSubTopics(p) + peerMatch := true + peerTopics, err := ps.PubSubTopics(p) if err == nil { - //Convoluted and crazy logic to find subset of topics - // Could not find a better way to do it? - peerTopicMap := make(map[string]struct{}) - match := true - for _, topic := range topics { - peerTopicMap[topic] = struct{}{} - } - for _, topic := range pubSubTopics { - if _, ok := peerTopicMap[topic]; !ok { - match = false + for _, t := range pubSubTopics { + if _, ok := peerTopics[t]; !ok { + peerMatch = false break } } - if match { + if peerMatch { result = append(result, p) } } //Note: skipping a peer in case of an error as there would be others available. @@ -268,7 +258,7 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeer for _, p := range specificPeers { topics, err := ps.PubSubTopics(p) if err == nil { - for _, topic := range topics { + for topic := range topics { if topic == pubSubTopic { result = append(result, p) } diff --git a/waku/v2/protocol/content_filter.go b/waku/v2/protocol/content_filter.go index f09cf52b..0a859e0d 100644 --- a/waku/v2/protocol/content_filter.go +++ b/waku/v2/protocol/content_filter.go @@ -1,11 +1,22 @@ package protocol -import "golang.org/x/exp/maps" +import ( + "golang.org/x/exp/maps" +) type PubsubTopicStr = string type ContentTopicStr = string type ContentTopicSet map[string]struct{} +type TopicSet map[string]struct{} + +func NewTopicSet(topics ...string) TopicSet { + s := make(TopicSet, len(topics)) + for _, t := range topics { + s[t] = struct{}{} + } + return s +} func NewContentTopicSet(contentTopics ...string) ContentTopicSet { s := make(ContentTopicSet, len(contentTopics)) @@ -28,6 +39,16 @@ type ContentFilter struct { ContentTopics ContentTopicSet `json:"contentTopics"` } +func (cf ContentFilter) String() string { + var ret string + ret += "{ pubsubTopic: " + cf.PubsubTopic + ", contentTopics: [ " + for ct := range cf.ContentTopics { + ret += ct + } + ret += " ] }" + return ret +} + func (cf ContentFilter) ContentTopicsList() []string { return cf.ContentTopics.ToList() } diff --git a/waku/v2/protocol/filter/filter_ping_test.go b/waku/v2/protocol/filter/filter_ping_test.go index 07fb8d83..cc6dfb2a 100644 --- a/waku/v2/protocol/filter/filter_ping_test.go +++ b/waku/v2/protocol/filter/filter_ping_test.go @@ -6,29 +6,29 @@ import ( ) func (s *FilterTestSuite) TestSubscriptionPing() { - err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + err := s.LightNode.Ping(context.Background(), s.FullNodeHost.ID()) s.Require().Error(err) filterErr, ok := err.(*FilterError) s.Require().True(ok) s.Require().Equal(filterErr.Code, http.StatusNotFound) contentTopic := "abc" - s.subDetails = s.subscribe(s.testTopic, contentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, contentTopic, s.FullNodeHost.ID()) - err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + err = s.LightNode.Ping(context.Background(), s.FullNodeHost.ID()) s.Require().NoError(err) } func (s *FilterTestSuite) TestUnSubscriptionPing() { - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) - err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + err := s.LightNode.Ping(context.Background(), s.FullNodeHost.ID()) s.Require().NoError(err) - _, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + _, err = s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) - err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + err = s.LightNode.Ping(context.Background(), s.FullNodeHost.ID()) s.Require().Error(err) } diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go index 4fae951f..549071a1 100644 --- a/waku/v2/protocol/filter/filter_proto_ident_test.go +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -30,52 +30,32 @@ import ( func (s *FilterTestSuite) TestCreateSubscription() { // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - s.waitForMsg(func() { - _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) - s.Require().NoError(err) - - }, s.subDetails[0].C) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""}) } func (s *FilterTestSuite) TestModifySubscription() { // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) - s.waitForMsg(func() { - _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) - s.Require().NoError(err) - - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""}) // Subscribe to another content_topic newContentTopic := "Topic_modified" - s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, newContentTopic, s.FullNodeHost.ID()) - s.waitForMsg(func() { - _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) - s.Require().NoError(err) - - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, newContentTopic, ""}) } func (s *FilterTestSuite) TestMultipleMessages() { // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) - s.waitForMsg(func() { - _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), relay.WithPubSubTopic(s.testTopic)) - s.Require().NoError(err) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "first"}) - }, s.subDetails[0].C) - - s.waitForMsg(func() { - _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic)) - s.Require().NoError(err) - - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "second"}) } func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters, @@ -222,28 +202,29 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi func (s *FilterTestSuite) TestIncorrectSubscribeIdentifier() { log := utils.Logger() - s.log = log + s.Log = log s.wg = &sync.WaitGroup{} // Create test context s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - s.testTopic = defaultTestPubSubTopic - s.testContentTopic = defaultTestContentTopic + s.TestTopic = DefaultTestPubSubTopic + s.TestContentTopic = DefaultTestContentTopic - s.lightNode = s.makeWakuFilterLightNode(true, true) + s.MakeWakuFilterLightNode() + s.StartLightNode() - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) + s.MakeWakuFilterFullNode(s.TestTopic, false) //Connect nodes - s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) // Subscribe with incorrect SubscribeID - s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)} - _, err := s.lightNode.IncorrectSubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} + _, err := s.LightNode.IncorrectSubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().Error(err) - _, err = s.lightNode.UnsubscribeAll(s.ctx) + _, err = s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -259,7 +240,7 @@ func (wf *WakuFilterLightNode) startWithIncorrectPushProto() error { func (s *FilterTestSuite) TestIncorrectPushIdentifier() { log := utils.Logger() - s.log = log + s.Log = log s.wg = &sync.WaitGroup{} // Create test context @@ -267,43 +248,43 @@ func (s *FilterTestSuite) TestIncorrectPushIdentifier() { s.ctx = ctx s.ctxCancel = cancel - s.testTopic = defaultTestPubSubTopic - s.testContentTopic = defaultTestContentTopic + s.TestTopic = DefaultTestPubSubTopic + s.TestContentTopic = DefaultTestContentTopic - s.lightNode = s.makeWakuFilterLightNode(false, true) + s.MakeWakuFilterLightNode() - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) + s.MakeWakuFilterFullNode(s.TestTopic, false) // Re-start light node with unsupported prefix for match func - s.lightNode.Stop() - err := s.lightNode.CommonService.Start(s.ctx, s.lightNode.startWithIncorrectPushProto) + s.LightNode.Stop() + err := s.LightNode.CommonService.Start(s.ctx, s.LightNode.startWithIncorrectPushProto) s.Require().NoError(err) // Connect nodes - s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) - err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) + err = s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) s.Require().NoError(err) // Subscribe - s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)} - s.subDetails, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} + s.subDetails, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) time.Sleep(1 * time.Second) // Send message - _, err = s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic)) + _, err = s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.TestContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.TestTopic)) s.Require().NoError(err) // Message should never arrive -> exit after timeout select { case msg := <-s.subDetails[0].C: - s.log.Info("Light node received a msg") + s.Log.Info("Light node received a msg") s.Require().Nil(msg) case <-time.After(1 * time.Second): s.Require().True(true) } - _, err = s.lightNode.UnsubscribeAll(s.ctx) + _, err = s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } diff --git a/waku/v2/protocol/filter/filter_push_test.go b/waku/v2/protocol/filter/filter_push_test.go index e5bc530f..e3aa106f 100644 --- a/waku/v2/protocol/filter/filter_push_test.go +++ b/waku/v2/protocol/filter/filter_push_test.go @@ -2,28 +2,27 @@ package filter import ( "context" + "strconv" + "time" + "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "strconv" - "time" ) func (s *FilterTestSuite) TestValidPayloadsASCII() { // Subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomASCIIString) + messages := s.prepareData(100, false, false, true, tests.GenerateRandomASCIIString) // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -31,17 +30,15 @@ func (s *FilterTestSuite) TestValidPayloadsASCII() { func (s *FilterTestSuite) TestValidPayloadsUTF8() { // Subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomUTF8String) + messages := s.prepareData(100, false, false, true, tests.GenerateRandomUTF8String) // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -49,17 +46,15 @@ func (s *FilterTestSuite) TestValidPayloadsUTF8() { func (s *FilterTestSuite) TestValidPayloadsBase64() { // Subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomBase64String) + messages := s.prepareData(100, false, false, true, tests.GenerateRandomBase64String) // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -67,17 +62,15 @@ func (s *FilterTestSuite) TestValidPayloadsBase64() { func (s *FilterTestSuite) TestValidPayloadsJSON() { // Subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomJSONString) + messages := s.prepareData(100, false, false, true, tests.GenerateRandomJSONString) // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -85,17 +78,15 @@ func (s *FilterTestSuite) TestValidPayloadsJSON() { func (s *FilterTestSuite) TestValidPayloadsURLEncoded() { // Subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomURLEncodedString) + messages := s.prepareData(100, false, false, true, tests.GenerateRandomURLEncodedString) // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -103,17 +94,15 @@ func (s *FilterTestSuite) TestValidPayloadsURLEncoded() { func (s *FilterTestSuite) TestValidPayloadsSQL() { // Subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomSQLInsert) + messages := s.prepareData(100, false, false, true, tests.GenerateRandomSQLInsert) // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -123,23 +112,21 @@ func (s *FilterTestSuite) TestLargePayloadsUTF8() { s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 40*time.Second) // Subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Prepare basic data - messages := prepareData(10, false, false, false, nil) + messages := s.prepareData(10, false, false, false, nil) // Generate large string for i := range messages { - messages[i].payload, _ = tests.GenerateRandomUTF8String(153600) - s.log.Info("Generated payload with ", zap.String("length", strconv.Itoa(len(messages[i].payload)))) + messages[i].Payload, _ = tests.GenerateRandomUTF8String(153600) + s.Log.Info("Generated payload with ", zap.String("length", strconv.Itoa(len(messages[i].Payload)))) } // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -147,19 +134,19 @@ func (s *FilterTestSuite) TestLargePayloadsUTF8() { func (s *FilterTestSuite) TestFuturePayloadEncryptionVersion() { // Subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) - message := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "test_payload") + message := tests.CreateWakuMessage(s.TestContentTopic, utils.GetUnixEpoch(), "test_payload") futureVersion := uint32(100) message.Version = &futureVersion // Should get accepted - _, err := s.relayNode.Publish(s.ctx, message, relay.WithPubSubTopic(s.testTopic)) + _, err := s.relayNode.Publish(s.ctx, message, relay.WithPubSubTopic(s.TestTopic)) s.Require().NoError(err) // Should be received - s.waitForMsg(nil, s.subDetails[0].C) + s.waitForMsg(nil) - _, err = s.lightNode.UnsubscribeAll(s.ctx) + _, err = s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } diff --git a/waku/v2/protocol/filter/filter_subscribe_test.go b/waku/v2/protocol/filter/filter_subscribe_test.go index b7579f8c..17e23a27 100644 --- a/waku/v2/protocol/filter/filter_subscribe_test.go +++ b/waku/v2/protocol/filter/filter_subscribe_test.go @@ -18,76 +18,61 @@ import ( func (s *FilterTestSuite) TestWakuFilter() { // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Should be received - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "first") - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "first"}) // Wrong content topic - s.waitForTimeout(func() { - s.publishMsg(s.testTopic, "TopicB", "second") - }, s.subDetails[0].C) + s.waitForTimeout(&WakuMsg{s.TestTopic, "TopicB", "second"}) - _, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + _, err := s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) // Should not receive after unsubscribe - s.waitForTimeout(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "third") - }, s.subDetails[0].C) + s.waitForTimeout(&WakuMsg{s.TestTopic, s.TestContentTopic, "third"}) // Two new subscriptions with same [peer, contentFilter] - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - secondSub := s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) + + secondSub := s.getSub(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Assert that we have 2 subscriptions now - s.Require().Equal(len(s.lightNode.Subscriptions()), 2) + s.Require().Equal(len(s.LightNode.Subscriptions()), 2) // Should be received on both subscriptions - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "fourth") - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "fourth"}) - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "fifth") - }, secondSub[0].C) + s.waitForMsgFromChan(&WakuMsg{s.TestTopic, s.TestContentTopic, "fifth"}, secondSub[0].C) - s.waitForMsg(nil, s.subDetails[0].C) - s.waitForMsg(nil, secondSub[0].C) + s.waitForMsg(nil) + s.waitForMsgFromChan(nil, secondSub[0].C) // Unsubscribe from second sub only - _, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0]) + _, err = s.LightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0]) s.Require().NoError(err) // Should still receive - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "sixth") - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "sixth"}) // Unsubscribe from first sub only - _, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0]) + _, err = s.LightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0]) s.Require().NoError(err) - s.Require().Equal(len(s.lightNode.Subscriptions()), 0) + s.Require().Equal(len(s.LightNode.Subscriptions()), 0) // Should not receive after unsubscribe - s.waitForTimeout(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "seventh") - }, s.subDetails[0].C) + s.waitForTimeout(&WakuMsg{s.TestTopic, s.TestContentTopic, "seventh"}) } func (s *FilterTestSuite) TestPubSubSingleContentTopic() { // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Message should be received - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "test_msg") - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "test_msg"}) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -97,19 +82,17 @@ func (s *FilterTestSuite) TestPubSubMultiContentTopic() { // Create test context s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds - messages := prepareData(3, false, true, false, nil) + messages := s.prepareData(3, false, true, false, nil) // Subscribe for _, m := range messages { - s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID()) } // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -119,36 +102,35 @@ func (s *FilterTestSuite) TestMultiPubSubMultiContentTopic() { // Create test context s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 20 seconds - s.lightNode = s.makeWakuFilterLightNode(true, true) + s.MakeWakuFilterLightNode() + s.StartLightNode() - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true) + s.MakeWakuFilterFullNode(s.TestTopic, true) // Connect nodes - s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) - err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) + err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) s.Require().NoError(err) - messages := prepareData(2, true, true, false, nil) + messages := s.prepareData(2, true, true, false, nil) // Subscribe for _, m := range messages { - s.subDetails = append(s.subDetails, s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())...) - s.log.Info("Subscribing ", zap.String("PubSubTopic", m.pubSubTopic)) - _, err := s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic)) + s.subDetails = append(s.subDetails, s.getSub(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())...) + s.Log.Info("Subscribing ", zap.String("PubSubTopic", m.PubSubTopic)) + _, err := s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.PubSubTopic)) s.Require().NoError(err) } // Debug to see subscriptions in light node for _, sub := range s.subDetails { - s.log.Info("Light Node subscription ", zap.String("PubSubTopic", sub.ContentFilter.PubsubTopic), zap.String("ContentTopic", sub.ContentFilter.ContentTopicsList()[0])) + s.Log.Info("Light Node subscription ", zap.String("PubSubTopic", sub.ContentFilter.PubsubTopic), zap.String("ContentTopic", sub.ContentFilter.ContentTopicsList()[0])) } // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err = s.lightNode.UnsubscribeAll(s.ctx) + _, err = s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -158,39 +140,35 @@ func (s *FilterTestSuite) TestPubSubMultiOverlapContentTopic() { // Create test context s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 20 seconds - messages := prepareData(10, false, true, true, nil) + messages := s.prepareData(10, false, true, true, nil) // Subscribe for _, m := range messages { - s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID()) } // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } func (s *FilterTestSuite) TestSubscriptionRefresh() { - messages := prepareData(2, false, false, true, nil) + messages := s.prepareData(2, false, false, true, nil) // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Repeat the same subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Both messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -209,33 +187,31 @@ func (s *FilterTestSuite) TestContentTopicsLimit() { } } - messages := prepareData(maxContentTopics+1, false, true, true, nil) + messages := s.prepareData(maxContentTopics+1, false, true, true, nil) // Subscribe for _, m := range messages[:len(messages)-1] { - s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID()) } // All messages within limit should get received - s.waitForMessages(func() { - s.publishMessages(messages[:len(messages)-1]) - }, s.subDetails, messages[:len(messages)-1]) + s.waitForMessages(messages[:len(messages)-1]) // Adding over the limit contentTopic should fail for _, sub := range s.subDetails { - if sub.ContentFilter.PubsubTopic == messages[len(messages)-1].pubSubTopic { - sub.Add(messages[len(messages)-1].contentTopic) - _, err := s.lightNode.Subscribe(s.ctx, sub.ContentFilter, WithPeer(s.fullNodeHost.ID())) + if sub.ContentFilter.PubsubTopic == messages[len(messages)-1].PubSubTopic { + sub.Add(messages[len(messages)-1].ContentTopic) + _, err := s.LightNode.Subscribe(s.ctx, sub.ContentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().Error(err) } } // Unsubscribe for cleanup for _, m := range messages { - _ = s.unsubscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + _ = s.unsubscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID()) } - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -245,61 +221,61 @@ func (s *FilterTestSuite) TestSubscribeErrorHandling() { // Prepare data messages = append(messages, WakuMsg{ - pubSubTopic: "", - contentTopic: s.testContentTopic, - payload: "N/A", + PubSubTopic: "", + ContentTopic: s.TestContentTopic, + Payload: "N/A", }) messages = append(messages, WakuMsg{ - pubSubTopic: s.testTopic, - contentTopic: "", - payload: "N/A", + PubSubTopic: s.TestTopic, + ContentTopic: "", + Payload: "N/A", }) // Subscribe with empty pubsub - s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[0].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].contentTopic)} - _, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[0].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].ContentTopic)} + _, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().Error(err) // Subscribe with empty content topic - s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[1].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].contentTopic)} - _, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.contentFilter = protocol.ContentFilter{PubsubTopic: messages[1].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].ContentTopic)} + _, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().Error(err) } func (s *FilterTestSuite) TestMultipleFullNodeSubscriptions() { log := utils.Logger() - s.log = log + s.Log = log s.wg = &sync.WaitGroup{} // Create test context s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - fullNodeIDHex := make([]byte, hex.EncodedLen(len([]byte(s.fullNodeHost.ID())))) - _ = hex.Encode(fullNodeIDHex, []byte(s.fullNodeHost.ID())) + fullNodeIDHex := make([]byte, hex.EncodedLen(len([]byte(s.FullNodeHost.ID())))) + _ = hex.Encode(fullNodeIDHex, []byte(s.FullNodeHost.ID())) - s.log.Info("Already subscribed to", zap.String("fullNode", string(fullNodeIDHex))) + s.Log.Info("Already subscribed to", zap.String("fullNode", string(fullNodeIDHex))) // This will overwrite values with the second node info - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) + s.MakeWakuFilterFullNode(s.TestTopic, false) // Connect to second full and relay node - s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) - err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) + err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) s.Require().NoError(err) - fullNodeIDHex = make([]byte, hex.EncodedLen(len([]byte(s.fullNodeHost.ID())))) - _ = hex.Encode(fullNodeIDHex, []byte(s.fullNodeHost.ID())) + fullNodeIDHex = make([]byte, hex.EncodedLen(len([]byte(s.FullNodeHost.ID())))) + _ = hex.Encode(fullNodeIDHex, []byte(s.FullNodeHost.ID())) - s.log.Info("Subscribing to second", zap.String("fullNode", string(fullNodeIDHex))) + s.Log.Info("Subscribing to second", zap.String("fullNode", string(fullNodeIDHex))) // Subscribe to the second full node - s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)} - _, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + s.contentFilter = protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} + _, err = s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) - _, err = s.lightNode.UnsubscribeAll(s.ctx) + _, err = s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -308,25 +284,28 @@ func (s *FilterTestSuite) TestSubscribeMultipleLightNodes() { // Create test context s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - lightNode2 := s.makeWakuFilterLightNode(true, true) + lightNodeData := s.GetWakuFilterLightNode() + lightNode2 := lightNodeData.LightNode + err := lightNode2.Start(context.Background()) + s.Require().NoError(err) // Connect node2 - lightNode2.h.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + lightNode2.h.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) - messages := prepareData(2, true, true, true, nil) + messages := s.prepareData(2, true, true, true, nil) // Subscribe separately: light node 1 -> full node - contentFilter := protocol.ContentFilter{PubsubTopic: messages[0].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].contentTopic)} - _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + contentFilter := protocol.ContentFilter{PubsubTopic: messages[0].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[0].ContentTopic)} + _, err = s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) // Subscribe separately: light node 2 -> full node - contentFilter2 := protocol.ContentFilter{PubsubTopic: messages[1].pubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].contentTopic)} - _, err = lightNode2.Subscribe(s.ctx, contentFilter2, WithPeer(s.fullNodeHost.ID())) + contentFilter2 := protocol.ContentFilter{PubsubTopic: messages[1].PubSubTopic, ContentTopics: protocol.NewContentTopicSet(messages[1].ContentTopic)} + _, err = lightNode2.Subscribe(s.ctx, contentFilter2, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) // Unsubscribe - _, err = s.lightNode.UnsubscribeAll(s.ctx) + _, err = s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) _, err = lightNode2.UnsubscribeAll(s.ctx) @@ -344,13 +323,14 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() { // Create test context s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) - _, fullNode2 := s.makeWakuFilterFullNode(testTopic, false, false) + nodeData := s.GetWakuFilterFullNode(testTopic, false) + fullNode2 := nodeData.fullNode // Connect nodes - fullNode2.h.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + fullNode2.h.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) // Get stream - stream, err := fullNode2.h.NewStream(s.ctx, s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + stream, err := fullNode2.h.NewStream(s.ctx, s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) s.Require().NoError(err) // Prepare subscribe request @@ -364,7 +344,7 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() { fullNode2.subscribe(s.ctx, stream, subscribeRequest) // Check the pubsub topic related to the first node is stored within the second node - pubsubTopics, hasTopics := fullNode2.subscriptions.Get(s.fullNodeHost.ID()) + pubsubTopics, hasTopics := fullNode2.subscriptions.Get(s.FullNodeHost.ID()) s.Require().True(hasTopics) // Check the pubsub topic is what we have set @@ -378,13 +358,13 @@ func (s *FilterTestSuite) TestSubscribeFullNode2FullNode() { } func (s *FilterTestSuite) TestIsSubscriptionAlive() { - messages := prepareData(2, false, true, false, nil) + messages := s.prepareData(2, false, true, false, nil) // Subscribe with the first message only - s.subDetails = s.subscribe(messages[0].pubSubTopic, messages[0].contentTopic, s.fullNodeHost.ID()) + s.subscribe(messages[0].PubSubTopic, messages[0].ContentTopic, s.FullNodeHost.ID()) // IsSubscriptionAlive returns no error for the first message - err := s.lightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0]) + err := s.LightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0]) s.Require().NoError(err) // Create new host/peer - not related to any node @@ -395,25 +375,25 @@ func (s *FilterTestSuite) TestIsSubscriptionAlive() { s.subDetails[0].PeerID = host.ID() // IsSubscriptionAlive returns error for the second message, peer ID doesn't match - err = s.lightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0]) + err = s.LightNode.IsSubscriptionAlive(s.ctx, s.subDetails[0]) s.Require().Error(err) } func (s *FilterTestSuite) TestFilterSubscription() { - contentFilter := protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)} + contentFilter := protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} // Subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Returns no error and SubscriptionDetails for existing subscription - _, err := s.lightNode.FilterSubscription(s.fullNodeHost.ID(), contentFilter) + _, err := s.LightNode.FilterSubscription(s.FullNodeHost.ID(), contentFilter) s.Require().NoError(err) otherFilter := protocol.ContentFilter{PubsubTopic: "34583495", ContentTopics: protocol.NewContentTopicSet("sjfa402")} // Returns error and nil SubscriptionDetails for non existent subscription - nonSubscription, err := s.lightNode.FilterSubscription(s.fullNodeHost.ID(), otherFilter) + nonSubscription, err := s.LightNode.FilterSubscription(s.FullNodeHost.ID(), otherFilter) s.Require().Error(err) s.Require().Nil(nonSubscription) @@ -422,32 +402,32 @@ func (s *FilterTestSuite) TestFilterSubscription() { s.Require().NoError(err) // Returns error and nil SubscriptionDetails for unrelated host/peer - nonSubscription, err = s.lightNode.FilterSubscription(host.ID(), contentFilter) + nonSubscription, err = s.LightNode.FilterSubscription(host.ID(), contentFilter) s.Require().Error(err) s.Require().Nil(nonSubscription) } func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() { - contentFilter := protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)} + contentFilter := protocol.ContentFilter{PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic)} // Subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // With valid peer - opts := []FilterSubscribeOption{WithPeer(s.fullNodeHost.ID())} + opts := []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID())} // Positive case - _, _, err := s.lightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts) + _, _, err := s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts) s.Require().NoError(err) - addr := s.fullNodeHost.Addrs()[0] + addr := s.FullNodeHost.Addrs()[0] // Combine mutually exclusive options - opts = []FilterSubscribeOption{WithPeer(s.fullNodeHost.ID()), WithPeerAddr(addr)} + opts = []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)} // Should fail on wrong option combination - _, _, err = s.lightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts) + _, _, err = s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts) s.Require().Error(err) } diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index be5a0e2c..d3117118 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -2,25 +2,16 @@ package filter import ( "context" - "crypto/rand" "errors" - "fmt" - "strconv" "sync" "testing" "time" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/suite" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/waku-org/go-waku/waku/v2/service" - "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -29,359 +20,20 @@ func TestFilterSuite(t *testing.T) { suite.Run(t, new(FilterTestSuite)) } -const defaultTestPubSubTopic = "/waku/2/go/filter/test" -const defaultTestContentTopic = "/test/10/my-app" - -type FilterTestSuite struct { - suite.Suite - - testTopic string - testContentTopic string - ctx context.Context - ctxCancel context.CancelFunc - lightNode *WakuFilterLightNode - lightNodeHost host.Host - relayNode *relay.WakuRelay - relaySub *relay.Subscription - fullNode *WakuFilterFullNode - fullNodeHost host.Host - wg *sync.WaitGroup - contentFilter protocol.ContentFilter - subDetails []*subscription.SubscriptionDetails - log *zap.Logger -} - -type WakuMsg struct { - pubSubTopic string - contentTopic string - payload string -} - -func (s *FilterTestSuite) makeWakuRelay(topic string, shared bool) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) { - - broadcaster := relay.NewBroadcaster(10) - s.Require().NoError(broadcaster.Start(context.Background())) - - port, err := tests.FindFreePort(s.T(), "", 5) - s.Require().NoError(err) - - host, err := tests.MakeHost(context.Background(), port, rand.Reader) - s.Require().NoError(err) - - relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log) - relay.SetHost(host) - - if shared { - s.fullNodeHost = host - } - - err = relay.Start(context.Background()) - s.Require().NoError(err) - - sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic)) - s.Require().NoError(err) - - return relay, sub[0], host, broadcaster -} - -func (s *FilterTestSuite) makeWakuFilterLightNode(start bool, withBroadcaster bool) *WakuFilterLightNode { - port, err := tests.FindFreePort(s.T(), "", 5) - s.Require().NoError(err) - - host, err := tests.MakeHost(context.Background(), port, rand.Reader) - s.Require().NoError(err) - var b relay.Broadcaster - if withBroadcaster { - b = relay.NewBroadcaster(10) - s.Require().NoError(b.Start(context.Background())) - } - filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log) - filterPush.SetHost(host) - s.lightNodeHost = host - if start { - err = filterPush.Start(context.Background()) - s.Require().NoError(err) - } - - return filterPush -} - -func (s *FilterTestSuite) makeWakuFilterFullNode(topic string, withRegisterAll bool, shared bool) (*relay.WakuRelay, *WakuFilterFullNode) { - var sub *relay.Subscription - node, relaySub, host, broadcaster := s.makeWakuRelay(topic, shared) - - if shared { - s.relaySub = relaySub - } - - node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log) - node2Filter.SetHost(host) - - if withRegisterAll { - sub = broadcaster.RegisterForAll() - } else { - sub = broadcaster.Register(protocol.NewContentFilter(topic)) - } - - err := node2Filter.Start(s.ctx, sub) - s.Require().NoError(err) - - return node, node2Filter -} - -func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) { - s.wg.Add(1) - var msgFound = false - go func() { - defer s.wg.Done() - select { - case env := <-ch: - for _, topic := range s.contentFilter.ContentTopicsList() { - if topic == env.Message().GetContentTopic() { - msgFound = true - } - } - s.Require().True(msgFound) - case <-time.After(1 * time.Second): - s.Require().Fail("Message timeout") - case <-s.ctx.Done(): - s.Require().Fail("test exceeded allocated time") - } - }() - - if fn != nil { - fn() - } - - s.wg.Wait() -} - -func matchOneOfManyMsg(one WakuMsg, many []WakuMsg) bool { - for _, m := range many { - if m.pubSubTopic == one.pubSubTopic && - m.contentTopic == one.contentTopic && - m.payload == one.payload { - return true - } - } - - return false -} - -func (s *FilterTestSuite) waitForMessages(fn func(), subs []*subscription.SubscriptionDetails, expected []WakuMsg) { - s.wg.Add(1) - msgCount := len(expected) - found := 0 - s.log.Info("Expected messages ", zap.String("count", strconv.Itoa(msgCount))) - s.log.Info("Existing subscriptions ", zap.String("count", strconv.Itoa(len(subs)))) - - go func() { - defer s.wg.Done() - for _, sub := range subs { - s.log.Info("Looking at ", zap.String("pubSubTopic", sub.ContentFilter.PubsubTopic)) - for i := 0; i < msgCount; i++ { - select { - case env, ok := <-sub.C: - if !ok { - continue - } - received := WakuMsg{ - pubSubTopic: env.PubsubTopic(), - contentTopic: env.Message().GetContentTopic(), - payload: string(env.Message().GetPayload()), - } - s.log.Debug("received message ", zap.String("pubSubTopic", received.pubSubTopic), zap.String("contentTopic", received.contentTopic), zap.String("payload", received.payload)) - if matchOneOfManyMsg(received, expected) { - found++ - } - case <-time.After(3 * time.Second): - - case <-s.ctx.Done(): - s.Require().Fail("test exceeded allocated time") - } - } - } - }() - - if fn != nil { - fn() - } - - s.wg.Wait() - s.Require().True(msgCount == found) -} - -func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) { - s.wg.Add(1) - go func() { - defer s.wg.Done() - select { - case env, ok := <-ch: - if ok { - s.Require().Fail("should not receive another message", zap.String("payload", string(env.Message().Payload))) - } - case <-time.After(1 * time.Second): - // Timeout elapsed, all good - case <-s.ctx.Done(): - s.Require().Fail("waitForTimeout test exceeded allocated time") - } - }() - - fn() - - s.wg.Wait() -} - -func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails { - - for _, sub := range s.subDetails { - if sub.ContentFilter.PubsubTopic == pubsubTopic { - sub.Add(contentTopic) - s.contentFilter = sub.ContentFilter - subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) - s.Require().NoError(err) - return subDetails - } - } - - s.contentFilter = protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)} - - subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) - s.Require().NoError(err) - - // Sleep to make sure the filter is subscribed - time.Sleep(1 * time.Second) - - return subDetails -} - -func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails { - - for _, sub := range s.subDetails { - if sub.ContentFilter.PubsubTopic == pubsubTopic { - topicsCount := len(sub.ContentFilter.ContentTopicsList()) - if topicsCount == 1 { - _, err := s.lightNode.Unsubscribe(s.ctx, sub.ContentFilter, WithPeer(peer)) - s.Require().NoError(err) - } else { - sub.Remove(contentTopic) - } - s.contentFilter = sub.ContentFilter - } - } - - return s.lightNode.Subscriptions() -} - -func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload ...string) { - var payload string - if len(optionalPayload) > 0 { - payload = optionalPayload[0] - } else { - payload = "123" - } - - _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), relay.WithPubSubTopic(topic)) - s.Require().NoError(err) -} - -func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) { - for _, m := range msgs { - _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(m.contentTopic, utils.GetUnixEpoch(), m.payload), relay.WithPubSubTopic(m.pubSubTopic)) - s.Require().NoError(err) - } -} - -func prepareData(quantity int, topics, contentTopics, payloads bool, sg tests.StringGenerator) []WakuMsg { - var ( - pubsubTopic = defaultTestPubSubTopic // Has to be the same with initial s.testTopic - contentTopic = defaultTestContentTopic // Has to be the same with initial s.testContentTopic - payload = "test_msg" - messages []WakuMsg - strMaxLenght = 4097 - generatedString = "" - ) - - for i := 0; i < quantity; i++ { - msg := WakuMsg{ - pubSubTopic: pubsubTopic, - contentTopic: contentTopic, - payload: payload, - } - - if sg != nil { - generatedString, _ = sg(strMaxLenght) - - } - - if topics { - msg.pubSubTopic = fmt.Sprintf("%s%02d%s", pubsubTopic, i, generatedString) - } - - if contentTopics { - msg.contentTopic = fmt.Sprintf("%s%02d%s", contentTopic, i, generatedString) - } - - if payloads { - msg.payload = fmt.Sprintf("%s%02d%s", payload, i, generatedString) - } - - messages = append(messages, msg) - } - - return messages -} - -func (s *FilterTestSuite) SetupTest() { - log := utils.Logger() //.Named("filterv2-test") - s.log = log - // Use a pointer to WaitGroup so that to avoid copying - // https://pkg.go.dev/sync#WaitGroup - s.wg = &sync.WaitGroup{} - - // Create test context - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - s.ctx = ctx - s.ctxCancel = cancel - - s.testTopic = defaultTestPubSubTopic - s.testContentTopic = defaultTestContentTopic - - s.lightNode = s.makeWakuFilterLightNode(true, true) - - //TODO: Add tests to verify broadcaster. - - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) - - // Connect nodes - s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) - err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) - s.Require().NoError(err) - -} - -func (s *FilterTestSuite) TearDownTest() { - s.fullNode.Stop() - s.relayNode.Stop() - s.relaySub.Unsubscribe() - s.lightNode.Stop() - s.ctxCancel() -} - func (s *FilterTestSuite) TestRunningGuard() { - s.lightNode.Stop() + s.LightNode.Stop() contentFilter := protocol.ContentFilter{PubsubTopic: "test", ContentTopics: protocol.NewContentTopicSet("test")} - _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + _, err := s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().ErrorIs(err, service.ErrNotStarted) - err = s.lightNode.Start(s.ctx) + err = s.LightNode.Start(s.ctx) s.Require().NoError(err) - _, err = s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + _, err = s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) } @@ -390,18 +42,18 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { contentFilter := protocol.ContentFilter{PubsubTopic: "test", ContentTopics: protocol.NewContentTopicSet("test")} - _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + _, err := s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) - result, err := s.lightNode.Unsubscribe(s.ctx, contentFilter, DontWait()) + result, err := s.LightNode.Unsubscribe(s.ctx, contentFilter, DontWait()) s.Require().NoError(err) s.Require().Equal(0, len(result.Errors())) - _, err = s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + _, err = s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) wg := sync.WaitGroup{} - _, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithWaitGroup(&wg)) + _, err = s.LightNode.Unsubscribe(s.ctx, contentFilter, WithWaitGroup(&wg)) wg.Wait() s.Require().NoError(err) } @@ -410,18 +62,18 @@ func (s *FilterTestSuite) TestStartStop() { var wg sync.WaitGroup wg.Add(2) - s.lightNode = s.makeWakuFilterLightNode(false, false) + s.MakeWakuFilterLightNode() stopNode := func() { for i := 0; i < 100000; i++ { - s.lightNode.Stop() + s.LightNode.Stop() } wg.Done() } startNode := func() { for i := 0; i < 100; i++ { - err := s.lightNode.Start(context.Background()) + err := s.LightNode.Start(context.Background()) if errors.Is(err, service.ErrAlreadyStarted) { continue } @@ -441,7 +93,7 @@ func (s *FilterTestSuite) TestAutoShard() { //Workaround as could not find a way to reuse setup test with params // Stop what is run in setup s.fullNode.Stop() - s.lightNode.Stop() + s.LightNode.Stop() ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds s.ctx = ctx s.ctxCancel = cancel @@ -451,102 +103,91 @@ func (s *FilterTestSuite) TestAutoShard() { s.Require().NoError(err) //Computing pubSubTopic only for filterFullNode. pubSubTopic := protocol.GetShardFromContentTopic(cTopic1, protocol.GenerationZeroShardsCount) - s.testContentTopic = cTopic1Str - s.testTopic = pubSubTopic.String() + s.TestContentTopic = cTopic1Str + s.TestTopic = pubSubTopic.String() - s.lightNode = s.makeWakuFilterLightNode(true, false) - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(pubSubTopic.String(), false, true) + s.MakeWakuFilterLightNode() + s.StartLightNode() + s.MakeWakuFilterFullNode(pubSubTopic.String(), false) - s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) - err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) + err = s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) s.Require().NoError(err) - s.log.Info("Testing Autoshard:CreateSubscription") - s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID()) - s.waitForMsg(func() { - _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) - s.Require().NoError(err) - - }, s.subDetails[0].C) + s.Log.Info("Testing Autoshard:CreateSubscription") + s.subscribe("", s.TestContentTopic, s.FullNodeHost.ID()) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""}) // Wrong content topic - s.waitForTimeout(func() { - s.publishMsg(s.testTopic, "TopicB", "second") - }, s.subDetails[0].C) + s.waitForTimeout(&WakuMsg{s.TestTopic, "TopicB", "second"}) - _, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) + _, err = s.LightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) time.Sleep(1 * time.Second) // Should not receive after unsubscribe - s.waitForTimeout(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "third") - }, s.subDetails[0].C) + s.waitForTimeout(&WakuMsg{s.TestTopic, s.TestContentTopic, "third"}) - s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID()) + s.subscribe("", s.TestContentTopic, s.FullNodeHost.ID()) - s.log.Info("Testing Autoshard:SubscriptionPing") - err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + s.Log.Info("Testing Autoshard:SubscriptionPing") + err = s.LightNode.Ping(context.Background(), s.FullNodeHost.ID()) s.Require().NoError(err) // Test ModifySubscription Subscribe to another content_topic - s.log.Info("Testing Autoshard:ModifySubscription") + s.Log.Info("Testing Autoshard:ModifySubscription") newContentTopic := "0/test/1/testTopic1/proto" - s.subDetails = s.subscribe("", newContentTopic, s.fullNodeHost.ID()) + s.subscribe("", newContentTopic, s.FullNodeHost.ID()) - s.waitForMsg(func() { - _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) - s.Require().NoError(err) + s.waitForMsg(&WakuMsg{s.TestTopic, newContentTopic, ""}) - }, s.subDetails[0].C) - - _, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{ - PubsubTopic: s.testTopic, + _, err = s.LightNode.Unsubscribe(s.ctx, protocol.ContentFilter{ + PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(newContentTopic), }) s.Require().NoError(err) - _, err = s.lightNode.UnsubscribeAll(s.ctx) + _, err = s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } func (s *FilterTestSuite) TestLightNodeIsListening() { - messages := prepareData(2, true, true, false, nil) + messages := s.prepareData(2, true, true, false, nil) // Subscribe with the first message only - s.subDetails = s.subscribe(messages[0].pubSubTopic, messages[0].contentTopic, s.fullNodeHost.ID()) + s.subscribe(messages[0].PubSubTopic, messages[0].ContentTopic, s.FullNodeHost.ID()) // IsListening returns true for the first message - listenStatus := s.lightNode.IsListening(messages[0].pubSubTopic, messages[0].contentTopic) + listenStatus := s.LightNode.IsListening(messages[0].PubSubTopic, messages[0].ContentTopic) s.Require().True(listenStatus) // IsListening returns false for the second message - listenStatus = s.lightNode.IsListening(messages[1].pubSubTopic, messages[1].contentTopic) + listenStatus = s.LightNode.IsListening(messages[1].PubSubTopic, messages[1].ContentTopic) s.Require().False(listenStatus) // IsListening returns false for combination as well - listenStatus = s.lightNode.IsListening(messages[0].pubSubTopic, messages[1].contentTopic) + listenStatus = s.LightNode.IsListening(messages[0].PubSubTopic, messages[1].ContentTopic) s.Require().False(listenStatus) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } func (s *FilterTestSuite) BeforeTest(suiteName, testName string) { - s.log.Info("Executing ", zap.String("testName", testName)) + s.Log.Info("Executing ", zap.String("testName", testName)) } func (s *FilterTestSuite) AfterTest(suiteName, testName string) { - s.log.Info("Finished executing ", zap.String("testName", testName)) + s.Log.Info("Finished executing ", zap.String("testName", testName)) } func (s *FilterTestSuite) TestStaticSharding() { log := utils.Logger() - s.log = log + s.Log = log s.wg = &sync.WaitGroup{} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds @@ -554,56 +195,40 @@ func (s *FilterTestSuite) TestStaticSharding() { s.ctxCancel = cancel // Gen pubsub topic "/waku/2/rs/100/100" - s.testTopic = protocol.NewStaticShardingPubsubTopic(uint16(100), uint16(100)).String() + s.TestTopic = protocol.NewStaticShardingPubsubTopic(uint16(100), uint16(100)).String() // Pubsub topics for neg. test cases testTopics := []string{ "/waku/2/rs/100/1024", "/waku/2/rs/100/101", } - s.testContentTopic = "/test/10/my-filter-app/proto" + s.TestContentTopic = "/test/10/my-filter-app/proto" // Prepare new nodes - s.lightNode = s.makeWakuFilterLightNode(true, true) - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) + s.MakeWakuFilterLightNode() + s.StartLightNode() + s.MakeWakuFilterFullNode(s.TestTopic, false) // Connect nodes - s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) - err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) - s.Require().NoError(err) + s.ConnectHosts(s.LightNodeHost, s.FullNodeHost) - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - - msg := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()) - msg2 := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // Test positive case for static shard pubsub topic - message gets received - s.waitForMsg(func() { - _, err := s.relayNode.Publish(s.ctx, msg, relay.WithPubSubTopic(s.testTopic)) - s.Require().NoError(err) - - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""}) // Test two negative cases for static shard pubsub topic - message times out - s.waitForTimeout(func() { - _, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[0])) - s.Require().NoError(err) + s.waitForTimeout(&WakuMsg{testTopics[0], s.TestContentTopic, ""}) - }, s.subDetails[0].C) - - s.waitForTimeout(func() { - _, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[1])) - s.Require().NoError(err) - - }, s.subDetails[0].C) + s.waitForTimeout(&WakuMsg{testTopics[1], s.TestContentTopic, ""}) // Cleanup - _, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{ - PubsubTopic: s.testTopic, - ContentTopics: protocol.NewContentTopicSet(s.testContentTopic), + _, err := s.LightNode.Unsubscribe(s.ctx, protocol.ContentFilter{ + PubsubTopic: s.TestTopic, + ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic), }) s.Require().NoError(err) - _, err = s.lightNode.UnsubscribeAll(s.ctx) + _, err = s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } diff --git a/waku/v2/protocol/filter/filter_unsubscribe_test.go b/waku/v2/protocol/filter/filter_unsubscribe_test.go index a498de0c..cac86615 100644 --- a/waku/v2/protocol/filter/filter_unsubscribe_test.go +++ b/waku/v2/protocol/filter/filter_unsubscribe_test.go @@ -3,13 +3,14 @@ package filter import ( "context" "crypto/rand" + "strconv" + "time" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" - "strconv" - "time" ) func (s *FilterTestSuite) TestUnsubscribeSingleContentTopic() { @@ -17,68 +18,54 @@ func (s *FilterTestSuite) TestUnsubscribeSingleContentTopic() { var newContentTopic = "TopicB" // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID()) + s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) + s.subscribe(s.TestTopic, newContentTopic, s.FullNodeHost.ID()) // Message is possible to receive for original contentTopic - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "test_msg") - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "test_msg"}) // Message is possible to receive for new contentTopic - s.waitForMsg(func() { - s.publishMsg(s.testTopic, newContentTopic, "test_msg") - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, newContentTopic, "test_msg"}) - _ = s.unsubscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID()) + _ = s.unsubscribe(s.TestTopic, newContentTopic, s.FullNodeHost.ID()) // Message should not be received for new contentTopic as it was unsubscribed - s.waitForTimeout(func() { - s.publishMsg(s.testTopic, newContentTopic, "test_msg") - }, s.subDetails[0].C) + s.waitForTimeout(&WakuMsg{s.TestTopic, newContentTopic, "test_msg"}) // Message is still possible to receive for original contentTopic - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic, "test_msg2") - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "test_msg2"}) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } func (s *FilterTestSuite) TestUnsubscribeMultiContentTopic() { - var messages = prepareData(3, false, true, true, nil) + var messages = s.prepareData(3, false, true, true, nil) // Subscribe with 3 content topics for _, m := range messages { - s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID()) } // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) // Unsubscribe with the last 2 content topics for _, m := range messages[1:] { - _ = s.unsubscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + _ = s.unsubscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID()) } // Messages should not be received for the last two contentTopics as it was unsubscribed for _, m := range messages[1:] { - s.waitForTimeout(func() { - s.publishMsg(m.pubSubTopic, m.contentTopic, m.payload) - }, s.subDetails[0].C) + s.waitForTimeout(&WakuMsg{m.PubSubTopic, m.ContentTopic, m.Payload}) } // Message is still possible to receive for the first contentTopic - s.waitForMsg(func() { - s.publishMsg(messages[0].pubSubTopic, messages[0].contentTopic, messages[0].payload) - }, s.subDetails[0].C) + s.waitForMsg(&WakuMsg{messages[0].PubSubTopic, messages[0].ContentTopic, messages[0].Payload}) - _, err := s.lightNode.UnsubscribeAll(s.ctx) + _, err := s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } @@ -87,39 +74,36 @@ func (s *FilterTestSuite) TestUnsubscribeMultiPubSubMultiContentTopic() { s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) - s.lightNode = s.makeWakuFilterLightNode(true, true) + s.MakeWakuFilterLightNode() + s.StartLightNode() - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true) + s.MakeWakuFilterFullNode(s.TestTopic, true) // Connect nodes - s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL) - err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL) + err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) s.Require().NoError(err) - messages := prepareData(2, true, true, true, nil) + messages := s.prepareData(2, true, true, true, nil) // Subscribe for _, m := range messages { - s.subDetails = append(s.subDetails, s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())...) - _, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic)) + s.subDetails = append(s.subDetails, s.getSub(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())...) + _, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.PubSubTopic)) s.Require().NoError(err) } // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) // Unsubscribe for _, m := range messages { - _ = s.unsubscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + _ = s.unsubscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID()) } // No messages can be received with previous subscriptions for i, m := range messages { - s.waitForTimeout(func() { - s.publishMsg(m.pubSubTopic, m.contentTopic, m.payload) - }, s.subDetails[i].C) + s.waitForTimeoutFromChan(&WakuMsg{m.PubSubTopic, m.ContentTopic, m.Payload}, s.subDetails[i].C) } } @@ -127,100 +111,93 @@ func (s *FilterTestSuite) TestUnsubscribeErrorHandling() { s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) - s.lightNode = s.makeWakuFilterLightNode(true, true) + s.MakeWakuFilterLightNode() + s.StartLightNode() - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true) + s.MakeWakuFilterFullNode(s.TestTopic, true) // Connect nodes - s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) - err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.FullNodeHost), peerstore.PermanentAddrTTL) + err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) s.Require().NoError(err) var messages, invalidMessages []WakuMsg - messages = prepareData(2, false, true, true, nil) + messages = s.prepareData(2, false, true, true, nil) // Prepare "invalid" data for unsubscribe invalidMessages = append(invalidMessages, WakuMsg{ - pubSubTopic: "", - contentTopic: messages[0].contentTopic, - payload: "N/A", + PubSubTopic: "", + ContentTopic: messages[0].ContentTopic, + Payload: "N/A", }, WakuMsg{ - pubSubTopic: messages[0].pubSubTopic, - contentTopic: "", - payload: "N/A", + PubSubTopic: messages[0].PubSubTopic, + ContentTopic: "", + Payload: "N/A", }, WakuMsg{ - pubSubTopic: "/waku/2/go/filter/not_subscribed", - contentTopic: "not_subscribed_topic", - payload: "N/A", + PubSubTopic: "/waku/2/go/filter/not_subscribed", + ContentTopic: "not_subscribed_topic", + Payload: "N/A", }) // Subscribe with valid topics for _, m := range messages { - s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) - _, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic)) + s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID()) + _, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.PubSubTopic)) s.Require().NoError(err) } // All messages should be possible to receive for subscribed topics - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) // Unsubscribe with empty pubsub - contentFilter := protocol.ContentFilter{PubsubTopic: invalidMessages[0].pubSubTopic, - ContentTopics: protocol.NewContentTopicSet(invalidMessages[0].contentTopic)} - _, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + contentFilter := protocol.ContentFilter{PubsubTopic: invalidMessages[0].PubSubTopic, + ContentTopics: protocol.NewContentTopicSet(invalidMessages[0].ContentTopic)} + _, err = s.LightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().Error(err) // Unsubscribe with empty content topic - contentFilter = protocol.ContentFilter{PubsubTopic: invalidMessages[1].pubSubTopic, - ContentTopics: protocol.NewContentTopicSet(invalidMessages[1].contentTopic)} - _, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + contentFilter = protocol.ContentFilter{PubsubTopic: invalidMessages[1].PubSubTopic, + ContentTopics: protocol.NewContentTopicSet(invalidMessages[1].ContentTopic)} + _, err = s.LightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().Error(err) // Unsubscribe with non-existent topics, expect no error to prevent attacker from topic guessing - contentFilter = protocol.ContentFilter{PubsubTopic: invalidMessages[2].pubSubTopic, - ContentTopics: protocol.NewContentTopicSet(invalidMessages[2].contentTopic)} - _, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + contentFilter = protocol.ContentFilter{PubsubTopic: invalidMessages[2].PubSubTopic, + ContentTopics: protocol.NewContentTopicSet(invalidMessages[2].ContentTopic)} + _, err = s.LightNode.Unsubscribe(s.ctx, contentFilter, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) // All messages should be still possible to receive for subscribed topics - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) - _, err = s.lightNode.UnsubscribeAll(s.ctx) + _, err = s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) } func (s *FilterTestSuite) TestUnsubscribeAllWithoutContentTopics() { - var messages = prepareData(2, false, true, true, nil) + var messages = s.prepareData(2, false, true, true, nil) // Subscribe with 2 content topics for _, m := range messages { - s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID()) } // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) // Unsubscribe all with peer specification - _, err := s.lightNode.UnsubscribeAll(s.ctx, WithPeer(s.fullNodeHost.ID())) + _, err := s.LightNode.UnsubscribeAll(s.ctx, WithPeer(s.FullNodeHost.ID())) s.Require().NoError(err) // Messages should not be received for any contentTopics for _, m := range messages { - s.waitForTimeout(func() { - s.publishMsg(m.pubSubTopic, m.contentTopic, m.payload) - }, s.subDetails[0].C) + s.waitForTimeout(&WakuMsg{m.PubSubTopic, m.ContentTopic, m.Payload}) } } @@ -228,75 +205,68 @@ func (s *FilterTestSuite) TestUnsubscribeAllDiffPubSubContentTopics() { s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second) - s.lightNode = s.makeWakuFilterLightNode(true, true) + s.MakeWakuFilterLightNode() + s.StartLightNode() - s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, true, true) + s.MakeWakuFilterFullNode(s.TestTopic, true) // Connect nodes - s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL) - err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.LightNodeHost.Peerstore().AddAddr(s.FullNodeHost.ID(), tests.GetHostAddress(s.fullNode.h), peerstore.PermanentAddrTTL) + err := s.LightNodeHost.Peerstore().AddProtocols(s.FullNodeHost.ID(), FilterSubscribeID_v20beta1) s.Require().NoError(err) - messages := prepareData(2, true, true, true, nil) + messages := s.prepareData(2, true, true, true, nil) // Subscribe for _, m := range messages { - s.subDetails = append(s.subDetails, s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID())...) - _, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.pubSubTopic)) + s.subDetails = append(s.subDetails, s.getSub(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID())...) + _, err = s.relayNode.Subscribe(context.Background(), protocol.NewContentFilter(m.PubSubTopic)) s.Require().NoError(err) } // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) // Unsubscribe all without any specification - _, err = s.lightNode.UnsubscribeAll(s.ctx) + _, err = s.LightNode.UnsubscribeAll(s.ctx) s.Require().NoError(err) // No messages can be received with previous subscriptions for i, m := range messages { - s.waitForTimeout(func() { - s.publishMsg(m.pubSubTopic, m.contentTopic, m.payload) - }, s.subDetails[i].C) + s.waitForTimeoutFromChan(&WakuMsg{m.PubSubTopic, m.ContentTopic, m.Payload}, s.subDetails[i].C) } } func (s *FilterTestSuite) TestUnsubscribeAllUnrelatedPeer() { - var messages = prepareData(2, false, true, false, nil) + var messages = s.prepareData(2, false, true, false, nil) // Subscribe with 2 content topics for _, m := range messages { - s.subDetails = s.subscribe(m.pubSubTopic, m.contentTopic, s.fullNodeHost.ID()) + s.subscribe(m.PubSubTopic, m.ContentTopic, s.FullNodeHost.ID()) } // All messages should be received - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) // Create new host - not related to any node host, err := tests.MakeHost(context.Background(), 12345, rand.Reader) s.Require().NoError(err) - s.log.Info("Host ID", logging.HostID("FullNode", s.fullNodeHost.ID())) - s.log.Info("Host ID", logging.HostID("LightNode", s.lightNodeHost.ID())) - s.log.Info("Host ID", logging.HostID("Unrelated", host.ID())) + s.Log.Info("Host ID", logging.HostID("FullNode", s.FullNodeHost.ID())) + s.Log.Info("Host ID", logging.HostID("LightNode", s.LightNodeHost.ID())) + s.Log.Info("Host ID", logging.HostID("Unrelated", host.ID())) // Unsubscribe all with unrelated peer specification - pushResult, err := s.lightNode.UnsubscribeAll(s.ctx, WithPeer(host.ID())) + pushResult, err := s.LightNode.UnsubscribeAll(s.ctx, WithPeer(host.ID())) for e := range pushResult.errs { - s.log.Info("Push Result ", zap.String("error", strconv.Itoa(e))) + s.Log.Info("Push Result ", zap.String("error", strconv.Itoa(e))) } // All messages should be received because peer ID used was not related to any subscription - s.waitForMessages(func() { - s.publishMessages(messages) - }, s.subDetails, messages) + s.waitForMessages(messages) // Expect error for unsubscribe from non existing peer s.Require().Error(err) diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index aa18839b..2bf63bb5 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -128,7 +128,7 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start)) - logger.Info("received request", zap.String("requestType", subscribeRequest.FilterSubscribeType.String())) + logger.Info("received request", zap.Stringer("serverID", wf.h.ID()), zap.Stringer("requestType", subscribeRequest.FilterSubscribeType)) } } diff --git a/waku/v2/protocol/filter/test_utils.go b/waku/v2/protocol/filter/test_utils.go new file mode 100644 index 00000000..2784c16b --- /dev/null +++ b/waku/v2/protocol/filter/test_utils.go @@ -0,0 +1,390 @@ +package filter + +import ( + "context" + "crypto/rand" + "fmt" + "strconv" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/suite" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +type LightNodeData struct { + LightNode *WakuFilterLightNode + LightNodeHost host.Host +} + +type FullNodeData struct { + relayNode *relay.WakuRelay + RelaySub *relay.Subscription + FullNodeHost host.Host + Broadcaster relay.Broadcaster + fullNode *WakuFilterFullNode +} + +type FilterTestSuite struct { + suite.Suite + LightNodeData + FullNodeData + + TestTopic string + TestContentTopic string + ctx context.Context + ctxCancel context.CancelFunc + wg *sync.WaitGroup + contentFilter protocol.ContentFilter + subDetails []*subscription.SubscriptionDetails + + Log *zap.Logger +} + +const DefaultTestPubSubTopic = "/waku/2/go/filter/test" +const DefaultTestContentTopic = "/test/10/my-app" + +type WakuMsg struct { + PubSubTopic string + ContentTopic string + Payload string +} + +func (s *FilterTestSuite) SetupTest() { + log := utils.Logger() //.Named("filterv2-test") + s.Log = log + + s.Log.Info("SetupTest()") + // Use a pointer to WaitGroup so that to avoid copying + // https://pkg.go.dev/sync#WaitGroup + s.wg = &sync.WaitGroup{} + + // Create test context + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + s.ctx = ctx + s.ctxCancel = cancel + + s.TestTopic = DefaultTestPubSubTopic + s.TestContentTopic = DefaultTestContentTopic + + s.MakeWakuFilterLightNode() + s.StartLightNode() + + //TODO: Add tests to verify broadcaster. + + s.MakeWakuFilterFullNode(s.TestTopic, false) + + s.ConnectHosts(s.LightNodeHost, s.FullNodeHost) + +} + +func (s *FilterTestSuite) TearDownTest() { + s.fullNode.Stop() + s.LightNode.Stop() + s.RelaySub.Unsubscribe() + s.LightNode.Stop() + s.ctxCancel() +} + +func (s *FilterTestSuite) ConnectHosts(h1, h2 host.Host) { + h1.Peerstore().AddAddr(h2.ID(), tests.GetHostAddress(h2), peerstore.PermanentAddrTTL) + err := h1.Peerstore().AddProtocols(h2.ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) +} + +func (s *FilterTestSuite) GetWakuRelay(topic string) FullNodeData { + + broadcaster := relay.NewBroadcaster(10) + s.Require().NoError(broadcaster.Start(context.Background())) + + port, err := tests.FindFreePort(s.T(), "", 5) + s.Require().NoError(err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + s.Require().NoError(err) + + relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, relay.WithMaxMsgSize(1024*1024)) + relay.SetHost(host) + + err = relay.Start(context.Background()) + s.Require().NoError(err) + + sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic)) + s.Require().NoError(err) + + return FullNodeData{relay, sub[0], host, broadcaster, nil} +} + +func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bool) FullNodeData { + + nodeData := s.GetWakuRelay(topic) + + node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + node2Filter.SetHost(nodeData.FullNodeHost) + + var sub *relay.Subscription + if withRegisterAll { + sub = nodeData.Broadcaster.RegisterForAll() + } else { + sub = nodeData.Broadcaster.Register(protocol.NewContentFilter(topic)) + } + + err := node2Filter.Start(s.ctx, sub) + s.Require().NoError(err) + + nodeData.fullNode = node2Filter + + return nodeData +} + +func (s *FilterTestSuite) MakeWakuFilterFullNode(topic string, withRegisterAll bool) { + nodeData := s.GetWakuFilterFullNode(topic, withRegisterAll) + + s.FullNodeData = nodeData +} + +func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { + port, err := tests.FindFreePort(s.T(), "", 5) + s.Require().NoError(err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + s.Require().NoError(err) + b := relay.NewBroadcaster(10) + s.Require().NoError(b.Start(context.Background())) + filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + filterPush.SetHost(host) + + return LightNodeData{filterPush, host} +} + +func (s *FilterTestSuite) MakeWakuFilterLightNode() { + s.LightNodeData = s.GetWakuFilterLightNode() +} + +func (s *FilterTestSuite) StartLightNode() { + err := s.LightNode.Start(context.Background()) + s.Require().NoError(err) +} + +func (s *FilterTestSuite) waitForMsg(msg *WakuMsg) { + s.waitForMsgFromChan(msg, s.subDetails[0].C) +} + +func (s *FilterTestSuite) waitForMsgFromChan(msg *WakuMsg, ch chan *protocol.Envelope) { + s.wg.Add(1) + var msgFound = false + go func() { + defer s.wg.Done() + select { + case env := <-ch: + for _, topic := range s.contentFilter.ContentTopicsList() { + if topic == env.Message().GetContentTopic() { + msgFound = true + } + } + s.Require().True(msgFound) + case <-time.After(1 * time.Second): + s.Require().Fail("Message timeout") + case <-s.ctx.Done(): + s.Require().Fail("test exceeded allocated time") + } + }() + + if msg != nil { + s.PublishMsg(msg) + } + + s.wg.Wait() +} + +func matchOneOfManyMsg(one WakuMsg, many []WakuMsg) bool { + for _, m := range many { + if m.PubSubTopic == one.PubSubTopic && + m.ContentTopic == one.ContentTopic && + m.Payload == one.Payload { + return true + } + } + + return false +} + +func (s *FilterTestSuite) waitForMessages(msgs []WakuMsg) { + s.wg.Add(1) + msgCount := len(msgs) + found := 0 + subs := s.subDetails + s.Log.Info("Expected messages ", zap.String("count", strconv.Itoa(msgCount))) + s.Log.Info("Existing subscriptions ", zap.String("count", strconv.Itoa(len(subs)))) + + go func() { + defer s.wg.Done() + for _, sub := range subs { + s.Log.Info("Looking at ", zap.String("pubSubTopic", sub.ContentFilter.PubsubTopic)) + for i := 0; i < msgCount; i++ { + select { + case env, ok := <-sub.C: + if !ok { + continue + } + received := WakuMsg{ + PubSubTopic: env.PubsubTopic(), + ContentTopic: env.Message().GetContentTopic(), + Payload: string(env.Message().GetPayload()), + } + s.Log.Debug("received message ", zap.String("pubSubTopic", received.PubSubTopic), zap.String("contentTopic", received.ContentTopic), zap.String("payload", received.Payload)) + if matchOneOfManyMsg(received, msgs) { + found++ + } + case <-time.After(3 * time.Second): + + case <-s.ctx.Done(): + s.Require().Fail("test exceeded allocated time") + } + } + } + }() + + if msgs != nil { + s.publishMessages(msgs) + } + + s.wg.Wait() + s.Require().Equal(msgCount, found) +} + +func (s *FilterTestSuite) waitForTimeout(msg *WakuMsg) { + s.waitForTimeoutFromChan(msg, s.subDetails[0].C) +} + +func (s *FilterTestSuite) waitForTimeoutFromChan(msg *WakuMsg, ch chan *protocol.Envelope) { + s.wg.Add(1) + go func() { + defer s.wg.Done() + select { + case env, ok := <-ch: + if ok { + s.Require().Fail("should not receive another message", zap.String("payload", string(env.Message().Payload))) + } + case <-time.After(1 * time.Second): + // Timeout elapsed, all good + case <-s.ctx.Done(): + s.Require().Fail("waitForTimeout test exceeded allocated time") + } + }() + + s.PublishMsg(msg) + + s.wg.Wait() +} + +func (s *FilterTestSuite) getSub(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails { + contentFilter := protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)} + + subDetails, err := s.LightNode.Subscribe(s.ctx, contentFilter, WithPeer(peer)) + s.Require().NoError(err) + + time.Sleep(1 * time.Second) + + return subDetails +} +func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) { + + for _, sub := range s.subDetails { + if sub.ContentFilter.PubsubTopic == pubsubTopic { + sub.Add(contentTopic) + s.contentFilter = sub.ContentFilter + subDetails, err := s.LightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) + s.subDetails = subDetails + s.Require().NoError(err) + return + } + } + + s.subDetails = s.getSub(pubsubTopic, contentTopic, peer) + s.contentFilter = s.subDetails[0].ContentFilter +} + +func (s *FilterTestSuite) unsubscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails { + + for _, sub := range s.subDetails { + if sub.ContentFilter.PubsubTopic == pubsubTopic { + topicsCount := len(sub.ContentFilter.ContentTopicsList()) + if topicsCount == 1 { + _, err := s.LightNode.Unsubscribe(s.ctx, sub.ContentFilter, WithPeer(peer)) + s.Require().NoError(err) + } else { + sub.Remove(contentTopic) + } + s.contentFilter = sub.ContentFilter + } + } + + return s.LightNode.Subscriptions() +} + +func (s *FilterTestSuite) PublishMsg(msg *WakuMsg) { + if len(msg.Payload) == 0 { + msg.Payload = "123" + } + + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(msg.ContentTopic, utils.GetUnixEpoch(), msg.Payload), relay.WithPubSubTopic(msg.PubSubTopic)) + s.Require().NoError(err) +} + +func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) { + for _, m := range msgs { + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(m.ContentTopic, utils.GetUnixEpoch(), m.Payload), relay.WithPubSubTopic(m.PubSubTopic)) + s.Require().NoError(err) + } +} + +func (s *FilterTestSuite) prepareData(quantity int, topics, contentTopics, payloads bool, sg tests.StringGenerator) []WakuMsg { + var ( + pubsubTopic = s.TestTopic // Has to be the same with initial s.testTopic + contentTopic = s.TestContentTopic // Has to be the same with initial s.testContentTopic + payload = "test_msg" + messages []WakuMsg + strMaxLenght = 4097 + generatedString = "" + ) + + for i := 0; i < quantity; i++ { + msg := WakuMsg{ + PubSubTopic: pubsubTopic, + ContentTopic: contentTopic, + Payload: payload, + } + + if sg != nil { + generatedString, _ = sg(strMaxLenght) + + } + + if topics { + msg.PubSubTopic = fmt.Sprintf("%s%02d%s", pubsubTopic, i, generatedString) + } + + if contentTopics { + msg.ContentTopic = fmt.Sprintf("%s%02d%s", contentTopic, i, generatedString) + } + + if payloads { + msg.Payload = fmt.Sprintf("%s%02d%s", payload, i, generatedString) + } + + messages = append(messages, msg) + } + + return messages +}