Compare commits

..

6 Commits

Author SHA1 Message Date
Igor Sirotin
f40bcd7e79
chore: update version file 2025-12-10 10:27:51 +00:00
Igor Sirotin
b0af7695bd
feat: single point of localnode parametrization (#1297) 2025-12-10 10:21:57 +00:00
Igor Sirotin
4b28d08451
fix: start watchTopicShards once (#1298) 2025-12-09 10:50:39 +00:00
Igor Sirotin
5635735da6
fix(PeerManager): SelectRandom filter by protocol (#1296) 2025-12-08 21:26:41 +00:00
Prem Chaitanya Prathi
84a4b1be7a
fix: use simple protocol and pubsubTopic based selection for peerExchange (#1295) 2025-10-15 06:37:06 +05:30
Igor Sirotin
06c9af60f3
chore: update go-discover (#1294) 2025-10-03 23:51:21 +01:00
30 changed files with 463 additions and 364 deletions

View File

@ -1 +1 @@
0.10.0
0.10.1

View File

@ -26,7 +26,7 @@ pkgs.buildGo123Module {
'' else "";
# FIXME: This needs to be manually changed when updating modules.
vendorHash = "sha256-2bMMcrg61qnVT1xIlAk6R/JAu7GpN2vr/Rlj4SaqjPQ=";
vendorHash = "sha256-uz9IVTEd+3UypZQc2CVWCFeLE4xEagn9YT9W2hr0K/o=";
# Fix for 'nix run' trying to execute 'go-waku'.
meta = { mainProgram = "waku"; };

View File

@ -141,7 +141,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 // indirect
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 // indirect
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect

View File

@ -519,8 +519,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 h1:c9ZwkW26T0+8pjevhUwWTZs+cujXTNur9MdnPnvwaL4=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 h1:BNw9UkCVftI0l0QWm5ofpM5HaLfRuv77Fcea4byk6wk=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku h1:ovXh1m76i0yPWXt95Z9ZRyEk0+WKSuPt5e4IddgwUrY=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku/go.mod h1:MKPU5vMI8RRFyTP0HfdsF9cLmL1nHAeJm44AxJGJx44=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=

View File

@ -141,7 +141,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 // indirect
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 // indirect
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect

View File

@ -519,8 +519,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 h1:c9ZwkW26T0+8pjevhUwWTZs+cujXTNur9MdnPnvwaL4=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 h1:BNw9UkCVftI0l0QWm5ofpM5HaLfRuv77Fcea4byk6wk=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku h1:ovXh1m76i0yPWXt95Z9ZRyEk0+WKSuPt5e4IddgwUrY=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku/go.mod h1:MKPU5vMI8RRFyTP0HfdsF9cLmL1nHAeJm44AxJGJx44=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=

View File

@ -155,7 +155,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 // indirect
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 // indirect
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect

View File

@ -553,8 +553,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 h1:c9ZwkW26T0+8pjevhUwWTZs+cujXTNur9MdnPnvwaL4=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 h1:BNw9UkCVftI0l0QWm5ofpM5HaLfRuv77Fcea4byk6wk=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku h1:ovXh1m76i0yPWXt95Z9ZRyEk0+WKSuPt5e4IddgwUrY=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku/go.mod h1:MKPU5vMI8RRFyTP0HfdsF9cLmL1nHAeJm44AxJGJx44=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=

View File

@ -154,7 +154,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 // indirect
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 // indirect
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect

View File

@ -548,8 +548,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 h1:c9ZwkW26T0+8pjevhUwWTZs+cujXTNur9MdnPnvwaL4=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 h1:BNw9UkCVftI0l0QWm5ofpM5HaLfRuv77Fcea4byk6wk=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku h1:ovXh1m76i0yPWXt95Z9ZRyEk0+WKSuPt5e4IddgwUrY=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku/go.mod h1:MKPU5vMI8RRFyTP0HfdsF9cLmL1nHAeJm44AxJGJx44=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=

View File

@ -136,7 +136,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 // indirect
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 // indirect
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect

View File

@ -519,8 +519,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 h1:c9ZwkW26T0+8pjevhUwWTZs+cujXTNur9MdnPnvwaL4=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 h1:BNw9UkCVftI0l0QWm5ofpM5HaLfRuv77Fcea4byk6wk=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku h1:ovXh1m76i0yPWXt95Z9ZRyEk0+WKSuPt5e4IddgwUrY=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku/go.mod h1:MKPU5vMI8RRFyTP0HfdsF9cLmL1nHAeJm44AxJGJx44=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=

View File

@ -138,7 +138,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 // indirect
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 // indirect
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect

View File

@ -519,8 +519,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 h1:c9ZwkW26T0+8pjevhUwWTZs+cujXTNur9MdnPnvwaL4=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 h1:BNw9UkCVftI0l0QWm5ofpM5HaLfRuv77Fcea4byk6wk=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku h1:ovXh1m76i0yPWXt95Z9ZRyEk0+WKSuPt5e4IddgwUrY=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku/go.mod h1:MKPU5vMI8RRFyTP0HfdsF9cLmL1nHAeJm44AxJGJx44=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=

View File

@ -137,7 +137,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 // indirect
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 // indirect
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect

View File

@ -519,8 +519,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 h1:c9ZwkW26T0+8pjevhUwWTZs+cujXTNur9MdnPnvwaL4=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 h1:BNw9UkCVftI0l0QWm5ofpM5HaLfRuv77Fcea4byk6wk=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku h1:ovXh1m76i0yPWXt95Z9ZRyEk0+WKSuPt5e4IddgwUrY=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku/go.mod h1:MKPU5vMI8RRFyTP0HfdsF9cLmL1nHAeJm44AxJGJx44=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=

2
go.mod
View File

@ -26,7 +26,7 @@ require (
)
require (
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971
golang.org/x/text v0.28.0 // indirect
)

4
go.sum
View File

@ -1503,8 +1503,8 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6 h1:c9ZwkW26T0+8pjevhUwWTZs+cujXTNur9MdnPnvwaL4=
github.com/waku-org/go-discover v0.0.0-20251002230139-b9b5a06121c6/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971 h1:BNw9UkCVftI0l0QWm5ofpM5HaLfRuv77Fcea4byk6wk=
github.com/waku-org/go-discover v0.0.0-20251003191045-8ee308fe7971/go.mod h1:oVHZeHiB/OJpc0k4r+Iq+5bJXvOV3Mpz1gPQh8Vswps=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku h1:ovXh1m76i0yPWXt95Z9ZRyEk0+WKSuPt5e4IddgwUrY=
github.com/waku-org/go-libp2p-pubsub v0.13.1-gowaku/go.mod h1:MKPU5vMI8RRFyTP0HfdsF9cLmL1nHAeJm44AxJGJx44=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=

View File

@ -146,6 +146,7 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
PrivateKey: priv,
Bootnodes: bootnodes,
V5ProtocolID: &protocolID,
Log: logger.Named("go-discover"),
},
udpAddr: &net.UDPAddr{
IP: net.IPv4zero,

View File

@ -6,173 +6,23 @@ import (
"net"
"strconv"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/event"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"go.uber.org/zap"
ndoeutils "github.com/waku-org/go-waku/waku/v2/node/utils"
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags wenr.WakuEnrBitfield, advertiseAddr []ma.Multiaddr, shouldAutoUpdate bool) error {
var options []wenr.ENROption
options = append(options, wenr.WithUDPPort(udpPort))
options = append(options, wenr.WithWakuBitfield(wakuFlags))
// Reset ENR fields
wenr.DeleteField(localnode, wenr.MultiaddrENRField)
wenr.DeleteField(localnode, enr.TCP(0).ENRKey())
wenr.DeleteField(localnode, enr.IPv4{}.ENRKey())
wenr.DeleteField(localnode, enr.IPv6{}.ENRKey())
if advertiseAddr != nil {
// An advertised address disables libp2p address updates
// and discv5 predictions
ipAddr, err := selectMostExternalAddress(advertiseAddr)
if err != nil {
return err
}
options = append(options, wenr.WithIP(ipAddr))
} else if !shouldAutoUpdate {
// We received a libp2p address update. Autoupdate is disabled
// Using a static ip will disable endpoint prediction.
options = append(options, wenr.WithIP(ipAddr))
} else {
if ipAddr.Port != 0 {
// We received a libp2p address update, but we should still
// allow discv5 to update the enr record. We set the localnode
// keys manually. It's possible that the ENR record might get
// updated automatically
ip4 := ipAddr.IP.To4()
ip6 := ipAddr.IP.To16()
if ip4 != nil && !ip4.IsUnspecified() {
localnode.SetFallbackIP(ip4)
localnode.Set(enr.IPv4(ip4))
localnode.Set(enr.TCP(uint16(ipAddr.Port)))
} else {
localnode.Delete(enr.IPv4{})
localnode.Delete(enr.TCP(0))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
}
if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
localnode.Set(enr.IPv6(ip6))
localnode.Set(enr.TCP6(ipAddr.Port))
} else {
localnode.Delete(enr.IPv6{})
localnode.Delete(enr.TCP6(0))
}
}
}
// Writing the IP + Port has priority over writting the multiaddress which might fail or not
// depending on the enr having space
options = append(options, wenr.WithMultiaddress(multiaddrs...))
return wenr.Update(w.log, localnode, options...)
}
func isPrivate(addr *net.TCPAddr) bool {
return addr.IP.IsPrivate()
}
func isExternal(addr *net.TCPAddr) bool {
return !isPrivate(addr) && !addr.IP.IsLoopback() && !addr.IP.IsUnspecified()
}
func isLoopback(addr *net.TCPAddr) bool {
return addr.IP.IsLoopback()
}
func filterIP(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
for _, s := range ss {
if fn(s) {
ret = append(ret, s)
}
}
return
}
func extractIPAddressForENR(addr ma.Multiaddr) (*net.TCPAddr, error) {
// It's a p2p-circuit address. We shouldnt use these
// for building the ENR record default keys
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
if err == nil {
return nil, errors.New("can't use IP address from a p2p-circuit address")
}
// ws and wss addresses are handled by the multiaddr key
// they shouldnt be used for building the ENR record default keys
_, err = addr.ValueForProtocol(ma.P_WS)
if err == nil {
return nil, errors.New("can't use IP address from a ws address")
}
_, err = addr.ValueForProtocol(ma.P_WSS)
if err == nil {
return nil, errors.New("can't use IP address from a wss address")
}
var ipStr string
dns4, err := addr.ValueForProtocol(ma.P_DNS4)
if err != nil {
ipStr, err = addr.ValueForProtocol(ma.P_IP4)
if err != nil {
return nil, err
}
} else {
netIP, err := net.ResolveIPAddr("ip4", dns4)
if err != nil {
return nil, err
}
ipStr = netIP.String()
}
portStr, err := addr.ValueForProtocol(ma.P_TCP)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
}, nil
}
func selectMostExternalAddress(addresses []ma.Multiaddr) (*net.TCPAddr, error) {
var ipAddrs []*net.TCPAddr
for _, addr := range addresses {
ipAddr, err := extractIPAddressForENR(addr)
if err != nil {
continue
}
ipAddrs = append(ipAddrs, ipAddr)
}
externalIPs := filterIP(ipAddrs, isExternal)
if len(externalIPs) > 0 {
return externalIPs[0], nil
}
privateIPs := filterIP(ipAddrs, isPrivate)
if len(privateIPs) > 0 {
return privateIPs[0], nil
}
loopback := filterIP(ipAddrs, isLoopback)
if len(loopback) > 0 {
return loopback[0], nil
}
return nil, errors.New("could not obtain ip address")
func (w *WakuNode) updateLocalNode() error {
w.localNodeMutex.Lock()
defer w.localNodeMutex.Unlock()
return enr.UpdateLocalNode(w.log, w.localNode, &w.localNodeParams)
}
func decapsulateP2P(addr ma.Multiaddr) (ma.Multiaddr, error) {
@ -282,7 +132,7 @@ func filter0Port(addresses []ma.Multiaddr) ([]ma.Multiaddr, error) {
}
func (w *WakuNode) getENRAddresses(ctx context.Context, addrs []ma.Multiaddr) (extAddr *net.TCPAddr, multiaddr []ma.Multiaddr, err error) {
extAddr, err = selectMostExternalAddress(addrs)
extAddr, err = ndoeutils.SelectMostExternalAddress(addrs)
if err != nil {
return nil, nil, err
}
@ -320,7 +170,10 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
return err
}
err = w.updateLocalNode(w.localNode, multiaddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddrs, w.opts.discV5autoUpdate)
w.localNodeParams.Multiaddrs = multiaddresses
w.localNodeParams.IPAddr = ipAddr
err = w.updateLocalNode()
if err != nil {
w.log.Error("updating localnode ENR record", zap.Error(err))
return err
@ -340,7 +193,8 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
}
func (w *WakuNode) SetRelayShards(rs protocol.RelayShards) error {
err := wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs))
w.localNodeParams.RelayShards = rs
err := w.updateLocalNode()
if err != nil {
return err
}
@ -348,6 +202,10 @@ func (w *WakuNode) SetRelayShards(rs protocol.RelayShards) error {
}
func (w *WakuNode) watchTopicShards(ctx context.Context) error {
if !w.watchingRelayShards.CompareAndSwap(false, true) {
return nil
}
evtRelaySubscribed, err := w.Relay().Events().Subscribe(new(relay.EvtRelaySubscribed))
if err != nil {
return err
@ -358,10 +216,13 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
return err
}
w.wg.Add(1)
go func() {
defer utils.LogOnPanic()
defer evtRelaySubscribed.Close()
defer evtRelayUnsubscribed.Close()
defer w.wg.Done()
for {
select {
@ -389,7 +250,8 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0]))
}
err = wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs[0]))
w.localNodeParams.RelayShards = rs[0]
err = w.updateLocalNode()
if err != nil {
w.log.Warn("could not set ENR shard info", zap.Error(err))
continue

106
waku/v2/node/utils/utils.go Normal file
View File

@ -0,0 +1,106 @@
package utils
import (
"errors"
"net"
"strconv"
"github.com/multiformats/go-multiaddr"
)
func ExtractIPAddressForENR(addr multiaddr.Multiaddr) (*net.TCPAddr, error) {
// It's a p2p-circuit address. We shouldnt use these
// for building the ENR record default keys
_, err := addr.ValueForProtocol(multiaddr.P_CIRCUIT)
if err == nil {
return nil, errors.New("can't use IP address from a p2p-circuit address")
}
// ws and wss addresses are handled by the multiaddr key
// they shouldnt be used for building the ENR record default keys
_, err = addr.ValueForProtocol(multiaddr.P_WS)
if err == nil {
return nil, errors.New("can't use IP address from a ws address")
}
_, err = addr.ValueForProtocol(multiaddr.P_WSS)
if err == nil {
return nil, errors.New("can't use IP address from a wss address")
}
var ipStr string
dns4, err := addr.ValueForProtocol(multiaddr.P_DNS4)
if err != nil {
ipStr, err = addr.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
return nil, err
}
} else {
netIP, err := net.ResolveIPAddr("ip4", dns4)
if err != nil {
return nil, err
}
ipStr = netIP.String()
}
portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
}, nil
}
func SelectMostExternalAddress(addresses []multiaddr.Multiaddr) (*net.TCPAddr, error) {
var ipAddrs []*net.TCPAddr
for _, addr := range addresses {
ipAddr, err := ExtractIPAddressForENR(addr)
if err != nil {
continue
}
ipAddrs = append(ipAddrs, ipAddr)
}
externalIPs := filterIP(ipAddrs, isExternal)
if len(externalIPs) > 0 {
return externalIPs[0], nil
}
privateIPs := filterIP(ipAddrs, isPrivate)
if len(privateIPs) > 0 {
return privateIPs[0], nil
}
loopback := filterIP(ipAddrs, isLoopback)
if len(loopback) > 0 {
return loopback[0], nil
}
return nil, errors.New("could not obtain ip address")
}
func isPrivate(addr *net.TCPAddr) bool {
return addr.IP.IsPrivate()
}
func isExternal(addr *net.TCPAddr) bool {
return !isPrivate(addr) && !addr.IP.IsLoopback() && !addr.IP.IsUnspecified()
}
func isLoopback(addr *net.TCPAddr) bool {
return addr.IP.IsLoopback()
}
func filterIP(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
for _, s := range ss {
if fn(s) {
ret = append(ret, s)
}
}
return
}

View File

@ -5,6 +5,7 @@ import (
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
backoffv4 "github.com/cenkalti/backoff/v4"
@ -105,11 +106,20 @@ type WakuNode struct {
store *store.WakuStore
rlnRelay RLNRelay
wakuFlag enr.WakuEnrBitfield
circuitRelayNodes chan peer.AddrInfo
localNode *enode.LocalNode
// localNodeParams are ENR parameters that will be applied to the localnode.
localNodeParams enr.LocalNodeParams
// LocalNode.Set is a lazy operation that only stores the entry, but does not sign the record.
// But the size of the record is only checked during `sign`, and if it's >300 bytes, it will panic.
// In WithMultiaddress we attempt to write as much addresses as possible, relying on existing local node entries.
// On the other hand, enr.WithWakuRelaySharding is called in a goroutine, so ther is a race condition.
// To make it work properly, we should make sure that entries are not added during WithMultiaddress run.
localNodeMutex *sync.Mutex
bcaster relay.Broadcaster
connectionNotif ConnectionNotifier
@ -122,6 +132,8 @@ type WakuNode struct {
storeFactory storeFactory
peermanager *peermanager.PeerManager
watchingRelayShards atomic.Bool
}
func defaultStoreFactory(w *WakuNode) legacy_store.Store {
@ -190,11 +202,18 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.opts = params
w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{}
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay)
w.circuitRelayNodes = make(chan peer.AddrInfo)
w.metrics = newMetrics(params.prometheusReg)
w.metrics.RecordVersion(Version, GitCommit)
w.localNodeMutex = &sync.Mutex{}
w.localNodeParams = enr.LocalNodeParams{
WakuFlags: enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay),
UDPPort: w.opts.udpPort,
AdvertiseAddr: w.opts.advertiseAddrs,
ShouldAutoUpdate: w.opts.discV5autoUpdate,
}
// Setup peerstore wrapper
if params.peerstore != nil {
w.peerstore = wps.NewWakuPeerstore(params.peerstore)

View File

@ -96,6 +96,7 @@ type PeerSelection int
const (
Automatic PeerSelection = iota
LowestRTT
ProtoPubSubTopicOnly //This is added to address an issue with peerExchange where on-demand discovery cannot be used.
)
const maxFailedAttempts = 5

View File

@ -15,6 +15,7 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
@ -125,36 +126,38 @@ func TestPeerSelection(t *testing.T) {
defer h3.Close()
protocol := libp2pProtocol.ID("test/protocol")
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, protocol)
require.NoError(t, err)
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, protocol)
require.NoError(t, err)
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
var peerIDs peer.IDSlice
peerIDs, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
require.Equal(t, h2.ID(), peerIDs[0])
require.Len(t, peerIDs, 1) // Only 1 peer is returned randomly, because MaxPeers defaults to 1 when not set
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, utils.ErrNoPeersAvailable, err)
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
require.NoError(t, err)
require.Len(t, peerIDs, 1)
require.Equal(t, h2.ID(), peerIDs[0]) // Only h2 has this pubsub topic
//Test for selectWithLowestRTT
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, utils.ErrNoPeersAvailable, err) // No peer has this pubsub topic
require.Empty(t, peerIDs)
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err)
require.Len(t, peerIDs, 1) // Both h2 and h3 have this pubsub topic, but only 1 peer is returned randomly because MaxPeers defaults to 1 when not set
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2})
require.Equal(t, 2, peerIDs.Len())
require.NoError(t, err)
require.Len(t, peerIDs, 2)
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
require.Equal(t, 2, peerIDs.Len())
require.NoError(t, err)
require.Len(t, peerIDs, 2)
h4, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
@ -163,9 +166,14 @@ func TestPeerSelection(t *testing.T) {
require.NoError(t, err)
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
require.Equal(t, 3, peerIDs.Len())
require.Len(t, peerIDs, 3)
require.NoError(t, err)
//Test for selectWithLowestRTT
// NOTE: This test must go the last because it involves pinging peers, which modifies the list of supported protocols
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2})
require.NoError(t, err)
require.Len(t, peerIDs, 1) // With LowestRTT, only 1 peer is returned, even if MaxPeers is set
}
func TestDefaultProtocol(t *testing.T) {

View File

@ -7,11 +7,13 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type PeerSet map[peer.ID]struct{}
@ -46,7 +48,7 @@ func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopic
return peers[0], nil
}
// SelectRandomPeer is used to return a random peer that supports a given protocol.
// SelectRandom is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
@ -57,26 +59,40 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
// This will require us to check for various factors such as:
// - which topics they track
// - latency?
peerIDs, err := pm.selectServicePeer(criteria)
if err == nil && len(peerIDs) == criteria.MaxPeers {
return maps.Keys(peerIDs), nil
} else if !errors.Is(err, utils.ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)),
zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
}
if !errors.Is(err, utils.ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot",
zap.String("protocol", string(criteria.Proto)),
zap.Strings("pubsubTopics", criteria.PubsubTopics),
zap.Error(err))
return nil, err
} else if len(peerIDs) == 0 {
}
if len(peerIDs) == 0 {
peerIDs = make(PeerSet)
}
// if not found in serviceSlots or proto == WakuRelayIDv200
filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto)
// if not found in serviceSlots or proto == WakuRelayIDv200 (service slots don't support WakuRelayIDv200)
peerSet := criteria.SpecificPeers
if len(peerSet) == 0 {
peerSet = pm.host.Peerstore().Peers()
}
filteredPeers, err := pm.FilterPeersByProto(peerSet, criteria.ExcludePeers, criteria.Proto)
if err != nil {
return nil, err
}
if len(filteredPeers) == 0 && criteria.Proto != relay.WakuRelayID_v200 {
return nil, utils.ErrNoPeersAvailable
}
if len(criteria.PubsubTopics) > 0 {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...)
}
//Not passing excludePeers as filterPeers are already considering excluded ones.
randomPeers, err := selectRandomPeers(filteredPeers, nil, criteria.MaxPeers-len(peerIDs))
randomPeers, err := selectRandomPeers(filteredPeers, nil, min(criteria.MaxPeers, len(peerIDs)))
if err != nil && len(peerIDs) == 0 {
return nil, err
}
@ -88,7 +104,6 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
}
func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) {
i := 0
selectedPeers := make(PeerSet)
for pID := range filter {
if PeerInSet(excludePeers, pID) {
@ -96,8 +111,7 @@ func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error)
}
//Map's iterator in golang works using randomness and hence not random function is being used.
selectedPeers[pID] = struct{}{}
i++
if i == count {
if len(selectedPeers) == count {
break
}
}
@ -121,41 +135,57 @@ func PeerSliceToMap(peers peer.IDSlice) PeerSet {
return peerSet
}
/*
selectServicePeer tries to select peer from serviceSlot first based on criteria.
serviceSlots is a map of peerMap by protocol.ID.
- Slots are created automatically in getPeers.
- Slots are not created for relay.WakuRelayID_v200.
Therefore for Relay protocol, selectServicePeer will always return ErrNoPeersAvailable.
If there is no pubsubTopics criteria, a random peer from the selected slot is returned.
Otherwise, peers from the slot are filtered by pubsubTopics and random peers are selected from the filtered list.
If no peer is found in the slot, on-demand discovery is triggered for the given pubsubTopics and protocol.
The function retries once to fetch peers from the slot after discovery.
*/
func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSet, error) {
peers := make(PeerSet)
var err error
for retryCnt := 0; retryCnt < 1; retryCnt++ {
//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil {
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
} else { //PubsubTopic based selection
slot.mu.RLock()
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
if PeerInSet(criteria.ExcludePeers, i) {
continue
}
keys = append(keys, i)
}
slot.mu.RUnlock()
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
for tmpPeer := range tmpPeers {
peers[tmpPeer] = struct{}{}
}
if err == nil && len(peers) == criteria.MaxPeers {
return peers, nil
} else {
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics))
//Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover atleast 1 peer for the criteria
pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1)
//Try to fetch peers again.
continue
}
}
slot := pm.serviceSlots.getPeers(criteria.Proto)
if slot == nil {
continue
}
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
}
//PubsubTopic based selection
slot.mu.RLock()
keys := make([]peer.ID, 0, len(slot.m))
for peerID := range slot.m {
if PeerInSet(criteria.ExcludePeers, peerID) {
continue
}
keys = append(keys, peerID)
}
slot.mu.RUnlock()
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
for tmpPeer := range tmpPeers {
peers[tmpPeer] = struct{}{}
}
if err == nil && len(peers) == criteria.MaxPeers {
return peers, nil
}
//Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover at least 1 peer for the criteria
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics))
pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1)
//Try to fetch peers again.
}
if len(peers) == 0 {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))
@ -204,6 +234,12 @@ func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice
}
//TODO: Update this once peer Ping cache PR is merged into this code.
return []peer.ID{peerID}, nil
case ProtoPubSubTopicOnly:
peers, err := pm.SelectPeersByProto(criteria.Proto, criteria.SpecificPeers, criteria.PubsubTopics)
if err != nil {
return nil, err
}
return peers, nil
default:
return nil, errors.New("unknown peer selection type specified")
}
@ -257,3 +293,16 @@ func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, excludePee
}
return peers, nil
}
func (pm *PeerManager) SelectPeersByProto(protocol protocol.ID, specificPeers peer.IDSlice, pubsubTopics []string) (peer.IDSlice, error) {
var selectedPeers peer.IDSlice
selectedPeers, err := pm.FilterPeersByProto(specificPeers, nil, protocol)
if err != nil {
return nil, err
}
selectedPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(pubsubTopics, selectedPeers...)
if len(selectedPeers) == 0 {
return nil, utils.ErrNoPeersAvailable
}
return selectedPeers, nil
}

