diff --git a/VERSION b/VERSION index 7b4813b0c..7faf444b0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.171.41 +0.172.0 diff --git a/go.mod b/go.mod index f3bcefd22..8c6248000 100644 --- a/go.mod +++ b/go.mod @@ -88,7 +88,7 @@ require ( github.com/mutecomm/go-sqlcipher/v4 v4.4.2 github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 - github.com/waku-org/go-waku v0.8.1-0.20231206134046-3d73afcd50a3 + github.com/waku-org/go-waku v0.8.1-0.20240104144340-585648c4eefe github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 @@ -120,6 +120,7 @@ require ( github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96 // indirect github.com/anacrolix/utp v0.1.0 // indirect github.com/andybalholm/cascadia v1.2.0 // indirect + github.com/avast/retry-go/v4 v4.5.1 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/benbjohnson/immutable v0.3.0 // indirect @@ -138,7 +139,7 @@ require ( github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect @@ -259,7 +260,7 @@ require ( github.com/urfave/cli/v2 v2.24.4 // indirect github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 // indirect - github.com/waku-org/go-zerokit-rln v0.1.14-0.20230916173259-d284a3d8f2fd // indirect + github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230916171929-1dd9494ff065 // indirect github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230916171518-2a77c3734dd1 // indirect diff --git a/go.sum b/go.sum index 0e515f5ba..aca8a6c69 100644 --- a/go.sum +++ b/go.sum @@ -345,6 +345,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= +github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o= +github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/aws/aws-sdk-go v1.17.7/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= @@ -710,8 +712,9 @@ github.com/dop251/goja v0.0.0-20211011172007-d99e4b8cbf48/go.mod h1:R9ET47fwRVRP github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/dvyukov/go-fuzz v0.0.0-20200318091601-be3528f3a813/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= @@ -2096,10 +2099,10 @@ github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZF github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 h1:0e1h+p84yBp0IN7AqgbZlV7lgFBjm214lgSOE7CeJmE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7/go.mod h1:pFvOZ9YTFsW0o5zJW7a0B5tr1owAijRWJctXJ2toL04= -github.com/waku-org/go-waku v0.8.1-0.20231206134046-3d73afcd50a3 h1:ck3j0not5znCThv4E76bJlUZHDn96IXABH5fg7CfEQc= -github.com/waku-org/go-waku v0.8.1-0.20231206134046-3d73afcd50a3/go.mod h1:hem2hnXK5BdabxwJULszM0Rh1Yj+gD9IxjwLCGPPaxs= -github.com/waku-org/go-zerokit-rln v0.1.14-0.20230916173259-d284a3d8f2fd h1:cu7CsUo7BK6ac/v193RIaqAzUcmpa6MNY4xYW9AenQI= -github.com/waku-org/go-zerokit-rln v0.1.14-0.20230916173259-d284a3d8f2fd/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= +github.com/waku-org/go-waku v0.8.1-0.20240104144340-585648c4eefe h1:2D97fbaKlIQRjWMz/iTjnYcxi2z6ekKvspTGtcuPHgU= +github.com/waku-org/go-waku v0.8.1-0.20240104144340-585648c4eefe/go.mod h1:+b5fPPJ4YUIAPJtPOtwB7bTrOQ9lF15I2LnQjV6NMIA= +github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= +github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48= github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230916171929-1dd9494ff065 h1:Sd7QD/1Yo2o2M1MY49F8Zr4KNBPUEK5cz5HoXQVJbrs= diff --git a/vendor/github.com/avast/retry-go/v4/.gitignore b/vendor/github.com/avast/retry-go/v4/.gitignore new file mode 100644 index 000000000..c40eb23f9 --- /dev/null +++ b/vendor/github.com/avast/retry-go/v4/.gitignore @@ -0,0 +1,21 @@ +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ + +# dep +vendor/ +Gopkg.lock + +# cover +coverage.txt diff --git a/vendor/github.com/avast/retry-go/v4/.godocdown.tmpl b/vendor/github.com/avast/retry-go/v4/.godocdown.tmpl new file mode 100644 index 000000000..e914ca4af --- /dev/null +++ b/vendor/github.com/avast/retry-go/v4/.godocdown.tmpl @@ -0,0 +1,38 @@ +# {{ .Name }} + +[![Release](https://img.shields.io/github/release/avast/retry-go.svg?style=flat-square)](https://github.com/avast/retry-go/releases/latest) +[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg?style=flat-square)](LICENSE.md) +![GitHub Actions](https://github.com/avast/retry-go/actions/workflows/workflow.yaml/badge.svg) +[![Go Report Card](https://goreportcard.com/badge/github.com/avast/retry-go?style=flat-square)](https://goreportcard.com/report/github.com/avast/retry-go) +[![GoDoc](https://godoc.org/github.com/avast/retry-go?status.svg&style=flat-square)](http://godoc.org/github.com/avast/retry-go) +[![codecov.io](https://codecov.io/github/avast/retry-go/coverage.svg?branch=master)](https://codecov.io/github/avast/retry-go?branch=master) +[![Sourcegraph](https://sourcegraph.com/github.com/avast/retry-go/-/badge.svg)](https://sourcegraph.com/github.com/avast/retry-go?badge) + +{{ .EmitSynopsis }} + +{{ .EmitUsage }} + +## Contributing + +Contributions are very much welcome. + +### Makefile + +Makefile provides several handy rules, like README.md `generator` , `setup` for prepare build/dev environment, `test`, `cover`, etc... + +Try `make help` for more information. + +### Before pull request + +> maybe you need `make setup` in order to setup environment + +please try: +* run tests (`make test`) +* run linter (`make lint`) +* if your IDE don't automaticaly do `go fmt`, run `go fmt` (`make fmt`) + +### README + +README.md are generate from template [.godocdown.tmpl](.godocdown.tmpl) and code documentation via [godocdown](https://github.com/robertkrimen/godocdown). + +Never edit README.md direct, because your change will be lost. diff --git a/vendor/github.com/avast/retry-go/v4/LICENSE b/vendor/github.com/avast/retry-go/v4/LICENSE new file mode 100644 index 000000000..f63fca814 --- /dev/null +++ b/vendor/github.com/avast/retry-go/v4/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 Avast + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/avast/retry-go/v4/Makefile b/vendor/github.com/avast/retry-go/v4/Makefile new file mode 100644 index 000000000..86544d239 --- /dev/null +++ b/vendor/github.com/avast/retry-go/v4/Makefile @@ -0,0 +1,59 @@ +SOURCE_FILES?=$$(go list ./... | grep -v /vendor/) +TEST_PATTERN?=. +TEST_OPTIONS?= +VERSION?=$$(cat VERSION) +LINTER?=$$(which golangci-lint) +LINTER_VERSION=1.50.0 + +ifeq ($(OS),Windows_NT) + LINTER_FILE=golangci-lint-$(LINTER_VERSION)-windows-amd64.zip + LINTER_UNPACK= >| app.zip; unzip -j app.zip -d $$GOPATH/bin; rm app.zip +else ifeq ($(OS), Darwin) + LINTER_FILE=golangci-lint-$(LINTER_VERSION)-darwin-amd64.tar.gz + LINTER_UNPACK= | tar xzf - -C $$GOPATH/bin --wildcards --strip 1 "**/golangci-lint" +else + LINTER_FILE=golangci-lint-$(LINTER_VERSION)-linux-amd64.tar.gz + LINTER_UNPACK= | tar xzf - -C $$GOPATH/bin --wildcards --strip 1 "**/golangci-lint" +endif + +setup: + go install github.com/pierrre/gotestcover@latest + go install golang.org/x/tools/cmd/cover@latest + go install github.com/robertkrimen/godocdown/godocdown@latest + go mod download + +generate: ## Generate README.md + godocdown >| README.md + +test: generate test_and_cover_report lint + +test_and_cover_report: + gotestcover $(TEST_OPTIONS) -covermode=atomic -coverprofile=coverage.txt $(SOURCE_FILES) -run $(TEST_PATTERN) -timeout=2m + +cover: test ## Run all the tests and opens the coverage report + go tool cover -html=coverage.txt + +fmt: ## gofmt and goimports all go files + find . -name '*.go' -not -wholename './vendor/*' | while read -r file; do gofmt -w -s "$$file"; goimports -w "$$file"; done + +lint: ## Run all the linters + @if [ "$(LINTER)" = "" ]; then\ + curl -L https://github.com/golangci/golangci-lint/releases/download/v$(LINTER_VERSION)/$(LINTER_FILE) $(LINTER_UNPACK) ;\ + chmod +x $$GOPATH/bin/golangci-lint;\ + fi + + golangci-lint run + +ci: test_and_cover_report ## Run all the tests but no linters - use https://golangci.com integration instead + +build: + go build + +release: ## Release new version + git tag | grep -q $(VERSION) && echo This version was released! Increase VERSION! || git tag $(VERSION) && git push origin $(VERSION) && git tag v$(VERSION) && git push origin v$(VERSION) + +# Absolutely awesome: http://marmelab.com/blog/2016/02/29/auto-documented-makefile.html +help: + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' + +.DEFAULT_GOAL := build diff --git a/vendor/github.com/avast/retry-go/v4/README.md b/vendor/github.com/avast/retry-go/v4/README.md new file mode 100644 index 000000000..c7736c070 --- /dev/null +++ b/vendor/github.com/avast/retry-go/v4/README.md @@ -0,0 +1,485 @@ +# retry + +[![Release](https://img.shields.io/github/release/avast/retry-go.svg?style=flat-square)](https://github.com/avast/retry-go/releases/latest) +[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg?style=flat-square)](LICENSE.md) +![GitHub Actions](https://github.com/avast/retry-go/actions/workflows/workflow.yaml/badge.svg) +[![Go Report Card](https://goreportcard.com/badge/github.com/avast/retry-go?style=flat-square)](https://goreportcard.com/report/github.com/avast/retry-go) +[![GoDoc](https://godoc.org/github.com/avast/retry-go?status.svg&style=flat-square)](http://godoc.org/github.com/avast/retry-go) +[![codecov.io](https://codecov.io/github/avast/retry-go/coverage.svg?branch=master)](https://codecov.io/github/avast/retry-go?branch=master) +[![Sourcegraph](https://sourcegraph.com/github.com/avast/retry-go/-/badge.svg)](https://sourcegraph.com/github.com/avast/retry-go?badge) + +Simple library for retry mechanism + +slightly inspired by +[Try::Tiny::Retry](https://metacpan.org/pod/Try::Tiny::Retry) + +# SYNOPSIS + +http get with retry: + + url := "http://example.com" + var body []byte + + err := retry.Do( + func() error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + return nil + }, + ) + if err != nil { + // handle error + } + + fmt.Println(string(body)) + +http get with retry with data: + + url := "http://example.com" + + body, err := retry.DoWithData( + func() ([]byte, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return body, nil + }, + ) + if err != nil { + // handle error + } + + fmt.Println(string(body)) + +[next examples](https://github.com/avast/retry-go/tree/master/examples) + +# SEE ALSO + +* [giantswarm/retry-go](https://github.com/giantswarm/retry-go) - slightly +complicated interface. + +* [sethgrid/pester](https://github.com/sethgrid/pester) - only http retry for +http calls with retries and backoff + +* [cenkalti/backoff](https://github.com/cenkalti/backoff) - Go port of the +exponential backoff algorithm from Google's HTTP Client Library for Java. Really +complicated interface. + +* [rafaeljesus/retry-go](https://github.com/rafaeljesus/retry-go) - looks good, +slightly similar as this package, don't have 'simple' `Retry` method + +* [matryer/try](https://github.com/matryer/try) - very popular package, +nonintuitive interface (for me) + +# BREAKING CHANGES + +* 4.0.0 + + - infinity retry is possible by set `Attempts(0)` by PR [#49](https://github.com/avast/retry-go/pull/49) + +* 3.0.0 + + - `DelayTypeFunc` accepts a new parameter `err` - this breaking change affects only your custom Delay Functions. This change allow [make delay functions based on error](examples/delay_based_on_error_test.go). + +* 1.0.2 -> 2.0.0 + + - argument of `retry.Delay` is final delay (no multiplication by `retry.Units` anymore) + - function `retry.Units` are removed + - [more about this breaking change](https://github.com/avast/retry-go/issues/7) + +* 0.3.0 -> 1.0.0 + + - `retry.Retry` function are changed to `retry.Do` function + - `retry.RetryCustom` (OnRetry) and `retry.RetryCustomWithOpts` functions are now implement via functions produces Options (aka `retry.OnRetry`) + +## Usage + +#### func BackOffDelay + +```go +func BackOffDelay(n uint, _ error, config *Config) time.Duration +``` +BackOffDelay is a DelayType which increases delay between consecutive retries + +#### func Do + +```go +func Do(retryableFunc RetryableFunc, opts ...Option) error +``` + +#### func DoWithData + +```go +func DoWithData[T any](retryableFunc RetryableFuncWithData[T], opts ...Option) (T, error) +``` + +#### func FixedDelay + +```go +func FixedDelay(_ uint, _ error, config *Config) time.Duration +``` +FixedDelay is a DelayType which keeps delay the same through all iterations + +#### func IsRecoverable + +```go +func IsRecoverable(err error) bool +``` +IsRecoverable checks if error is an instance of `unrecoverableError` + +#### func RandomDelay + +```go +func RandomDelay(_ uint, _ error, config *Config) time.Duration +``` +RandomDelay is a DelayType which picks a random delay up to config.maxJitter + +#### func Unrecoverable + +```go +func Unrecoverable(err error) error +``` +Unrecoverable wraps an error in `unrecoverableError` struct + +#### type Config + +```go +type Config struct { +} +``` + + +#### type DelayTypeFunc + +```go +type DelayTypeFunc func(n uint, err error, config *Config) time.Duration +``` + +DelayTypeFunc is called to return the next delay to wait after the retriable +function fails on `err` after `n` attempts. + +#### func CombineDelay + +```go +func CombineDelay(delays ...DelayTypeFunc) DelayTypeFunc +``` +CombineDelay is a DelayType the combines all of the specified delays into a new +DelayTypeFunc + +#### type Error + +```go +type Error []error +``` + +Error type represents list of errors in retry + +#### func (Error) As + +```go +func (e Error) As(target interface{}) bool +``` + +#### func (Error) Error + +```go +func (e Error) Error() string +``` +Error method return string representation of Error It is an implementation of +error interface + +#### func (Error) Is + +```go +func (e Error) Is(target error) bool +``` + +#### func (Error) Unwrap + +```go +func (e Error) Unwrap() error +``` +Unwrap the last error for compatibility with `errors.Unwrap()`. When you need to +unwrap all errors, you should use `WrappedErrors()` instead. + + err := Do( + func() error { + return errors.New("original error") + }, + Attempts(1), + ) + + fmt.Println(errors.Unwrap(err)) # "original error" is printed + +Added in version 4.2.0. + +#### func (Error) WrappedErrors + +```go +func (e Error) WrappedErrors() []error +``` +WrappedErrors returns the list of errors that this Error is wrapping. It is an +implementation of the `errwrap.Wrapper` interface in package +[errwrap](https://github.com/hashicorp/errwrap) so that `retry.Error` can be +used with that library. + +#### type OnRetryFunc + +```go +type OnRetryFunc func(n uint, err error) +``` + +Function signature of OnRetry function n = count of attempts + +#### type Option + +```go +type Option func(*Config) +``` + +Option represents an option for retry. + +#### func Attempts + +```go +func Attempts(attempts uint) Option +``` +Attempts set count of retry. Setting to 0 will retry until the retried function +succeeds. default is 10 + +#### func AttemptsForError + +```go +func AttemptsForError(attempts uint, err error) Option +``` +AttemptsForError sets count of retry in case execution results in given `err` +Retries for the given `err` are also counted against total retries. The retry +will stop if any of given retries is exhausted. + +added in 4.3.0 + +#### func Context + +```go +func Context(ctx context.Context) Option +``` +Context allow to set context of retry default are Background context + +example of immediately cancellation (maybe it isn't the best example, but it +describes behavior enough; I hope) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + retry.Do( + func() error { + ... + }, + retry.Context(ctx), + ) + +#### func Delay + +```go +func Delay(delay time.Duration) Option +``` +Delay set delay between retry default is 100ms + +#### func DelayType + +```go +func DelayType(delayType DelayTypeFunc) Option +``` +DelayType set type of the delay between retries default is BackOff + +#### func LastErrorOnly + +```go +func LastErrorOnly(lastErrorOnly bool) Option +``` +return the direct last error that came from the retried function default is +false (return wrapped errors with everything) + +#### func MaxDelay + +```go +func MaxDelay(maxDelay time.Duration) Option +``` +MaxDelay set maximum delay between retry does not apply by default + +#### func MaxJitter + +```go +func MaxJitter(maxJitter time.Duration) Option +``` +MaxJitter sets the maximum random Jitter between retries for RandomDelay + +#### func OnRetry + +```go +func OnRetry(onRetry OnRetryFunc) Option +``` +OnRetry function callback are called each retry + +log each retry example: + + retry.Do( + func() error { + return errors.New("some error") + }, + retry.OnRetry(func(n uint, err error) { + log.Printf("#%d: %s\n", n, err) + }), + ) + +#### func RetryIf + +```go +func RetryIf(retryIf RetryIfFunc) Option +``` +RetryIf controls whether a retry should be attempted after an error (assuming +there are any retry attempts remaining) + +skip retry if special error example: + + retry.Do( + func() error { + return errors.New("special error") + }, + retry.RetryIf(func(err error) bool { + if err.Error() == "special error" { + return false + } + return true + }), + ) + +By default RetryIf stops execution if the error is wrapped using +`retry.Unrecoverable`, so above example may also be shortened to: + + retry.Do( + func() error { + return retry.Unrecoverable(errors.New("special error")) + } + ) + +#### func WithTimer + +```go +func WithTimer(t Timer) Option +``` +WithTimer provides a way to swap out timer module implementations. This +primarily is useful for mocking/testing, where you may not want to explicitly +wait for a set duration for retries. + +example of augmenting time.After with a print statement + + type struct MyTimer {} + + func (t *MyTimer) After(d time.Duration) <- chan time.Time { + fmt.Print("Timer called!") + return time.After(d) + } + + retry.Do( + func() error { ... }, + retry.WithTimer(&MyTimer{}) + ) + +#### func WrapContextErrorWithLastError + +```go +func WrapContextErrorWithLastError(wrapContextErrorWithLastError bool) Option +``` +WrapContextErrorWithLastError allows the context error to be returned wrapped +with the last error that the retried function returned. This is only applicable +when Attempts is set to 0 to retry indefinitly and when using a context to +cancel / timeout + +default is false + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + retry.Do( + func() error { + ... + }, + retry.Context(ctx), + retry.Attempts(0), + retry.WrapContextErrorWithLastError(true), + ) + +#### type RetryIfFunc + +```go +type RetryIfFunc func(error) bool +``` + +Function signature of retry if function + +#### type RetryableFunc + +```go +type RetryableFunc func() error +``` + +Function signature of retryable function + +#### type RetryableFuncWithData + +```go +type RetryableFuncWithData[T any] func() (T, error) +``` + +Function signature of retryable function with data + +#### type Timer + +```go +type Timer interface { + After(time.Duration) <-chan time.Time +} +``` + +Timer represents the timer used to track time for a retry. + +## Contributing + +Contributions are very much welcome. + +### Makefile + +Makefile provides several handy rules, like README.md `generator` , `setup` for prepare build/dev environment, `test`, `cover`, etc... + +Try `make help` for more information. + +### Before pull request + +> maybe you need `make setup` in order to setup environment + +please try: +* run tests (`make test`) +* run linter (`make lint`) +* if your IDE don't automaticaly do `go fmt`, run `go fmt` (`make fmt`) + +### README + +README.md are generate from template [.godocdown.tmpl](.godocdown.tmpl) and code documentation via [godocdown](https://github.com/robertkrimen/godocdown). + +Never edit README.md direct, because your change will be lost. diff --git a/vendor/github.com/avast/retry-go/v4/VERSION b/vendor/github.com/avast/retry-go/v4/VERSION new file mode 100644 index 000000000..4404a17ba --- /dev/null +++ b/vendor/github.com/avast/retry-go/v4/VERSION @@ -0,0 +1 @@ +4.5.1 diff --git a/vendor/github.com/avast/retry-go/v4/options.go b/vendor/github.com/avast/retry-go/v4/options.go new file mode 100644 index 000000000..9d98a1d43 --- /dev/null +++ b/vendor/github.com/avast/retry-go/v4/options.go @@ -0,0 +1,274 @@ +package retry + +import ( + "context" + "math" + "math/rand" + "time" +) + +// Function signature of retry if function +type RetryIfFunc func(error) bool + +// Function signature of OnRetry function +// n = count of attempts +type OnRetryFunc func(n uint, err error) + +// DelayTypeFunc is called to return the next delay to wait after the retriable function fails on `err` after `n` attempts. +type DelayTypeFunc func(n uint, err error, config *Config) time.Duration + +// Timer represents the timer used to track time for a retry. +type Timer interface { + After(time.Duration) <-chan time.Time +} + +type Config struct { + attempts uint + attemptsForError map[error]uint + delay time.Duration + maxDelay time.Duration + maxJitter time.Duration + onRetry OnRetryFunc + retryIf RetryIfFunc + delayType DelayTypeFunc + lastErrorOnly bool + context context.Context + timer Timer + wrapContextErrorWithLastError bool + + maxBackOffN uint +} + +// Option represents an option for retry. +type Option func(*Config) + +func emptyOption(c *Config) {} + +// return the direct last error that came from the retried function +// default is false (return wrapped errors with everything) +func LastErrorOnly(lastErrorOnly bool) Option { + return func(c *Config) { + c.lastErrorOnly = lastErrorOnly + } +} + +// Attempts set count of retry. Setting to 0 will retry until the retried function succeeds. +// default is 10 +func Attempts(attempts uint) Option { + return func(c *Config) { + c.attempts = attempts + } +} + +// AttemptsForError sets count of retry in case execution results in given `err` +// Retries for the given `err` are also counted against total retries. +// The retry will stop if any of given retries is exhausted. +// +// added in 4.3.0 +func AttemptsForError(attempts uint, err error) Option { + return func(c *Config) { + c.attemptsForError[err] = attempts + } +} + +// Delay set delay between retry +// default is 100ms +func Delay(delay time.Duration) Option { + return func(c *Config) { + c.delay = delay + } +} + +// MaxDelay set maximum delay between retry +// does not apply by default +func MaxDelay(maxDelay time.Duration) Option { + return func(c *Config) { + c.maxDelay = maxDelay + } +} + +// MaxJitter sets the maximum random Jitter between retries for RandomDelay +func MaxJitter(maxJitter time.Duration) Option { + return func(c *Config) { + c.maxJitter = maxJitter + } +} + +// DelayType set type of the delay between retries +// default is BackOff +func DelayType(delayType DelayTypeFunc) Option { + if delayType == nil { + return emptyOption + } + return func(c *Config) { + c.delayType = delayType + } +} + +// BackOffDelay is a DelayType which increases delay between consecutive retries +func BackOffDelay(n uint, _ error, config *Config) time.Duration { + // 1 << 63 would overflow signed int64 (time.Duration), thus 62. + const max uint = 62 + + if config.maxBackOffN == 0 { + if config.delay <= 0 { + config.delay = 1 + } + + config.maxBackOffN = max - uint(math.Floor(math.Log2(float64(config.delay)))) + } + + if n > config.maxBackOffN { + n = config.maxBackOffN + } + + return config.delay << n +} + +// FixedDelay is a DelayType which keeps delay the same through all iterations +func FixedDelay(_ uint, _ error, config *Config) time.Duration { + return config.delay +} + +// RandomDelay is a DelayType which picks a random delay up to config.maxJitter +func RandomDelay(_ uint, _ error, config *Config) time.Duration { + return time.Duration(rand.Int63n(int64(config.maxJitter))) +} + +// CombineDelay is a DelayType the combines all of the specified delays into a new DelayTypeFunc +func CombineDelay(delays ...DelayTypeFunc) DelayTypeFunc { + const maxInt64 = uint64(math.MaxInt64) + + return func(n uint, err error, config *Config) time.Duration { + var total uint64 + for _, delay := range delays { + total += uint64(delay(n, err, config)) + if total > maxInt64 { + total = maxInt64 + } + } + + return time.Duration(total) + } +} + +// OnRetry function callback are called each retry +// +// log each retry example: +// +// retry.Do( +// func() error { +// return errors.New("some error") +// }, +// retry.OnRetry(func(n uint, err error) { +// log.Printf("#%d: %s\n", n, err) +// }), +// ) +func OnRetry(onRetry OnRetryFunc) Option { + if onRetry == nil { + return emptyOption + } + return func(c *Config) { + c.onRetry = onRetry + } +} + +// RetryIf controls whether a retry should be attempted after an error +// (assuming there are any retry attempts remaining) +// +// skip retry if special error example: +// +// retry.Do( +// func() error { +// return errors.New("special error") +// }, +// retry.RetryIf(func(err error) bool { +// if err.Error() == "special error" { +// return false +// } +// return true +// }) +// ) +// +// By default RetryIf stops execution if the error is wrapped using `retry.Unrecoverable`, +// so above example may also be shortened to: +// +// retry.Do( +// func() error { +// return retry.Unrecoverable(errors.New("special error")) +// } +// ) +func RetryIf(retryIf RetryIfFunc) Option { + if retryIf == nil { + return emptyOption + } + return func(c *Config) { + c.retryIf = retryIf + } +} + +// Context allow to set context of retry +// default are Background context +// +// example of immediately cancellation (maybe it isn't the best example, but it describes behavior enough; I hope) +// +// ctx, cancel := context.WithCancel(context.Background()) +// cancel() +// +// retry.Do( +// func() error { +// ... +// }, +// retry.Context(ctx), +// ) +func Context(ctx context.Context) Option { + return func(c *Config) { + c.context = ctx + } +} + +// WithTimer provides a way to swap out timer module implementations. +// This primarily is useful for mocking/testing, where you may not want to explicitly wait for a set duration +// for retries. +// +// example of augmenting time.After with a print statement +// +// type struct MyTimer {} +// +// func (t *MyTimer) After(d time.Duration) <- chan time.Time { +// fmt.Print("Timer called!") +// return time.After(d) +// } +// +// retry.Do( +// func() error { ... }, +// retry.WithTimer(&MyTimer{}) +// ) +func WithTimer(t Timer) Option { + return func(c *Config) { + c.timer = t + } +} + +// WrapContextErrorWithLastError allows the context error to be returned wrapped with the last error that the +// retried function returned. This is only applicable when Attempts is set to 0 to retry indefinitly and when +// using a context to cancel / timeout +// +// default is false +// +// ctx, cancel := context.WithCancel(context.Background()) +// defer cancel() +// +// retry.Do( +// func() error { +// ... +// }, +// retry.Context(ctx), +// retry.Attempts(0), +// retry.WrapContextErrorWithLastError(true), +// ) +func WrapContextErrorWithLastError(wrapContextErrorWithLastError bool) Option { + return func(c *Config) { + c.wrapContextErrorWithLastError = wrapContextErrorWithLastError + } +} diff --git a/vendor/github.com/avast/retry-go/v4/retry.go b/vendor/github.com/avast/retry-go/v4/retry.go new file mode 100644 index 000000000..56bc9dd15 --- /dev/null +++ b/vendor/github.com/avast/retry-go/v4/retry.go @@ -0,0 +1,347 @@ +/* +Simple library for retry mechanism + +slightly inspired by [Try::Tiny::Retry](https://metacpan.org/pod/Try::Tiny::Retry) + +# SYNOPSIS + +http get with retry: + + url := "http://example.com" + var body []byte + + err := retry.Do( + func() error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + return nil + }, + ) + if err != nil { + // handle error + } + + fmt.Println(string(body)) + +http get with retry with data: + + url := "http://example.com" + + body, err := retry.DoWithData( + func() ([]byte, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return body, nil + }, + ) + if err != nil { + // handle error + } + + fmt.Println(string(body)) + +[next examples](https://github.com/avast/retry-go/tree/master/examples) + +# SEE ALSO + +* [giantswarm/retry-go](https://github.com/giantswarm/retry-go) - slightly complicated interface. + +* [sethgrid/pester](https://github.com/sethgrid/pester) - only http retry for http calls with retries and backoff + +* [cenkalti/backoff](https://github.com/cenkalti/backoff) - Go port of the exponential backoff algorithm from Google's HTTP Client Library for Java. Really complicated interface. + +* [rafaeljesus/retry-go](https://github.com/rafaeljesus/retry-go) - looks good, slightly similar as this package, don't have 'simple' `Retry` method + +* [matryer/try](https://github.com/matryer/try) - very popular package, nonintuitive interface (for me) + +# BREAKING CHANGES + +* 4.0.0 + - infinity retry is possible by set `Attempts(0)` by PR [#49](https://github.com/avast/retry-go/pull/49) + +* 3.0.0 + - `DelayTypeFunc` accepts a new parameter `err` - this breaking change affects only your custom Delay Functions. This change allow [make delay functions based on error](examples/delay_based_on_error_test.go). + +* 1.0.2 -> 2.0.0 + - argument of `retry.Delay` is final delay (no multiplication by `retry.Units` anymore) + - function `retry.Units` are removed + - [more about this breaking change](https://github.com/avast/retry-go/issues/7) + +* 0.3.0 -> 1.0.0 + - `retry.Retry` function are changed to `retry.Do` function + - `retry.RetryCustom` (OnRetry) and `retry.RetryCustomWithOpts` functions are now implement via functions produces Options (aka `retry.OnRetry`) +*/ +package retry + +import ( + "context" + "errors" + "fmt" + "strings" + "time" +) + +// Function signature of retryable function +type RetryableFunc func() error + +// Function signature of retryable function with data +type RetryableFuncWithData[T any] func() (T, error) + +// Default timer is a wrapper around time.After +type timerImpl struct{} + +func (t *timerImpl) After(d time.Duration) <-chan time.Time { + return time.After(d) +} + +func Do(retryableFunc RetryableFunc, opts ...Option) error { + retryableFuncWithData := func() (any, error) { + return nil, retryableFunc() + } + + _, err := DoWithData(retryableFuncWithData, opts...) + return err +} + +func DoWithData[T any](retryableFunc RetryableFuncWithData[T], opts ...Option) (T, error) { + var n uint + var emptyT T + + // default + config := newDefaultRetryConfig() + + // apply opts + for _, opt := range opts { + opt(config) + } + + if err := config.context.Err(); err != nil { + return emptyT, err + } + + // Setting attempts to 0 means we'll retry until we succeed + var lastErr error + if config.attempts == 0 { + for { + t, err := retryableFunc() + if err == nil { + return t, nil + } + + if !IsRecoverable(err) { + return emptyT, err + } + + if !config.retryIf(err) { + return emptyT, err + } + + lastErr = err + + n++ + config.onRetry(n, err) + select { + case <-config.timer.After(delay(config, n, err)): + case <-config.context.Done(): + if config.wrapContextErrorWithLastError { + return emptyT, Error{config.context.Err(), lastErr} + } + return emptyT, config.context.Err() + } + } + } + + errorLog := Error{} + + attemptsForError := make(map[error]uint, len(config.attemptsForError)) + for err, attempts := range config.attemptsForError { + attemptsForError[err] = attempts + } + + shouldRetry := true + for shouldRetry { + t, err := retryableFunc() + if err == nil { + return t, nil + } + + errorLog = append(errorLog, unpackUnrecoverable(err)) + + if !config.retryIf(err) { + break + } + + config.onRetry(n, err) + + for errToCheck, attempts := range attemptsForError { + if errors.Is(err, errToCheck) { + attempts-- + attemptsForError[errToCheck] = attempts + shouldRetry = shouldRetry && attempts > 0 + } + } + + // if this is last attempt - don't wait + if n == config.attempts-1 { + break + } + + select { + case <-config.timer.After(delay(config, n, err)): + case <-config.context.Done(): + if config.lastErrorOnly { + return emptyT, config.context.Err() + } + + return emptyT, append(errorLog, config.context.Err()) + } + + n++ + shouldRetry = shouldRetry && n < config.attempts + } + + if config.lastErrorOnly { + return emptyT, errorLog.Unwrap() + } + return emptyT, errorLog +} + +func newDefaultRetryConfig() *Config { + return &Config{ + attempts: uint(10), + attemptsForError: make(map[error]uint), + delay: 100 * time.Millisecond, + maxJitter: 100 * time.Millisecond, + onRetry: func(n uint, err error) {}, + retryIf: IsRecoverable, + delayType: CombineDelay(BackOffDelay, RandomDelay), + lastErrorOnly: false, + context: context.Background(), + timer: &timerImpl{}, + } +} + +// Error type represents list of errors in retry +type Error []error + +// Error method return string representation of Error +// It is an implementation of error interface +func (e Error) Error() string { + logWithNumber := make([]string, len(e)) + for i, l := range e { + if l != nil { + logWithNumber[i] = fmt.Sprintf("#%d: %s", i+1, l.Error()) + } + } + + return fmt.Sprintf("All attempts fail:\n%s", strings.Join(logWithNumber, "\n")) +} + +func (e Error) Is(target error) bool { + for _, v := range e { + if errors.Is(v, target) { + return true + } + } + return false +} + +func (e Error) As(target interface{}) bool { + for _, v := range e { + if errors.As(v, target) { + return true + } + } + return false +} + +/* +Unwrap the last error for compatibility with `errors.Unwrap()`. +When you need to unwrap all errors, you should use `WrappedErrors()` instead. + + err := Do( + func() error { + return errors.New("original error") + }, + Attempts(1), + ) + + fmt.Println(errors.Unwrap(err)) # "original error" is printed + +Added in version 4.2.0. +*/ +func (e Error) Unwrap() error { + return e[len(e)-1] +} + +// WrappedErrors returns the list of errors that this Error is wrapping. +// It is an implementation of the `errwrap.Wrapper` interface +// in package [errwrap](https://github.com/hashicorp/errwrap) so that +// `retry.Error` can be used with that library. +func (e Error) WrappedErrors() []error { + return e +} + +type unrecoverableError struct { + error +} + +func (e unrecoverableError) Error() string { + if e.error == nil { + return "unrecoverable error" + } + return e.error.Error() +} + +func (e unrecoverableError) Unwrap() error { + return e.error +} + +// Unrecoverable wraps an error in `unrecoverableError` struct +func Unrecoverable(err error) error { + return unrecoverableError{err} +} + +// IsRecoverable checks if error is an instance of `unrecoverableError` +func IsRecoverable(err error) bool { + return !errors.Is(err, unrecoverableError{}) +} + +// Adds support for errors.Is usage on unrecoverableError +func (unrecoverableError) Is(err error) bool { + _, isUnrecoverable := err.(unrecoverableError) + return isUnrecoverable +} + +func unpackUnrecoverable(err error) error { + if unrecoverable, isUnrecoverable := err.(unrecoverableError); isUnrecoverable { + return unrecoverable.error + } + + return err +} + +func delay(config *Config, n uint, err error) time.Duration { + delayTime := config.delayType(n, err, config) + if config.maxDelay > 0 && delayTime > config.maxDelay { + delayTime = config.maxDelay + } + + return delayTime +} diff --git a/vendor/github.com/dustin/go-humanize/.travis.yml b/vendor/github.com/dustin/go-humanize/.travis.yml index ba95cdd15..ac12e485a 100644 --- a/vendor/github.com/dustin/go-humanize/.travis.yml +++ b/vendor/github.com/dustin/go-humanize/.travis.yml @@ -1,12 +1,12 @@ sudo: false language: go +go_import_path: github.com/dustin/go-humanize go: - - 1.3.x - - 1.5.x - - 1.6.x - - 1.7.x - - 1.8.x - - 1.9.x + - 1.13.x + - 1.14.x + - 1.15.x + - 1.16.x + - stable - master matrix: allow_failures: @@ -15,7 +15,7 @@ matrix: install: - # Do nothing. This is needed to prevent default install action "go get -t -v ./..." from happening here (we want it to happen inside script step). script: - - go get -t -v ./... - diff -u <(echo -n) <(gofmt -d -s .) - - go tool vet . + - go vet . + - go install -v -race ./... - go test -v -race ./... diff --git a/vendor/github.com/dustin/go-humanize/README.markdown b/vendor/github.com/dustin/go-humanize/README.markdown index 91b4ae564..7d0b16b34 100644 --- a/vendor/github.com/dustin/go-humanize/README.markdown +++ b/vendor/github.com/dustin/go-humanize/README.markdown @@ -5,7 +5,7 @@ Just a few functions for helping humanize times and sizes. `go get` it as `github.com/dustin/go-humanize`, import it as `"github.com/dustin/go-humanize"`, use it as `humanize`. -See [godoc](https://godoc.org/github.com/dustin/go-humanize) for +See [godoc](https://pkg.go.dev/github.com/dustin/go-humanize) for complete documentation. ## Sizes diff --git a/vendor/github.com/dustin/go-humanize/bigbytes.go b/vendor/github.com/dustin/go-humanize/bigbytes.go index 1a2bf6172..3b015fd59 100644 --- a/vendor/github.com/dustin/go-humanize/bigbytes.go +++ b/vendor/github.com/dustin/go-humanize/bigbytes.go @@ -28,6 +28,10 @@ var ( BigZiByte = (&big.Int{}).Mul(BigEiByte, bigIECExp) // BigYiByte is 1,024 z bytes in bit.Ints BigYiByte = (&big.Int{}).Mul(BigZiByte, bigIECExp) + // BigRiByte is 1,024 y bytes in bit.Ints + BigRiByte = (&big.Int{}).Mul(BigYiByte, bigIECExp) + // BigQiByte is 1,024 r bytes in bit.Ints + BigQiByte = (&big.Int{}).Mul(BigRiByte, bigIECExp) ) var ( @@ -51,6 +55,10 @@ var ( BigZByte = (&big.Int{}).Mul(BigEByte, bigSIExp) // BigYByte is 1,000 SI z bytes in big.Ints BigYByte = (&big.Int{}).Mul(BigZByte, bigSIExp) + // BigRByte is 1,000 SI y bytes in big.Ints + BigRByte = (&big.Int{}).Mul(BigYByte, bigSIExp) + // BigQByte is 1,000 SI r bytes in big.Ints + BigQByte = (&big.Int{}).Mul(BigRByte, bigSIExp) ) var bigBytesSizeTable = map[string]*big.Int{ @@ -71,6 +79,10 @@ var bigBytesSizeTable = map[string]*big.Int{ "zb": BigZByte, "yib": BigYiByte, "yb": BigYByte, + "rib": BigRiByte, + "rb": BigRByte, + "qib": BigQiByte, + "qb": BigQByte, // Without suffix "": BigByte, "ki": BigKiByte, @@ -89,6 +101,10 @@ var bigBytesSizeTable = map[string]*big.Int{ "zi": BigZiByte, "y": BigYByte, "yi": BigYiByte, + "r": BigRByte, + "ri": BigRiByte, + "q": BigQByte, + "qi": BigQiByte, } var ten = big.NewInt(10) @@ -115,7 +131,7 @@ func humanateBigBytes(s, base *big.Int, sizes []string) string { // // BigBytes(82854982) -> 83 MB func BigBytes(s *big.Int) string { - sizes := []string{"B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"} + sizes := []string{"B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB", "RB", "QB"} return humanateBigBytes(s, bigSIExp, sizes) } @@ -125,7 +141,7 @@ func BigBytes(s *big.Int) string { // // BigIBytes(82854982) -> 79 MiB func BigIBytes(s *big.Int) string { - sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"} + sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB", "RiB", "QiB"} return humanateBigBytes(s, bigIECExp, sizes) } diff --git a/vendor/github.com/dustin/go-humanize/commaf.go b/vendor/github.com/dustin/go-humanize/commaf.go index 620690dec..2bc83a03c 100644 --- a/vendor/github.com/dustin/go-humanize/commaf.go +++ b/vendor/github.com/dustin/go-humanize/commaf.go @@ -1,3 +1,4 @@ +//go:build go1.6 // +build go1.6 package humanize diff --git a/vendor/github.com/dustin/go-humanize/ftoa.go b/vendor/github.com/dustin/go-humanize/ftoa.go index 1c62b640d..bce923f37 100644 --- a/vendor/github.com/dustin/go-humanize/ftoa.go +++ b/vendor/github.com/dustin/go-humanize/ftoa.go @@ -6,6 +6,9 @@ import ( ) func stripTrailingZeros(s string) string { + if !strings.ContainsRune(s, '.') { + return s + } offset := len(s) - 1 for offset > 0 { if s[offset] == '.' { diff --git a/vendor/github.com/dustin/go-humanize/number.go b/vendor/github.com/dustin/go-humanize/number.go index dec618659..6470d0d47 100644 --- a/vendor/github.com/dustin/go-humanize/number.go +++ b/vendor/github.com/dustin/go-humanize/number.go @@ -73,7 +73,7 @@ func FormatFloat(format string, n float64) string { if n > math.MaxFloat64 { return "Infinity" } - if n < -math.MaxFloat64 { + if n < (0.0 - math.MaxFloat64) { return "-Infinity" } diff --git a/vendor/github.com/dustin/go-humanize/si.go b/vendor/github.com/dustin/go-humanize/si.go index ae659e0e4..8b8501984 100644 --- a/vendor/github.com/dustin/go-humanize/si.go +++ b/vendor/github.com/dustin/go-humanize/si.go @@ -8,6 +8,8 @@ import ( ) var siPrefixTable = map[float64]string{ + -30: "q", // quecto + -27: "r", // ronto -24: "y", // yocto -21: "z", // zepto -18: "a", // atto @@ -25,6 +27,8 @@ var siPrefixTable = map[float64]string{ 18: "E", // exa 21: "Z", // zetta 24: "Y", // yotta + 27: "R", // ronna + 30: "Q", // quetta } var revSIPrefixTable = revfmap(siPrefixTable) diff --git a/vendor/github.com/waku-org/go-waku/waku/persistence/store.go b/vendor/github.com/waku-org/go-waku/waku/persistence/store.go index 8992b2bd3..574d1877d 100644 --- a/vendor/github.com/waku-org/go-waku/waku/persistence/store.go +++ b/vendor/github.com/waku-org/go-waku/waku/persistence/store.go @@ -49,7 +49,7 @@ type DBStore struct { MessageProvider db *sql.DB - migrationFn func(db *sql.DB) error + migrationFn func(db *sql.DB, logger *zap.Logger) error metrics Metrics timesource timesource.Timesource @@ -121,7 +121,7 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption { } } -type MigrationFn func(db *sql.DB) error +type MigrationFn func(db *sql.DB, logger *zap.Logger) error // WithMigrations is a DBOption used to determine if migrations should // be executed, and what driver to use @@ -157,7 +157,7 @@ func NewDBStore(reg prometheus.Registerer, log *zap.Logger, options ...DBOption) } if result.enableMigrations { - err := result.migrationFn(result.db) + err := result.migrationFn(result.db, log) if err != nil { return nil, err } @@ -211,7 +211,7 @@ func (d *DBStore) cleanOlderRecords(ctx context.Context) error { // Delete older messages if d.maxDuration > 0 { start := time.Now() - sqlStmt := `DELETE FROM message WHERE receiverTimestamp < $1` + sqlStmt := `DELETE FROM message WHERE storedAt < $1` _, err := d.db.Exec(sqlStmt, d.timesource.Now().Add(-d.maxDuration).UnixNano()) if err != nil { d.metrics.RecordError(retPolicyFailure) @@ -240,7 +240,7 @@ func (d *DBStore) cleanOlderRecords(ctx context.Context) error { } func (d *DBStore) getDeleteOldRowsQuery() string { - sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC %s OFFSET $1)` + sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY storedAt DESC %s OFFSET $1)` switch GetDriverType(d.db) { case SQLiteDriver: sqlStmt = fmt.Sprintf(sqlStmt, "LIMIT -1") @@ -282,7 +282,12 @@ func (d *DBStore) Stop() { // Validate validates the message to be stored against possible fradulent conditions. func (d *DBStore) Validate(env *protocol.Envelope) error { - n := time.Unix(0, env.Index().ReceiverTime) + timestamp := env.Message().GetTimestamp() + if timestamp == 0 { + return nil + } + + n := time.Unix(0, timestamp) upperBound := n.Add(MaxTimeVariance) lowerBound := n.Add(-MaxTimeVariance) @@ -300,17 +305,20 @@ func (d *DBStore) Validate(env *protocol.Envelope) error { // Put inserts a WakuMessage into the DB func (d *DBStore) Put(env *protocol.Envelope) error { - stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7)") + + stmt, err := d.db.Prepare("INSERT INTO message (id, messageHash, storedAt, timestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)") if err != nil { d.metrics.RecordError(insertFailure) return err } - cursor := env.Index() - dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(cursor.ReceiverTime), env.PubsubTopic(), env.Index().Digest) + storedAt := env.Message().GetTimestamp() + if storedAt == 0 { + storedAt = env.Index().ReceiverTime + } start := time.Now() - _, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().GetTimestamp(), env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().GetVersion()) + _, err = stmt.Exec(env.Index().Digest, env.Hash(), storedAt, env.Message().GetTimestamp(), env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().GetVersion()) if err != nil { return err } @@ -329,36 +337,33 @@ func (d *DBStore) handleQueryCursor(query *pb.HistoryQuery, paramCnt *int, condi usesCursor := false if query.PagingInfo.Cursor != nil { usesCursor = true + var exists bool - cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest) - - err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = $1)", - cursorDBKey.Bytes(), + err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE storedAt = $1 AND id = $2)", + query.PagingInfo.Cursor.ReceiverTime, query.PagingInfo.Cursor.Digest, ).Scan(&exists) - if err != nil { return nil, nil, err } - if exists { - eqOp := ">" - if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { - eqOp = "<" - } - *paramCnt++ - conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, *paramCnt)) - - parameters = append(parameters, cursorDBKey.Bytes()) - } else { + if !exists { return nil, nil, ErrInvalidCursor } + + eqOp := ">" + if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD { + eqOp = "<" + } + conditions = append(conditions, fmt.Sprintf("(storedAt, id) %s ($%d, $%d)", eqOp, *paramCnt+1, *paramCnt+2)) + *paramCnt += 2 + + parameters = append(parameters, query.PagingInfo.Cursor.ReceiverTime, query.PagingInfo.Cursor.Digest) } handleTimeParam := func(time int64, op string) { *paramCnt++ - conditions = append(conditions, fmt.Sprintf("id %s $%d", op, *paramCnt)) - timeDBKey := NewDBKey(uint64(time), 0, "", []byte{}) - parameters = append(parameters, timeDBKey.Bytes()) + conditions = append(conditions, fmt.Sprintf("storedAt %s $%d", op, *paramCnt)) + parameters = append(parameters, time) } startTime := query.GetStartTime() @@ -378,10 +383,10 @@ func (d *DBStore) handleQueryCursor(query *pb.HistoryQuery, paramCnt *int, condi } func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{}, error) { - sqlQuery := `SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version + sqlQuery := `SELECT id, storedAt, timestamp, contentTopic, pubsubTopic, payload, version FROM message %s - ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s ` + ORDER BY timestamp %s, id %s, pubsubTopic %s, storedAt %s ` var conditions []string //var parameters []interface{} @@ -428,7 +433,7 @@ func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{} parameters = append(parameters, pageSize) sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection) - d.log.Info(fmt.Sprintf("sqlQuery: %s", sqlQuery)) + d.log.Debug(fmt.Sprintf("sqlQuery: %s", sqlQuery)) return sqlQuery, parameters, nil @@ -490,12 +495,12 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err return cursor, result, nil } -// MostRecentTimestamp returns an unix timestamp with the most recent senderTimestamp +// MostRecentTimestamp returns an unix timestamp with the most recent timestamp // in the message table func (d *DBStore) MostRecentTimestamp() (int64, error) { result := sql.NullInt64{} - err := d.db.QueryRow(`SELECT max(senderTimestamp) FROM message`).Scan(&result) + err := d.db.QueryRow(`SELECT max(timestamp) FROM message`).Scan(&result) if err != nil && err != sql.ErrNoRows { return 0, err } @@ -520,7 +525,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { d.log.Info("loading records from the DB", zap.Duration("duration", elapsed)) }() - rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC") + rows, err := d.db.Query("SELECT id, storedAt, timestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY timestamp ASC") if err != nil { return nil, err } @@ -550,14 +555,14 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { // GetStoredMessage is a helper function used to convert a `*sql.Rows` into a `StoredMessage` func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) { var id []byte - var receiverTimestamp int64 - var senderTimestamp int64 + var storedAt int64 + var timestamp int64 var contentTopic string var payload []byte var version uint32 var pubsubTopic string - err := row.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version) + err := row.Scan(&id, &storedAt, ×tamp, &contentTopic, &pubsubTopic, &payload, &version) if err != nil { d.log.Error("scanning messages from db", zap.Error(err)) return StoredMessage{}, err @@ -567,8 +572,8 @@ func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) { msg.ContentTopic = contentTopic msg.Payload = payload - if senderTimestamp != 0 { - msg.Timestamp = proto.Int64(senderTimestamp) + if timestamp != 0 { + msg.Timestamp = proto.Int64(timestamp) } if version > 0 { @@ -578,7 +583,7 @@ func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) { record := StoredMessage{ ID: id, PubsubTopic: pubsubTopic, - ReceiverTime: receiverTimestamp, + ReceiverTime: storedAt, Message: msg, } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go index cf014adc2..5fe8d5c4e 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go @@ -272,16 +272,14 @@ func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool { if node == nil { return false } - d.log.Debug("found a peer", logging.ENode("enr", node)) // node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage if !isWakuNode(node) { d.log.Debug("peer is not waku node", logging.ENode("enr", node)) return false } - d.log.Debug("peer is a waku node", logging.ENode("enr", node)) - _, err := wenr.EnodeToPeerInfo(node) + _, err := wenr.EnodeToPeerInfo(node) if err != nil { d.metrics.RecordError(peerInfoFailure) d.log.Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err)) @@ -411,20 +409,17 @@ func (d *DiscoveryV5) DefaultPredicate() Predicate { } if nodeRS == nil { - d.log.Debug("node has no shards registered", logging.ENode("node", n)) // Node has no shards registered. return false } if nodeRS.ClusterID != localRS.ClusterID { - d.log.Debug("cluster id mismatch from local clusterid", logging.ENode("node", n), zap.Error(err)) return false } // Contains any for _, idx := range localRS.ShardIDs { if nodeRS.Contains(localRS.ClusterID, idx) { - d.log.Debug("shards match for discovered node", logging.ENode("node", n)) return true } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go index e1d5321ad..7aa509609 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go @@ -82,6 +82,7 @@ func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) { } c.metrics.RecordPeerConnected() + c.metrics.SetPeerStoreSize(c.h.Peerstore().Peers().Len()) } // Disconnected is called when a connection closed @@ -96,6 +97,7 @@ func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) { c.log.Warn("subscriber is too slow") } } + c.metrics.SetPeerStoreSize(c.h.Peerstore().Peers().Len()) } // OpenedStream is called when a stream opened diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/metrics.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/metrics.go index cad8c27c6..7bcf51223 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/metrics.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/metrics.go @@ -27,10 +27,17 @@ var connectedPeers = prometheus.NewGauge( Help: "Number of connected peers", }) +var peerStoreSize = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "waku_peer_store_size", + Help: "Size of Peer Store", + }) + var collectors = []prometheus.Collector{ gitVersion, peerDials, connectedPeers, + peerStoreSize, } // Metrics exposes the functions required to update prometheus metrics for the waku node @@ -39,6 +46,7 @@ type Metrics interface { RecordDial() RecordPeerConnected() RecordPeerDisconnected() + SetPeerStoreSize(int) } type metricsImpl struct { @@ -72,3 +80,7 @@ func (m *metricsImpl) RecordPeerConnected() { func (m *metricsImpl) RecordPeerDisconnected() { connectedPeers.Dec() } + +func (m *metricsImpl) SetPeerStoreSize(size int) { + peerStoreSize.Set(float64(size)) +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 8321d64c4..578d4cdd9 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -280,8 +280,9 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log) - - w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log, w.opts.pubsubOpts...) + w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log, + relay.WithPubSubOptions(w.opts.pubsubOpts), + relay.WithMaxMsgSize(w.opts.maxMsgSizeBytes)) if w.opts.enableRelay { err = w.setupRLNRelay() @@ -528,6 +529,7 @@ func (w *WakuNode) Stop() { w.store.Stop() w.legacyFilter.Stop() w.filterFullNode.Stop() + w.filterLightNode.Stop() if w.opts.enableDiscV5 { w.discoveryV5.Stop() @@ -976,3 +978,7 @@ func GetDiscv5Option(dnsDiscoveredNodes []dnsdisc.DiscoveredNode, discv5Nodes [] return WithDiscoveryV5(port, bootnodes, autoUpdate), nil } + +func (w *WakuNode) ClusterID() uint16 { + return w.opts.clusterID +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index 39f04fe30..dd6d9958e 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -80,6 +80,7 @@ type WakuNodeParameters struct { pubsubOpts []pubsub.Option minRelayPeersToPublish int + maxMsgSizeBytes int enableStore bool messageProvider store.MessageProvider @@ -358,6 +359,13 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option) } } +func WithMaxMsgSize(maxMsgSizeBytes int) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.maxMsgSizeBytes = maxMsgSizeBytes + return nil + } +} + func WithMaxPeerConnections(maxPeers int) WakuNodeOption { return func(params *WakuNodeParameters) error { params.maxPeerConnections = maxPeers diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index 4eddd0905..85806d05f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -351,10 +351,18 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { if err == nil { enr, err := pm.host.Peerstore().(wps.WakuPeerstore).ENR(p.AddrInfo.ID) // Verifying if the enr record is more recent (DiscV5 and peer exchange can return peers already seen) - if err == nil && enr.Record().Seq() > p.ENR.Seq() { - pm.logger.Debug("found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID)) + if err == nil && enr.Record().Seq() >= p.ENR.Seq() { return } + if err != nil { + //Peer is already in peer-store but it doesn't have an enr, but discovered peer has ENR + pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding", + logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq())) + } else { + //Peer is already in peer-store but stored ENR is older than discovered one. + pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored", + logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()), zap.Uint64("storedENRSeq", enr.Record().Seq())) + } } supportedProtos := []protocol.ID{} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 38449ae10..b6fc3092f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -112,19 +112,23 @@ func (wf *WakuFilterLightNode) Stop() { wf.CommonService.Stop(func() { wf.h.RemoveStreamHandler(FilterPushID_v20beta1) if wf.subscriptions.Count() > 0 { - res, err := wf.unsubscribeAll(wf.Context()) - if err != nil { - wf.log.Warn("unsubscribing from full nodes", zap.Error(err)) - } - - for _, r := range res.Errors() { - if r.Err != nil { - wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID)) + go func() { + defer func() { + _ = recover() + }() + res, err := wf.unsubscribeAll(wf.Context()) + if err != nil { + wf.log.Warn("unsubscribing from full nodes", zap.Error(err)) } - } - // - wf.subscriptions.Clear() + for _, r := range res.Errors() { + if r.Err != nil { + wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID)) + } + + } + wf.subscriptions.Clear() + }() } }) } @@ -132,7 +136,9 @@ func (wf *WakuFilterLightNode) Stop() { func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) { return func(stream network.Stream) { peerID := stream.Conn().RemotePeer() - logger := wf.log.With(logging.HostID("peer", peerID)) + + logger := wf.log.With(logging.HostID("peerID", peerID)) + if !wf.subscriptions.IsSubscribedTo(peerID) { logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) wf.metrics.RecordError(unknownPeerMessagePush) @@ -177,10 +183,11 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea } else { pubSubTopic = *messagePush.PubsubTopic } + + logger = messagePush.WakuMessage.Logger(logger, pubSubTopic) + if !wf.subscriptions.Has(peerID, protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) { - logger.Warn("received messagepush with invalid subscription parameters", - zap.String("topic", pubSubTopic), - zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) + logger.Warn("received messagepush with invalid subscription parameters") wf.metrics.RecordError(invalidSubscriptionMessage) return } @@ -218,6 +225,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr return err } + logger := wf.log.With(logging.HostID("peerID", params.selectedPeer)) + stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) @@ -227,13 +236,13 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr writer := pbio.NewDelimitedWriter(stream) reader := pbio.NewDelimitedReader(stream, math.MaxInt32) - wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request)) + logger.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request)) err = writer.WriteMsg(request) if err != nil { wf.metrics.RecordError(writeRequestFailure) - wf.log.Error("sending FilterSubscribeRequest", zap.Error(err)) + logger.Error("sending FilterSubscribeRequest", zap.Error(err)) if err := stream.Reset(); err != nil { - wf.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return err } @@ -241,10 +250,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr filterSubscribeResponse := &pb.FilterSubscribeResponse{} err = reader.ReadMsg(filterSubscribeResponse) if err != nil { - wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err)) + logger.Error("receiving FilterSubscribeResponse", zap.Error(err)) wf.metrics.RecordError(decodeRPCFailure) if err := stream.Reset(); err != nil { - wf.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return err } @@ -253,6 +262,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr if err = filterSubscribeResponse.Validate(); err != nil { wf.metrics.RecordError(decodeRPCFailure) + logger.Error("validating response", zap.Error(err)) return err } @@ -617,6 +627,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte if params.wg != nil { params.wg.Done() } + _ = recover() }() paramsCopy := params.Copy() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/common.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/common.go index 398667680..bbccde566 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/common.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/common.go @@ -22,6 +22,18 @@ func NewFilterError(code int, message string) FilterError { } } +const errorStringFmt = "%d - %s" + func (e *FilterError) Error() string { - return fmt.Sprintf("%d - %s", e.Code, e.Message) + return fmt.Sprintf(errorStringFmt, e.Code, e.Message) +} + +func ExtractCodeFromFilterError(fErr string) int { + code := 0 + var message string + _, err := fmt.Sscanf(fErr, errorStringFmt, &code, &message) + if err != nil { + return -1 + } + return code } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 5cdcc6c34..322b4d9c3 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -53,6 +53,10 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p wakuLP.pm = pm wakuLP.metrics = newMetrics(reg) + if pm != nil { + wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField) + } + return wakuLP } @@ -73,9 +77,6 @@ func (wakuLP *WakuLightPush) Start(ctx context.Context) error { wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest(ctx)) wakuLP.log.Info("Light Push protocol started") - if wakuLP.pm != nil { - wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField) - } return nil } @@ -299,8 +300,13 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa req.Message = message req.PubsubTopic = params.pubsubTopic + logger := message.Logger(wakuLP.log, params.pubsubTopic).With(logging.HostID("peerID", params.selectedPeer)) + + logger.Debug("publishing message") + response, err := wakuLP.request(ctx, req, params) if err != nil { + logger.Error("could not publish message", zap.Error(err)) return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/utils.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/utils.go index ba73b92b1..70de2e0d5 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/utils.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/utils.go @@ -1,10 +1,34 @@ package pb import ( + "encoding/binary" + + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/waku-org/go-waku/waku/v2/hash" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // Hash calculates the hash of a waku message func (msg *WakuMessage) Hash(pubsubTopic string) []byte { - return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta) + return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta, toBytes(msg.GetTimestamp())) +} + +func toBytes(i int64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + return b +} + +func (msg *WakuMessage) LogFields(pubsubTopic string) []zapcore.Field { + return []zapcore.Field{ + zap.String("hash", hexutil.Encode(msg.Hash(pubsubTopic))), + zap.String("pubsubTopic", pubsubTopic), + zap.String("contentTopic", msg.ContentTopic), + zap.Int64("timestamp", msg.GetTimestamp()), + } +} + +func (msg *WakuMessage) Logger(logger *zap.Logger, pubsubTopic string) *zap.Logger { + return logger.With(msg.LogFields(pubsubTopic)...) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/enr_cache.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/enr_cache.go index 06538d2b1..90fb096a4 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/enr_cache.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/enr_cache.go @@ -3,69 +3,45 @@ package peer_exchange import ( "bufio" "bytes" - "math/rand" - "sync" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/hashicorp/golang-lru/simplelru" + "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" - "go.uber.org/zap" ) // simpleLRU internal uses container/list, which is ring buffer(double linked list) type enrCache struct { - // using lru, saves us from periodically cleaning the cache to maintain a certain size - data *simplelru.LRU - rng *rand.Rand - mu sync.RWMutex - log *zap.Logger + // using lru, saves us from periodically cleaning the cache to mauintain a certain size + data *shardLRU } // err on negative size -func newEnrCache(size int, log *zap.Logger) (*enrCache, error) { - inner, err := simplelru.NewLRU(size, nil) +func newEnrCache(size int) *enrCache { + inner := newShardLRU(int(size)) return &enrCache{ data: inner, - rng: rand.New(rand.NewSource(rand.Int63())), - log: log.Named("enr-cache"), - }, err + } } // updating cache -func (c *enrCache) updateCache(node *enode.Node) { - c.mu.Lock() - defer c.mu.Unlock() - currNode, ok := c.data.Get(node.ID()) - if !ok || node.Seq() > currNode.(*enode.Node).Seq() { - c.data.Add(node.ID(), node) - c.log.Debug("discovered px peer via discv5", zap.Stringer("enr", node)) +func (c *enrCache) updateCache(node *enode.Node) error { + currNode := c.data.Get(node.ID()) + if currNode == nil || node.Seq() > currNode.Seq() { + return c.data.Add(node) } + return nil } // get `numPeers` records of enr -func (c *enrCache) getENRs(neededPeers int) ([]*pb.PeerInfo, error) { - c.mu.RLock() - defer c.mu.RUnlock() +func (c *enrCache) getENRs(neededPeers int, clusterIndex *ShardInfo) ([]*pb.PeerInfo, error) { // - availablePeers := c.data.Len() - if availablePeers == 0 { - return nil, nil - } - if availablePeers < neededPeers { - neededPeers = availablePeers - } - - perm := c.rng.Perm(availablePeers)[0:neededPeers] - keys := c.data.Keys() + nodes := c.data.GetRandomNodes(clusterIndex, neededPeers) result := []*pb.PeerInfo{} - for _, ind := range perm { - node, ok := c.data.Get(keys[ind]) - if !ok { - continue - } + for _, node := range nodes { + // var b bytes.Buffer writer := bufio.NewWriter(&b) - err := node.(*enode.Node).Record().EncodeRLP(writer) + err := node.Record().EncodeRLP(writer) if err != nil { return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go index e8af91c51..374cde3aa 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go @@ -57,17 +57,11 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, wakuPX.disc = disc wakuPX.metrics = newMetrics(reg) wakuPX.log = log.Named("wakupx") + wakuPX.enrCache = newEnrCache(MaxCacheSize) wakuPX.peerConnector = peerConnector wakuPX.pm = pm wakuPX.CommonService = service.NewCommonService() - newEnrCache, err := newEnrCache(MaxCacheSize, wakuPX.log) - if err != nil { - return nil, err - } - - wakuPX.enrCache = newEnrCache - return wakuPX, nil } @@ -108,7 +102,7 @@ func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) { if requestRPC.Query != nil { logger.Info("request received") - records, err := wakuPX.enrCache.getENRs(int(requestRPC.Query.NumPeers)) + records, err := wakuPX.enrCache.getENRs(int(requestRPC.Query.NumPeers), nil) if err != nil { logger.Error("obtaining enrs from cache", zap.Error(err)) wakuPX.metrics.RecordError(pxFailure) @@ -161,7 +155,11 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error { continue } - wakuPX.enrCache.updateCache(iterator.Node()) + err = wakuPX.enrCache.updateCache(iterator.Node()) + if err != nil { + wakuPX.log.Error("adding peer to cache", zap.Error(err)) + continue + } select { case <-ctx.Done(): diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/shard_lru.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/shard_lru.go new file mode 100644 index 000000000..fade60a9b --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/shard_lru.go @@ -0,0 +1,178 @@ +package peer_exchange + +import ( + "container/list" + "fmt" + "math/rand" + "sync" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/waku-org/go-waku/waku/v2/protocol" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +type ShardInfo struct { + clusterID uint16 + shard uint16 +} +type shardLRU struct { + size int // number of nodes allowed per shard + idToNode map[enode.ID][]*list.Element + shardNodes map[ShardInfo]*list.List + rng *rand.Rand + mu sync.RWMutex +} + +func newShardLRU(size int) *shardLRU { + return &shardLRU{ + idToNode: map[enode.ID][]*list.Element{}, + shardNodes: map[ShardInfo]*list.List{}, + size: size, + rng: rand.New(rand.NewSource(rand.Int63())), + } +} + +type nodeWithShardInfo struct { + key ShardInfo + node *enode.Node +} + +// time complexity: O(number of previous indexes present for node.ID) +func (l *shardLRU) remove(node *enode.Node) { + elements := l.idToNode[node.ID()] + for _, element := range elements { + key := element.Value.(nodeWithShardInfo).key + l.shardNodes[key].Remove(element) + } + delete(l.idToNode, node.ID()) +} + +// if a node is removed for a list, remove it from idToNode too +func (l *shardLRU) removeFromIdToNode(ele *list.Element) { + nodeID := ele.Value.(nodeWithShardInfo).node.ID() + for ind, entries := range l.idToNode[nodeID] { + if entries == ele { + l.idToNode[nodeID] = append(l.idToNode[nodeID][:ind], l.idToNode[nodeID][ind+1:]...) + break + } + } + if len(l.idToNode[nodeID]) == 0 { + delete(l.idToNode, nodeID) + } +} + +func nodeToRelayShard(node *enode.Node) (*protocol.RelayShards, error) { + shard, err := wenr.RelaySharding(node.Record()) + if err != nil { + return nil, err + } + + if shard == nil { // if no shard info, then add to node to Cluster 0, Index 0 + shard = &protocol.RelayShards{ + ClusterID: 0, + ShardIDs: []uint16{0}, + } + } + + return shard, nil +} + +// time complexity: O(new number of indexes in node's shard) +func (l *shardLRU) add(node *enode.Node) error { + shard, err := nodeToRelayShard(node) + if err != nil { + return err + } + + elements := []*list.Element{} + for _, index := range shard.ShardIDs { + key := ShardInfo{ + shard.ClusterID, + index, + } + if l.shardNodes[key] == nil { + l.shardNodes[key] = list.New() + } + if l.shardNodes[key].Len() >= l.size { + oldest := l.shardNodes[key].Back() + l.removeFromIdToNode(oldest) + l.shardNodes[key].Remove(oldest) + } + entry := l.shardNodes[key].PushFront(nodeWithShardInfo{ + key: key, + node: node, + }) + elements = append(elements, entry) + + } + l.idToNode[node.ID()] = elements + + return nil +} + +// this will be called when the seq number of node is more than the one in cache +func (l *shardLRU) Add(node *enode.Node) error { + l.mu.Lock() + defer l.mu.Unlock() + // removing bcz previous node might be subscribed to different shards, we need to remove node from those shards + l.remove(node) + return l.add(node) +} + +// clusterIndex is nil when peers for no specific shard are requested +func (l *shardLRU) GetRandomNodes(clusterIndex *ShardInfo, neededPeers int) (nodes []*enode.Node) { + l.mu.Lock() + defer l.mu.Unlock() + + availablePeers := l.len(clusterIndex) + if availablePeers < neededPeers { + neededPeers = availablePeers + } + // if clusterIndex is nil, then return all nodes + var elements []*list.Element + if clusterIndex == nil { + elements = make([]*list.Element, 0, len(l.idToNode)) + for _, entries := range l.idToNode { + elements = append(elements, entries[0]) + } + } else if entries := l.shardNodes[*clusterIndex]; entries != nil && entries.Len() != 0 { + elements = make([]*list.Element, 0, entries.Len()) + for ent := entries.Back(); ent != nil; ent = ent.Prev() { + elements = append(elements, ent) + } + } + utils.Logger().Info(fmt.Sprintf("%d", len(elements))) + indexes := l.rng.Perm(len(elements))[0:neededPeers] + for _, ind := range indexes { + node := elements[ind].Value.(nodeWithShardInfo).node + nodes = append(nodes, node) + // this removes the node from all list (all cluster/shard pair that the node has) and adds it to the front + l.remove(node) + _ = l.add(node) + } + return nodes +} + +// if clusterIndex is not nil, return len of nodes maintained for a given shard +// if clusterIndex is nil, return count of all nodes maintained +func (l *shardLRU) len(clusterIndex *ShardInfo) int { + if clusterIndex == nil { + return len(l.idToNode) + } + if entries := l.shardNodes[*clusterIndex]; entries != nil { + return entries.Len() + } + return 0 +} + +// get the node with the given id, if it is present in cache +func (l *shardLRU) Get(id enode.ID) *enode.Node { + l.mu.RLock() + defer l.mu.RUnlock() + + if elements, ok := l.idToNode[id]; ok && len(elements) > 0 { + return elements[0].Value.(nodeWithShardInfo).node + } + return nil +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/metrics.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/metrics.go index 35cddbc79..b642c4970 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/metrics.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/metrics.go @@ -22,14 +22,22 @@ var messageSize = prometheus.NewHistogram(prometheus.HistogramOpts{ Buckets: []float64{0.0, 5.0, 15.0, 50.0, 100.0, 300.0, 700.0, 1000.0}, }) +var pubsubTopics = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "waku_pubsub_topics", + Help: "Number of PubSub Topics node is subscribed to", + }) + var collectors = []prometheus.Collector{ messages, messageSize, + pubsubTopics, } // Metrics exposes the functions required to update prometheus metrics for relay protocol type Metrics interface { RecordMessage(envelope *waku_proto.Envelope) + SetPubSubTopics(int) } type metricsImpl struct { @@ -56,3 +64,7 @@ func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) { m.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexBytes("hash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes)) }() } + +func (m *metricsImpl) SetPubSubTopics(size int) { + pubsubTopics.Set(float64(size)) +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/options.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/options.go index 253a9fded..7d21f9273 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/options.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/options.go @@ -1,5 +1,7 @@ package relay +import pubsub "github.com/libp2p/go-libp2p-pubsub" + type publishParameters struct { pubsubTopic string } @@ -20,3 +22,31 @@ func WithDefaultPubsubTopic() PublishOption { params.pubsubTopic = DefaultWakuTopic } } + +type relayParameters struct { + pubsubOpts []pubsub.Option + maxMsgSizeBytes int +} + +type RelayOption func(*relayParameters) + +func WithPubSubOptions(opts []pubsub.Option) RelayOption { + return func(params *relayParameters) { + params.pubsubOpts = append(params.pubsubOpts, opts...) + } +} + +func WithMaxMsgSize(maxMsgSizeBytes int) RelayOption { + return func(params *relayParameters) { + if maxMsgSizeBytes == 0 { + maxMsgSizeBytes = defaultMaxMsgSizeBytes + } + params.maxMsgSizeBytes = maxMsgSizeBytes + } +} + +func defaultOptions() []RelayOption { + return []RelayOption{ + WithMaxMsgSize(defaultMaxMsgSizeBytes), + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go index a1d279df3..060f161c5 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -27,13 +27,15 @@ import ( const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") const WakuRelayENRField = uint8(1 << 0) +const defaultMaxMsgSizeBytes = 150 * 1024 + // DefaultWakuTopic is the default pubsub topic used across all Waku protocols var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic{}.String() // WakuRelay is the implementation of the Waku Relay protocol type WakuRelay struct { host host.Host - opts []pubsub.Option + relayParams *relayParameters pubsub *pubsub.PubSub params pubsub.GossipSubParams peerScoreParams *pubsub.PeerScoreParams @@ -41,9 +43,8 @@ type WakuRelay struct { topicParams *pubsub.TopicScoreParams timesource timesource.Timesource metrics Metrics - - log *zap.Logger - logMessages *zap.Logger + log *zap.Logger + logMessages *zap.Logger bcaster Broadcaster @@ -75,7 +76,7 @@ type pubsubTopicSubscriptionDetails struct { // NewWakuRelay returns a new instance of a WakuRelay struct func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource, - reg prometheus.Registerer, log *zap.Logger, opts ...pubsub.Option) *WakuRelay { + reg prometheus.Registerer, log *zap.Logger, opts ...RelayOption) *WakuRelay { w := new(WakuRelay) w.timesource = timesource w.topics = make(map[string]*pubsubTopicSubscriptionDetails) @@ -87,9 +88,16 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.logMessages = utils.MessagesLogger("relay") w.events = eventbus.NewBus() w.metrics = newMetrics(reg, w.logMessages) + w.relayParams = new(relayParameters) + w.relayParams.pubsubOpts = w.defaultPubsubOptions() - // default options required by WakuRelay - w.opts = append(w.defaultPubsubOptions(), opts...) + options := defaultOptions() + options = append(options, opts...) + for _, opt := range options { + opt(w.relayParams) + } + w.log.Info("relay config", zap.Int("max-msg-size-bytes", w.relayParams.maxMsgSizeBytes), + zap.Int("min-peers-to-publish", w.minPeersToPublish)) return w } @@ -123,7 +131,7 @@ func (w *WakuRelay) start() error { if w.bcaster == nil { return errors.New("broadcaster not specified for relay") } - ps, err := pubsub.NewGossipSub(w.Context(), w.host, w.opts...) + ps, err := pubsub.NewGossipSub(w.Context(), w.host, w.relayParams.pubsubOpts...) if err != nil { return err } @@ -235,14 +243,14 @@ func (w *WakuRelay) subscribeToPubsubTopic(topic string) (*pubsubTopicSubscripti } w.log.Info("gossipsub subscription", zap.String("pubsubTopic", subscription.Topic())) - + w.metrics.SetPubSubTopics(len(w.topics)) result = w.topics[topic] } return result, nil } -// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic. The pubsubTopic is derived from contentTopic +// Publish is used to broadcast a WakuMessage to a pubsub topic. The pubsubTopic is derived from contentTopic // specified in the message via autosharding. To publish to a specific pubsubTopic, the `WithPubSubTopic` option should // be provided func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts ...PublishOption) ([]byte, error) { @@ -289,7 +297,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts . return nil, err } - if len(out) > pubsub.DefaultMaxMessageSize { + if len(out) > w.relayParams.maxMsgSizeBytes { return nil, errors.New("message size exceeds gossipsub max message size") } @@ -483,6 +491,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co if err != nil { return err } + w.metrics.SetPubSubTopics(len(w.topics)) } } return nil diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go index 8faf29d18..419709840 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -45,13 +46,15 @@ type DynamicGroupManager struct { membershipIndexToLoad *uint } -func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) error { +func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered, latestProcessBlock uint64) error { gm.lastBlockProcessedMutex.Lock() defer gm.lastBlockProcessedMutex.Unlock() toRemoveTable := om.New() toInsertTable := om.New() - + if gm.lastBlockProcessed == 0 { + gm.lastBlockProcessed = latestProcessBlock + } lastBlockProcessed := gm.lastBlockProcessed for _, event := range events { if event.Raw.Removed { @@ -130,19 +133,27 @@ func NewDynamicGroupManager( } func (gm *DynamicGroupManager) getMembershipFee(ctx context.Context) (*big.Int, error) { - fee, err := gm.web3Config.RLNContract.MEMBERSHIPDEPOSIT(&bind.CallOpts{Context: ctx}) - if err != nil { - return nil, fmt.Errorf("could not check if credential exits in contract: %w", err) - } - return fee, nil + return retry.DoWithData( + func() (*big.Int, error) { + fee, err := gm.web3Config.RLNContract.MEMBERSHIPDEPOSIT(&bind.CallOpts{Context: ctx}) + if err != nil { + return nil, fmt.Errorf("could not check if credential exits in contract: %w", err) + } + return fee, nil + }, retry.Attempts(3), + ) } func (gm *DynamicGroupManager) memberExists(ctx context.Context, idCommitment rln.IDCommitment) (bool, error) { - exists, err := gm.web3Config.RLNContract.MemberExists(&bind.CallOpts{Context: ctx}, rln.Bytes32ToBigInt(idCommitment)) - if err != nil { - return false, fmt.Errorf("could not check if credential exits in contract: %w", err) - } - return exists, nil + return retry.DoWithData( + func() (bool, error) { + exists, err := gm.web3Config.RLNContract.MemberExists(&bind.CallOpts{Context: ctx}, rln.Bytes32ToBigInt(idCommitment)) + if err != nil { + return false, fmt.Errorf("could not check if credential exits in contract: %w", err) + } + return exists, nil + }, retry.Attempts(3), + ) } func (gm *DynamicGroupManager) Start(ctx context.Context) error { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go index a418dde23..42c3b5f26 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" @@ -19,7 +20,7 @@ import ( ) // RegistrationEventHandler represents the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface -type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered) error +type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered, uint64) error // MembershipFetcher is used for getting membershipRegsitered Events from the eth rpc type MembershipFetcher struct { @@ -93,7 +94,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo t1Since := time.Since(t1) t2 := time.Now() - if err := handler(events); err != nil { + if err := handler(events, fromBlock+maxBatchSize); err != nil { return err } @@ -109,7 +110,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo // process all the fetched events t2 := time.Now() - err = handler(events) + err = handler(events, toBlock) if err != nil { return err } @@ -155,7 +156,7 @@ func (mf *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint6 fromBlock = toBlock + 1 } - err = handler(events) + err = handler(events, toBlock) if err != nil { mf.log.Error("processing rln log", zap.Error(err)) } @@ -207,26 +208,30 @@ func (mf *MembershipFetcher) getEvents(ctx context.Context, fromBlock uint64, to } func (mf *MembershipFetcher) fetchEvents(ctx context.Context, from uint64, to uint64) ([]*contracts.RLNMemberRegistered, error) { - logIterator, err := mf.web3Config.RLNContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: &to, Context: ctx}) - if err != nil { - return nil, err - } + return retry.DoWithData( + func() ([]*contracts.RLNMemberRegistered, error) { + logIterator, err := mf.web3Config.RLNContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: &to, Context: ctx}) + if err != nil { + return nil, err + } - var results []*contracts.RLNMemberRegistered + var results []*contracts.RLNMemberRegistered - for { - if !logIterator.Next() { - break - } + for { + if !logIterator.Next() { + break + } - if logIterator.Error() != nil { - return nil, logIterator.Error() - } + if logIterator.Error() != nil { + return nil, logIterator.Error() + } - results = append(results, logIterator.Event) - } + results = append(results, logIterator.Event) + } - return results, nil + return results, nil + }, retry.Attempts(3), + ) } // GetMetadata retrieves metadata from the zerokit's RLN database diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/metadata.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/metadata.go index a430a810c..f391fe3b9 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/metadata.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic/metadata.go @@ -18,10 +18,10 @@ type RLNMetadata struct { } // Serialize converts a RLNMetadata into a binary format expected by zerokit's RLN -func (r RLNMetadata) Serialize() []byte { +func (r RLNMetadata) Serialize() ([]byte, error) { chainID := r.ChainID if chainID == nil { - chainID = big.NewInt(0) + return nil, errors.New("chain-id not specified and cannot be 0") } var result []byte @@ -34,7 +34,7 @@ func (r RLNMetadata) Serialize() []byte { result = binary.LittleEndian.AppendUint64(result, v.BlockNumber) } - return result + return result, nil } const lastProcessedBlockOffset = 0 @@ -76,6 +76,9 @@ func DeserializeMetadata(b []byte) (RLNMetadata, error) { // SetMetadata stores some metadata into the zerokit's RLN database func (gm *DynamicGroupManager) SetMetadata(meta RLNMetadata) error { - b := meta.Serialize() + b, err := meta.Serialize() + if err != nil { + return err + } return gm.rln.SetMetadata(b) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/utils/logger.go b/vendor/github.com/waku-org/go-waku/waku/v2/utils/logger.go index 22e3a7ff7..4616dfa24 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/utils/logger.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/utils/logger.go @@ -19,7 +19,7 @@ func Logger() *zap.Logger { return log } -// MessagesLogger returns a logger used for debug logging of receivent/sent messages +// MessagesLogger returns a logger used for debug logging of sent/received messages func MessagesLogger(prefix string) *zap.Logger { if messageLoggers == nil { messageLoggers = make(map[string]*zap.Logger) diff --git a/vendor/github.com/waku-org/go-zerokit-rln/rln/types.go b/vendor/github.com/waku-org/go-zerokit-rln/rln/types.go index f3114c36c..1d20b222c 100644 --- a/vendor/github.com/waku-org/go-zerokit-rln/rln/types.go +++ b/vendor/github.com/waku-org/go-zerokit-rln/rln/types.go @@ -154,7 +154,7 @@ func init() { // the root is created locally, using createMembershipList proc from waku_rln_relay_utils module, and the result is hardcoded in here const STATIC_GROUP_MERKLE_ROOT = "ca7290e49680fa14eeaeea709e4742a8a074a1bcbfd50a4b3976742ae8a6ca25" -const EPOCH_UNIT_SECONDS = uint64(10) // the rln-relay epoch length in seconds +const EPOCH_UNIT_SECONDS = uint64(1) // the rln-relay epoch length in seconds type Epoch [32]byte diff --git a/vendor/modules.txt b/vendor/modules.txt index cb0e7bc91..3aea2acbc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -116,6 +116,9 @@ github.com/andybalholm/brotli # github.com/andybalholm/cascadia v1.2.0 ## explicit; go 1.13 github.com/andybalholm/cascadia +# github.com/avast/retry-go/v4 v4.5.1 +## explicit; go 1.18 +github.com/avast/retry-go/v4 # github.com/bahlo/generic-list-go v0.2.0 ## explicit; go 1.18 github.com/bahlo/generic-list-go @@ -195,8 +198,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4/ecdsa # github.com/docker/go-units v0.5.0 ## explicit github.com/docker/go-units -# github.com/dustin/go-humanize v1.0.0 -## explicit +# github.com/dustin/go-humanize v1.0.1 +## explicit; go 1.16 github.com/dustin/go-humanize # github.com/edsrzf/mmap-go v1.0.0 ## explicit @@ -1008,7 +1011,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20231206134046-3d73afcd50a3 +# github.com/waku-org/go-waku v0.8.1-0.20240104144340-585648c4eefe ## explicit; go 1.19 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/waku/persistence @@ -1048,7 +1051,7 @@ github.com/waku-org/go-waku/waku/v2/rendezvous github.com/waku-org/go-waku/waku/v2/service github.com/waku-org/go-waku/waku/v2/timesource github.com/waku-org/go-waku/waku/v2/utils -# github.com/waku-org/go-zerokit-rln v0.1.14-0.20230916173259-d284a3d8f2fd +# github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 ## explicit; go 1.18 github.com/waku-org/go-zerokit-rln/rln github.com/waku-org/go-zerokit-rln/rln/link diff --git a/vendor/github.com/waku-org/go-waku/waku/persistence/db_key.go b/wakuv2/persistence/dbkey.go similarity index 100% rename from vendor/github.com/waku-org/go-waku/waku/persistence/db_key.go rename to wakuv2/persistence/dbkey.go diff --git a/wakuv2/persistence/dbstore.go b/wakuv2/persistence/dbstore.go index f6a5100d0..ec6ff0021 100644 --- a/wakuv2/persistence/dbstore.go +++ b/wakuv2/persistence/dbstore.go @@ -178,7 +178,7 @@ func (d *DBStore) Put(env *protocol.Envelope) error { } cursor := env.Index() - dbKey := gowakuPersistence.NewDBKey(uint64(cursor.SenderTime), uint64(env.Index().ReceiverTime), env.PubsubTopic(), env.Index().Digest) + dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(env.Index().ReceiverTime), env.PubsubTopic(), env.Index().Digest) _, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().Timestamp, env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().Version) if err != nil { return err @@ -231,7 +231,7 @@ func (d *DBStore) Query(query *storepb.HistoryQuery) (*storepb.Index, []gowakuPe if query.PagingInfo.Cursor != nil { usesCursor = true var exists bool - cursorDBKey := gowakuPersistence.NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest) + cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest) err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM store_messages WHERE id = $1)", cursorDBKey.Bytes(), @@ -259,7 +259,7 @@ func (d *DBStore) Query(query *storepb.HistoryQuery) (*storepb.Index, []gowakuPe if !usesCursor || query.PagingInfo.Direction == storepb.PagingInfo_BACKWARD { paramCnt++ conditions = append(conditions, fmt.Sprintf("id >= $%d", paramCnt)) - startTimeDBKey := gowakuPersistence.NewDBKey(uint64(query.GetStartTime()), uint64(query.GetStartTime()), "", []byte{}) + startTimeDBKey := NewDBKey(uint64(query.GetStartTime()), uint64(query.GetStartTime()), "", []byte{}) parameters = append(parameters, startTimeDBKey.Bytes()) } @@ -269,7 +269,7 @@ func (d *DBStore) Query(query *storepb.HistoryQuery) (*storepb.Index, []gowakuPe if !usesCursor || query.PagingInfo.Direction == storepb.PagingInfo_FORWARD { paramCnt++ conditions = append(conditions, fmt.Sprintf("id <= $%d", paramCnt)) - endTimeDBKey := gowakuPersistence.NewDBKey(uint64(query.GetEndTime()), uint64(query.GetEndTime()), "", []byte{}) + endTimeDBKey := NewDBKey(uint64(query.GetEndTime()), uint64(query.GetEndTime()), "", []byte{}) parameters = append(parameters, endTimeDBKey.Bytes()) } } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 72805b46a..dd9c532ae 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -292,6 +292,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s node.WithMaxPeerConnections(cfg.DiscoveryLimit), node.WithLogger(logger), node.WithClusterID(cfg.ClusterID), + node.WithMaxMsgSize(1024 * 1024), } if cfg.EnableDiscV5 {