chore: bump go-waku (#4505)

This commit is contained in:
richΛrd 2024-01-05 16:43:05 -04:00 committed by GitHub
parent 5c704b2ec2
commit 1ac99c2dcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1790 additions and 201 deletions

View File

@ -1 +1 @@
0.171.41
0.172.0

7
go.mod
View File

@ -88,7 +88,7 @@ require (
github.com/mutecomm/go-sqlcipher/v4 v4.4.2
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/waku-org/go-waku v0.8.1-0.20231206134046-3d73afcd50a3
github.com/waku-org/go-waku v0.8.1-0.20240104144340-585648c4eefe
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
@ -120,6 +120,7 @@ require (
github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96 // indirect
github.com/anacrolix/utp v0.1.0 // indirect
github.com/andybalholm/cascadia v1.2.0 // indirect
github.com/avast/retry-go/v4 v4.5.1 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/benbjohnson/immutable v0.3.0 // indirect
@ -138,7 +139,7 @@ require (
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
@ -259,7 +260,7 @@ require (
github.com/urfave/cli/v2 v2.24.4 // indirect
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 // indirect
github.com/waku-org/go-zerokit-rln v0.1.14-0.20230916173259-d284a3d8f2fd // indirect
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230916171929-1dd9494ff065 // indirect
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230916171518-2a77c3734dd1 // indirect

13
go.sum
View File

@ -345,6 +345,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o=
github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go v1.17.7/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
@ -710,8 +712,9 @@ github.com/dop251/goja v0.0.0-20211011172007-d99e4b8cbf48/go.mod h1:R9ET47fwRVRP
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/dvyukov/go-fuzz v0.0.0-20200318091601-be3528f3a813/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
@ -2096,10 +2099,10 @@ github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZF
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 h1:0e1h+p84yBp0IN7AqgbZlV7lgFBjm214lgSOE7CeJmE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7/go.mod h1:pFvOZ9YTFsW0o5zJW7a0B5tr1owAijRWJctXJ2toL04=
github.com/waku-org/go-waku v0.8.1-0.20231206134046-3d73afcd50a3 h1:ck3j0not5znCThv4E76bJlUZHDn96IXABH5fg7CfEQc=
github.com/waku-org/go-waku v0.8.1-0.20231206134046-3d73afcd50a3/go.mod h1:hem2hnXK5BdabxwJULszM0Rh1Yj+gD9IxjwLCGPPaxs=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20230916173259-d284a3d8f2fd h1:cu7CsUo7BK6ac/v193RIaqAzUcmpa6MNY4xYW9AenQI=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20230916173259-d284a3d8f2fd/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-waku v0.8.1-0.20240104144340-585648c4eefe h1:2D97fbaKlIQRjWMz/iTjnYcxi2z6ekKvspTGtcuPHgU=
github.com/waku-org/go-waku v0.8.1-0.20240104144340-585648c4eefe/go.mod h1:+b5fPPJ4YUIAPJtPOtwB7bTrOQ9lF15I2LnQjV6NMIA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230916171929-1dd9494ff065 h1:Sd7QD/1Yo2o2M1MY49F8Zr4KNBPUEK5cz5HoXQVJbrs=

21
vendor/github.com/avast/retry-go/v4/.gitignore generated vendored Normal file
View File

@ -0,0 +1,21 @@
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
# dep
vendor/
Gopkg.lock
# cover
coverage.txt

38
vendor/github.com/avast/retry-go/v4/.godocdown.tmpl generated vendored Normal file
View File

@ -0,0 +1,38 @@
# {{ .Name }}
[![Release](https://img.shields.io/github/release/avast/retry-go.svg?style=flat-square)](https://github.com/avast/retry-go/releases/latest)
[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg?style=flat-square)](LICENSE.md)
![GitHub Actions](https://github.com/avast/retry-go/actions/workflows/workflow.yaml/badge.svg)
[![Go Report Card](https://goreportcard.com/badge/github.com/avast/retry-go?style=flat-square)](https://goreportcard.com/report/github.com/avast/retry-go)
[![GoDoc](https://godoc.org/github.com/avast/retry-go?status.svg&style=flat-square)](http://godoc.org/github.com/avast/retry-go)
[![codecov.io](https://codecov.io/github/avast/retry-go/coverage.svg?branch=master)](https://codecov.io/github/avast/retry-go?branch=master)
[![Sourcegraph](https://sourcegraph.com/github.com/avast/retry-go/-/badge.svg)](https://sourcegraph.com/github.com/avast/retry-go?badge)
{{ .EmitSynopsis }}
{{ .EmitUsage }}
## Contributing
Contributions are very much welcome.
### Makefile
Makefile provides several handy rules, like README.md `generator` , `setup` for prepare build/dev environment, `test`, `cover`, etc...
Try `make help` for more information.
### Before pull request
> maybe you need `make setup` in order to setup environment
please try:
* run tests (`make test`)
* run linter (`make lint`)
* if your IDE don't automaticaly do `go fmt`, run `go fmt` (`make fmt`)
### README
README.md are generate from template [.godocdown.tmpl](.godocdown.tmpl) and code documentation via [godocdown](https://github.com/robertkrimen/godocdown).
Never edit README.md direct, because your change will be lost.

21
vendor/github.com/avast/retry-go/v4/LICENSE generated vendored Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017 Avast
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

59
vendor/github.com/avast/retry-go/v4/Makefile generated vendored Normal file
View File

@ -0,0 +1,59 @@
SOURCE_FILES?=$$(go list ./... | grep -v /vendor/)
TEST_PATTERN?=.
TEST_OPTIONS?=
VERSION?=$$(cat VERSION)
LINTER?=$$(which golangci-lint)
LINTER_VERSION=1.50.0
ifeq ($(OS),Windows_NT)
LINTER_FILE=golangci-lint-$(LINTER_VERSION)-windows-amd64.zip
LINTER_UNPACK= >| app.zip; unzip -j app.zip -d $$GOPATH/bin; rm app.zip
else ifeq ($(OS), Darwin)
LINTER_FILE=golangci-lint-$(LINTER_VERSION)-darwin-amd64.tar.gz
LINTER_UNPACK= | tar xzf - -C $$GOPATH/bin --wildcards --strip 1 "**/golangci-lint"
else
LINTER_FILE=golangci-lint-$(LINTER_VERSION)-linux-amd64.tar.gz
LINTER_UNPACK= | tar xzf - -C $$GOPATH/bin --wildcards --strip 1 "**/golangci-lint"
endif
setup:
go install github.com/pierrre/gotestcover@latest
go install golang.org/x/tools/cmd/cover@latest
go install github.com/robertkrimen/godocdown/godocdown@latest
go mod download
generate: ## Generate README.md
godocdown >| README.md
test: generate test_and_cover_report lint
test_and_cover_report:
gotestcover $(TEST_OPTIONS) -covermode=atomic -coverprofile=coverage.txt $(SOURCE_FILES) -run $(TEST_PATTERN) -timeout=2m
cover: test ## Run all the tests and opens the coverage report
go tool cover -html=coverage.txt
fmt: ## gofmt and goimports all go files
find . -name '*.go' -not -wholename './vendor/*' | while read -r file; do gofmt -w -s "$$file"; goimports -w "$$file"; done
lint: ## Run all the linters
@if [ "$(LINTER)" = "" ]; then\
curl -L https://github.com/golangci/golangci-lint/releases/download/v$(LINTER_VERSION)/$(LINTER_FILE) $(LINTER_UNPACK) ;\
chmod +x $$GOPATH/bin/golangci-lint;\
fi
golangci-lint run
ci: test_and_cover_report ## Run all the tests but no linters - use https://golangci.com integration instead
build:
go build
release: ## Release new version
git tag | grep -q $(VERSION) && echo This version was released! Increase VERSION! || git tag $(VERSION) && git push origin $(VERSION) && git tag v$(VERSION) && git push origin v$(VERSION)
# Absolutely awesome: http://marmelab.com/blog/2016/02/29/auto-documented-makefile.html
help:
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
.DEFAULT_GOAL := build

485
vendor/github.com/avast/retry-go/v4/README.md generated vendored Normal file
View File

@ -0,0 +1,485 @@
# retry
[![Release](https://img.shields.io/github/release/avast/retry-go.svg?style=flat-square)](https://github.com/avast/retry-go/releases/latest)
[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg?style=flat-square)](LICENSE.md)
![GitHub Actions](https://github.com/avast/retry-go/actions/workflows/workflow.yaml/badge.svg)
[![Go Report Card](https://goreportcard.com/badge/github.com/avast/retry-go?style=flat-square)](https://goreportcard.com/report/github.com/avast/retry-go)
[![GoDoc](https://godoc.org/github.com/avast/retry-go?status.svg&style=flat-square)](http://godoc.org/github.com/avast/retry-go)
[![codecov.io](https://codecov.io/github/avast/retry-go/coverage.svg?branch=master)](https://codecov.io/github/avast/retry-go?branch=master)
[![Sourcegraph](https://sourcegraph.com/github.com/avast/retry-go/-/badge.svg)](https://sourcegraph.com/github.com/avast/retry-go?badge)
Simple library for retry mechanism
slightly inspired by
[Try::Tiny::Retry](https://metacpan.org/pod/Try::Tiny::Retry)
# SYNOPSIS
http get with retry:
url := "http://example.com"
var body []byte
err := retry.Do(
func() error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return nil
},
)
if err != nil {
// handle error
}
fmt.Println(string(body))
http get with retry with data:
url := "http://example.com"
body, err := retry.DoWithData(
func() ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
},
)
if err != nil {
// handle error
}
fmt.Println(string(body))
[next examples](https://github.com/avast/retry-go/tree/master/examples)
# SEE ALSO
* [giantswarm/retry-go](https://github.com/giantswarm/retry-go) - slightly
complicated interface.
* [sethgrid/pester](https://github.com/sethgrid/pester) - only http retry for
http calls with retries and backoff
* [cenkalti/backoff](https://github.com/cenkalti/backoff) - Go port of the
exponential backoff algorithm from Google's HTTP Client Library for Java. Really
complicated interface.
* [rafaeljesus/retry-go](https://github.com/rafaeljesus/retry-go) - looks good,
slightly similar as this package, don't have 'simple' `Retry` method
* [matryer/try](https://github.com/matryer/try) - very popular package,
nonintuitive interface (for me)
# BREAKING CHANGES
* 4.0.0
- infinity retry is possible by set `Attempts(0)` by PR [#49](https://github.com/avast/retry-go/pull/49)
* 3.0.0
- `DelayTypeFunc` accepts a new parameter `err` - this breaking change affects only your custom Delay Functions. This change allow [make delay functions based on error](examples/delay_based_on_error_test.go).
* 1.0.2 -> 2.0.0
- argument of `retry.Delay` is final delay (no multiplication by `retry.Units` anymore)
- function `retry.Units` are removed
- [more about this breaking change](https://github.com/avast/retry-go/issues/7)
* 0.3.0 -> 1.0.0
- `retry.Retry` function are changed to `retry.Do` function
- `retry.RetryCustom` (OnRetry) and `retry.RetryCustomWithOpts` functions are now implement via functions produces Options (aka `retry.OnRetry`)
## Usage
#### func BackOffDelay
```go
func BackOffDelay(n uint, _ error, config *Config) time.Duration
```
BackOffDelay is a DelayType which increases delay between consecutive retries
#### func Do
```go
func Do(retryableFunc RetryableFunc, opts ...Option) error
```
#### func DoWithData
```go
func DoWithData[T any](retryableFunc RetryableFuncWithData[T], opts ...Option) (T, error)
```
#### func FixedDelay
```go
func FixedDelay(_ uint, _ error, config *Config) time.Duration
```
FixedDelay is a DelayType which keeps delay the same through all iterations
#### func IsRecoverable
```go
func IsRecoverable(err error) bool
```
IsRecoverable checks if error is an instance of `unrecoverableError`
#### func RandomDelay
```go
func RandomDelay(_ uint, _ error, config *Config) time.Duration
```
RandomDelay is a DelayType which picks a random delay up to config.maxJitter
#### func Unrecoverable
```go
func Unrecoverable(err error) error
```
Unrecoverable wraps an error in `unrecoverableError` struct
#### type Config
```go
type Config struct {
}
```
#### type DelayTypeFunc
```go
type DelayTypeFunc func(n uint, err error, config *Config) time.Duration
```
DelayTypeFunc is called to return the next delay to wait after the retriable
function fails on `err` after `n` attempts.
#### func CombineDelay
```go
func CombineDelay(delays ...DelayTypeFunc) DelayTypeFunc
```
CombineDelay is a DelayType the combines all of the specified delays into a new
DelayTypeFunc
#### type Error
```go
type Error []error
```
Error type represents list of errors in retry
#### func (Error) As
```go
func (e Error) As(target interface{}) bool
```
#### func (Error) Error
```go
func (e Error) Error() string
```
Error method return string representation of Error It is an implementation of
error interface
#### func (Error) Is
```go
func (e Error) Is(target error) bool
```
#### func (Error) Unwrap
```go
func (e Error) Unwrap() error
```
Unwrap the last error for compatibility with `errors.Unwrap()`. When you need to
unwrap all errors, you should use `WrappedErrors()` instead.
err := Do(
func() error {
return errors.New("original error")
},
Attempts(1),
)
fmt.Println(errors.Unwrap(err)) # "original error" is printed
Added in version 4.2.0.
#### func (Error) WrappedErrors
```go
func (e Error) WrappedErrors() []error
```
WrappedErrors returns the list of errors that this Error is wrapping. It is an
implementation of the `errwrap.Wrapper` interface in package
[errwrap](https://github.com/hashicorp/errwrap) so that `retry.Error` can be
used with that library.
#### type OnRetryFunc
```go
type OnRetryFunc func(n uint, err error)
```
Function signature of OnRetry function n = count of attempts
#### type Option
```go
type Option func(*Config)
```
Option represents an option for retry.
#### func Attempts
```go
func Attempts(attempts uint) Option
```
Attempts set count of retry. Setting to 0 will retry until the retried function
succeeds. default is 10
#### func AttemptsForError
```go
func AttemptsForError(attempts uint, err error) Option
```
AttemptsForError sets count of retry in case execution results in given `err`
Retries for the given `err` are also counted against total retries. The retry
will stop if any of given retries is exhausted.
added in 4.3.0
#### func Context
```go
func Context(ctx context.Context) Option
```
Context allow to set context of retry default are Background context
example of immediately cancellation (maybe it isn't the best example, but it
describes behavior enough; I hope)
ctx, cancel := context.WithCancel(context.Background())
cancel()
retry.Do(
func() error {
...
},
retry.Context(ctx),
)
#### func Delay
```go
func Delay(delay time.Duration) Option
```
Delay set delay between retry default is 100ms
#### func DelayType
```go
func DelayType(delayType DelayTypeFunc) Option
```
DelayType set type of the delay between retries default is BackOff
#### func LastErrorOnly
```go
func LastErrorOnly(lastErrorOnly bool) Option
```
return the direct last error that came from the retried function default is
false (return wrapped errors with everything)
#### func MaxDelay
```go
func MaxDelay(maxDelay time.Duration) Option
```
MaxDelay set maximum delay between retry does not apply by default
#### func MaxJitter
```go
func MaxJitter(maxJitter time.Duration) Option
```
MaxJitter sets the maximum random Jitter between retries for RandomDelay
#### func OnRetry
```go
func OnRetry(onRetry OnRetryFunc) Option
```
OnRetry function callback are called each retry
log each retry example:
retry.Do(
func() error {
return errors.New("some error")
},
retry.OnRetry(func(n uint, err error) {
log.Printf("#%d: %s\n", n, err)
}),
)
#### func RetryIf
```go
func RetryIf(retryIf RetryIfFunc) Option
```
RetryIf controls whether a retry should be attempted after an error (assuming
there are any retry attempts remaining)
skip retry if special error example:
retry.Do(
func() error {
return errors.New("special error")
},
retry.RetryIf(func(err error) bool {
if err.Error() == "special error" {
return false
}
return true
}),
)
By default RetryIf stops execution if the error is wrapped using
`retry.Unrecoverable`, so above example may also be shortened to:
retry.Do(
func() error {
return retry.Unrecoverable(errors.New("special error"))
}
)
#### func WithTimer
```go
func WithTimer(t Timer) Option
```
WithTimer provides a way to swap out timer module implementations. This
primarily is useful for mocking/testing, where you may not want to explicitly
wait for a set duration for retries.
example of augmenting time.After with a print statement
type struct MyTimer {}
func (t *MyTimer) After(d time.Duration) <- chan time.Time {
fmt.Print("Timer called!")
return time.After(d)
}
retry.Do(
func() error { ... },
retry.WithTimer(&MyTimer{})
)
#### func WrapContextErrorWithLastError
```go
func WrapContextErrorWithLastError(wrapContextErrorWithLastError bool) Option
```
WrapContextErrorWithLastError allows the context error to be returned wrapped
with the last error that the retried function returned. This is only applicable
when Attempts is set to 0 to retry indefinitly and when using a context to
cancel / timeout
default is false
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
retry.Do(
func() error {
...
},
retry.Context(ctx),
retry.Attempts(0),
retry.WrapContextErrorWithLastError(true),
)
#### type RetryIfFunc
```go
type RetryIfFunc func(error) bool
```
Function signature of retry if function
#### type RetryableFunc
```go
type RetryableFunc func() error
```
Function signature of retryable function
#### type RetryableFuncWithData
```go
type RetryableFuncWithData[T any] func() (T, error)
```
Function signature of retryable function with data
#### type Timer
```go
type Timer interface {
After(time.Duration) <-chan time.Time
}
```
Timer represents the timer used to track time for a retry.
## Contributing
Contributions are very much welcome.
### Makefile
Makefile provides several handy rules, like README.md `generator` , `setup` for prepare build/dev environment, `test`, `cover`, etc...
Try `make help` for more information.
### Before pull request
> maybe you need `make setup` in order to setup environment
please try:
* run tests (`make test`)
* run linter (`make lint`)
* if your IDE don't automaticaly do `go fmt`, run `go fmt` (`make fmt`)
### README
README.md are generate from template [.godocdown.tmpl](.godocdown.tmpl) and code documentation via [godocdown](https://github.com/robertkrimen/godocdown).
Never edit README.md direct, because your change will be lost.

1
vendor/github.com/avast/retry-go/v4/VERSION generated vendored Normal file
View File

@ -0,0 +1 @@
4.5.1

274
vendor/github.com/avast/retry-go/v4/options.go generated vendored Normal file
View File

@ -0,0 +1,274 @@
package retry
import (
"context"
"math"
"math/rand"
"time"
)
// Function signature of retry if function
type RetryIfFunc func(error) bool
// Function signature of OnRetry function
// n = count of attempts
type OnRetryFunc func(n uint, err error)
// DelayTypeFunc is called to return the next delay to wait after the retriable function fails on `err` after `n` attempts.
type DelayTypeFunc func(n uint, err error, config *Config) time.Duration
// Timer represents the timer used to track time for a retry.
type Timer interface {
After(time.Duration) <-chan time.Time
}
type Config struct {
attempts uint
attemptsForError map[error]uint
delay time.Duration
maxDelay time.Duration
maxJitter time.Duration
onRetry OnRetryFunc
retryIf RetryIfFunc
delayType DelayTypeFunc
lastErrorOnly bool
context context.Context
timer Timer
wrapContextErrorWithLastError bool
maxBackOffN uint
}
// Option represents an option for retry.
type Option func(*Config)
func emptyOption(c *Config) {}
// return the direct last error that came from the retried function
// default is false (return wrapped errors with everything)
func LastErrorOnly(lastErrorOnly bool) Option {
return func(c *Config) {
c.lastErrorOnly = lastErrorOnly
}
}
// Attempts set count of retry. Setting to 0 will retry until the retried function succeeds.
// default is 10
func Attempts(attempts uint) Option {
return func(c *Config) {
c.attempts = attempts
}
}
// AttemptsForError sets count of retry in case execution results in given `err`
// Retries for the given `err` are also counted against total retries.
// The retry will stop if any of given retries is exhausted.
//
// added in 4.3.0
func AttemptsForError(attempts uint, err error) Option {
return func(c *Config) {
c.attemptsForError[err] = attempts
}
}
// Delay set delay between retry
// default is 100ms
func Delay(delay time.Duration) Option {
return func(c *Config) {
c.delay = delay
}
}
// MaxDelay set maximum delay between retry
// does not apply by default
func MaxDelay(maxDelay time.Duration) Option {
return func(c *Config) {
c.maxDelay = maxDelay
}
}
// MaxJitter sets the maximum random Jitter between retries for RandomDelay
func MaxJitter(maxJitter time.Duration) Option {
return func(c *Config) {
c.maxJitter = maxJitter
}
}
// DelayType set type of the delay between retries
// default is BackOff
func DelayType(delayType DelayTypeFunc) Option {
if delayType == nil {
return emptyOption
}
return func(c *Config) {
c.delayType = delayType
}
}
// BackOffDelay is a DelayType which increases delay between consecutive retries
func BackOffDelay(n uint, _ error, config *Config) time.Duration {
// 1 << 63 would overflow signed int64 (time.Duration), thus 62.
const max uint = 62
if config.maxBackOffN == 0 {
if config.delay <= 0 {
config.delay = 1
}
config.maxBackOffN = max - uint(math.Floor(math.Log2(float64(config.delay))))
}
if n > config.maxBackOffN {
n = config.maxBackOffN
}
return config.delay << n
}
// FixedDelay is a DelayType which keeps delay the same through all iterations
func FixedDelay(_ uint, _ error, config *Config) time.Duration {
return config.delay
}
// RandomDelay is a DelayType which picks a random delay up to config.maxJitter
func RandomDelay(_ uint, _ error, config *Config) time.Duration {
return time.Duration(rand.Int63n(int64(config.maxJitter)))
}
// CombineDelay is a DelayType the combines all of the specified delays into a new DelayTypeFunc
func CombineDelay(delays ...DelayTypeFunc) DelayTypeFunc {
const maxInt64 = uint64(math.MaxInt64)
return func(n uint, err error, config *Config) time.Duration {
var total uint64
for _, delay := range delays {
total += uint64(delay(n, err, config))
if total > maxInt64 {
total = maxInt64
}
}
return time.Duration(total)
}
}
// OnRetry function callback are called each retry
//
// log each retry example:
//
// retry.Do(
// func() error {
// return errors.New("some error")
// },
// retry.OnRetry(func(n uint, err error) {
// log.Printf("#%d: %s\n", n, err)
// }),
// )
func OnRetry(onRetry OnRetryFunc) Option {
if onRetry == nil {
return emptyOption
}
return func(c *Config) {
c.onRetry = onRetry
}
}
// RetryIf controls whether a retry should be attempted after an error
// (assuming there are any retry attempts remaining)
//
// skip retry if special error example:
//
// retry.Do(
// func() error {
// return errors.New("special error")
// },
// retry.RetryIf(func(err error) bool {
// if err.Error() == "special error" {
// return false
// }
// return true
// })
// )
//
// By default RetryIf stops execution if the error is wrapped using `retry.Unrecoverable`,
// so above example may also be shortened to:
//
// retry.Do(
// func() error {
// return retry.Unrecoverable(errors.New("special error"))
// }
// )
func RetryIf(retryIf RetryIfFunc) Option {
if retryIf == nil {
return emptyOption
}
return func(c *Config) {
c.retryIf = retryIf
}
}
// Context allow to set context of retry
// default are Background context
//
// example of immediately cancellation (maybe it isn't the best example, but it describes behavior enough; I hope)
//
// ctx, cancel := context.WithCancel(context.Background())
// cancel()
//
// retry.Do(
// func() error {
// ...
// },
// retry.Context(ctx),
// )
func Context(ctx context.Context) Option {
return func(c *Config) {
c.context = ctx
}
}
// WithTimer provides a way to swap out timer module implementations.
// This primarily is useful for mocking/testing, where you may not want to explicitly wait for a set duration
// for retries.
//
// example of augmenting time.After with a print statement
//
// type struct MyTimer {}
//
// func (t *MyTimer) After(d time.Duration) <- chan time.Time {
// fmt.Print("Timer called!")
// return time.After(d)
// }
//
// retry.Do(
// func() error { ... },
// retry.WithTimer(&MyTimer{})
// )
func WithTimer(t Timer) Option {
return func(c *Config) {
c.timer = t
}
}
// WrapContextErrorWithLastError allows the context error to be returned wrapped with the last error that the
// retried function returned. This is only applicable when Attempts is set to 0 to retry indefinitly and when
// using a context to cancel / timeout
//
// default is false
//
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
//
// retry.Do(
// func() error {
// ...
// },
// retry.Context(ctx),
// retry.Attempts(0),
// retry.WrapContextErrorWithLastError(true),
// )
func WrapContextErrorWithLastError(wrapContextErrorWithLastError bool) Option {
return func(c *Config) {
c.wrapContextErrorWithLastError = wrapContextErrorWithLastError
}
}

347
vendor/github.com/avast/retry-go/v4/retry.go generated vendored Normal file
View File

@ -0,0 +1,347 @@
/*
Simple library for retry mechanism
slightly inspired by [Try::Tiny::Retry](https://metacpan.org/pod/Try::Tiny::Retry)
# SYNOPSIS
http get with retry:
url := "http://example.com"
var body []byte
err := retry.Do(
func() error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return nil
},
)
if err != nil {
// handle error
}
fmt.Println(string(body))
http get with retry with data:
url := "http://example.com"
body, err := retry.DoWithData(
func() ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
},
)
if err != nil {
// handle error
}
fmt.Println(string(body))
[next examples](https://github.com/avast/retry-go/tree/master/examples)
# SEE ALSO
* [giantswarm/retry-go](https://github.com/giantswarm/retry-go) - slightly complicated interface.
* [sethgrid/pester](https://github.com/sethgrid/pester) - only http retry for http calls with retries and backoff
* [cenkalti/backoff](https://github.com/cenkalti/backoff) - Go port of the exponential backoff algorithm from Google's HTTP Client Library for Java. Really complicated interface.
* [rafaeljesus/retry-go](https://github.com/rafaeljesus/retry-go) - looks good, slightly similar as this package, don't have 'simple' `Retry` method
* [matryer/try](https://github.com/matryer/try) - very popular package, nonintuitive interface (for me)
# BREAKING CHANGES
* 4.0.0
- infinity retry is possible by set `Attempts(0)` by PR [#49](https://github.com/avast/retry-go/pull/49)
* 3.0.0
- `DelayTypeFunc` accepts a new parameter `err` - this breaking change affects only your custom Delay Functions. This change allow [make delay functions based on error](examples/delay_based_on_error_test.go).
* 1.0.2 -> 2.0.0
- argument of `retry.Delay` is final delay (no multiplication by `retry.Units` anymore)
- function `retry.Units` are removed
- [more about this breaking change](https://github.com/avast/retry-go/issues/7)
* 0.3.0 -> 1.0.0
- `retry.Retry` function are changed to `retry.Do` function
- `retry.RetryCustom` (OnRetry) and `retry.RetryCustomWithOpts` functions are now implement via functions produces Options (aka `retry.OnRetry`)
*/
package retry
import (
"context"
"errors"
"fmt"
"strings"
"time"
)
// Function signature of retryable function
type RetryableFunc func() error
// Function signature of retryable function with data
type RetryableFuncWithData[T any] func() (T, error)
// Default timer is a wrapper around time.After
type timerImpl struct{}
func (t *timerImpl) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
func Do(retryableFunc RetryableFunc, opts ...Option) error {
retryableFuncWithData := func() (any, error) {
return nil, retryableFunc()
}
_, err := DoWithData(retryableFuncWithData, opts...)
return err
}
func DoWithData[T any](retryableFunc RetryableFuncWithData[T], opts ...Option) (T, error) {
var n uint
var emptyT T
// default
config := newDefaultRetryConfig()
// apply opts
for _, opt := range opts {
opt(config)
}
if err := config.context.Err(); err != nil {
return emptyT, err
}
// Setting attempts to 0 means we'll retry until we succeed
var lastErr error
if config.attempts == 0 {
for {
t, err := retryableFunc()
if err == nil {
return t, nil
}
if !IsRecoverable(err) {
return emptyT, err
}
if !config.retryIf(err) {
return emptyT, err
}
lastErr = err
n++
config.onRetry(n, err)
select {
case <-config.timer.After(delay(config, n, err)):
case <-config.context.Done():
if config.wrapContextErrorWithLastError {
return emptyT, Error{config.context.Err(), lastErr}
}
return emptyT, config.context.Err()
}
}
}
errorLog := Error{}
attemptsForError := make(map[error]uint, len(config.attemptsForError))
for err, attempts := range config.attemptsForError {
attemptsForError[err] = attempts
}
shouldRetry := true
for shouldRetry {
t, err := retryableFunc()
if err == nil {
return t, nil
}
errorLog = append(errorLog, unpackUnrecoverable(err))
if !config.retryIf(err) {
break
}
config.onRetry(n, err)
for errToCheck, attempts := range attemptsForError {
if errors.Is(err, errToCheck) {
attempts--
attemptsForError[errToCheck] = attempts
shouldRetry = shouldRetry && attempts > 0
}
}
// if this is last attempt - don't wait
if n == config.attempts-1 {
break
}
select {
case <-config.timer.After(delay(config, n, err)):
case <-config.context.Done():
if config.lastErrorOnly {
return emptyT, config.context.Err()
}
return emptyT, append(errorLog, config.context.Err())
}
n++
shouldRetry = shouldRetry && n < config.attempts
}
if config.lastErrorOnly {
return emptyT, errorLog.Unwrap()
}
return emptyT, errorLog
}
func newDefaultRetryConfig() *Config {
return &Config{
attempts: uint(10),
attemptsForError: make(map[error]uint),
delay: 100 * time.Millisecond,
maxJitter: 100 * time.Millisecond,
onRetry: func(n uint, err error) {},
retryIf: IsRecoverable,
delayType: CombineDelay(BackOffDelay, RandomDelay),
lastErrorOnly: false,
context: context.Background(),
timer: &timerImpl{},
}
}
// Error type represents list of errors in retry
type Error []error
// Error method return string representation of Error
// It is an implementation of error interface
func (e Error) Error() string {
logWithNumber := make([]string, len(e))
for i, l := range e {
if l != nil {
logWithNumber[i] = fmt.Sprintf("#%d: %s", i+1, l.Error())
}
}
return fmt.Sprintf("All attempts fail:\n%s", strings.Join(logWithNumber, "\n"))
}
func (e Error) Is(target error) bool {
for _, v := range e {
if errors.Is(v, target) {
return true
}
}
return false
}
func (e Error) As(target interface{}) bool {
for _, v := range e {
if errors.As(v, target) {
return true
}
}
return false
}
/*
Unwrap the last error for compatibility with `errors.Unwrap()`.
When you need to unwrap all errors, you should use `WrappedErrors()` instead.
err := Do(
func() error {
return errors.New("original error")
},
Attempts(1),
)
fmt.Println(errors.Unwrap(err)) # "original error" is printed
Added in version 4.2.0.
*/
func (e Error) Unwrap() error {
return e[len(e)-1]
}
// WrappedErrors returns the list of errors that this Error is wrapping.
// It is an implementation of the `errwrap.Wrapper` interface
// in package [errwrap](https://github.com/hashicorp/errwrap) so that
// `retry.Error` can be used with that library.
func (e Error) WrappedErrors() []error {
return e
}
type unrecoverableError struct {
error
}
func (e unrecoverableError) Error() string {
if e.error == nil {
return "unrecoverable error"
}
return e.error.Error()
}
func (e unrecoverableError) Unwrap() error {
return e.error
}
// Unrecoverable wraps an error in `unrecoverableError` struct
func Unrecoverable(err error) error {
return unrecoverableError{err}
}
// IsRecoverable checks if error is an instance of `unrecoverableError`
func IsRecoverable(err error) bool {
return !errors.Is(err, unrecoverableError{})
}
// Adds support for errors.Is usage on unrecoverableError
func (unrecoverableError) Is(err error) bool {
_, isUnrecoverable := err.(unrecoverableError)
return isUnrecoverable
}
func unpackUnrecoverable(err error) error {
if unrecoverable, isUnrecoverable := err.(unrecoverableError); isUnrecoverable {
return unrecoverable.error
}
return err
}
func delay(config *Config, n uint, err error) time.Duration {
delayTime := config.delayType(n, err, config)
if config.maxDelay > 0 && delayTime > config.maxDelay {
delayTime = config.maxDelay
}
return delayTime
}

View File

@ -1,12 +1,12 @@
sudo: false
language: go
go_import_path: github.com/dustin/go-humanize
go:
- 1.3.x
- 1.5.x
- 1.6.x
- 1.7.x
- 1.8.x
- 1.9.x
- 1.13.x
- 1.14.x
- 1.15.x
- 1.16.x
- stable
- master
matrix:
allow_failures:
@ -15,7 +15,7 @@ matrix:
install:
- # Do nothing. This is needed to prevent default install action "go get -t -v ./..." from happening here (we want it to happen inside script step).
script:
- go get -t -v ./...
- diff -u <(echo -n) <(gofmt -d -s .)
- go tool vet .
- go vet .
- go install -v -race ./...
- go test -v -race ./...

View File

@ -5,7 +5,7 @@ Just a few functions for helping humanize times and sizes.
`go get` it as `github.com/dustin/go-humanize`, import it as
`"github.com/dustin/go-humanize"`, use it as `humanize`.
See [godoc](https://godoc.org/github.com/dustin/go-humanize) for
See [godoc](https://pkg.go.dev/github.com/dustin/go-humanize) for
complete documentation.
## Sizes

View File

@ -28,6 +28,10 @@ var (
BigZiByte = (&big.Int{}).Mul(BigEiByte, bigIECExp)
// BigYiByte is 1,024 z bytes in bit.Ints
BigYiByte = (&big.Int{}).Mul(BigZiByte, bigIECExp)
// BigRiByte is 1,024 y bytes in bit.Ints
BigRiByte = (&big.Int{}).Mul(BigYiByte, bigIECExp)
// BigQiByte is 1,024 r bytes in bit.Ints
BigQiByte = (&big.Int{}).Mul(BigRiByte, bigIECExp)
)
var (
@ -51,6 +55,10 @@ var (
BigZByte = (&big.Int{}).Mul(BigEByte, bigSIExp)
// BigYByte is 1,000 SI z bytes in big.Ints
BigYByte = (&big.Int{}).Mul(BigZByte, bigSIExp)
// BigRByte is 1,000 SI y bytes in big.Ints
BigRByte = (&big.Int{}).Mul(BigYByte, bigSIExp)
// BigQByte is 1,000 SI r bytes in big.Ints
BigQByte = (&big.Int{}).Mul(BigRByte, bigSIExp)
)
var bigBytesSizeTable = map[string]*big.Int{
@ -71,6 +79,10 @@ var bigBytesSizeTable = map[string]*big.Int{
"zb": BigZByte,
"yib": BigYiByte,
"yb": BigYByte,
"rib": BigRiByte,
"rb": BigRByte,
"qib": BigQiByte,
"qb": BigQByte,
// Without suffix
"": BigByte,
"ki": BigKiByte,
@ -89,6 +101,10 @@ var bigBytesSizeTable = map[string]*big.Int{
"zi": BigZiByte,
"y": BigYByte,
"yi": BigYiByte,
"r": BigRByte,
"ri": BigRiByte,
"q": BigQByte,
"qi": BigQiByte,
}
var ten = big.NewInt(10)
@ -115,7 +131,7 @@ func humanateBigBytes(s, base *big.Int, sizes []string) string {
//
// BigBytes(82854982) -> 83 MB
func BigBytes(s *big.Int) string {
sizes := []string{"B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"}
sizes := []string{"B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB", "RB", "QB"}
return humanateBigBytes(s, bigSIExp, sizes)
}
@ -125,7 +141,7 @@ func BigBytes(s *big.Int) string {
//
// BigIBytes(82854982) -> 79 MiB
func BigIBytes(s *big.Int) string {
sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"}
sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB", "RiB", "QiB"}
return humanateBigBytes(s, bigIECExp, sizes)
}

View File

@ -1,3 +1,4 @@
//go:build go1.6
// +build go1.6
package humanize

View File

@ -6,6 +6,9 @@ import (
)
func stripTrailingZeros(s string) string {
if !strings.ContainsRune(s, '.') {
return s
}
offset := len(s) - 1
for offset > 0 {
if s[offset] == '.' {

View File

@ -73,7 +73,7 @@ func FormatFloat(format string, n float64) string {
if n > math.MaxFloat64 {
return "Infinity"
}
if n < -math.MaxFloat64 {
if n < (0.0 - math.MaxFloat64) {
return "-Infinity"
}

View File

@ -8,6 +8,8 @@ import (
)
var siPrefixTable = map[float64]string{
-30: "q", // quecto
-27: "r", // ronto
-24: "y", // yocto
-21: "z", // zepto
-18: "a", // atto
@ -25,6 +27,8 @@ var siPrefixTable = map[float64]string{
18: "E", // exa
21: "Z", // zetta
24: "Y", // yotta
27: "R", // ronna
30: "Q", // quetta
}
var revSIPrefixTable = revfmap(siPrefixTable)

View File

@ -49,7 +49,7 @@ type DBStore struct {
MessageProvider
db *sql.DB
migrationFn func(db *sql.DB) error
migrationFn func(db *sql.DB, logger *zap.Logger) error
metrics Metrics
timesource timesource.Timesource
@ -121,7 +121,7 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption {
}
}
type MigrationFn func(db *sql.DB) error
type MigrationFn func(db *sql.DB, logger *zap.Logger) error
// WithMigrations is a DBOption used to determine if migrations should
// be executed, and what driver to use
@ -157,7 +157,7 @@ func NewDBStore(reg prometheus.Registerer, log *zap.Logger, options ...DBOption)
}
if result.enableMigrations {
err := result.migrationFn(result.db)
err := result.migrationFn(result.db, log)
if err != nil {
return nil, err
}
@ -211,7 +211,7 @@ func (d *DBStore) cleanOlderRecords(ctx context.Context) error {
// Delete older messages
if d.maxDuration > 0 {
start := time.Now()
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < $1`
sqlStmt := `DELETE FROM message WHERE storedAt < $1`
_, err := d.db.Exec(sqlStmt, d.timesource.Now().Add(-d.maxDuration).UnixNano())
if err != nil {
d.metrics.RecordError(retPolicyFailure)
@ -240,7 +240,7 @@ func (d *DBStore) cleanOlderRecords(ctx context.Context) error {
}
func (d *DBStore) getDeleteOldRowsQuery() string {
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC %s OFFSET $1)`
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY storedAt DESC %s OFFSET $1)`
switch GetDriverType(d.db) {
case SQLiteDriver:
sqlStmt = fmt.Sprintf(sqlStmt, "LIMIT -1")
@ -282,7 +282,12 @@ func (d *DBStore) Stop() {
// Validate validates the message to be stored against possible fradulent conditions.
func (d *DBStore) Validate(env *protocol.Envelope) error {
n := time.Unix(0, env.Index().ReceiverTime)
timestamp := env.Message().GetTimestamp()
if timestamp == 0 {
return nil
}
n := time.Unix(0, timestamp)
upperBound := n.Add(MaxTimeVariance)
lowerBound := n.Add(-MaxTimeVariance)
@ -300,17 +305,20 @@ func (d *DBStore) Validate(env *protocol.Envelope) error {
// Put inserts a WakuMessage into the DB
func (d *DBStore) Put(env *protocol.Envelope) error {
stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7)")
stmt, err := d.db.Prepare("INSERT INTO message (id, messageHash, storedAt, timestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
if err != nil {
d.metrics.RecordError(insertFailure)
return err
}
cursor := env.Index()
dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(cursor.ReceiverTime), env.PubsubTopic(), env.Index().Digest)
storedAt := env.Message().GetTimestamp()
if storedAt == 0 {
storedAt = env.Index().ReceiverTime
}
start := time.Now()
_, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().GetTimestamp(), env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().GetVersion())
_, err = stmt.Exec(env.Index().Digest, env.Hash(), storedAt, env.Message().GetTimestamp(), env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().GetVersion())
if err != nil {
return err
}
@ -329,36 +337,33 @@ func (d *DBStore) handleQueryCursor(query *pb.HistoryQuery, paramCnt *int, condi
usesCursor := false
if query.PagingInfo.Cursor != nil {
usesCursor = true
var exists bool
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE id = $1)",
cursorDBKey.Bytes(),
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM message WHERE storedAt = $1 AND id = $2)",
query.PagingInfo.Cursor.ReceiverTime, query.PagingInfo.Cursor.Digest,
).Scan(&exists)
if err != nil {
return nil, nil, err
}
if exists {
eqOp := ">"
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
eqOp = "<"
}
*paramCnt++
conditions = append(conditions, fmt.Sprintf("id %s $%d", eqOp, *paramCnt))
parameters = append(parameters, cursorDBKey.Bytes())
} else {
if !exists {
return nil, nil, ErrInvalidCursor
}
eqOp := ">"
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
eqOp = "<"
}
conditions = append(conditions, fmt.Sprintf("(storedAt, id) %s ($%d, $%d)", eqOp, *paramCnt+1, *paramCnt+2))
*paramCnt += 2
parameters = append(parameters, query.PagingInfo.Cursor.ReceiverTime, query.PagingInfo.Cursor.Digest)
}
handleTimeParam := func(time int64, op string) {
*paramCnt++
conditions = append(conditions, fmt.Sprintf("id %s $%d", op, *paramCnt))
timeDBKey := NewDBKey(uint64(time), 0, "", []byte{})
parameters = append(parameters, timeDBKey.Bytes())
conditions = append(conditions, fmt.Sprintf("storedAt %s $%d", op, *paramCnt))
parameters = append(parameters, time)
}
startTime := query.GetStartTime()
@ -378,10 +383,10 @@ func (d *DBStore) handleQueryCursor(query *pb.HistoryQuery, paramCnt *int, condi
}
func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{}, error) {
sqlQuery := `SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version
sqlQuery := `SELECT id, storedAt, timestamp, contentTopic, pubsubTopic, payload, version
FROM message
%s
ORDER BY senderTimestamp %s, id %s, pubsubTopic %s, receiverTimestamp %s `
ORDER BY timestamp %s, id %s, pubsubTopic %s, storedAt %s `
var conditions []string
//var parameters []interface{}
@ -428,7 +433,7 @@ func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{}
parameters = append(parameters, pageSize)
sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection)
d.log.Info(fmt.Sprintf("sqlQuery: %s", sqlQuery))
d.log.Debug(fmt.Sprintf("sqlQuery: %s", sqlQuery))
return sqlQuery, parameters, nil
@ -490,12 +495,12 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
return cursor, result, nil
}
// MostRecentTimestamp returns an unix timestamp with the most recent senderTimestamp
// MostRecentTimestamp returns an unix timestamp with the most recent timestamp
// in the message table
func (d *DBStore) MostRecentTimestamp() (int64, error) {
result := sql.NullInt64{}
err := d.db.QueryRow(`SELECT max(senderTimestamp) FROM message`).Scan(&result)
err := d.db.QueryRow(`SELECT max(timestamp) FROM message`).Scan(&result)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
@ -520,7 +525,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) {
d.log.Info("loading records from the DB", zap.Duration("duration", elapsed))
}()
rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC")
rows, err := d.db.Query("SELECT id, storedAt, timestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY timestamp ASC")
if err != nil {
return nil, err
}
@ -550,14 +555,14 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) {
// GetStoredMessage is a helper function used to convert a `*sql.Rows` into a `StoredMessage`
func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) {
var id []byte
var receiverTimestamp int64
var senderTimestamp int64
var storedAt int64
var timestamp int64
var contentTopic string
var payload []byte
var version uint32
var pubsubTopic string
err := row.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version)
err := row.Scan(&id, &storedAt, &timestamp, &contentTopic, &pubsubTopic, &payload, &version)
if err != nil {
d.log.Error("scanning messages from db", zap.Error(err))
return StoredMessage{}, err
@ -567,8 +572,8 @@ func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) {
msg.ContentTopic = contentTopic
msg.Payload = payload
if senderTimestamp != 0 {
msg.Timestamp = proto.Int64(senderTimestamp)
if timestamp != 0 {
msg.Timestamp = proto.Int64(timestamp)
}
if version > 0 {
@ -578,7 +583,7 @@ func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error) {
record := StoredMessage{
ID: id,
PubsubTopic: pubsubTopic,
ReceiverTime: receiverTimestamp,
ReceiverTime: storedAt,
Message: msg,
}

View File

@ -272,16 +272,14 @@ func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool {
if node == nil {
return false
}
d.log.Debug("found a peer", logging.ENode("enr", node))
// node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage
if !isWakuNode(node) {
d.log.Debug("peer is not waku node", logging.ENode("enr", node))
return false
}
d.log.Debug("peer is a waku node", logging.ENode("enr", node))
_, err := wenr.EnodeToPeerInfo(node)
_, err := wenr.EnodeToPeerInfo(node)
if err != nil {
d.metrics.RecordError(peerInfoFailure)
d.log.Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err))
@ -411,20 +409,17 @@ func (d *DiscoveryV5) DefaultPredicate() Predicate {
}
if nodeRS == nil {
d.log.Debug("node has no shards registered", logging.ENode("node", n))
// Node has no shards registered.
return false
}
if nodeRS.ClusterID != localRS.ClusterID {
d.log.Debug("cluster id mismatch from local clusterid", logging.ENode("node", n), zap.Error(err))
return false
}
// Contains any
for _, idx := range localRS.ShardIDs {
if nodeRS.Contains(localRS.ClusterID, idx) {
d.log.Debug("shards match for discovered node", logging.ENode("node", n))
return true
}
}

View File

@ -82,6 +82,7 @@ func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
}
c.metrics.RecordPeerConnected()
c.metrics.SetPeerStoreSize(c.h.Peerstore().Peers().Len())
}
// Disconnected is called when a connection closed
@ -96,6 +97,7 @@ func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) {
c.log.Warn("subscriber is too slow")
}
}
c.metrics.SetPeerStoreSize(c.h.Peerstore().Peers().Len())
}
// OpenedStream is called when a stream opened

View File

@ -27,10 +27,17 @@ var connectedPeers = prometheus.NewGauge(
Help: "Number of connected peers",
})
var peerStoreSize = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "waku_peer_store_size",
Help: "Size of Peer Store",
})
var collectors = []prometheus.Collector{
gitVersion,
peerDials,
connectedPeers,
peerStoreSize,
}
// Metrics exposes the functions required to update prometheus metrics for the waku node
@ -39,6 +46,7 @@ type Metrics interface {
RecordDial()
RecordPeerConnected()
RecordPeerDisconnected()
SetPeerStoreSize(int)
}
type metricsImpl struct {
@ -72,3 +80,7 @@ func (m *metricsImpl) RecordPeerConnected() {
func (m *metricsImpl) RecordPeerDisconnected() {
connectedPeers.Dec()
}
func (m *metricsImpl) SetPeerStoreSize(size int) {
peerStoreSize.Set(float64(size))
}

View File

@ -280,8 +280,9 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}
w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log, w.opts.pubsubOpts...)
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.opts.prometheusReg, w.log,
relay.WithPubSubOptions(w.opts.pubsubOpts),
relay.WithMaxMsgSize(w.opts.maxMsgSizeBytes))
if w.opts.enableRelay {
err = w.setupRLNRelay()
@ -528,6 +529,7 @@ func (w *WakuNode) Stop() {
w.store.Stop()
w.legacyFilter.Stop()
w.filterFullNode.Stop()
w.filterLightNode.Stop()
if w.opts.enableDiscV5 {
w.discoveryV5.Stop()
@ -976,3 +978,7 @@ func GetDiscv5Option(dnsDiscoveredNodes []dnsdisc.DiscoveredNode, discv5Nodes []
return WithDiscoveryV5(port, bootnodes, autoUpdate), nil
}
func (w *WakuNode) ClusterID() uint16 {
return w.opts.clusterID
}

View File

@ -80,6 +80,7 @@ type WakuNodeParameters struct {
pubsubOpts []pubsub.Option
minRelayPeersToPublish int
maxMsgSizeBytes int
enableStore bool
messageProvider store.MessageProvider
@ -358,6 +359,13 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option)
}
}
func WithMaxMsgSize(maxMsgSizeBytes int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.maxMsgSizeBytes = maxMsgSizeBytes
return nil
}
}
func WithMaxPeerConnections(maxPeers int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.maxPeerConnections = maxPeers

View File

@ -351,10 +351,18 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
if err == nil {
enr, err := pm.host.Peerstore().(wps.WakuPeerstore).ENR(p.AddrInfo.ID)
// Verifying if the enr record is more recent (DiscV5 and peer exchange can return peers already seen)
if err == nil && enr.Record().Seq() > p.ENR.Seq() {
pm.logger.Debug("found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID))
if err == nil && enr.Record().Seq() >= p.ENR.Seq() {
return
}
if err != nil {
//Peer is already in peer-store but it doesn't have an enr, but discovered peer has ENR
pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding",
logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()))
} else {
//Peer is already in peer-store but stored ENR is older than discovered one.
pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored",
logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()), zap.Uint64("storedENRSeq", enr.Record().Seq()))
}
}
supportedProtos := []protocol.ID{}

View File

@ -112,19 +112,23 @@ func (wf *WakuFilterLightNode) Stop() {
wf.CommonService.Stop(func() {
wf.h.RemoveStreamHandler(FilterPushID_v20beta1)
if wf.subscriptions.Count() > 0 {
res, err := wf.unsubscribeAll(wf.Context())
if err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(err))
}
for _, r := range res.Errors() {
if r.Err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID))
go func() {
defer func() {
_ = recover()
}()
res, err := wf.unsubscribeAll(wf.Context())
if err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(err))
}
}
//
wf.subscriptions.Clear()
for _, r := range res.Errors() {
if r.Err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID))
}
}
wf.subscriptions.Clear()
}()
}
})
}
@ -132,7 +136,9 @@ func (wf *WakuFilterLightNode) Stop() {
func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) {
peerID := stream.Conn().RemotePeer()
logger := wf.log.With(logging.HostID("peer", peerID))
logger := wf.log.With(logging.HostID("peerID", peerID))
if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
wf.metrics.RecordError(unknownPeerMessagePush)
@ -177,10 +183,11 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
} else {
pubSubTopic = *messagePush.PubsubTopic
}
logger = messagePush.WakuMessage.Logger(logger, pubSubTopic)
if !wf.subscriptions.Has(peerID, protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
logger.Warn("received messagepush with invalid subscription parameters",
zap.String("topic", pubSubTopic),
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
logger.Warn("received messagepush with invalid subscription parameters")
wf.metrics.RecordError(invalidSubscriptionMessage)
return
}
@ -218,6 +225,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
return err
}
logger := wf.log.With(logging.HostID("peerID", params.selectedPeer))
stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
@ -227,13 +236,13 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request))
logger.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request))
err = writer.WriteMsg(request)
if err != nil {
wf.metrics.RecordError(writeRequestFailure)
wf.log.Error("sending FilterSubscribeRequest", zap.Error(err))
logger.Error("sending FilterSubscribeRequest", zap.Error(err))
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
logger.Error("resetting connection", zap.Error(err))
}
return err
}
@ -241,10 +250,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
filterSubscribeResponse := &pb.FilterSubscribeResponse{}
err = reader.ReadMsg(filterSubscribeResponse)
if err != nil {
wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err))
logger.Error("receiving FilterSubscribeResponse", zap.Error(err))
wf.metrics.RecordError(decodeRPCFailure)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
logger.Error("resetting connection", zap.Error(err))
}
return err
}
@ -253,6 +262,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
if err = filterSubscribeResponse.Validate(); err != nil {
wf.metrics.RecordError(decodeRPCFailure)
logger.Error("validating response", zap.Error(err))
return err
}
@ -617,6 +627,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
if params.wg != nil {
params.wg.Done()
}
_ = recover()
}()
paramsCopy := params.Copy()

View File

@ -22,6 +22,18 @@ func NewFilterError(code int, message string) FilterError {
}
}
const errorStringFmt = "%d - %s"
func (e *FilterError) Error() string {
return fmt.Sprintf("%d - %s", e.Code, e.Message)
return fmt.Sprintf(errorStringFmt, e.Code, e.Message)
}
func ExtractCodeFromFilterError(fErr string) int {
code := 0
var message string
_, err := fmt.Sscanf(fErr, errorStringFmt, &code, &message)
if err != nil {
return -1
}
return code
}

View File

@ -53,6 +53,10 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p
wakuLP.pm = pm
wakuLP.metrics = newMetrics(reg)
if pm != nil {
wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField)
}
return wakuLP
}
@ -73,9 +77,6 @@ func (wakuLP *WakuLightPush) Start(ctx context.Context) error {
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest(ctx))
wakuLP.log.Info("Light Push protocol started")
if wakuLP.pm != nil {
wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField)
}
return nil
}
@ -299,8 +300,13 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
req.Message = message
req.PubsubTopic = params.pubsubTopic
logger := message.Logger(wakuLP.log, params.pubsubTopic).With(logging.HostID("peerID", params.selectedPeer))
logger.Debug("publishing message")
response, err := wakuLP.request(ctx, req, params)
if err != nil {
logger.Error("could not publish message", zap.Error(err))
return nil, err
}

View File

@ -1,10 +1,34 @@
package pb
import (
"encoding/binary"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/waku-org/go-waku/waku/v2/hash"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Hash calculates the hash of a waku message
func (msg *WakuMessage) Hash(pubsubTopic string) []byte {
return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta)
return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta, toBytes(msg.GetTimestamp()))
}
func toBytes(i int64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(i))
return b
}
func (msg *WakuMessage) LogFields(pubsubTopic string) []zapcore.Field {
return []zapcore.Field{
zap.String("hash", hexutil.Encode(msg.Hash(pubsubTopic))),
zap.String("pubsubTopic", pubsubTopic),
zap.String("contentTopic", msg.ContentTopic),
zap.Int64("timestamp", msg.GetTimestamp()),
}
}
func (msg *WakuMessage) Logger(logger *zap.Logger, pubsubTopic string) *zap.Logger {
return logger.With(msg.LogFields(pubsubTopic)...)
}

View File

@ -3,69 +3,45 @@ package peer_exchange
import (
"bufio"
"bytes"
"math/rand"
"sync"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/hashicorp/golang-lru/simplelru"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"go.uber.org/zap"
)
// simpleLRU internal uses container/list, which is ring buffer(double linked list)
type enrCache struct {
// using lru, saves us from periodically cleaning the cache to maintain a certain size
data *simplelru.LRU
rng *rand.Rand
mu sync.RWMutex
log *zap.Logger
// using lru, saves us from periodically cleaning the cache to mauintain a certain size
data *shardLRU
}
// err on negative size
func newEnrCache(size int, log *zap.Logger) (*enrCache, error) {
inner, err := simplelru.NewLRU(size, nil)
func newEnrCache(size int) *enrCache {
inner := newShardLRU(int(size))
return &enrCache{
data: inner,
rng: rand.New(rand.NewSource(rand.Int63())),
log: log.Named("enr-cache"),
}, err
}
}
// updating cache
func (c *enrCache) updateCache(node *enode.Node) {
c.mu.Lock()
defer c.mu.Unlock()
currNode, ok := c.data.Get(node.ID())
if !ok || node.Seq() > currNode.(*enode.Node).Seq() {
c.data.Add(node.ID(), node)
c.log.Debug("discovered px peer via discv5", zap.Stringer("enr", node))
func (c *enrCache) updateCache(node *enode.Node) error {
currNode := c.data.Get(node.ID())
if currNode == nil || node.Seq() > currNode.Seq() {
return c.data.Add(node)
}
return nil
}
// get `numPeers` records of enr
func (c *enrCache) getENRs(neededPeers int) ([]*pb.PeerInfo, error) {
c.mu.RLock()
defer c.mu.RUnlock()
func (c *enrCache) getENRs(neededPeers int, clusterIndex *ShardInfo) ([]*pb.PeerInfo, error) {
//
availablePeers := c.data.Len()
if availablePeers == 0 {
return nil, nil
}
if availablePeers < neededPeers {
neededPeers = availablePeers
}
perm := c.rng.Perm(availablePeers)[0:neededPeers]
keys := c.data.Keys()
nodes := c.data.GetRandomNodes(clusterIndex, neededPeers)
result := []*pb.PeerInfo{}
for _, ind := range perm {
node, ok := c.data.Get(keys[ind])
if !ok {
continue
}
for _, node := range nodes {
//
var b bytes.Buffer
writer := bufio.NewWriter(&b)
err := node.(*enode.Node).Record().EncodeRLP(writer)
err := node.Record().EncodeRLP(writer)
if err != nil {
return nil, err
}

View File

@ -57,17 +57,11 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector,
wakuPX.disc = disc
wakuPX.metrics = newMetrics(reg)
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = newEnrCache(MaxCacheSize)
wakuPX.peerConnector = peerConnector
wakuPX.pm = pm
wakuPX.CommonService = service.NewCommonService()
newEnrCache, err := newEnrCache(MaxCacheSize, wakuPX.log)
if err != nil {
return nil, err
}
wakuPX.enrCache = newEnrCache
return wakuPX, nil
}
@ -108,7 +102,7 @@ func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
if requestRPC.Query != nil {
logger.Info("request received")
records, err := wakuPX.enrCache.getENRs(int(requestRPC.Query.NumPeers))
records, err := wakuPX.enrCache.getENRs(int(requestRPC.Query.NumPeers), nil)
if err != nil {
logger.Error("obtaining enrs from cache", zap.Error(err))
wakuPX.metrics.RecordError(pxFailure)
@ -161,7 +155,11 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error {
continue
}
wakuPX.enrCache.updateCache(iterator.Node())
err = wakuPX.enrCache.updateCache(iterator.Node())
if err != nil {
wakuPX.log.Error("adding peer to cache", zap.Error(err))
continue
}
select {
case <-ctx.Done():

View File

@ -0,0 +1,178 @@
package peer_exchange
import (
"container/list"
"fmt"
"math/rand"
"sync"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type ShardInfo struct {
clusterID uint16
shard uint16
}
type shardLRU struct {
size int // number of nodes allowed per shard
idToNode map[enode.ID][]*list.Element
shardNodes map[ShardInfo]*list.List
rng *rand.Rand
mu sync.RWMutex
}
func newShardLRU(size int) *shardLRU {
return &shardLRU{
idToNode: map[enode.ID][]*list.Element{},
shardNodes: map[ShardInfo]*list.List{},
size: size,
rng: rand.New(rand.NewSource(rand.Int63())),
}
}
type nodeWithShardInfo struct {
key ShardInfo
node *enode.Node
}
// time complexity: O(number of previous indexes present for node.ID)
func (l *shardLRU) remove(node *enode.Node) {
elements := l.idToNode[node.ID()]
for _, element := range elements {
key := element.Value.(nodeWithShardInfo).key
l.shardNodes[key].Remove(element)
}
delete(l.idToNode, node.ID())
}
// if a node is removed for a list, remove it from idToNode too
func (l *shardLRU) removeFromIdToNode(ele *list.Element) {
nodeID := ele.Value.(nodeWithShardInfo).node.ID()
for ind, entries := range l.idToNode[nodeID] {
if entries == ele {
l.idToNode[nodeID] = append(l.idToNode[nodeID][:ind], l.idToNode[nodeID][ind+1:]...)
break
}
}
if len(l.idToNode[nodeID]) == 0 {
delete(l.idToNode, nodeID)
}
}
func nodeToRelayShard(node *enode.Node) (*protocol.RelayShards, error) {
shard, err := wenr.RelaySharding(node.Record())
if err != nil {
return nil, err
}
if shard == nil { // if no shard info, then add to node to Cluster 0, Index 0
shard = &protocol.RelayShards{
ClusterID: 0,
ShardIDs: []uint16{0},
}
}
return shard, nil
}
// time complexity: O(new number of indexes in node's shard)
func (l *shardLRU) add(node *enode.Node) error {
shard, err := nodeToRelayShard(node)
if err != nil {
return err
}
elements := []*list.Element{}
for _, index := range shard.ShardIDs {
key := ShardInfo{
shard.ClusterID,
index,
}
if l.shardNodes[key] == nil {
l.shardNodes[key] = list.New()
}
if l.shardNodes[key].Len() >= l.size {
oldest := l.shardNodes[key].Back()
l.removeFromIdToNode(oldest)
l.shardNodes[key].Remove(oldest)
}
entry := l.shardNodes[key].PushFront(nodeWithShardInfo{
key: key,
node: node,
})
elements = append(elements, entry)
}
l.idToNode[node.ID()] = elements
return nil
}
// this will be called when the seq number of node is more than the one in cache
func (l *shardLRU) Add(node *enode.Node) error {
l.mu.Lock()
defer l.mu.Unlock()
// removing bcz previous node might be subscribed to different shards, we need to remove node from those shards
l.remove(node)
return l.add(node)
}
// clusterIndex is nil when peers for no specific shard are requested
func (l *shardLRU) GetRandomNodes(clusterIndex *ShardInfo, neededPeers int) (nodes []*enode.Node) {
l.mu.Lock()
defer l.mu.Unlock()
availablePeers := l.len(clusterIndex)
if availablePeers < neededPeers {
neededPeers = availablePeers
}
// if clusterIndex is nil, then return all nodes
var elements []*list.Element
if clusterIndex == nil {
elements = make([]*list.Element, 0, len(l.idToNode))
for _, entries := range l.idToNode {
elements = append(elements, entries[0])
}
} else if entries := l.shardNodes[*clusterIndex]; entries != nil && entries.Len() != 0 {
elements = make([]*list.Element, 0, entries.Len())
for ent := entries.Back(); ent != nil; ent = ent.Prev() {
elements = append(elements, ent)
}
}
utils.Logger().Info(fmt.Sprintf("%d", len(elements)))
indexes := l.rng.Perm(len(elements))[0:neededPeers]
for _, ind := range indexes {
node := elements[ind].Value.(nodeWithShardInfo).node
nodes = append(nodes, node)
// this removes the node from all list (all cluster/shard pair that the node has) and adds it to the front
l.remove(node)
_ = l.add(node)
}
return nodes
}
// if clusterIndex is not nil, return len of nodes maintained for a given shard
// if clusterIndex is nil, return count of all nodes maintained
func (l *shardLRU) len(clusterIndex *ShardInfo) int {
if clusterIndex == nil {
return len(l.idToNode)
}
if entries := l.shardNodes[*clusterIndex]; entries != nil {
return entries.Len()
}
return 0
}
// get the node with the given id, if it is present in cache
func (l *shardLRU) Get(id enode.ID) *enode.Node {
l.mu.RLock()
defer l.mu.RUnlock()
if elements, ok := l.idToNode[id]; ok && len(elements) > 0 {
return elements[0].Value.(nodeWithShardInfo).node
}
return nil
}

View File

@ -22,14 +22,22 @@ var messageSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Buckets: []float64{0.0, 5.0, 15.0, 50.0, 100.0, 300.0, 700.0, 1000.0},
})
var pubsubTopics = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "waku_pubsub_topics",
Help: "Number of PubSub Topics node is subscribed to",
})
var collectors = []prometheus.Collector{
messages,
messageSize,
pubsubTopics,
}
// Metrics exposes the functions required to update prometheus metrics for relay protocol
type Metrics interface {
RecordMessage(envelope *waku_proto.Envelope)
SetPubSubTopics(int)
}
type metricsImpl struct {
@ -56,3 +64,7 @@ func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) {
m.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexBytes("hash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes))
}()
}
func (m *metricsImpl) SetPubSubTopics(size int) {
pubsubTopics.Set(float64(size))
}

View File

@ -1,5 +1,7 @@
package relay
import pubsub "github.com/libp2p/go-libp2p-pubsub"
type publishParameters struct {
pubsubTopic string
}
@ -20,3 +22,31 @@ func WithDefaultPubsubTopic() PublishOption {
params.pubsubTopic = DefaultWakuTopic
}
}
type relayParameters struct {
pubsubOpts []pubsub.Option
maxMsgSizeBytes int
}
type RelayOption func(*relayParameters)
func WithPubSubOptions(opts []pubsub.Option) RelayOption {
return func(params *relayParameters) {
params.pubsubOpts = append(params.pubsubOpts, opts...)
}
}
func WithMaxMsgSize(maxMsgSizeBytes int) RelayOption {
return func(params *relayParameters) {
if maxMsgSizeBytes == 0 {
maxMsgSizeBytes = defaultMaxMsgSizeBytes
}
params.maxMsgSizeBytes = maxMsgSizeBytes
}
}
func defaultOptions() []RelayOption {
return []RelayOption{
WithMaxMsgSize(defaultMaxMsgSizeBytes),
}
}

View File

@ -27,13 +27,15 @@ import (
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
const WakuRelayENRField = uint8(1 << 0)
const defaultMaxMsgSizeBytes = 150 * 1024
// DefaultWakuTopic is the default pubsub topic used across all Waku protocols
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic{}.String()
// WakuRelay is the implementation of the Waku Relay protocol
type WakuRelay struct {
host host.Host
opts []pubsub.Option
relayParams *relayParameters
pubsub *pubsub.PubSub
params pubsub.GossipSubParams
peerScoreParams *pubsub.PeerScoreParams
@ -41,9 +43,8 @@ type WakuRelay struct {
topicParams *pubsub.TopicScoreParams
timesource timesource.Timesource
metrics Metrics
log *zap.Logger
logMessages *zap.Logger
log *zap.Logger
logMessages *zap.Logger
bcaster Broadcaster
@ -75,7 +76,7 @@ type pubsubTopicSubscriptionDetails struct {
// NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource,
reg prometheus.Registerer, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
reg prometheus.Registerer, log *zap.Logger, opts ...RelayOption) *WakuRelay {
w := new(WakuRelay)
w.timesource = timesource
w.topics = make(map[string]*pubsubTopicSubscriptionDetails)
@ -87,9 +88,16 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
w.logMessages = utils.MessagesLogger("relay")
w.events = eventbus.NewBus()
w.metrics = newMetrics(reg, w.logMessages)
w.relayParams = new(relayParameters)
w.relayParams.pubsubOpts = w.defaultPubsubOptions()
// default options required by WakuRelay
w.opts = append(w.defaultPubsubOptions(), opts...)
options := defaultOptions()
options = append(options, opts...)
for _, opt := range options {
opt(w.relayParams)
}
w.log.Info("relay config", zap.Int("max-msg-size-bytes", w.relayParams.maxMsgSizeBytes),
zap.Int("min-peers-to-publish", w.minPeersToPublish))
return w
}
@ -123,7 +131,7 @@ func (w *WakuRelay) start() error {
if w.bcaster == nil {
return errors.New("broadcaster not specified for relay")
}
ps, err := pubsub.NewGossipSub(w.Context(), w.host, w.opts...)
ps, err := pubsub.NewGossipSub(w.Context(), w.host, w.relayParams.pubsubOpts...)
if err != nil {
return err
}
@ -235,14 +243,14 @@ func (w *WakuRelay) subscribeToPubsubTopic(topic string) (*pubsubTopicSubscripti
}
w.log.Info("gossipsub subscription", zap.String("pubsubTopic", subscription.Topic()))
w.metrics.SetPubSubTopics(len(w.topics))
result = w.topics[topic]
}
return result, nil
}
// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic. The pubsubTopic is derived from contentTopic
// Publish is used to broadcast a WakuMessage to a pubsub topic. The pubsubTopic is derived from contentTopic
// specified in the message via autosharding. To publish to a specific pubsubTopic, the `WithPubSubTopic` option should
// be provided
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts ...PublishOption) ([]byte, error) {
@ -289,7 +297,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
return nil, err
}
if len(out) > pubsub.DefaultMaxMessageSize {
if len(out) > w.relayParams.maxMsgSizeBytes {
return nil, errors.New("message size exceeds gossipsub max message size")
}
@ -483,6 +491,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
if err != nil {
return err
}
w.metrics.SetPubSubTopics(len(w.topics))
}
}
return nil

View File

@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/avast/retry-go/v4"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@ -45,13 +46,15 @@ type DynamicGroupManager struct {
membershipIndexToLoad *uint
}
func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) error {
func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered, latestProcessBlock uint64) error {
gm.lastBlockProcessedMutex.Lock()
defer gm.lastBlockProcessedMutex.Unlock()
toRemoveTable := om.New()
toInsertTable := om.New()
if gm.lastBlockProcessed == 0 {
gm.lastBlockProcessed = latestProcessBlock
}
lastBlockProcessed := gm.lastBlockProcessed
for _, event := range events {
if event.Raw.Removed {
@ -130,19 +133,27 @@ func NewDynamicGroupManager(
}
func (gm *DynamicGroupManager) getMembershipFee(ctx context.Context) (*big.Int, error) {
fee, err := gm.web3Config.RLNContract.MEMBERSHIPDEPOSIT(&bind.CallOpts{Context: ctx})
if err != nil {
return nil, fmt.Errorf("could not check if credential exits in contract: %w", err)
}
return fee, nil
return retry.DoWithData(
func() (*big.Int, error) {
fee, err := gm.web3Config.RLNContract.MEMBERSHIPDEPOSIT(&bind.CallOpts{Context: ctx})
if err != nil {
return nil, fmt.Errorf("could not check if credential exits in contract: %w", err)
}
return fee, nil
}, retry.Attempts(3),
)
}
func (gm *DynamicGroupManager) memberExists(ctx context.Context, idCommitment rln.IDCommitment) (bool, error) {
exists, err := gm.web3Config.RLNContract.MemberExists(&bind.CallOpts{Context: ctx}, rln.Bytes32ToBigInt(idCommitment))
if err != nil {
return false, fmt.Errorf("could not check if credential exits in contract: %w", err)
}
return exists, nil
return retry.DoWithData(
func() (bool, error) {
exists, err := gm.web3Config.RLNContract.MemberExists(&bind.CallOpts{Context: ctx}, rln.Bytes32ToBigInt(idCommitment))
if err != nil {
return false, fmt.Errorf("could not check if credential exits in contract: %w", err)
}
return exists, nil
}, retry.Attempts(3),
)
}
func (gm *DynamicGroupManager) Start(ctx context.Context) error {

View File

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/avast/retry-go/v4"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
@ -19,7 +20,7 @@ import (
)
// RegistrationEventHandler represents the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered) error
type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered, uint64) error
// MembershipFetcher is used for getting membershipRegsitered Events from the eth rpc
type MembershipFetcher struct {
@ -93,7 +94,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo
t1Since := time.Since(t1)
t2 := time.Now()
if err := handler(events); err != nil {
if err := handler(events, fromBlock+maxBatchSize); err != nil {
return err
}
@ -109,7 +110,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo
// process all the fetched events
t2 := time.Now()
err = handler(events)
err = handler(events, toBlock)
if err != nil {
return err
}
@ -155,7 +156,7 @@ func (mf *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint6
fromBlock = toBlock + 1
}
err = handler(events)
err = handler(events, toBlock)
if err != nil {
mf.log.Error("processing rln log", zap.Error(err))
}
@ -207,26 +208,30 @@ func (mf *MembershipFetcher) getEvents(ctx context.Context, fromBlock uint64, to
}
func (mf *MembershipFetcher) fetchEvents(ctx context.Context, from uint64, to uint64) ([]*contracts.RLNMemberRegistered, error) {
logIterator, err := mf.web3Config.RLNContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: &to, Context: ctx})
if err != nil {
return nil, err
}
return retry.DoWithData(
func() ([]*contracts.RLNMemberRegistered, error) {
logIterator, err := mf.web3Config.RLNContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: &to, Context: ctx})
if err != nil {
return nil, err
}
var results []*contracts.RLNMemberRegistered
var results []*contracts.RLNMemberRegistered
for {
if !logIterator.Next() {
break
}
for {
if !logIterator.Next() {
break
}
if logIterator.Error() != nil {
return nil, logIterator.Error()
}
if logIterator.Error() != nil {
return nil, logIterator.Error()
}
results = append(results, logIterator.Event)
}
results = append(results, logIterator.Event)
}
return results, nil
return results, nil
}, retry.Attempts(3),
)
}
// GetMetadata retrieves metadata from the zerokit's RLN database

View File

@ -18,10 +18,10 @@ type RLNMetadata struct {
}
// Serialize converts a RLNMetadata into a binary format expected by zerokit's RLN
func (r RLNMetadata) Serialize() []byte {
func (r RLNMetadata) Serialize() ([]byte, error) {
chainID := r.ChainID
if chainID == nil {
chainID = big.NewInt(0)
return nil, errors.New("chain-id not specified and cannot be 0")
}
var result []byte
@ -34,7 +34,7 @@ func (r RLNMetadata) Serialize() []byte {
result = binary.LittleEndian.AppendUint64(result, v.BlockNumber)
}
return result
return result, nil
}
const lastProcessedBlockOffset = 0
@ -76,6 +76,9 @@ func DeserializeMetadata(b []byte) (RLNMetadata, error) {
// SetMetadata stores some metadata into the zerokit's RLN database
func (gm *DynamicGroupManager) SetMetadata(meta RLNMetadata) error {
b := meta.Serialize()
b, err := meta.Serialize()
if err != nil {
return err
}
return gm.rln.SetMetadata(b)
}

View File

@ -19,7 +19,7 @@ func Logger() *zap.Logger {
return log
}
// MessagesLogger returns a logger used for debug logging of receivent/sent messages
// MessagesLogger returns a logger used for debug logging of sent/received messages
func MessagesLogger(prefix string) *zap.Logger {
if messageLoggers == nil {
messageLoggers = make(map[string]*zap.Logger)

View File

@ -154,7 +154,7 @@ func init() {
// the root is created locally, using createMembershipList proc from waku_rln_relay_utils module, and the result is hardcoded in here
const STATIC_GROUP_MERKLE_ROOT = "ca7290e49680fa14eeaeea709e4742a8a074a1bcbfd50a4b3976742ae8a6ca25"
const EPOCH_UNIT_SECONDS = uint64(10) // the rln-relay epoch length in seconds
const EPOCH_UNIT_SECONDS = uint64(1) // the rln-relay epoch length in seconds
type Epoch [32]byte

11
vendor/modules.txt vendored
View File

@ -116,6 +116,9 @@ github.com/andybalholm/brotli
# github.com/andybalholm/cascadia v1.2.0
## explicit; go 1.13
github.com/andybalholm/cascadia
# github.com/avast/retry-go/v4 v4.5.1
## explicit; go 1.18
github.com/avast/retry-go/v4
# github.com/bahlo/generic-list-go v0.2.0
## explicit; go 1.18
github.com/bahlo/generic-list-go
@ -195,8 +198,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4/ecdsa
# github.com/docker/go-units v0.5.0
## explicit
github.com/docker/go-units
# github.com/dustin/go-humanize v1.0.0
## explicit
# github.com/dustin/go-humanize v1.0.1
## explicit; go 1.16
github.com/dustin/go-humanize
# github.com/edsrzf/mmap-go v1.0.0
## explicit
@ -1008,7 +1011,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20231206134046-3d73afcd50a3
# github.com/waku-org/go-waku v0.8.1-0.20240104144340-585648c4eefe
## explicit; go 1.19
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/waku/persistence
@ -1048,7 +1051,7 @@ github.com/waku-org/go-waku/waku/v2/rendezvous
github.com/waku-org/go-waku/waku/v2/service
github.com/waku-org/go-waku/waku/v2/timesource
github.com/waku-org/go-waku/waku/v2/utils
# github.com/waku-org/go-zerokit-rln v0.1.14-0.20230916173259-d284a3d8f2fd
# github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59
## explicit; go 1.18
github.com/waku-org/go-zerokit-rln/rln
github.com/waku-org/go-zerokit-rln/rln/link

View File

@ -178,7 +178,7 @@ func (d *DBStore) Put(env *protocol.Envelope) error {
}
cursor := env.Index()
dbKey := gowakuPersistence.NewDBKey(uint64(cursor.SenderTime), uint64(env.Index().ReceiverTime), env.PubsubTopic(), env.Index().Digest)
dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(env.Index().ReceiverTime), env.PubsubTopic(), env.Index().Digest)
_, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().Timestamp, env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().Version)
if err != nil {
return err
@ -231,7 +231,7 @@ func (d *DBStore) Query(query *storepb.HistoryQuery) (*storepb.Index, []gowakuPe
if query.PagingInfo.Cursor != nil {
usesCursor = true
var exists bool
cursorDBKey := gowakuPersistence.NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
cursorDBKey := NewDBKey(uint64(query.PagingInfo.Cursor.SenderTime), uint64(query.PagingInfo.Cursor.ReceiverTime), query.PagingInfo.Cursor.PubsubTopic, query.PagingInfo.Cursor.Digest)
err := d.db.QueryRow("SELECT EXISTS(SELECT 1 FROM store_messages WHERE id = $1)",
cursorDBKey.Bytes(),
@ -259,7 +259,7 @@ func (d *DBStore) Query(query *storepb.HistoryQuery) (*storepb.Index, []gowakuPe
if !usesCursor || query.PagingInfo.Direction == storepb.PagingInfo_BACKWARD {
paramCnt++
conditions = append(conditions, fmt.Sprintf("id >= $%d", paramCnt))
startTimeDBKey := gowakuPersistence.NewDBKey(uint64(query.GetStartTime()), uint64(query.GetStartTime()), "", []byte{})
startTimeDBKey := NewDBKey(uint64(query.GetStartTime()), uint64(query.GetStartTime()), "", []byte{})
parameters = append(parameters, startTimeDBKey.Bytes())
}
@ -269,7 +269,7 @@ func (d *DBStore) Query(query *storepb.HistoryQuery) (*storepb.Index, []gowakuPe
if !usesCursor || query.PagingInfo.Direction == storepb.PagingInfo_FORWARD {
paramCnt++
conditions = append(conditions, fmt.Sprintf("id <= $%d", paramCnt))
endTimeDBKey := gowakuPersistence.NewDBKey(uint64(query.GetEndTime()), uint64(query.GetEndTime()), "", []byte{})
endTimeDBKey := NewDBKey(uint64(query.GetEndTime()), uint64(query.GetEndTime()), "", []byte{})
parameters = append(parameters, endTimeDBKey.Bytes())
}
}

View File

@ -292,6 +292,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
node.WithMaxPeerConnections(cfg.DiscoveryLimit),
node.WithLogger(logger),
node.WithClusterID(cfg.ClusterID),
node.WithMaxMsgSize(1024 * 1024),
}
if cfg.EnableDiscV5 {