View File

@ -8,8 +8,9 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"golang.org/x/exp/maps"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
// Origin is used to determine how the peer is identified,
@ -233,17 +234,20 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific
for _, p := range specificPeers {
peerMatch := true
peerTopics, err := ps.PubSubTopics(p)
if err == nil {
for _, t := range pubSubTopics {
if _, ok := peerTopics[t]; !ok {
peerMatch = false
break
}
if err != nil {
//Note: skipping a peer in case of an error as there would be others available.
continue
}
for _, t := range pubSubTopics {
if _, ok := peerTopics[t]; !ok {
peerMatch = false
break
}
if peerMatch {
result = append(result, p)
}
} //Note: skipping a peer in case of an error as there would be others available.
}
if peerMatch {
result = append(result, p)
}
}
return result
}

View File

@ -1,15 +1,15 @@
package enr
import (
"fmt"
"net"
"testing"
gcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
func TestEnodeToMultiAddr(t *testing.T) {
@ -22,75 +22,51 @@ func TestEnodeToMultiAddr(t *testing.T) {
require.Equal(t, expectedMultiAddr, actualMultiAddr.String())
}
// TODO: this function is duplicated in localnode.go. Remove duplication
func updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags WakuEnrBitfield, advertiseAddr *net.IP, shouldAutoUpdate bool) error {
var options []ENROption
options = append(options, WithUDPPort(udpPort))
options = append(options, WithWakuBitfield(wakuFlags))
options = append(options, WithMultiaddress(multiaddrs...))
if advertiseAddr != nil {
// An advertised address disables libp2p address updates
// and discv5 predictions
nip := &net.TCPAddr{
IP: *advertiseAddr,
Port: ipAddr.Port,
}
options = append(options, WithIP(nip))
} else if !shouldAutoUpdate {
// We received a libp2p address update. Autoupdate is disabled
// Using a static ip will disable endpoint prediction.
options = append(options, WithIP(ipAddr))
} else {
// We received a libp2p address update, but we should still
// allow discv5 to update the enr record. We set the localnode
// keys manually. It's possible that the ENR record might get
// updated automatically
ip4 := ipAddr.IP.To4()
ip6 := ipAddr.IP.To16()
if ip4 != nil && !ip4.IsUnspecified() {
localnode.Set(enr.IPv4(ip4))
localnode.Set(enr.TCP(uint16(ipAddr.Port)))
} else {
localnode.Delete(enr.IPv4{})
localnode.Delete(enr.TCP(0))
}
if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
localnode.Set(enr.IPv6(ip6))
localnode.Set(enr.TCP6(ipAddr.Port))
} else {
localnode.Delete(enr.IPv6{})
localnode.Delete(enr.TCP6(0))
}
}
return Update(utils.Logger(), localnode, options...)
}
func TestMultiaddr(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
key, _ := gcrypto.GenerateKey()
wakuFlag := NewWakuEnrBitfield(true, true, true, true)
//wss, _ := ma.NewMultiaddr("/dns4/www.somedomainname.com/tcp/443/wss")
wss, _ := ma.NewMultiaddr("/dns4/www.somedomainname.com/tcp/443/wss")
circuit1, _ := ma.NewMultiaddr("/dns4/node-02.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
circuit2, _ := ma.NewMultiaddr("/dns4/node-01.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
circuit3, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
circuit4, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
circuit5, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
circuit6, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
multiaddrValues := []ma.Multiaddr{
//wss,
wss,
circuit1,
circuit2,
circuit3,
circuit4,
circuit5,
circuit6,
}
db, _ := enode.OpenDB("")
localNode := enode.NewLocalNode(db, key)
err := updateLocalNode(localNode, multiaddrValues, &net.TCPAddr{IP: net.IPv4(192, 168, 1, 241), Port: 60000}, 50000, wakuFlag, nil, false)
err = UpdateLocalNode(logger, localNode, &LocalNodeParams{
Multiaddrs: multiaddrValues,
IPAddr: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 241), Port: 60000},
UDPPort: 50000,
WakuFlags: wakuFlag,
AdvertiseAddr: nil,
ShouldAutoUpdate: false,
})
require.NoError(t, err)
_ = localNode.Node() // Should not panic
require.NotPanics(t, func() {
_ = localNode.Node()
})
_, _, err = Multiaddress(localNode.Node())
peerID, maddrs, err := Multiaddress(localNode.Node())
require.NoError(t, err)
fmt.Println("peerID: ", peerID)
fmt.Println("len maddrs: ", len(maddrs))
fmt.Println("maddrs: ", maddrs)
}

