mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-07 16:33:08 +00:00
feat: filter rate limit (#1258)
This commit is contained in:
parent
1608cf2b0b
commit
0c594b3140
@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
|
func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
|
||||||
@ -37,7 +38,7 @@ func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
|
|||||||
|
|
||||||
// node2 connects to node1
|
// node2 connects to node1
|
||||||
func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) {
|
func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) {
|
||||||
node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter
|
node1 := createNode(t, node.WithWakuFilterFullNode(filter.WithFullNodeRateLimiter(rate.Inf, 0))) // full node filter
|
||||||
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter
|
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter
|
||||||
|
|
||||||
node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
|
node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
|
||||||
|
|||||||
@ -26,7 +26,7 @@ pkgs.buildGo121Module {
|
|||||||
'' else "";
|
'' else "";
|
||||||
|
|
||||||
# FIXME: This needs to be manually changed when updating modules.
|
# FIXME: This needs to be manually changed when updating modules.
|
||||||
vendorHash = "sha256-TrKlv3UHhFl+1HviEYFTmOpF+UiVdL6h53IkJXBFsRo=";
|
vendorHash = "sha256-yQ3anfZ/PU0M0KHiXqA9Ri8zFkg1nTYIk43jmcdGZYU=";
|
||||||
|
|
||||||
# Fix for 'nix run' trying to execute 'go-waku'.
|
# Fix for 'nix run' trying to execute 'go-waku'.
|
||||||
meta = { mainProgram = "waku"; };
|
meta = { mainProgram = "waku"; };
|
||||||
|
|||||||
@ -62,6 +62,7 @@ require (
|
|||||||
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
||||||
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
||||||
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
|
||||||
github.com/klauspost/compress v1.17.9 // indirect
|
github.com/klauspost/compress v1.17.9 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
||||||
github.com/koron/go-ssdp v0.0.4 // indirect
|
github.com/koron/go-ssdp v0.0.4 // indirect
|
||||||
|
|||||||
@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
|
|||||||
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
||||||
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
||||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
|
||||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||||
|
|||||||
@ -62,6 +62,7 @@ require (
|
|||||||
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
||||||
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
||||||
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
|
||||||
github.com/klauspost/compress v1.17.9 // indirect
|
github.com/klauspost/compress v1.17.9 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
||||||
github.com/koron/go-ssdp v0.0.4 // indirect
|
github.com/koron/go-ssdp v0.0.4 // indirect
|
||||||
|
|||||||
@ -337,6 +337,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
|
|||||||
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
||||||
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
||||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
|
||||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||||
|
|||||||
@ -70,6 +70,7 @@ require (
|
|||||||
github.com/ipfs/go-cid v0.4.1 // indirect
|
github.com/ipfs/go-cid v0.4.1 // indirect
|
||||||
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
||||||
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
|
||||||
github.com/klauspost/compress v1.17.9 // indirect
|
github.com/klauspost/compress v1.17.9 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
||||||
github.com/koron/go-ssdp v0.0.4 // indirect
|
github.com/koron/go-ssdp v0.0.4 // indirect
|
||||||
|
|||||||
@ -349,6 +349,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
|
|||||||
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
||||||
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
||||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
|
||||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||||
|
|||||||
@ -57,6 +57,7 @@ require (
|
|||||||
github.com/ipfs/go-cid v0.4.1 // indirect
|
github.com/ipfs/go-cid v0.4.1 // indirect
|
||||||
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
||||||
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
|
||||||
github.com/klauspost/compress v1.17.9 // indirect
|
github.com/klauspost/compress v1.17.9 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
||||||
github.com/koron/go-ssdp v0.0.4 // indirect
|
github.com/koron/go-ssdp v0.0.4 // indirect
|
||||||
|
|||||||
@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
|
|||||||
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
||||||
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
||||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
|
||||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||||
|
|||||||
@ -60,6 +60,7 @@ require (
|
|||||||
github.com/ipfs/go-cid v0.4.1 // indirect
|
github.com/ipfs/go-cid v0.4.1 // indirect
|
||||||
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
||||||
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
|
||||||
github.com/klauspost/compress v1.17.9 // indirect
|
github.com/klauspost/compress v1.17.9 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
||||||
github.com/koron/go-ssdp v0.0.4 // indirect
|
github.com/koron/go-ssdp v0.0.4 // indirect
|
||||||
|
|||||||
@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
|
|||||||
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
||||||
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
||||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
|
||||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||||
|
|||||||
@ -58,6 +58,7 @@ require (
|
|||||||
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
||||||
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
||||||
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
|
||||||
github.com/klauspost/compress v1.17.9 // indirect
|
github.com/klauspost/compress v1.17.9 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
||||||
github.com/koron/go-ssdp v0.0.4 // indirect
|
github.com/koron/go-ssdp v0.0.4 // indirect
|
||||||
|
|||||||
@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
|
|||||||
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
||||||
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
||||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
|
||||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||||
|
|||||||
1
go.mod
1
go.mod
@ -40,6 +40,7 @@ require (
|
|||||||
github.com/dustin/go-humanize v1.0.1
|
github.com/dustin/go-humanize v1.0.1
|
||||||
github.com/go-chi/chi/v5 v5.0.0
|
github.com/go-chi/chi/v5 v5.0.0
|
||||||
github.com/jackc/pgx/v5 v5.4.1
|
github.com/jackc/pgx/v5 v5.4.1
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0
|
||||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0
|
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0
|
||||||
github.com/waku-org/go-noise v0.0.4
|
github.com/waku-org/go-noise v0.0.4
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59
|
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59
|
||||||
|
|||||||
2
go.sum
2
go.sum
@ -949,6 +949,8 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0
|
|||||||
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
|
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
|
||||||
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
||||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
|
||||||
|
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
|
||||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
||||||
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||||
|
|||||||
@ -54,6 +54,7 @@ type WakuFilterLightNode struct {
|
|||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
subscriptions *subscription.SubscriptionsMap
|
subscriptions *subscription.SubscriptionsMap
|
||||||
pm *peermanager.PeerManager
|
pm *peermanager.PeerManager
|
||||||
|
limiter *utils.RateLimiter
|
||||||
peerPingInterval time.Duration
|
peerPingInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -89,6 +90,7 @@ func NewWakuFilterLightNode(
|
|||||||
onlineChecker onlinechecker.OnlineChecker,
|
onlineChecker onlinechecker.OnlineChecker,
|
||||||
reg prometheus.Registerer,
|
reg prometheus.Registerer,
|
||||||
log *zap.Logger,
|
log *zap.Logger,
|
||||||
|
opts ...LightNodeOption,
|
||||||
) *WakuFilterLightNode {
|
) *WakuFilterLightNode {
|
||||||
wf := new(WakuFilterLightNode)
|
wf := new(WakuFilterLightNode)
|
||||||
wf.log = log.Named("filterv2-lightnode")
|
wf.log = log.Named("filterv2-lightnode")
|
||||||
@ -99,6 +101,14 @@ func NewWakuFilterLightNode(
|
|||||||
wf.CommonService = service.NewCommonService()
|
wf.CommonService = service.NewCommonService()
|
||||||
wf.metrics = newMetrics(reg)
|
wf.metrics = newMetrics(reg)
|
||||||
wf.peerPingInterval = 1 * time.Minute
|
wf.peerPingInterval = 1 * time.Minute
|
||||||
|
|
||||||
|
params := &LightNodeParameters{}
|
||||||
|
opts = append(DefaultLightNodeOptions(), opts...)
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(params)
|
||||||
|
}
|
||||||
|
wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB)
|
||||||
|
|
||||||
return wf
|
return wf
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,6 +165,14 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
|
|||||||
|
|
||||||
logger := wf.log.With(logging.HostID("peerID", peerID))
|
logger := wf.log.With(logging.HostID("peerID", peerID))
|
||||||
|
|
||||||
|
if !wf.limiter.Allow(peerID) {
|
||||||
|
wf.metrics.RecordError(rateLimitFailure)
|
||||||
|
if err := stream.Reset(); err != nil {
|
||||||
|
wf.log.Error("resetting connection", zap.Error(err))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if !wf.subscriptions.IsSubscribedTo(peerID) {
|
if !wf.subscriptions.IsSubscribedTo(peerID) {
|
||||||
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
|
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
|
||||||
wf.metrics.RecordError(unknownPeerMessagePush)
|
wf.metrics.RecordError(unknownPeerMessagePush)
|
||||||
@ -287,7 +305,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if filterSubscribeResponse.RequestId != request.RequestId {
|
if filterSubscribeResponse.RequestId != "N/A" && filterSubscribeResponse.RequestId != request.RequestId {
|
||||||
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
|
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
|
||||||
wf.metrics.RecordError(requestIDMismatch)
|
wf.metrics.RecordError(requestIDMismatch)
|
||||||
err := NewFilterError(300, "request_id_mismatch")
|
err := NewFilterError(300, "request_id_mismatch")
|
||||||
|
|||||||
@ -96,6 +96,7 @@ var (
|
|||||||
peerNotFoundFailure metricsErrCategory = "peer_not_found_failure"
|
peerNotFoundFailure metricsErrCategory = "peer_not_found_failure"
|
||||||
writeResponseFailure metricsErrCategory = "write_response_failure"
|
writeResponseFailure metricsErrCategory = "write_response_failure"
|
||||||
pushTimeoutFailure metricsErrCategory = "push_timeout_failure"
|
pushTimeoutFailure metricsErrCategory = "push_timeout_failure"
|
||||||
|
rateLimitFailure metricsErrCategory = "ratelimit_failure"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RecordError increases the counter for different error types
|
// RecordError increases the counter for different error types
|
||||||
|
|||||||
@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters {
|
func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters {
|
||||||
@ -57,13 +58,35 @@ type (
|
|||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
MaxSubscribers int
|
MaxSubscribers int
|
||||||
pm *peermanager.PeerManager
|
pm *peermanager.PeerManager
|
||||||
|
limitR rate.Limit
|
||||||
|
limitB int
|
||||||
}
|
}
|
||||||
|
|
||||||
Option func(*FilterParameters)
|
Option func(*FilterParameters)
|
||||||
|
|
||||||
|
LightNodeParameters struct {
|
||||||
|
limitR rate.Limit
|
||||||
|
limitB int
|
||||||
|
}
|
||||||
|
|
||||||
|
LightNodeOption func(*LightNodeParameters)
|
||||||
|
|
||||||
FilterSubscribeOption func(*FilterSubscribeParameters) error
|
FilterSubscribeOption func(*FilterSubscribeParameters) error
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption {
|
||||||
|
return func(params *LightNodeParameters) {
|
||||||
|
params.limitR = r
|
||||||
|
params.limitB = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func DefaultLightNodeOptions() []LightNodeOption {
|
||||||
|
return []LightNodeOption{
|
||||||
|
WithLightNodeRateLimiter(1, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithTimeout(timeout time.Duration) Option {
|
func WithTimeout(timeout time.Duration) Option {
|
||||||
return func(params *FilterParameters) {
|
return func(params *FilterParameters) {
|
||||||
params.Timeout = timeout
|
params.Timeout = timeout
|
||||||
@ -202,9 +225,17 @@ func WithPeerManager(pm *peermanager.PeerManager) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithFullNodeRateLimiter(r rate.Limit, b int) Option {
|
||||||
|
return func(params *FilterParameters) {
|
||||||
|
params.limitR = r
|
||||||
|
params.limitB = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func DefaultOptions() []Option {
|
func DefaultOptions() []Option {
|
||||||
return []Option{
|
return []Option{
|
||||||
WithTimeout(DefaultIdleSubscriptionTimeout),
|
WithTimeout(DefaultIdleSubscriptionTimeout),
|
||||||
WithMaxSubscribers(DefaultMaxSubscribers),
|
WithMaxSubscribers(DefaultMaxSubscribers),
|
||||||
|
WithFullNodeRateLimiter(1, 1),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -39,7 +39,7 @@ type (
|
|||||||
*service.CommonService
|
*service.CommonService
|
||||||
subscriptions *SubscribersMap
|
subscriptions *SubscribersMap
|
||||||
pm *peermanager.PeerManager
|
pm *peermanager.PeerManager
|
||||||
|
limiter *utils.RateLimiter
|
||||||
maxSubscriptions int
|
maxSubscriptions int
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@ -56,6 +56,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
|
|||||||
opt(params)
|
opt(params)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB)
|
||||||
wf.CommonService = service.NewCommonService()
|
wf.CommonService = service.NewCommonService()
|
||||||
wf.metrics = newMetrics(reg)
|
wf.metrics = newMetrics(reg)
|
||||||
wf.subscriptions = NewSubscribersMap(params.Timeout)
|
wf.subscriptions = NewSubscribersMap(params.Timeout)
|
||||||
@ -93,7 +94,14 @@ func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error {
|
|||||||
|
|
||||||
func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream) {
|
func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream) {
|
||||||
return func(stream network.Stream) {
|
return func(stream network.Stream) {
|
||||||
logger := wf.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
|
peerID := stream.Conn().RemotePeer()
|
||||||
|
logger := wf.log.With(logging.HostID("peer", peerID))
|
||||||
|
|
||||||
|
if !wf.limiter.Allow(peerID) {
|
||||||
|
wf.metrics.RecordError(rateLimitFailure)
|
||||||
|
wf.reply(ctx, stream, &pb.FilterSubscribeRequest{RequestId: "N/A"}, http.StatusTooManyRequests, "filter request rejected due rate limit exceeded")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
|
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
|
||||||
|
|
||||||
|
|||||||
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
type LightNodeData struct {
|
type LightNodeData struct {
|
||||||
@ -133,7 +134,7 @@ func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bo
|
|||||||
|
|
||||||
nodeData := s.GetWakuRelay(topic)
|
nodeData := s.GetWakuRelay(topic)
|
||||||
|
|
||||||
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
|
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, WithFullNodeRateLimiter(rate.Inf, 0))
|
||||||
node2Filter.SetHost(nodeData.FullNodeHost)
|
node2Filter.SetHost(nodeData.FullNodeHost)
|
||||||
|
|
||||||
var sub *relay.Subscription
|
var sub *relay.Subscription
|
||||||
@ -166,7 +167,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
|
|||||||
b := relay.NewBroadcaster(10)
|
b := relay.NewBroadcaster(10)
|
||||||
s.Require().NoError(b.Start(context.Background()))
|
s.Require().NoError(b.Start(context.Background()))
|
||||||
pm := peermanager.NewPeerManager(5, 5, nil, nil, true, s.Log)
|
pm := peermanager.NewPeerManager(5, 5, nil, nil, true, s.Log)
|
||||||
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log)
|
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log, WithLightNodeRateLimiter(rate.Inf, 0))
|
||||||
filterPush.SetHost(host)
|
filterPush.SetHost(host)
|
||||||
pm.SetHost(host)
|
pm.SetHost(host)
|
||||||
return LightNodeData{filterPush, host}
|
return LightNodeData{filterPush, host}
|
||||||
|
|||||||
@ -24,7 +24,6 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// LightPushID_v20beta1 is the current Waku LightPush protocol identifier
|
// LightPushID_v20beta1 is the current Waku LightPush protocol identifier
|
||||||
@ -40,7 +39,7 @@ var (
|
|||||||
type WakuLightPush struct {
|
type WakuLightPush struct {
|
||||||
h host.Host
|
h host.Host
|
||||||
relay *relay.WakuRelay
|
relay *relay.WakuRelay
|
||||||
limiter *rate.Limiter
|
limiter *utils.RateLimiter
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
pm *peermanager.PeerManager
|
pm *peermanager.PeerManager
|
||||||
metrics Metrics
|
metrics Metrics
|
||||||
@ -59,11 +58,12 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p
|
|||||||
wakuLP.metrics = newMetrics(reg)
|
wakuLP.metrics = newMetrics(reg)
|
||||||
|
|
||||||
params := &LightpushParameters{}
|
params := &LightpushParameters{}
|
||||||
|
opts = append(DefaultLightpushOptions(), opts...)
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
opt(params)
|
opt(params)
|
||||||
}
|
}
|
||||||
|
|
||||||
wakuLP.limiter = params.limiter
|
wakuLP.limiter = utils.NewRateLimiter(params.limitR, params.limitB)
|
||||||
|
|
||||||
return wakuLP
|
return wakuLP
|
||||||
}
|
}
|
||||||
@ -106,7 +106,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream)
|
|||||||
Response: &pb.PushResponse{},
|
Response: &pb.PushResponse{},
|
||||||
}
|
}
|
||||||
|
|
||||||
if wakuLP.limiter != nil && !wakuLP.limiter.Allow() {
|
if !wakuLP.limiter.Allow(stream.Conn().RemotePeer()) {
|
||||||
wakuLP.metrics.RecordError(rateLimitFailure)
|
wakuLP.metrics.RecordError(rateLimitFailure)
|
||||||
responseMsg := "exceeds the rate limit"
|
responseMsg := "exceeds the rate limit"
|
||||||
responsePushRPC.Response.Info = &responseMsg
|
responsePushRPC.Response.Info = &responseMsg
|
||||||
|
|||||||
@ -14,7 +14,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type LightpushParameters struct {
|
type LightpushParameters struct {
|
||||||
limiter *rate.Limiter
|
limitR rate.Limit
|
||||||
|
limitB int
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option func(*LightpushParameters)
|
type Option func(*LightpushParameters)
|
||||||
@ -22,7 +23,14 @@ type Option func(*LightpushParameters)
|
|||||||
// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol
|
// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol
|
||||||
func WithRateLimiter(r rate.Limit, b int) Option {
|
func WithRateLimiter(r rate.Limit, b int) Option {
|
||||||
return func(params *LightpushParameters) {
|
return func(params *LightpushParameters) {
|
||||||
params.limiter = rate.NewLimiter(r, b)
|
params.limitR = r
|
||||||
|
params.limitB = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func DefaultLightpushOptions() []Option {
|
||||||
|
return []Option{
|
||||||
|
WithRateLimiter(1, 1),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -8,6 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/host"
|
"github.com/libp2p/go-libp2p/core/host"
|
||||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||||
@ -273,7 +274,7 @@ func TestWakuLightPushCornerCases(t *testing.T) {
|
|||||||
defer node2.Stop()
|
defer node2.Stop()
|
||||||
defer sub2.Unsubscribe()
|
defer sub2.Unsubscribe()
|
||||||
|
|
||||||
lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger())
|
lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0))
|
||||||
lightPushNode2.SetHost(host2)
|
lightPushNode2.SetHost(host2)
|
||||||
err := lightPushNode2.Start(ctx)
|
err := lightPushNode2.Start(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -358,7 +359,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) {
|
|||||||
|
|
||||||
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger())
|
client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0))
|
||||||
client.SetHost(clientHost)
|
client.SetHost(clientHost)
|
||||||
|
|
||||||
// Node2
|
// Node2
|
||||||
@ -366,7 +367,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) {
|
|||||||
defer node2.Stop()
|
defer node2.Stop()
|
||||||
defer sub2.Unsubscribe()
|
defer sub2.Unsubscribe()
|
||||||
|
|
||||||
lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger())
|
lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0))
|
||||||
lightPushNode2.SetHost(host2)
|
lightPushNode2.SetHost(host2)
|
||||||
err = lightPushNode2.Start(ctx)
|
err = lightPushNode2.Start(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|||||||
@ -23,7 +23,6 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
|
// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
|
||||||
@ -51,7 +50,7 @@ type WakuPeerExchange struct {
|
|||||||
|
|
||||||
peerConnector PeerConnector
|
peerConnector PeerConnector
|
||||||
enrCache *enrCache
|
enrCache *enrCache
|
||||||
limiter *rate.Limiter
|
limiter *utils.RateLimiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
|
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
|
||||||
@ -68,11 +67,12 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, clusterID uint16, peerConnect
|
|||||||
wakuPX.CommonService = service.NewCommonService()
|
wakuPX.CommonService = service.NewCommonService()
|
||||||
|
|
||||||
params := &PeerExchangeParameters{}
|
params := &PeerExchangeParameters{}
|
||||||
|
opts = append(DefaultPeerExchangeOptions(), opts...)
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
opt(params)
|
opt(params)
|
||||||
}
|
}
|
||||||
|
|
||||||
wakuPX.limiter = params.limiter
|
wakuPX.limiter = utils.NewRateLimiter(params.limiterR, params.limiterB)
|
||||||
return wakuPX, nil
|
return wakuPX, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,9 +97,10 @@ func (wakuPX *WakuPeerExchange) start() error {
|
|||||||
|
|
||||||
func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
|
func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
|
||||||
return func(stream network.Stream) {
|
return func(stream network.Stream) {
|
||||||
logger := wakuPX.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
|
peerID := stream.Conn().RemotePeer()
|
||||||
|
logger := wakuPX.log.With(logging.HostID("peer", peerID))
|
||||||
|
|
||||||
if wakuPX.limiter != nil && !wakuPX.limiter.Allow() {
|
if wakuPX.limiter != nil && !wakuPX.limiter.Allow(peerID) {
|
||||||
wakuPX.metrics.RecordError(rateLimitFailure)
|
wakuPX.metrics.RecordError(rateLimitFailure)
|
||||||
wakuPX.log.Info("exceeds the rate limit")
|
wakuPX.log.Info("exceeds the rate limit")
|
||||||
// TODO: peer exchange protocol should contain an err field
|
// TODO: peer exchange protocol should contain an err field
|
||||||
|
|||||||
@ -12,7 +12,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type PeerExchangeParameters struct {
|
type PeerExchangeParameters struct {
|
||||||
limiter *rate.Limiter
|
limiterR rate.Limit
|
||||||
|
limiterB int
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option func(*PeerExchangeParameters)
|
type Option func(*PeerExchangeParameters)
|
||||||
@ -20,7 +21,14 @@ type Option func(*PeerExchangeParameters)
|
|||||||
// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol
|
// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol
|
||||||
func WithRateLimiter(r rate.Limit, b int) Option {
|
func WithRateLimiter(r rate.Limit, b int) Option {
|
||||||
return func(params *PeerExchangeParameters) {
|
return func(params *PeerExchangeParameters) {
|
||||||
params.limiter = rate.NewLimiter(r, b)
|
params.limiterR = r
|
||||||
|
params.limiterB = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func DefaultPeerExchangeOptions() []Option {
|
||||||
|
return []Option{
|
||||||
|
WithRateLimiter(1, 1),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
69
waku/v2/utils/limiter.go
Normal file
69
waku/v2/utils/limiter.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jellydator/ttlcache/v3"
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RateLimiter struct {
|
||||||
|
sync.Mutex
|
||||||
|
limiters *ttlcache.Cache[peer.ID, *rate.Limiter]
|
||||||
|
r rate.Limit
|
||||||
|
b int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRateLimiter(r rate.Limit, b int) *RateLimiter {
|
||||||
|
return &RateLimiter{
|
||||||
|
r: r,
|
||||||
|
b: b,
|
||||||
|
limiters: ttlcache.New[peer.ID, *rate.Limiter](
|
||||||
|
ttlcache.WithTTL[peer.ID, *rate.Limiter](30 * time.Minute),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RateLimiter) Start(ctx context.Context) {
|
||||||
|
go func() {
|
||||||
|
t := time.NewTicker(time.Hour)
|
||||||
|
defer t.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
r.Lock()
|
||||||
|
r.limiters.DeleteExpired()
|
||||||
|
r.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RateLimiter) getOrCreate(peerID peer.ID) *rate.Limiter {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
|
var limiter *rate.Limiter
|
||||||
|
if !r.limiters.Has(peerID) {
|
||||||
|
limiter = rate.NewLimiter(r.r, r.b)
|
||||||
|
r.limiters.Set(peerID, limiter, ttlcache.DefaultTTL)
|
||||||
|
} else {
|
||||||
|
v := r.limiters.Get(peerID)
|
||||||
|
limiter = v.Value()
|
||||||
|
}
|
||||||
|
return limiter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RateLimiter) Allow(peerID peer.ID) bool {
|
||||||
|
return r.getOrCreate(peerID).Allow()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RateLimiter) Wait(ctx context.Context, peerID peer.ID) error {
|
||||||
|
return r.getOrCreate(peerID).Wait(ctx)
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user