refactor: service interface

This commit is contained in:
Richard Ramos 2023-01-06 18:37:57 -04:00 committed by RichΛrd
parent f10b1b0d7a
commit 40675ff204
56 changed files with 553 additions and 3946 deletions

View File

@ -41,15 +41,12 @@ require (
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-migrate/migrate/v4 v4.15.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.0 // indirect
@ -82,7 +79,6 @@ require (
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mattn/go-sqlite3 v1.14.15 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect

File diff suppressed because it is too large Load Diff

View File

@ -44,7 +44,7 @@ func main() {
ctx := context.Background()
wakuNode, err := node.New(ctx,
wakuNode, err := node.New(
node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr),
node.WithNTP(),
@ -55,7 +55,7 @@ func main() {
return
}
if err := wakuNode.Start(); err != nil {
if err := wakuNode.Start(ctx); err != nil {
log.Error(err)
return
}

View File

@ -91,7 +91,7 @@ func execute(options Options) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wakuNode, err := node.New(ctx, opts...)
wakuNode, err := node.New(opts...)
if err != nil {
fmt.Println(err.Error())
return
@ -115,7 +115,7 @@ func execute(options Options) {
return
}
if err := wakuNode.Start(); err != nil {
if err := wakuNode.Start(ctx); err != nil {
fmt.Println(err.Error())
return
}

View File

@ -56,14 +56,11 @@ require (
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-migrate/migrate/v4 v4.15.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.0 // indirect
@ -96,7 +93,6 @@ require (
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mattn/go-sqlite3 v1.14.15 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect

File diff suppressed because it is too large Load Diff

View File

@ -41,15 +41,12 @@ require (
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-migrate/migrate/v4 v4.15.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.0 // indirect
@ -82,7 +79,6 @@ require (
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mattn/go-sqlite3 v1.14.15 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect

File diff suppressed because it is too large Load Diff

View File

@ -34,8 +34,8 @@ func main() {
}
logging.SetAllLoggers(lvl)
hostAddr1, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("0.0.0.0:60000"))
hostAddr2, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("0.0.0.0:60001"))
hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:60000")
hostAddr2, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:60001")
key1, err := randomHex(32)
if err != nil {
@ -62,19 +62,22 @@ func main() {
ctx := context.Background()
fullNode, err := node.New(ctx,
fullNode, err := node.New(
node.WithPrivateKey(prvKey1),
node.WithHostAddress(hostAddr1),
node.WithWakuRelay(),
node.WithWakuFilter(true),
)
err = fullNode.Start()
if err != nil {
panic(err)
}
lightNode, err := node.New(ctx,
err = fullNode.Start(ctx)
if err != nil {
panic(err)
}
lightNode, err := node.New(
node.WithPrivateKey(prvKey2),
node.WithHostAddress(hostAddr2),
node.WithWakuFilter(false),
@ -88,7 +91,7 @@ func main() {
log.Info("Error adding filter peer on light node ", err)
}
err = lightNode.Start()
err = lightNode.Start(ctx)
if err != nil {
panic(err)
}

14
go.sum
View File

@ -73,6 +73,7 @@ github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjN
github.com/Azure/azure-storage-blob-go v0.14.0/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
github.com/Azure/go-ansiterm v0.0.0-20210608223527-2377c96fe795/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Azure/go-autorest v10.8.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
@ -111,6 +112,7 @@ github.com/Microsoft/go-winio v0.4.17-0.20210211115548-6eac466e5fa3/go.mod h1:JP
github.com/Microsoft/go-winio v0.4.17-0.20210324224401-5516f17a5958/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.4.17/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.5.1/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
github.com/Microsoft/hcsshim v0.8.6/go.mod h1:Op3hHsoHPAvb6lceZHDtd9OkTew38wNoXnJs8iY7rUg=
github.com/Microsoft/hcsshim v0.8.7-0.20190325164909-8abdbb8205e4/go.mod h1:Op3hHsoHPAvb6lceZHDtd9OkTew38wNoXnJs8iY7rUg=
@ -327,6 +329,7 @@ github.com/containerd/containerd v1.5.0-rc.0/go.mod h1:V/IXoMqNGgBlabz3tHD2TWDoT
github.com/containerd/containerd v1.5.1/go.mod h1:0DOxVqwDy2iZvrZp2JUx/E+hS0UNTVn7dJnIOwtYR4g=
github.com/containerd/containerd v1.5.7/go.mod h1:gyvv6+ugqY25TiXxcZC3L5yOeYgEw0QMhscqVp1AR9c=
github.com/containerd/containerd v1.5.8/go.mod h1:YdFSv5bTFLpG2HIYmfqDpSYYTDX+mc5qtSuYx1YUb/s=
github.com/containerd/containerd v1.6.1 h1:oa2uY0/0G+JX4X7hpGCYvkp9FjUancz56kSNnb1sG3o=
github.com/containerd/containerd v1.6.1/go.mod h1:1nJz5xCZPusx6jJU8Frfct988y0NpumIq9ODB0kLtoE=
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
github.com/containerd/continuity v0.0.0-20190815185530-f2a389ac0a02/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
@ -447,6 +450,7 @@ github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dhui/dktest v0.3.10 h1:0frpeeoM9pHouHjhLeZDuDTJ0PqjDTrycaHaMmkJAo8=
github.com/dhui/dktest v0.3.10/go.mod h1:h5Enh0nG3Qbo9WjNFRrwmKUaePEBhXMOygbz3Ww7Sz0=
github.com/dlclark/regexp2 v1.2.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc=
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc=
@ -455,11 +459,14 @@ github.com/docker/cli v0.0.0-20191017083524-a8ff7f821017/go.mod h1:JLrzqnKDaYBop
github.com/docker/distribution v0.0.0-20190905152932-14b96e55d84c/go.mod h1:0+TTO4EOBfRPhZXAeF1Vu+W3hHZ8eLp8PgKVZlcvtFY=
github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68=
github.com/docker/distribution v2.8.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v1.4.2-0.20180625184442-8e610b2b55bf/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v1.4.2-0.20190924003213-a8608b5b67c7/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v20.10.13+incompatible h1:5s7uxnKZG+b8hYWlPYUi6x1Sjpq2MSt96d15eLZeHyw=
github.com/docker/docker v20.10.13+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker-credential-helpers v0.6.3/go.mod h1:WRaJzqw3CTB9bk10avuGsjVBZsD05qeibJ1/TYlvc0Y=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-events v0.0.0-20170721190031-9461782956ad/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA=
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA=
@ -1147,6 +1154,7 @@ github.com/moby/sys/symlink v0.1.0/go.mod h1:GGDODQmbFOjFsXvfLVn3+ZRxkch54RkSiGq
github.com/moby/sys/symlink v0.2.0/go.mod h1:7uZVF2dqJjG/NsClqul95CqKOBRQyYSNnJ6BMgR/gFs=
github.com/moby/term v0.0.0-20200312100748-672ec06f55cd/go.mod h1:DdlQx2hp0Ss5/fLikoLlEeIYiATotOjgB//nb973jeo=
github.com/moby/term v0.0.0-20210610120745-9d4ed1856297/go.mod h1:vgPCkQMyxTZ7IDy8SXRufE172gr8+K/JE/7hHFxHW3A=
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -1154,6 +1162,7 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
@ -1238,10 +1247,12 @@ github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1
github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/go-digest v1.0.0-rc1.0.20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.0/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/opencontainers/image-spec v1.0.2-0.20211117181255-693428a734f5/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/opencontainers/runc v0.0.0-20190115041553-12f6a991201f/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U=
github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U=
@ -1418,6 +1429,7 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
@ -2214,6 +2226,7 @@ google.golang.org/genproto v0.0.0-20211203200212-54befc351ae9/go.mod h1:5CzLGKJ6
google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220111164026-67b88f271998/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220314164441-57ef72a4c106 h1:ErU+UA6wxadoU8nWrsy5MZUVBs75K17zUCsUCIfrXCE=
google.golang.org/genproto v0.0.0-20220314164441-57ef72a4c106/go.mod h1:hAL49I2IFola2sVEjAn7MEwsja0xp51I0tlGAf9hz4E=
google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
@ -2250,6 +2263,7 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K
google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M=
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

View File

@ -186,9 +186,7 @@ func NewNode(configJSON string) string {
}
logging.SetAllLoggers(lvl)
ctx := context.Background()
w, err := node.New(ctx, opts...)
w, err := node.New(opts...)
if err != nil {
return MakeJSONResponse(err)
}
@ -203,7 +201,8 @@ func Start() string {
return MakeJSONResponse(errWakuNodeNotReady)
}
if err := wakuNode.Start(); err != nil {
ctx := context.Background()
if err := wakuNode.Start(ctx); err != nil {
return MakeJSONResponse(err)
}

View File

@ -26,7 +26,7 @@ func TestBasicSendingReceiving(t *testing.T) {
ctx := context.Background()
wakuNode, err := node.New(ctx,
wakuNode, err := node.New(
node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr),
node.WithWakuRelay(),
@ -34,7 +34,7 @@ func TestBasicSendingReceiving(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, wakuNode)
err = wakuNode.Start()
err = wakuNode.Start(ctx)
require.NoError(t, err)
require.NoError(t, write(ctx, wakuNode, "test"))

View File

@ -254,7 +254,7 @@ func Execute(options Options) {
checkForRLN(logger, options, &nodeOpts)
wakuNode, err := node.New(ctx, nodeOpts...)
wakuNode, err := node.New(nodeOpts...)
failOnErr(err, "Wakunode")
@ -262,7 +262,7 @@ func Execute(options Options) {
addPeers(wakuNode, options.LightPush.Nodes, string(lightpush.LightPushID_v20beta1))
addPeers(wakuNode, options.Filter.Nodes, string(filter.FilterID_v20beta1))
if err = wakuNode.Start(); err != nil {
if err = wakuNode.Start(ctx); err != nil {
logger.Fatal("starting waku node", zap.Error(err))
}

View File

@ -207,8 +207,11 @@ func (d *DiscoveryV5) Stop() {
d.Lock()
defer d.Unlock()
d.cancel()
if d.cancel == nil {
return
}
d.cancel()
d.started = false
if d.listener != nil {

View File

@ -91,12 +91,12 @@ func (w *WakuNode) sendConnStatus() {
}
func (w *WakuNode) connectednessListener() {
func (w *WakuNode) connectednessListener(ctx context.Context) {
defer w.wg.Done()
for {
select {
case <-w.ctx.Done():
case <-ctx.Done():
return
case <-w.protocolEventSub.Out():
case <-w.identificationEventSub.Out():

View File

@ -49,24 +49,24 @@ func TestConnectionStatusChanges(t *testing.T) {
// Node1: Only Relay
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
node1, err := New(ctx,
node1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithConnectionStatusChannel(connStatusChan),
)
require.NoError(t, err)
err = node1.Start()
err = node1.Start(ctx)
require.NoError(t, err)
// Node2: Relay
hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
node2, err := New(ctx,
node2, err := New(
WithHostAddress(hostAddr2),
WithWakuRelay(),
)
require.NoError(t, err)
err = node2.Start()
err = node2.Start(ctx)
require.NoError(t, err)
db, migration, err := sqlite.NewDB(":memory:")
@ -77,14 +77,14 @@ func TestConnectionStatusChanges(t *testing.T) {
// Node3: Relay + Store
hostAddr3, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
node3, err := New(ctx,
node3, err := New(
WithHostAddress(hostAddr3),
WithWakuRelay(),
WithWakuStore(),
WithMessageProvider(dbStore),
)
require.NoError(t, err)
err = node3.Start()
err = node3.Start(ctx)
require.NoError(t, err)
var wg sync.WaitGroup

View File

@ -17,7 +17,7 @@ const maxPublishAttempt = 5
// startKeepAlive creates a go routine that periodically pings connected peers.
// This is necessary because TCP connections are automatically closed due to inactivity,
// and doing a ping will avoid this (with a small bandwidth cost)
func (w *WakuNode) startKeepAlive(t time.Duration) {
func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) {
go func() {
defer w.wg.Done()
w.log.Info("setting up ping protocol", zap.Duration("duration", t))
@ -49,12 +49,12 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
for _, p := range w.host.Network().Peers() {
if p != w.host.ID() {
w.wg.Add(1)
go w.pingPeer(p)
go w.pingPeer(ctx, p)
}
}
lastTimeExecuted = w.timesource.Now()
case <-w.ctx.Done():
case <-ctx.Done():
w.log.Info("stopping ping protocol")
return
}
@ -62,12 +62,12 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
}()
}
func (w *WakuNode) pingPeer(peer peer.ID) {
func (w *WakuNode) pingPeer(ctx context.Context, peer peer.ID) {
w.keepAliveMutex.Lock()
defer w.keepAliveMutex.Unlock()
defer w.wg.Done()
ctx, cancel := context.WithTimeout(w.ctx, 3*time.Second)
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
defer cancel()
logger := w.log.With(logging.HostID("peer", peer))

View File

@ -35,7 +35,6 @@ func TestKeepAlive(t *testing.T) {
w := &WakuNode{
host: host1,
ctx: ctx2,
wg: wg,
log: utils.Logger(),
keepAliveMutex: sync.Mutex{},
@ -43,7 +42,7 @@ func TestKeepAlive(t *testing.T) {
}
w.wg.Add(1)
w.pingPeer(host2.ID())
w.pingPeer(ctx2, host2.ID())
require.NoError(t, ctx.Err())
}

View File

@ -1,6 +1,7 @@
package node
import (
"context"
"crypto/ecdsa"
"encoding/binary"
"errors"
@ -17,19 +18,12 @@ import (
"go.uber.org/zap"
)
func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) {
func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) {
db, err := enode.OpenDB("")
if err != nil {
return nil, err
}
localnode := enode.NewLocalNode(db, priv)
err = w.updateLocalNode(localnode, priv, wsAddr, ipAddr, udpPort, wakuFlags, advertiseAddr, log)
if err != nil {
return nil, err
}
return localnode, nil
return enode.NewLocalNode(db, priv), nil
}
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) error {
@ -215,7 +209,7 @@ func selectWSListenAddress(addresses []ma.Multiaddr, extAddr ma.Multiaddr) ([]ma
return result, nil
}
func (w *WakuNode) setupENR(addrs []ma.Multiaddr) error {
func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
extAddr, ipAddr, err := selectMostExternalAddress(addrs)
if err != nil {
w.log.Error("obtaining external address", zap.Error(err))
@ -229,33 +223,23 @@ func (w *WakuNode) setupENR(addrs []ma.Multiaddr) error {
}
// TODO: make this optional depending on DNS Disc being enabled
if w.opts.privKey != nil {
if w.localNode != nil {
err := w.updateLocalNode(w.localNode, w.opts.privKey, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.log)
if err != nil {
w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err))
return err
} else {
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
// Restarting DiscV5
if w.discoveryV5 != nil && w.discoveryV5.IsStarted() {
w.log.Info("restarting discv5")
w.discoveryV5.Stop()
err = w.discoveryV5.Start(w.ctx)
if err != nil {
w.log.Error("could not restart discv5", zap.Error(err))
return err
}
}
}
if w.opts.privKey != nil && w.localNode != nil {
err := w.updateLocalNode(w.localNode, w.opts.privKey, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.log)
if err != nil {
w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err))
return err
} else {
localNode, err := w.newLocalnode(w.opts.privKey, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.log)
if err != nil {
w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err))
return err
} else {
w.localNode = localNode
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
// Restarting DiscV5
discV5 := w.DiscV5()
if discV5 != nil && discV5.IsStarted() {
w.log.Info("restarting discv5")
w.discoveryV5.Stop()
err = w.discoveryV5.Start(ctx)
if err != nil {
w.log.Error("could not restart discv5", zap.Error(err))
return err
}
}
}
}

17
waku/v2/node/service.go Normal file
View File

@ -0,0 +1,17 @@
package node
import (
"context"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
type Service interface {
Start(ctx context.Context) error
Stop()
}
type ReceptorService interface {
Service
MessageChannel() chan *protocol.Envelope
}

View File

@ -70,21 +70,21 @@ type WakuNode struct {
log *zap.Logger
timesource timesource.Timesource
relay *relay.WakuRelay
filter *filter.WakuFilter
lightPush *lightpush.WakuLightPush
store store.Store
swap *swap.WakuSwap
rlnRelay RLNRelay
wakuFlag utils.WakuEnrBitfield
relay Service
lightPush Service
swap Service
discoveryV5 Service
peerExchange Service
filter ReceptorService
store ReceptorService
rlnRelay RLNRelay
wakuFlag utils.WakuEnrBitfield
localNode *enode.LocalNode
addrChan chan ma.Multiaddr
discoveryV5 *discv5.DiscoveryV5
peerExchange *peer_exchange.WakuPeerExchange
bcaster v2.Broadcaster
connectionNotif ConnectionNotifier
@ -95,7 +95,6 @@ type WakuNode struct {
keepAliveMutex sync.Mutex
keepAliveFails map[peer.ID]int
ctx context.Context // TODO: remove this
cancel context.CancelFunc
wg *sync.WaitGroup
@ -111,7 +110,7 @@ func defaultStoreFactory(w *WakuNode) store.Store {
}
// New is used to instantiate a WakuNode using a set of WakuNodeOptions
func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
func New(opts ...WakuNodeOption) (*WakuNode, error) {
params := new(WakuNodeParameters)
params.libP2POpts = DefaultLibP2POptions
@ -161,13 +160,9 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
w := new(WakuNode)
w.bcaster = v2.NewBroadcaster(1024)
w.host = host
w.cancel = cancel
w.ctx = ctx
w.opts = params
w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{}
@ -181,6 +176,31 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.timesource = timesource.NewDefaultClock()
}
w.localNode, err = w.newLocalnode(w.opts.privKey)
if err != nil {
w.log.Error("creating localnode", zap.Error(err))
}
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log)
if w.opts.enableDiscV5 {
err := w.mountDiscV5()
if err != nil {
return nil, err
}
}
w.peerExchange = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.log)
if w.opts.enableSwap {
w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{
swap.WithMode(w.opts.swapMode),
swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold),
}...)
}
if params.storeFactory != nil {
w.storeFactory = params.storeFactory
} else {
@ -203,19 +223,6 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.connStatusChan = params.connStatusC
}
w.connectionNotif = NewConnectionNotifier(ctx, host, w.log)
w.host.Network().Notify(w.connectionNotif)
w.wg.Add(2)
go w.connectednessListener()
go w.checkForAddressChanges()
go w.onAddrChange()
if w.opts.keepAliveInterval > time.Duration(0) {
w.wg.Add(1)
w.startKeepAlive(w.opts.keepAliveInterval)
}
return w, nil
}
@ -226,7 +233,7 @@ func (w *WakuNode) onAddrChange() {
}
}
func (w *WakuNode) checkForAddressChanges() {
func (w *WakuNode) checkForAddressChanges(ctx context.Context) {
defer w.wg.Done()
addrs := w.ListenAddresses()
@ -234,7 +241,7 @@ func (w *WakuNode) checkForAddressChanges() {
first <- struct{}{}
for {
select {
case <-w.ctx.Done():
case <-ctx.Done():
close(w.addrChan)
return
case <-first:
@ -258,104 +265,110 @@ func (w *WakuNode) checkForAddressChanges() {
for _, addr := range addrs {
w.addrChan <- addr
}
_ = w.setupENR(addrs)
_ = w.setupENR(ctx, addrs)
}
}
}
}
// Start initializes all the protocols that were setup in the WakuNode
func (w *WakuNode) Start() error {
func (w *WakuNode) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel
w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log)
w.host.Network().Notify(w.connectionNotif)
w.wg.Add(2)
go w.connectednessListener(ctx)
go w.checkForAddressChanges(ctx)
go w.onAddrChange()
if w.opts.keepAliveInterval > time.Duration(0) {
w.wg.Add(1)
w.startKeepAlive(ctx, w.opts.keepAliveInterval)
}
if w.opts.enableNTP {
err := w.timesource.Start(w.ctx)
err := w.timesource.Start(ctx)
if err != nil {
return err
}
}
if w.opts.enableSwap {
w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{
swap.WithMode(w.opts.swapMode),
swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold),
}...)
if w.opts.enableRelay {
err := w.relay.Start(ctx)
if err != nil {
return err
}
sub, err := w.Relay().Subscribe(ctx)
if err != nil {
return err
}
w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
}
w.store = w.storeFactory(w)
if w.opts.enableStore {
err := w.startStore()
err := w.startStore(ctx)
if err != nil {
return err
}
w.log.Info("Subscribing store to broadcaster")
w.bcaster.Register(nil, w.store.MessageChannel())
}
if w.opts.enableLightPush {
if err := w.lightPush.Start(ctx); err != nil {
return err
}
}
if w.opts.enableFilter {
filter, err := filter.NewWakuFilter(w.ctx, w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
err := w.filter.Start(ctx)
if err != nil {
return err
}
w.filter = filter
w.log.Info("Subscribing filter to broadcaster")
w.bcaster.Register(nil, w.filter.MessageChannel())
}
err := w.setupENR(w.ListenAddresses())
err := w.setupENR(ctx, w.ListenAddresses())
if err != nil {
return err
}
if w.opts.enableDiscV5 {
err := w.mountDiscV5()
if err != nil {
return err
}
}
if w.opts.enablePeerExchange {
err := w.mountPeerExchange()
err := w.peerExchange.Start(ctx)
if err != nil {
return err
}
}
if w.opts.enableDiscV5 {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...))
}
if w.opts.enableRelay {
err = w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...)
if err != nil {
return err
}
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.DiscV5(), w.opts.discV5Opts...))
}
if w.opts.enableRLN {
err = w.mountRlnRelay()
err = w.mountRlnRelay(ctx)
if err != nil {
return err
}
}
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay, w.log)
if w.opts.enableLightPush {
if err := w.lightPush.Start(); err != nil {
return err
}
}
// Subscribe store to topic
if w.opts.enableStore {
w.log.Info("Subscribing store to broadcaster")
w.bcaster.Register(nil, w.store.MessageChannel())
}
if w.filter != nil {
w.log.Info("Subscribing filter to broadcaster")
w.bcaster.Register(nil, w.filter.MsgC)
}
return nil
}
// Stop stops the WakuNode and closess all connections to the host
func (w *WakuNode) Stop() {
if w.cancel == nil {
return
}
w.cancel()
w.bcaster.Close()
@ -365,30 +378,19 @@ func (w *WakuNode) Stop() {
defer w.identificationEventSub.Close()
defer w.addressChangesSub.Close()
if w.filter != nil {
w.filter.Stop()
}
w.relay.Stop()
w.lightPush.Stop()
w.store.Stop()
w.filter.Stop()
w.peerExchange.Stop()
if w.peerExchange != nil {
w.peerExchange.Stop()
}
if w.discoveryV5 != nil {
if w.opts.enableDiscV5 {
w.discoveryV5.Stop()
}
if w.opts.enableRelay {
w.relay.Stop()
}
w.lightPush.Stop()
w.store.Stop()
_ = w.stopRlnRelay()
err := w.timesource.Stop()
if err != nil {
w.log.Error("stopping timesource", zap.Error(err))
}
w.timesource.Stop()
w.host.Close()
@ -428,32 +430,47 @@ func (w *WakuNode) Timesource() timesource.Timesource {
// Relay is used to access any operation related to Waku Relay protocol
func (w *WakuNode) Relay() *relay.WakuRelay {
return w.relay
if result, ok := w.relay.(*relay.WakuRelay); ok {
return result
}
return nil
}
// Store is used to access any operation related to Waku Store protocol
func (w *WakuNode) Store() store.Store {
return w.store
return w.store.(store.Store)
}
// Filter is used to access any operation related to Waku Filter protocol
func (w *WakuNode) Filter() *filter.WakuFilter {
return w.filter
if result, ok := w.filter.(*filter.WakuFilter); ok {
return result
}
return nil
}
// Lightpush is used to access any operation related to Waku Lightpush protocol
func (w *WakuNode) Lightpush() *lightpush.WakuLightPush {
return w.lightPush
if result, ok := w.lightPush.(*lightpush.WakuLightPush); ok {
return result
}
return nil
}
// DiscV5 is used to access any operation related to DiscoveryV5
func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 {
return w.discoveryV5
if result, ok := w.discoveryV5.(*discv5.DiscoveryV5); ok {
return result
}
return nil
}
// PeerExchange is used to access any operation related to Peer Exchange
func (w *WakuNode) PeerExchange() *peer_exchange.WakuPeerExchange {
return w.peerExchange
if result, ok := w.peerExchange.(*peer_exchange.WakuPeerExchange); ok {
return result
}
return nil
}
// Broadcaster is used to access the message broadcaster that is used to push
@ -472,16 +489,16 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
hash, _, _ := msg.Hash()
err := try.Do(func(attempt int) (bool, error) {
var err error
if !w.relay.EnoughPeersToPublish() {
if !w.lightPush.IsStarted() {
err = errors.New("not enought peers for relay and lightpush is not yet started")
} else {
w.log.Debug("publishing message via lightpush", logging.HexBytes("hash", hash))
_, err = w.Lightpush().Publish(ctx, msg)
}
relay := w.Relay()
lightpush := w.Lightpush()
if relay == nil || !relay.EnoughPeersToPublish() {
w.log.Debug("publishing message via lightpush", logging.HexBytes("hash", hash))
_, err = lightpush.Publish(ctx, msg)
} else {
w.log.Debug("publishing message via relay", logging.HexBytes("hash", hash))
_, err = w.Relay().Publish(ctx, msg)
_, err = relay.Publish(ctx, msg)
}
return attempt < maxPublishAttempt, err
@ -490,24 +507,6 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
return err
}
func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error {
var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.timesource, w.log, opts...)
if err != nil {
return err
}
if w.opts.enableRelay {
sub, err := w.relay.Subscribe(w.ctx)
if err != nil {
return err
}
w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
}
return err
}
func (w *WakuNode) mountDiscV5() error {
discV5Options := []discv5.DiscoveryV5Option{
discv5.WithBootnodes(w.opts.discV5bootnodes),
@ -525,13 +524,8 @@ func (w *WakuNode) mountDiscV5() error {
return err
}
func (w *WakuNode) mountPeerExchange() error {
w.peerExchange = peer_exchange.NewWakuPeerExchange(w.host, w.discoveryV5, w.log)
return w.peerExchange.Start(w.ctx)
}
func (w *WakuNode) startStore() error {
err := w.store.Start(w.ctx)
func (w *WakuNode) startStore(ctx context.Context) error {
err := w.store.Start(ctx)
if err != nil {
w.log.Error("starting store", zap.Error(err))
return err
@ -554,9 +548,9 @@ func (w *WakuNode) startStore() error {
go func() {
defer w.wg.Done()
ctxWithTimeout, ctxCancel := context.WithTimeout(w.ctx, 20*time.Second)
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := w.store.Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil {
if _, err := w.store.(store.Store).Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil {
w.log.Error("Could not resume history", zap.Error(err))
time.Sleep(10 * time.Second)
}
@ -673,7 +667,7 @@ func (w *WakuNode) PeerStats() PeerStats {
// Set the bootnodes on discv5
func (w *WakuNode) SetDiscV5Bootnodes(nodes []*enode.Node) error {
w.opts.discV5bootnodes = nodes
return w.discoveryV5.SetBootnodes(nodes)
return w.DiscV5().SetBootnodes(nodes)
}
// Peers return the list of peers, addresses, protocols supported and connection status

View File

@ -3,11 +3,13 @@
package node
import "context"
func (w *WakuNode) RLNRelay() RLNRelay {
return nil
}
func (w *WakuNode) mountRlnRelay() error {
func (w *WakuNode) mountRlnRelay(ctx context.Context) error {
return nil
}

View File

@ -4,9 +4,9 @@
package node
import (
"context"
"encoding/hex"
"errors"
"github.com/waku-org/go-waku/waku/v2/protocol/rln"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
@ -17,7 +17,7 @@ func (w *WakuNode) RLNRelay() RLNRelay {
return w.rlnRelay
}
func (w *WakuNode) mountRlnRelay() error {
func (w *WakuNode) mountRlnRelay(ctx context.Context) error {
// check whether inputs are provided
// relay protocol is the prerequisite of rln-relay
if w.Relay() == nil {
@ -46,7 +46,7 @@ func (w *WakuNode) mountRlnRelay() error {
}
// mount rlnrelay in off-chain mode with a static group of users
rlnRelay, err := rln.RlnRelayStatic(w.ctx, w.relay, groupKeys, memKeyPair, memIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log)
rlnRelay, err := rln.RlnRelayStatic(ctx, w.Relay(), groupKeys, memKeyPair, memIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log)
if err != nil {
return err
}
@ -80,7 +80,7 @@ func (w *WakuNode) mountRlnRelay() error {
// mount the rln relay protocol in the on-chain/dynamic mode
var err error
w.rlnRelay, err = rln.RlnRelayDynamic(w.ctx, w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log)
w.rlnRelay, err = rln.RlnRelayDynamic(ctx, w.Relay(), w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log)
if err != nil {
return err
}

View File

@ -39,17 +39,17 @@ func TestWakuNode2(t *testing.T) {
ctx := context.Background()
wakuNode, err := New(ctx,
wakuNode, err := New(
WithPrivateKey(prvKey),
WithHostAddress(hostAddr),
WithWakuRelay(),
)
require.NoError(t, err)
err = wakuNode.Start()
defer wakuNode.Stop()
err = wakuNode.Start(ctx)
require.NoError(t, err)
defer wakuNode.Stop()
}
func int2Bytes(i int) []byte {
@ -74,23 +74,23 @@ func Test5000(t *testing.T) {
key2, _ := tests.RandomHex(32)
prvKey2, _ := crypto.HexToECDSA(key2)
wakuNode1, err := New(ctx,
wakuNode1, err := New(
WithPrivateKey(prvKey1),
WithHostAddress(hostAddr1),
WithWakuRelay(),
)
require.NoError(t, err)
err = wakuNode1.Start()
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()
wakuNode2, err := New(ctx,
wakuNode2, err := New(
WithPrivateKey(prvKey2),
WithHostAddress(hostAddr2),
WithWakuRelay(),
)
require.NoError(t, err)
err = wakuNode2.Start()
err = wakuNode2.Start(ctx)
require.NoError(t, err)
defer wakuNode2.Stop()
@ -171,13 +171,13 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
// NODE1: Relay Node + Filter Server
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
wakuNode1, err := New(ctx,
wakuNode1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithWakuFilter(true),
)
require.NoError(t, err)
err = wakuNode1.Start()
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()
@ -189,14 +189,14 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
wakuNode2, err := New(ctx,
wakuNode2, err := New(
WithHostAddress(hostAddr2),
WithWakuFilter(false),
WithWakuStore(),
WithMessageProvider(dbStore),
)
require.NoError(t, err)
err = wakuNode2.Start()
err = wakuNode2.Start(ctx)
require.NoError(t, err)
defer wakuNode2.Stop()
@ -238,12 +238,12 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
// NODE3: Query from NODE2
hostAddr3, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
wakuNode3, err := New(ctx,
wakuNode3, err := New(
WithHostAddress(hostAddr3),
WithWakuFilter(false),
)
require.NoError(t, err)
err = wakuNode3.Start()
err = wakuNode3.Start(ctx)
require.NoError(t, err)
defer wakuNode3.Stop()

View File

@ -41,6 +41,14 @@ func NewSubscribers(timeout time.Duration) *Subscribers {
}
}
func (sub *Subscribers) Clear() {
sub.Lock()
defer sub.Unlock()
sub.subscribers = nil
sub.failedPeers = make(map[peer.ID]time.Time)
}
func (sub *Subscribers) Append(s Subscriber) int {
sub.Lock()
defer sub.Unlock()

View File

@ -49,10 +49,10 @@ type (
}
WakuFilter struct {
ctx context.Context
cancel context.CancelFunc
h host.Host
isFullNode bool
MsgC chan *protocol.Envelope
msgC chan *protocol.Envelope
wg *sync.WaitGroup
log *zap.Logger
@ -65,16 +65,10 @@ type (
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilter(ctx context.Context, host host.Host, broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) (*WakuFilter, error) {
func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilter {
wf := new(WakuFilter)
wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode))
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil {
wf.log.Error("creating tag map", zap.Error(err))
return nil, errors.New("could not start waku filter")
}
params := new(FilterParameters)
optList := DefaultOptions()
optList = append(optList, opts...)
@ -82,87 +76,105 @@ func NewWakuFilter(ctx context.Context, host host.Host, broadcaster v2.Broadcast
opt(params)
}
wf.ctx = ctx
wf.wg = &sync.WaitGroup{}
wf.MsgC = make(chan *protocol.Envelope, 1024)
wf.h = host
wf.isFullNode = isFullNode
wf.filters = NewFilterMap(broadcaster, timesource)
wf.subscribers = NewSubscribers(params.timeout)
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
return wf
}
func (wf *WakuFilter) Start(ctx context.Context) error {
wf.wg.Wait() // Wait for any goroutines to stop
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil {
wf.log.Error("creating tag map", zap.Error(err))
return errors.New("could not start waku filter")
}
ctx, cancel := context.WithCancel(ctx)
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest(ctx))
wf.cancel = cancel
wf.msgC = make(chan *protocol.Envelope, 1024)
wf.wg.Add(1)
go wf.filterListener()
go wf.filterListener(ctx)
wf.log.Info("filter protocol started")
return wf, nil
return nil
}
func (wf *WakuFilter) onRequest(s network.Stream) {
defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
filterRPCRequest := &pb.FilterRPC{}
filterRPCRequest := &pb.FilterRPC{}
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
err := reader.ReadMsg(filterRPCRequest)
if err != nil {
logger.Error("reading request", zap.Error(err))
return
}
logger.Info("received request")
if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 {
// We're on a light node.
// This is a message push coming from a full node.
for _, message := range filterRPCRequest.Push.Messages {
wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node
err := reader.ReadMsg(filterRPCRequest)
if err != nil {
logger.Error("reading request", zap.Error(err))
return
}
logger.Info("received a message push", zap.Int("messages", len(filterRPCRequest.Push.Messages)))
stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages))))
} else if filterRPCRequest.Request != nil && wf.isFullNode {
// We're on a full node.
// This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe {
subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request}
if subscriber.filter.Topic == "" { // @TODO: review if empty topic is possible
subscriber.filter.Topic = relay.DefaultWakuTopic
logger.Info("received request")
if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 {
// We're on a light node.
// This is a message push coming from a full node.
for _, message := range filterRPCRequest.Push.Messages {
wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node
}
len := wf.subscribers.Append(subscriber)
logger.Info("received a message push", zap.Int("messages", len(filterRPCRequest.Push.Messages)))
stats.Record(ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages))))
} else if filterRPCRequest.Request != nil && wf.isFullNode {
// We're on a full node.
// This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe {
subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request}
if subscriber.filter.Topic == "" { // @TODO: review if empty topic is possible
subscriber.filter.Topic = relay.DefaultWakuTopic
}
logger.Info("adding subscriber")
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len)))
len := wf.subscribers.Append(subscriber)
logger.Info("adding subscriber")
stats.Record(ctx, metrics.FilterSubscriptions.M(int64(len)))
} else {
peerId := s.Conn().RemotePeer()
wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.RequestId, filterRPCRequest.Request.ContentFilters)
logger.Info("removing subscriber")
stats.Record(ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length())))
}
} else {
peerId := s.Conn().RemotePeer()
wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.RequestId, filterRPCRequest.Request.ContentFilters)
logger.Info("removing subscriber")
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length())))
logger.Error("can't serve request")
return
}
} else {
logger.Error("can't serve request")
return
}
}
func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error {
func (wf *WakuFilter) pushMessage(ctx context.Context, subscriber Subscriber, msg *pb.WakuMessage) error {
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}}
logger := wf.log.With(logging.HostID("peer", subscriber.peer))
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wf.h.Connect(wf.ctx, wf.h.Peerstore().PeerInfo(subscriber.peer))
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(subscriber.peer))
if err != nil {
wf.subscribers.FlagAsFailure(subscriber.peer)
logger.Error("connecting to peer", zap.Error(err))
return err
}
conn, err := wf.h.NewStream(wf.ctx, subscriber.peer, FilterID_v20beta1)
conn, err := wf.h.NewStream(ctx, subscriber.peer, FilterID_v20beta1)
if err != nil {
wf.subscribers.FlagAsFailure(subscriber.peer)
@ -184,7 +196,7 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er
return nil
}
func (wf *WakuFilter) filterListener() {
func (wf *WakuFilter) filterListener(ctx context.Context) {
defer wf.wg.Done()
// This function is invoked for each message received
@ -209,7 +221,7 @@ func (wf *WakuFilter) filterListener() {
// Do a message push to light node
logger.Info("pushing message to light node", zap.String("contentTopic", msg.ContentTopic))
g.Go(func() (err error) {
err = wf.pushMessage(subscriber, msg)
err = wf.pushMessage(ctx, subscriber, msg)
if err != nil {
logger.Error("pushing message", zap.Error(err))
}
@ -220,7 +232,7 @@ func (wf *WakuFilter) filterListener() {
return g.Wait()
}
for m := range wf.MsgC {
for m := range wf.msgC {
if err := handle(m); err != nil {
wf.log.Error("handling message", zap.Error(err))
}
@ -330,10 +342,18 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt
// Stop unmounts the filter protocol
func (wf *WakuFilter) Stop() {
close(wf.MsgC)
if wf.cancel == nil {
return
}
wf.cancel()
close(wf.msgC)
wf.h.RemoveStreamHandler(FilterID_v20beta1)
wf.filters.RemoveAll()
wf.subscribers.Clear()
wf.wg.Wait()
}
@ -446,3 +466,7 @@ func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) e
return nil
}
func (wf *WakuFilter) MessageChannel() chan *protocol.Envelope {
return wf.msgC
}

View File

@ -24,7 +24,8 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(host, broadcaster, 0, timesource.NewDefaultClock(), utils.Logger())
err = relay.Start(context.Background())
require.NoError(t, err)
sub, err := relay.SubscribeToTopic(context.Background(), topic)
@ -40,7 +41,9 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
filter, _ := NewWakuFilter(context.Background(), host, v2.NewBroadcaster(10), false, timesource.NewDefaultClock(), utils.Logger())
filter := NewWakuFilter(host, v2.NewBroadcaster(10), false, timesource.NewDefaultClock(), utils.Logger())
err = filter.Start(context.Background())
require.NoError(t, err)
return filter, host
}
@ -70,11 +73,14 @@ func TestWakuFilter(t *testing.T) {
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter, _ := NewWakuFilter(ctx, host2, broadcaster, true, timesource.NewDefaultClock(), utils.Logger())
broadcaster.Register(&testTopic, node2Filter.MsgC)
node2Filter := NewWakuFilter(host2, broadcaster, true, timesource.NewDefaultClock(), utils.Logger())
err := node2Filter.Start(ctx)
require.NoError(t, err)
broadcaster.Register(&testTopic, node2Filter.MessageChannel())
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err := host1.Peerstore().AddProtocols(host2.ID(), string(FilterID_v20beta1))
err = host1.Peerstore().AddProtocols(host2.ID(), string(FilterID_v20beta1))
require.NoError(t, err)
contentFilter := &ContentFilter{
@ -155,11 +161,14 @@ func TestWakuFilterPeerFailure(t *testing.T) {
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter, _ := NewWakuFilter(ctx, host2, v2.NewBroadcaster(10), true, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(3*time.Second))
broadcaster.Register(&testTopic, node2Filter.MsgC)
node2Filter := NewWakuFilter(host2, v2.NewBroadcaster(10), true, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(3*time.Second))
err := node2Filter.Start(ctx)
require.NoError(t, err)
broadcaster.Register(&testTopic, node2Filter.MessageChannel())
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err := host1.Peerstore().AddProtocols(host2.ID(), string(FilterID_v20beta1))
err = host1.Peerstore().AddProtocols(host2.ID(), string(FilterID_v20beta1))
require.NoError(t, err)
contentFilter := &ContentFilter{

View File

@ -27,20 +27,17 @@ var (
)
type WakuLightPush struct {
h host.Host
relay *relay.WakuRelay
ctx context.Context
h host.Host
relay *relay.WakuRelay
cancel context.CancelFunc
log *zap.Logger
started bool
}
// NewWakuRelay returns a new instance of Waku Lightpush struct
func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush {
func NewWakuLightPush(h host.Host, relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush {
wakuLP := new(WakuLightPush)
wakuLP.relay = relay
wakuLP.ctx = ctx
wakuLP.h = h
wakuLP.log = log.Named("lightpush")
@ -48,14 +45,16 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay,
}
// Start inits the lighpush protocol
func (wakuLP *WakuLightPush) Start() error {
func (wakuLP *WakuLightPush) Start(ctx context.Context) error {
if wakuLP.relayIsNotAvailable() {
return errors.New("relay is required, without it, it is only a client and cannot be started")
}
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest)
ctx, cancel := context.WithCancel(ctx)
wakuLP.cancel = cancel
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest(ctx))
wakuLP.log.Info("Light Push protocol started")
wakuLP.started = true
return nil
}
@ -65,67 +64,69 @@ func (wakuLp *WakuLightPush) relayIsNotAvailable() bool {
return wakuLp.relay == nil
}
func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
defer s.Close()
logger := wakuLP.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
requestPushRPC := &pb.PushRPC{}
func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wakuLP.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
requestPushRPC := &pb.PushRPC{}
writer := protoio.NewDelimitedWriter(s)
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
writer := protoio.NewDelimitedWriter(s)
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
err := reader.ReadMsg(requestPushRPC)
if err != nil {
logger.Error("reading request", zap.Error(err))
metrics.RecordLightpushError(wakuLP.ctx, "decodeRpcFailure")
return
}
logger.Info("request received")
if requestPushRPC.Query != nil {
logger.Info("push request")
response := new(pb.PushResponse)
if !wakuLP.relayIsNotAvailable() {
pubSubTopic := requestPushRPC.Query.PubsubTopic
message := requestPushRPC.Query.Message
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
_, err := wakuLP.relay.PublishToTopic(wakuLP.ctx, message, pubSubTopic)
if err != nil {
logger.Error("publishing message", zap.Error(err))
response.IsSuccess = false
response.Info = "Could not publish message"
} else {
response.IsSuccess = true
response.Info = "Totally" // TODO: ask about this
}
} else {
logger.Debug("no relay protocol present, unsuccessful push")
response.IsSuccess = false
response.Info = "No relay protocol"
}
responsePushRPC := &pb.PushRPC{}
responsePushRPC.RequestId = requestPushRPC.RequestId
responsePushRPC.Response = response
err = writer.WriteMsg(responsePushRPC)
err := reader.ReadMsg(requestPushRPC)
if err != nil {
logger.Error("writing response", zap.Error(err))
_ = s.Reset()
} else {
logger.Info("response sent")
logger.Error("reading request", zap.Error(err))
metrics.RecordLightpushError(ctx, "decodeRpcFailure")
return
}
}
if requestPushRPC.Response != nil {
if requestPushRPC.Response.IsSuccess {
logger.Info("request success")
} else {
logger.Info("request failure", zap.String("info=", requestPushRPC.Response.Info))
logger.Info("request received")
if requestPushRPC.Query != nil {
logger.Info("push request")
response := new(pb.PushResponse)
if !wakuLP.relayIsNotAvailable() {
pubSubTopic := requestPushRPC.Query.PubsubTopic
message := requestPushRPC.Query.Message
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
_, err := wakuLP.relay.PublishToTopic(ctx, message, pubSubTopic)
if err != nil {
logger.Error("publishing message", zap.Error(err))
response.IsSuccess = false
response.Info = "Could not publish message"
} else {
response.IsSuccess = true
response.Info = "Totally" // TODO: ask about this
}
} else {
logger.Debug("no relay protocol present, unsuccessful push")
response.IsSuccess = false
response.Info = "No relay protocol"
}
responsePushRPC := &pb.PushRPC{}
responsePushRPC.RequestId = requestPushRPC.RequestId
responsePushRPC.Response = response
err = writer.WriteMsg(responsePushRPC)
if err != nil {
logger.Error("writing response", zap.Error(err))
_ = s.Reset()
} else {
logger.Info("response sent")
}
}
if requestPushRPC.Response != nil {
if requestPushRPC.Response.IsSuccess {
logger.Info("request success")
} else {
logger.Info("request failure", zap.String("info=", requestPushRPC.Response.Info))
}
}
}
}
@ -142,7 +143,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
}
if params.selectedPeer == "" {
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
metrics.RecordLightpushError(ctx, "dialError")
return nil, ErrNoPeersAvailable
}
@ -161,7 +162,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
metrics.RecordLightpushError(ctx, "dialError")
return nil, err
}
@ -169,7 +170,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
defer func() {
err := connOpt.Reset()
if err != nil {
metrics.RecordLightpushError(wakuLP.ctx, "dialError")
metrics.RecordLightpushError(ctx, "dialError")
logger.Error("resetting connection", zap.Error(err))
}
}()
@ -189,24 +190,21 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
err = reader.ReadMsg(pushResponseRPC)
if err != nil {
logger.Error("reading response", zap.Error(err))
metrics.RecordLightpushError(wakuLP.ctx, "decodeRPCFailure")
metrics.RecordLightpushError(ctx, "decodeRPCFailure")
return nil, err
}
return pushResponseRPC.Response, nil
}
// IsStarted returns if the lightpush protocol has been mounted or not
func (wakuLP *WakuLightPush) IsStarted() bool {
return wakuLP.started
}
// Stop unmounts the lightpush protocol
func (wakuLP *WakuLightPush) Stop() {
if wakuLP.started {
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
wakuLP.started = false
if wakuLP.cancel == nil {
return
}
wakuLP.cancel()
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
}
// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol

View File

@ -26,7 +26,9 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(host, v2.NewBroadcaster(10), 0, timesource.NewDefaultClock(), utils.Logger())
require.NoError(t, err)
err = relay.Start(context.Background())
require.NoError(t, err)
sub, err := relay.SubscribeToTopic(context.Background(), topic)
@ -57,8 +59,8 @@ func TestWakuLightPush(t *testing.T) {
defer sub2.Unsubscribe()
ctx := context.Background()
lightPushNode2 := NewWakuLightPush(ctx, host2, node2, utils.Logger())
err := lightPushNode2.Start()
lightPushNode2 := NewWakuLightPush(host2, node2, utils.Logger())
err := lightPushNode2.Start(ctx)
require.NoError(t, err)
defer lightPushNode2.Stop()
@ -67,7 +69,7 @@ func TestWakuLightPush(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger())
client := NewWakuLightPush(clientHost, nil, utils.Logger())
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200))
@ -123,8 +125,8 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger())
err = client.Start()
client := NewWakuLightPush(clientHost, nil, utils.Logger())
err = client.Start(ctx)
require.Errorf(t, err, "relay is required")
}
@ -137,7 +139,7 @@ func TestWakuLightPushNoPeers(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger())
client := NewWakuLightPush(clientHost, nil, utils.Logger())
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic)
require.Errorf(t, err, "no suitable remote peers")

View File

@ -126,6 +126,10 @@ func (r *NoiseWakuRelay) Publish(ctx context.Context, contentTopic string, paylo
}
func (r *NoiseWakuRelay) Stop() {
if r.cancel == nil {
return
}
r.cancel()
for _, contentTopicSubscriptions := range r.subscriptionChPerContentTopic {
for _, c := range contentTopicSubscriptions {

View File

@ -26,7 +26,8 @@ func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(1024), 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(host, v2.NewBroadcaster(1024), 0, timesource.NewDefaultClock(), utils.Logger())
err = relay.Start(context.Background())
require.NoError(t, err)
return host, relay

View File

@ -50,8 +50,9 @@ type WakuPeerExchange struct {
log *zap.Logger
cancel context.CancelFunc
wg sync.WaitGroup
cancel context.CancelFunc
started bool
wg sync.WaitGroup
enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
enrCacheMutex sync.RWMutex
@ -71,9 +72,15 @@ func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger)
// Start inits the peer exchange protocol
func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
if wakuPX.started {
return errors.New("peer exchange already started")
}
wakuPX.wg.Wait() // Waiting for any go routines to stop
ctx, cancel := context.WithCancel(ctx)
wakuPX.cancel = cancel
wakuPX.started = true
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx))
wakuPX.log.Info("Peer exchange protocol started")
@ -192,6 +199,9 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
// Stop unmounts the peer exchange protocol
func (wakuPX *WakuPeerExchange) Stop() {
if wakuPX.cancel == nil {
return
}
wakuPX.cancel()
wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1)
wakuPX.wg.Wait()

View File

@ -31,6 +31,7 @@ var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
type WakuRelay struct {
host host.Host
opts []pubsub.Option
pubsub *pubsub.PubSub
timesource timesource.Timesource
@ -56,7 +57,7 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
}
// NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) (*WakuRelay, error) {
func NewWakuRelay(h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
w := new(WakuRelay)
w.host = h
w.timesource = timesource
@ -71,7 +72,6 @@ func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minP
opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
opts = append(opts, pubsub.WithNoAuthor())
opts = append(opts, pubsub.WithMessageIdFn(msgIdFn))
opts = append(opts, pubsub.WithGossipSubProtocols(
[]protocol.ID{pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID, WakuRelayID_v200},
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
@ -86,15 +86,20 @@ func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minP
},
))
ps, err := pubsub.NewGossipSub(ctx, h, opts...)
w.opts = opts
return w
}
func (w *WakuRelay) Start(ctx context.Context) error {
ps, err := pubsub.NewGossipSub(ctx, w.host, w.opts...)
if err != nil {
return nil, err
return err
}
w.pubsub = ps
w.log.Info("Relay protocol started")
return w, nil
return nil
}
// PubSub returns the implementation of the pubsub system
@ -293,7 +298,10 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <
for {
msg, err := sub.Next(ctx)
if err != nil {
w.log.Error("getting message from subscription", zap.Error(err))
if !errors.Is(err, context.Canceled) {
w.log.Error("getting message from subscription", zap.Error(err))
}
sub.Cancel()
close(msgChannel)
for _, subscription := range w.subscriptions[sub.Topic()] {

View File

@ -21,9 +21,10 @@ func TestWakuRelay(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay, err := NewWakuRelay(context.Background(), host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
defer relay.Stop()
relay := NewWakuRelay(host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
err = relay.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()
sub, err := relay.subscribe(testTopic)
defer sub.Cancel()

View File

@ -7,13 +7,12 @@ import (
"context"
"crypto/ecdsa"
"crypto/rand"
r "github.com/waku-org/go-zerokit-rln/rln"
"math/big"
"sync"
"testing"
"time"
r "github.com/waku-org/go-zerokit-rln/rln"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
@ -22,6 +21,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -122,7 +122,7 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() {
s.Require().NoError(err)
wg := &sync.WaitGroup{}
wg.Add(2)
wg.Add(1)
handler := func(pubkey r.IDCommitment, index r.MembershipIndex) error {
if pubkey == keyPair.IDCommitment || pubkey == keyPair2.IDCommitment {
@ -228,20 +228,21 @@ func (s *WakuRLNRelayDynamicSuite) TestMerkleTreeConstruction() {
host, err := tests.MakeHost(context.TODO(), port, rand.Reader)
s.Require().NoError(err)
relay, err := relay.NewWakuRelay(context.TODO(), host, nil, 0, utils.Logger())
defer relay.Stop()
relay := relay.NewWakuRelay(host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
err = relay.Start(context.TODO())
s.Require().NoError(err)
defer relay.Stop()
sub, err := relay.SubscribeToTopic(context.TODO(), RLNRELAY_PUBSUB_TOPIC)
s.Require().NoError(err)
defer sub.Unsubscribe()
// mount the rln relay protocol in the on-chain/dynamic mode
rlnRelay, err := RlnRelayDynamic(context.TODO(), relay, ETH_CLIENT_ADDRESS, nil, s.rlnAddr, keyPair1, r.MembershipIndex(0), RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, nil, utils.Logger())
rlnRelay, err := RlnRelayDynamic(context.TODO(), relay, ETH_CLIENT_ADDRESS, nil, s.rlnAddr, keyPair1, r.MembershipIndex(0), RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, nil, timesource.NewDefaultClock(), utils.Logger())
s.Require().NoError(err)
// wait for the event to reach the group handler
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
// rln pks are inserted into the rln peer's Merkle tree and the resulting root
// is expected to be the same as the calculatedRoot i.e., the one calculated outside of the mountRlnRelayDynamic proc
@ -260,16 +261,17 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() {
host1, err := tests.MakeHost(context.TODO(), port1, rand.Reader)
s.Require().NoError(err)
relay1, err := relay.NewWakuRelay(context.TODO(), host1, nil, 0, utils.Logger())
defer relay1.Stop()
relay1 := relay.NewWakuRelay(host1, nil, 0, timesource.NewDefaultClock(), utils.Logger())
relay1.Start(context.TODO())
s.Require().NoError(err)
defer relay1.Stop()
sub1, err := relay1.SubscribeToTopic(context.TODO(), RLNRELAY_PUBSUB_TOPIC)
s.Require().NoError(err)
defer sub1.Unsubscribe()
// mount the rln relay protocol in the on-chain/dynamic mode
rlnRelay1, err := RlnRelayDynamic(context.TODO(), relay1, ETH_CLIENT_ADDRESS, s.u1PrivKey, s.rlnAddr, nil, r.MembershipIndex(0), RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, nil, utils.Logger())
rlnRelay1, err := RlnRelayDynamic(context.TODO(), relay1, ETH_CLIENT_ADDRESS, s.u1PrivKey, s.rlnAddr, nil, r.MembershipIndex(0), RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, nil, timesource.NewDefaultClock(), utils.Logger())
s.Require().NoError(err)
// Node 2 ============================================================
@ -279,16 +281,17 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() {
host2, err := tests.MakeHost(context.TODO(), port2, rand.Reader)
s.Require().NoError(err)
relay2, err := relay.NewWakuRelay(context.TODO(), host2, nil, 0, utils.Logger())
defer relay2.Stop()
relay2 := relay.NewWakuRelay(host2, nil, 0, timesource.NewDefaultClock(), utils.Logger())
err = relay2.Start(context.TODO())
s.Require().NoError(err)
defer relay2.Stop()
sub2, err := relay2.SubscribeToTopic(context.TODO(), RLNRELAY_PUBSUB_TOPIC)
s.Require().NoError(err)
defer sub2.Unsubscribe()
// mount the rln relay protocol in the on-chain/dynamic mode
rlnRelay2, err := RlnRelayDynamic(context.TODO(), relay2, ETH_CLIENT_ADDRESS, s.u2PrivKey, s.rlnAddr, nil, r.MembershipIndex(0), RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, nil, utils.Logger())
rlnRelay2, err := RlnRelayDynamic(context.TODO(), relay2, ETH_CLIENT_ADDRESS, s.u2PrivKey, s.rlnAddr, nil, r.MembershipIndex(0), RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, nil, timesource.NewDefaultClock(), utils.Logger())
s.Require().NoError(err)
// ==================================

View File

@ -116,6 +116,7 @@ func RlnRelayDynamic(
timesource: timesource,
nullifierLog: make(map[r.Epoch][]r.ProofMetadata),
registrationHandler: registrationHandler,
lastIndexLoaded: -1,
}
root, err := rlnPeer.RLN.GetMerkleRoot()

View File

@ -33,9 +33,10 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
defer relay.Stop()
relay := relay.NewWakuRelay(host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
err = relay.Start(context.Background())
s.Require().NoError(err)
defer relay.Stop()
groupKeyPairs, root, err := r.CreateMembershipList(100)
s.Require().NoError(err)

View File

@ -55,7 +55,7 @@ type WakuRLNRelay struct {
// pubsubTopic is the topic for which rln relay is mounted
pubsubTopic string
contentTopic string
lastIndexLoaded r.MembershipIndex
lastIndexLoaded int64
validMerkleRoots []r.MerkleNode

View File

@ -114,13 +114,13 @@ func (rln *WakuRLNRelay) processLogs(evt *contracts.RLNMemberRegistered, handler
var pubkey r.IDCommitment = r.Bytes32(evt.Pubkey.Bytes())
index := r.MembershipIndex(uint(evt.Index.Int64()))
index := evt.Index.Int64()
if index <= rln.lastIndexLoaded {
return nil
}
rln.lastIndexLoaded = index
return handler(pubkey, index)
return handler(pubkey, r.MembershipIndex(uint(evt.Index.Int64())))
}
// HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract

View File

@ -8,7 +8,6 @@ import (
"github.com/libp2p/go-libp2p/core/host"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/swap"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
)
@ -46,6 +45,10 @@ var (
ErrEmptyResponse = errors.New("empty store response")
)
type WakuSwap interface {
// TODO: add functions
}
type WakuStore struct {
ctx context.Context
timesource timesource.Timesource
@ -59,11 +62,11 @@ type WakuStore struct {
msgProvider MessageProvider
h host.Host
swap *swap.WakuSwap
swap WakuSwap
}
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
func NewWakuStore(host host.Host, swap WakuSwap, p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msgProvider = p
wakuStore.h = host

View File

@ -1,6 +1,7 @@
package swap
import (
"context"
"sync"
"github.com/libp2p/go-libp2p/core/protocol"
@ -73,3 +74,14 @@ func (s *WakuSwap) Debit(peerId string, n int) {
s.Accounting[peerId] += n
s.applyPolicy(peerId)
}
func (s *WakuSwap) Start(ctx context.Context) error {
return nil
}
func (s *WakuSwap) Stop() {
}
func (s *WakuSwap) IsStarted() bool {
return false
}

View File

@ -12,10 +12,10 @@ import (
)
func TestGetV1Info(t *testing.T) {
wakuNode1, err := node.New(context.Background())
wakuNode1, err := node.New()
require.NoError(t, err)
defer wakuNode1.Stop()
err = wakuNode1.Start()
err = wakuNode1.Start(context.Background())
require.NoError(t, err)
d := &DebugService{

View File

@ -81,6 +81,9 @@ func (r *RelayService) Start(ctx context.Context) {
}
func (r *RelayService) Stop() {
if r.cancel == nil {
return
}
r.cancel()
}

View File

@ -22,9 +22,9 @@ import (
func makeRelayService(t *testing.T) *RelayService {
options := node.WithWakuRelayAndMinPeers(0)
n, err := node.New(context.Background(), options)
n, err := node.New(options)
require.NoError(t, err)
err = n.Start()
err = n.Start(context.Background())
require.NoError(t, err)
mux := mux.NewRouter()

View File

@ -39,6 +39,9 @@ func (r *runnerService) Start(ctx context.Context) {
}
func (r *runnerService) Stop() {
if r.cancel == nil {
return
}
r.cancel()
r.broadcaster.Unregister(nil, r.ch)
close(r.ch)

View File

@ -1,7 +1,6 @@
package rest
import (
"context"
"testing"
"github.com/stretchr/testify/require"
@ -11,7 +10,7 @@ import (
func TestWakuRest(t *testing.T) {
options := node.WithWakuStore()
n, err := node.New(context.Background(), options)
n, err := node.New(options)
require.NoError(t, err)
rpc := NewWakuRest(n, "127.0.0.1", 8080, true, true, false, 10, utils.Logger())

View File

@ -21,9 +21,9 @@ import (
func makeAdminService(t *testing.T) *AdminService {
options := node.WithWakuRelay()
n, err := node.New(context.Background(), options)
n, err := node.New(options)
require.NoError(t, err)
err = n.Start()
err = n.Start(context.Background())
require.NoError(t, err)
return &AdminService{n, utils.Logger()}
}
@ -34,7 +34,8 @@ func TestV1Peers(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
err = relay.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()

View File

@ -16,10 +16,10 @@ func TestGetV1Info(t *testing.T) {
request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte("")))
require.NoError(t, err)
wakuNode1, err := node.New(context.Background())
wakuNode1, err := node.New()
require.NoError(t, err)
defer wakuNode1.Stop()
err = wakuNode1.Start()
err = wakuNode1.Start(context.Background())
require.NoError(t, err)
d := &DebugService{

View File

@ -29,9 +29,9 @@ func makeFilterService(t *testing.T, isFullNode bool) *FilterService {
nodeOpts = append(nodeOpts, node.WithWakuRelay())
}
n, err := node.New(context.Background(), nodeOpts...)
n, err := node.New(nodeOpts...)
require.NoError(t, err)
err = n.Start()
err = n.Start(context.Background())
require.NoError(t, err)
if isFullNode {
@ -49,13 +49,16 @@ func TestFilterSubscription(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, timesource.NewDefaultClock(), utils.Logger())
node := relay.NewWakuRelay(host, v2.NewBroadcaster(10), 0, timesource.NewDefaultClock(), utils.Logger())
err = node.Start(context.Background())
require.NoError(t, err)
_, err = node.SubscribeToTopic(context.Background(), testTopic)
require.NoError(t, err)
_, _ = filter.NewWakuFilter(context.Background(), host, v2.NewBroadcaster(10), false, timesource.NewDefaultClock(), utils.Logger())
f := filter.NewWakuFilter(host, v2.NewBroadcaster(10), false, timesource.NewDefaultClock(), utils.Logger())
err = f.Start(context.Background())
require.NoError(t, err)
d := makeFilterService(t, true)
defer d.node.Stop()

View File

@ -13,9 +13,9 @@ import (
)
func makePrivateService(t *testing.T) *PrivateService {
n, err := node.New(context.Background(), node.WithWakuRelayAndMinPeers(0))
n, err := node.New(node.WithWakuRelayAndMinPeers(0))
require.NoError(t, err)
err = n.Start()
err = n.Start(context.Background())
require.NoError(t, err)
return NewPrivateService(n, 30, utils.Logger())

View File

@ -14,9 +14,9 @@ import (
func makeRelayService(t *testing.T) *RelayService {
options := node.WithWakuRelayAndMinPeers(0)
n, err := node.New(context.Background(), options)
n, err := node.New(options)
require.NoError(t, err)
err = n.Start()
err = n.Start(context.Background())
require.NoError(t, err)
return NewRelayService(n, 30, utils.Logger())
@ -61,6 +61,9 @@ func TestRelaySubscription(t *testing.T) {
func TestRelayGetV1Messages(t *testing.T) {
serviceA := makeRelayService(t)
go serviceA.Start()
defer serviceA.Stop()
var reply SuccessReply
serviceB := makeRelayService(t)

View File

@ -11,9 +11,9 @@ import (
func makeStoreService(t *testing.T) *StoreService {
options := node.WithWakuStore()
n, err := node.New(context.Background(), options)
n, err := node.New(options)
require.NoError(t, err)
err = n.Start()
err = n.Start(context.Background())
require.NoError(t, err)
return &StoreService{n, utils.Logger()}
}

View File

@ -1,7 +1,6 @@
package rpc
import (
"context"
"testing"
"github.com/stretchr/testify/require"
@ -11,7 +10,7 @@ import (
func TestWakuRpc(t *testing.T) {
options := node.WithWakuStore()
n, err := node.New(context.Background(), options)
n, err := node.New(options)
require.NoError(t, err)
rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, false, 30, utils.Logger())

View File

@ -200,8 +200,10 @@ func (s *NTPTimeSource) Start(ctx context.Context) error {
}
// Stop goroutine that updates time source.
func (s *NTPTimeSource) Stop() error {
func (s *NTPTimeSource) Stop() {
if s.cancel == nil {
return
}
s.cancel()
s.wg.Wait()
return nil
}

View File

@ -8,5 +8,5 @@ import (
type Timesource interface {
Now() time.Time
Start(ctx context.Context) error
Stop() error
Stop()
}

View File

@ -21,7 +21,6 @@ func (t *WallClockTimeSource) Start(ctx context.Context) error {
return nil
}
func (t *WallClockTimeSource) Stop() error {
func (t *WallClockTimeSource) Stop() {
// Do nothing
return nil
}