View File

@ -13,6 +13,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
"github.com/waku-org/go-waku/waku/v2/node/utils"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
func NewLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) {
@ -27,49 +30,60 @@ type ENROption func(*enode.LocalNode) error
func WithMultiaddress(multiaddrs ...multiaddr.Multiaddr) ENROption {
return func(localnode *enode.LocalNode) (err error) {
if len(multiaddrs) == 0 {
return nil
}
// Randomly shuffle multiaddresses
rand.Shuffle(len(multiaddrs), func(i, j int) { multiaddrs[i], multiaddrs[j] = multiaddrs[j], multiaddrs[i] })
// Testing how many multiaddresses we can write before we exceed the limit
// By simulating what the localnode does when signing the enr, but without
// causing a panic
privk, err := crypto.GenerateKey()
if err != nil {
return err
}
// Adding extra multiaddresses. Should probably not exceed the enr max size of 300bytes
failedOnceWritingENR := false
couldWriteENRatLeastOnce := false
successIdx := -1
for i := len(multiaddrs); i > 0; i-- {
cpy := localnode.Node().Record() // Record() creates a copy for the current iteration
cpy.Set(enr.WithEntry(MultiaddrENRField, marshalMultiaddress(multiaddrs[0:i])))
cpy.SetSeq(localnode.Seq() + 1)
err = enode.SignV4(cpy, privk)
if err == nil {
couldWriteENRatLeastOnce = true
successIdx = i
break
}
failedOnceWritingENR = true
}
if failedOnceWritingENR && couldWriteENRatLeastOnce {
// Could write a subset of multiaddresses but not all
writeMultiaddressField(localnode, multiaddrs[0:successIdx])
// Find the maximum number of multiaddresses that fit in the ENR size limit
maxFittingCount := findMaxFittingMultiaddrs(localnode, multiaddrs)
if maxFittingCount == 0 {
return errors.New("no multiaddress fit into ENR")
}
writeMultiaddressField(localnode, multiaddrs[0:maxFittingCount])
return nil
}
}
func WithCapabilities(lightpush, filter, store, relay bool) ENROption {
return func(localnode *enode.LocalNode) (err error) {
wakuflags := NewWakuEnrBitfield(lightpush, filter, store, relay)
return WithWakuBitfield(wakuflags)(localnode)
// findMaxFittingMultiaddrs determines how many multiaddresses can fit in the ENR
func findMaxFittingMultiaddrs(localnode *enode.LocalNode, multiaddrs []multiaddr.Multiaddr) int {
privk, err := crypto.GenerateKey()
if err != nil {
return 0
}
// Get the current committed record (after the Node() call above)
currentRecord := localnode.Node().Record()
// Binary search for optimal count
maxFitting := 0
for i := len(multiaddrs); i > 0; i-- {
if canFitMultiaddrsOnRecord(currentRecord, multiaddrs[0:i], privk, localnode.Seq()) {
// Return as soon as we can fit most of the addresses
return i
}
}
return maxFitting
}
// canFitMultiaddrsOnRecord tests if multiaddresses can fit on a specific record.
// ENR has a limit of 300 bytes. Later it will panic on signing, if the record is over the size limit.
// By simulating what the localnode does when signing the enr, but without causing a panic.
func canFitMultiaddrsOnRecord(baseRecord *enr.Record, addrs []multiaddr.Multiaddr, privk *ecdsa.PrivateKey, seq uint64) bool {
// Create a copy of the base record
testRecord := *baseRecord
// Add the multiaddress field
testRecord.Set(enr.WithEntry(MultiaddrENRField, marshalMultiaddress(addrs)))
testRecord.SetSeq(seq + 1)
// Try to sign - this will return an error if the record is too large
return enode.SignV4(&testRecord, privk) == nil
}
func WithWakuBitfield(flags WakuEnrBitfield) ENROption {
@ -105,6 +119,8 @@ func WithUDPPort(udpPort uint) ENROption {
}
}
// Update applies the given ENR options to the localnode.
// This function should only be called from UpdateLocalNode, to ensure the order of options applied.
func Update(logger *zap.Logger, localnode *enode.LocalNode, enrOptions ...ENROption) error {
for _, opt := range enrOptions {
err := opt(localnode)
@ -140,3 +156,76 @@ func writeMultiaddressField(localnode *enode.LocalNode, addrAggr []multiaddr.Mul
func DeleteField(localnode *enode.LocalNode, field string) {
localnode.Delete(enr.WithEntry(field, struct{}{}))
}
type LocalNodeParams struct {
Multiaddrs []multiaddr.Multiaddr
IPAddr *net.TCPAddr
UDPPort uint
WakuFlags WakuEnrBitfield
AdvertiseAddr []multiaddr.Multiaddr
ShouldAutoUpdate bool
RelayShards protocol.RelayShards
}
func UpdateLocalNode(logger *zap.Logger, localnode *enode.LocalNode, params *LocalNodeParams) error {
var options []ENROption
options = append(options, WithUDPPort(params.UDPPort))
options = append(options, WithWakuBitfield(params.WakuFlags))
// Reset ENR fields
DeleteField(localnode, MultiaddrENRField)
DeleteField(localnode, enr.TCP(0).ENRKey())
DeleteField(localnode, enr.IPv4{}.ENRKey())
DeleteField(localnode, enr.IPv6{}.ENRKey())
DeleteField(localnode, ShardingBitVectorEnrField)
DeleteField(localnode, ShardingIndicesListEnrField)
if params.AdvertiseAddr != nil {
// An advertised address disables libp2p address updates
// and discv5 predictions
ipAddr, err := utils.SelectMostExternalAddress(params.AdvertiseAddr)
if err != nil {
return err
}
options = append(options, WithIP(ipAddr))
} else if !params.ShouldAutoUpdate {
// We received a libp2p address update. Autoupdate is disabled
// Using a static ip will disable endpoint prediction.
options = append(options, WithIP(params.IPAddr))
} else {
if params.IPAddr.Port != 0 {
// We received a libp2p address update, but we should still
// allow discv5 to update the enr record. We set the localnode
// keys manually. It's possible that the ENR record might get
// updated automatically
ip4 := params.IPAddr.IP.To4()
ip6 := params.IPAddr.IP.To16()
if ip4 != nil && !ip4.IsUnspecified() {
localnode.SetFallbackIP(ip4)
localnode.Set(enr.IPv4(ip4))
localnode.Set(enr.TCP(uint16(params.IPAddr.Port)))
} else {
localnode.Delete(enr.IPv4{})
localnode.Delete(enr.TCP(0))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
}
if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
localnode.Set(enr.IPv6(ip6))
localnode.Set(enr.TCP6(params.IPAddr.Port))
} else {
localnode.Delete(enr.IPv6{})
localnode.Delete(enr.TCP6(0))
}
}
}
options = append(options, WithWakuRelaySharding(params.RelayShards))
// Writing the IP + Port has priority over writing the multiaddress which might fail or not
// depending on the enr having space
options = append(options, WithMultiaddress(params.Multiaddrs...))
return Update(logger, localnode, options...)
}

View File

@ -1,10 +1,9 @@
package enr
import (
"errors"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
@ -43,21 +42,6 @@ func WithWakuRelaySharding(rs protocol.RelayShards) ENROption {
}
}
func WithWakuRelayShardingTopics(topics ...string) ENROption {
return func(localnode *enode.LocalNode) error {
rs, err := protocol.TopicsToRelayShards(topics...)
if err != nil {
return err
}
if len(rs) != 1 {
return errors.New("expected a single RelayShards")
}
return WithWakuRelaySharding(rs[0])(localnode)
}
}
// ENR record accessors
func RelayShardList(record *enr.Record) (*protocol.RelayShards, error) {

View File

@ -53,7 +53,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
}
selectedPeers, err := wakuPX.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
SelectionType: peermanager.ProtoPubSubTopicOnly, //Overriding selection type, this is hacky but to avoid refactor
Proto: PeerExchangeID_v20alpha1,
PubsubTopics: pubsubTopics,
SpecificPeers: params.preferredPeers,